zentol commented on code in PR #20555:
URL: https://github.com/apache/flink/pull/20555#discussion_r943508794


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessor.java:
##########
@@ -443,7 +443,7 @@ static boolean isChainableSource(ExecNode<?> node, 
ProcessorContext context) {
             // this is OK because sources do not have any input so the 
transformation will never
             // change.
             Transformation<?> transformation =
-                    
node.translateToPlan(Preconditions.checkNotNull(context).getPlanner());
+                    
node.translateToPlan(Preconditions.checkNotNull(context).getPlanner(), false);

Review Comment:
   why is this false?



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java:
##########
@@ -108,10 +122,28 @@ public void testLegacyBatchValues() {
     }
 
     @Test
-    public void testLegacyUid() {
-        final TableEnvironment env =
-                
TableEnvironment.create(EnvironmentSettings.inStreamingMode().getConfiguration());
-        
env.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_LEGACY_TRANSFORMATION_UIDS,
 true);
+    public void testUidGeneration() {
+        checkUids(c -> c.set(TABLE_EXEC_UID_GENERATION, PLAN_ONLY), true, 
false);
+        checkUids(c -> c.set(TABLE_EXEC_UID_GENERATION, ALWAYS), true, true);
+        checkUids(c -> c.set(TABLE_EXEC_UID_GENERATION, DISABLED), false, 
false);
+        checkUids(
+                c -> {
+                    c.set(TABLE_EXEC_UID_GENERATION, PLAN_ONLY);
+                    c.set(TABLE_EXEC_LEGACY_TRANSFORMATION_UIDS, true);
+                },
+                false,
+                false);
+    }
+
+    private static void checkUids(
+            Consumer<TableConfig> config,
+            boolean uidWithCompilation,
+            boolean uidWithoutCompilation) {

Review Comment:
   ```suggestion
               boolean expectUidWithCompilation,
               boolean expectUidWithoutCompilation) {
   ```



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java:
##########
@@ -488,18 +488,59 @@ public class ExecutionConfigOptions {
                                             + "all changes to downstream just 
like when the mini-batch is "
                                             + "not enabled.");
 
+    /** @deprecated Use {@link #TABLE_EXEC_UID_GENERATION} instead. */
     @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
     @Deprecated
     public static final ConfigOption<Boolean> 
TABLE_EXEC_LEGACY_TRANSFORMATION_UIDS =
             key("table.exec.legacy-transformation-uids")
                     .booleanType()
                     .defaultValue(false)
                     .withDescription(
-                            "In Flink 1.15 Transformation UIDs are generated 
deterministically starting from the metadata available after the planning 
phase. "
-                                    + "This new behaviour allows a safe 
restore of persisted plan, remapping the plan execution graph to the correct 
operators state. "
-                                    + "Setting this flag to true enables the 
previous \"legacy\" behavior, which is generating uids from the Transformation 
graph topology. "
-                                    + "We strongly suggest to keep this flag 
disabled, as this flag is going to be removed in the next releases. "
-                                    + "If you have a pipeline relying on the 
old behavior, please create a new pipeline and regenerate the operators 
state.");
+                            "This flag has been replaced by 
table.exec.uid.generation. Use the enum "
+                                    + "value DISABLED to restore legacy 
behavior. However, the new "
+                                    + "default value should be sufficient for 
most use cases as "
+                                    + "only pipelines from compiled plans get 
UIDs assigned.");
+
+    @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+    public static final ConfigOption<UidGeneration> TABLE_EXEC_UID_GENERATION =
+            key("table.exec.uid.generation")
+                    .enumType(UidGeneration.class)
+                    .defaultValue(UidGeneration.PLAN_ONLY)
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "In order to remap state to 
operators during a restore, "
+                                                    + "it is required that the 
pipeline's streaming "
+                                                    + "transformations get a 
UID assigned.")
+                                    .linebreak()
+                                    .text(
+                                            "The planner can generate and 
assign explicit UIDs. If no "
+                                                    + "UIDs have been set by 
the planner, the UIDs will "
+                                                    + "be auto-generated by 
lower layers that can take "
+                                                    + "the complete topology 
into account for uniqueness "
+                                                    + "of the IDs. See the 
DataStream API for more information.")
+                                    .linebreak()
+                                    .text(
+                                            "This configuration option is for 
experts only and the default "
+                                                    + "should be sufficient 
for most use cases. By default, "
+                                                    + "only pipelines created 
from a persisted compiled plan will "
+                                                    + "get UIDs assigned 
explicitly. Thus, these pipelines can "
+                                                    + "be arbitrarily moved 
around within the same topology without "
+                                                    + "affecting the stable 
UIDs.")
+                                    .build());
+
+    @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+    public static final ConfigOption<String> TABLE_EXEC_UID_FORMAT =
+            key("table.exec.uid.format")
+                    .stringType()
+                    .defaultValue("%1$s_%4$s")

Review Comment:
   How about a more expressive format instead of mysterious indices, similar to 
scope formats? That should also make it easier to remove parameters in the 
future.



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/LongAdaptiveHashJoinGeneratorTest.java:
##########
@@ -88,7 +87,7 @@ public JoinCondition newInstance(ClassLoader classLoader) {
             sortMergeJoinFunction =
                     SorMergeJoinOperatorUtil.getSortMergeJoinFunction(
                             Thread.currentThread().getContextClassLoader(),
-                            new ExecNodeConfig(TableConfig.getDefault(), new 
Configuration()),
+                            ExecNodeConfig.ofNodeConfig(new Configuration(), 
false),

Review Comment:
   why are these false?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to