zentol commented on code in PR #20555:
URL: https://github.com/apache/flink/pull/20555#discussion_r943508794
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessor.java:
##########
@@ -443,7 +443,7 @@ static boolean isChainableSource(ExecNode<?> node,
ProcessorContext context) {
// this is OK because sources do not have any input so the
transformation will never
// change.
Transformation<?> transformation =
-
node.translateToPlan(Preconditions.checkNotNull(context).getPlanner());
+
node.translateToPlan(Preconditions.checkNotNull(context).getPlanner(), false);
Review Comment:
why is this false?
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java:
##########
@@ -108,10 +122,28 @@ public void testLegacyBatchValues() {
}
@Test
- public void testLegacyUid() {
- final TableEnvironment env =
-
TableEnvironment.create(EnvironmentSettings.inStreamingMode().getConfiguration());
-
env.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_LEGACY_TRANSFORMATION_UIDS,
true);
+ public void testUidGeneration() {
+ checkUids(c -> c.set(TABLE_EXEC_UID_GENERATION, PLAN_ONLY), true,
false);
+ checkUids(c -> c.set(TABLE_EXEC_UID_GENERATION, ALWAYS), true, true);
+ checkUids(c -> c.set(TABLE_EXEC_UID_GENERATION, DISABLED), false,
false);
+ checkUids(
+ c -> {
+ c.set(TABLE_EXEC_UID_GENERATION, PLAN_ONLY);
+ c.set(TABLE_EXEC_LEGACY_TRANSFORMATION_UIDS, true);
+ },
+ false,
+ false);
+ }
+
+ private static void checkUids(
+ Consumer<TableConfig> config,
+ boolean uidWithCompilation,
+ boolean uidWithoutCompilation) {
Review Comment:
```suggestion
boolean expectUidWithCompilation,
boolean expectUidWithoutCompilation) {
```
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java:
##########
@@ -488,18 +488,59 @@ public class ExecutionConfigOptions {
+ "all changes to downstream just
like when the mini-batch is "
+ "not enabled.");
+ /** @deprecated Use {@link #TABLE_EXEC_UID_GENERATION} instead. */
@Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
@Deprecated
public static final ConfigOption<Boolean>
TABLE_EXEC_LEGACY_TRANSFORMATION_UIDS =
key("table.exec.legacy-transformation-uids")
.booleanType()
.defaultValue(false)
.withDescription(
- "In Flink 1.15 Transformation UIDs are generated
deterministically starting from the metadata available after the planning
phase. "
- + "This new behaviour allows a safe
restore of persisted plan, remapping the plan execution graph to the correct
operators state. "
- + "Setting this flag to true enables the
previous \"legacy\" behavior, which is generating uids from the Transformation
graph topology. "
- + "We strongly suggest to keep this flag
disabled, as this flag is going to be removed in the next releases. "
- + "If you have a pipeline relying on the
old behavior, please create a new pipeline and regenerate the operators
state.");
+ "This flag has been replaced by
table.exec.uid.generation. Use the enum "
+ + "value DISABLED to restore legacy
behavior. However, the new "
+ + "default value should be sufficient for
most use cases as "
+ + "only pipelines from compiled plans get
UIDs assigned.");
+
+ @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+ public static final ConfigOption<UidGeneration> TABLE_EXEC_UID_GENERATION =
+ key("table.exec.uid.generation")
+ .enumType(UidGeneration.class)
+ .defaultValue(UidGeneration.PLAN_ONLY)
+ .withDescription(
+ Description.builder()
+ .text(
+ "In order to remap state to
operators during a restore, "
+ + "it is required that the
pipeline's streaming "
+ + "transformations get a
UID assigned.")
+ .linebreak()
+ .text(
+ "The planner can generate and
assign explicit UIDs. If no "
+ + "UIDs have been set by
the planner, the UIDs will "
+ + "be auto-generated by
lower layers that can take "
+ + "the complete topology
into account for uniqueness "
+ + "of the IDs. See the
DataStream API for more information.")
+ .linebreak()
+ .text(
+ "This configuration option is for
experts only and the default "
+ + "should be sufficient
for most use cases. By default, "
+ + "only pipelines created
from a persisted compiled plan will "
+ + "get UIDs assigned
explicitly. Thus, these pipelines can "
+ + "be arbitrarily moved
around within the same topology without "
+ + "affecting the stable
UIDs.")
+ .build());
+
+ @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+ public static final ConfigOption<String> TABLE_EXEC_UID_FORMAT =
+ key("table.exec.uid.format")
+ .stringType()
+ .defaultValue("%1$s_%4$s")
Review Comment:
How about a more expressive format instead of mysterious indices, similar to
scope formats? That should also make it easier to remove parameters in the
future.
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/LongAdaptiveHashJoinGeneratorTest.java:
##########
@@ -88,7 +87,7 @@ public JoinCondition newInstance(ClassLoader classLoader) {
sortMergeJoinFunction =
SorMergeJoinOperatorUtil.getSortMergeJoinFunction(
Thread.currentThread().getContextClassLoader(),
- new ExecNodeConfig(TableConfig.getDefault(), new
Configuration()),
+ ExecNodeConfig.ofNodeConfig(new Configuration(),
false),
Review Comment:
why are these false?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]