This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 881b2bf046e510b1b6dddddf8c15af45926397f1 Author: Timo Walther <twal...@apache.org> AuthorDate: Thu Aug 11 11:18:46 2022 +0200 [FLINK-28861][table] Fix bug in UID format for future migrations and make it configurable Before this commit, the UID format was not future-proof for migrations. The ExecNode version should not be in the UID, otherwise, operator migration won't be possible once plan migration is executed. See the FLIP-190 example that drops a version in the plan, once operator migration has been performed. Given that the plan feature is marked as @Experimental, this change should still be possible without providing backwards compatibility. However, the config option table.exec.uid.format allows for restoring the old format and solves other UID related issues on the way. This closes #20555. --- .../generated/execution_config_configuration.html | 6 ++ .../table/api/config/ExecutionConfigOptions.java | 13 +++++ .../planner/plan/nodes/exec/ExecNodeBase.java | 9 +-- .../planner/plan/nodes/exec/ExecNodeContext.java | 20 ++++++- .../plan/nodes/exec/common/CommonExecSink.java | 2 +- .../exec/common/CommonExecTableSourceScan.java | 2 +- .../nodes/exec/stream/StreamExecIntervalJoin.java | 11 ++-- .../plan/nodes/exec/TransformationsTest.java | 66 ++++++++++++++++++++++ .../flink/table/planner/utils/JsonTestUtils.java | 20 +++++++ 9 files changed, 136 insertions(+), 13 deletions(-) diff --git a/docs/layouts/shortcodes/generated/execution_config_configuration.html b/docs/layouts/shortcodes/generated/execution_config_configuration.html index d6292edc1b4..d837fd4ab66 100644 --- a/docs/layouts/shortcodes/generated/execution_config_configuration.html +++ b/docs/layouts/shortcodes/generated/execution_config_configuration.html @@ -160,6 +160,12 @@ By default no operator is disabled.</td> <td>Duration</td> <td>Specifies a minimum time interval for how long idle state (i.e. state which was not updated), will be retained. State will never be cleared until it was idle for less than the minimum time, and will be cleared at some time after it was idle. Default is never clean-up the state. NOTE: Cleaning up state requires additional overhead for bookkeeping. Default value is 0, which means that it will never clean up state.</td> </tr> + <tr> + <td><h5>table.exec.uid.format</h5><br> <span class="label label-primary">Streaming</span></td> + <td style="word-wrap: break-word;">"<id>_<transformation>"</td> + <td>String</td> + <td>Defines the format pattern for generating the UID of an ExecNode streaming transformation. The pattern can be defined globally or per-ExecNode in the compiled plan. Supported arguments are: <id> (from static counter), <type> (e.g. 'stream-exec-sink'), <version>, and <transformation> (e.g. 'constraint-validator' for a sink). In Flink 1.15.x the pattern was wrongly defined as '<id>_<type>_<version>_<transformation>' which woul [...] + </tr> <tr> <td><h5>table.exec.uid.generation</h5><br> <span class="label label-primary">Streaming</span></td> <td style="word-wrap: break-word;">PLAN_ONLY</td> diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java index 2f449dfcf6b..c14546c192f 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java @@ -529,6 +529,19 @@ public class ExecutionConfigOptions { + "affecting the stable UIDs.") .build()); + @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING) + public static final ConfigOption<String> TABLE_EXEC_UID_FORMAT = + key("table.exec.uid.format") + .stringType() + .defaultValue("<id>_<transformation>") + .withDescription( + "Defines the format pattern for generating the UID of an ExecNode streaming transformation. " + + "The pattern can be defined globally or per-ExecNode in the compiled plan. " + + "Supported arguments are: <id> (from static counter), <type> (e.g. 'stream-exec-sink'), " + + "<version>, and <transformation> (e.g. 'constraint-validator' for a sink). " + + "In Flink 1.15.x the pattern was wrongly defined as '<id>_<type>_<version>_<transformation>' " + + "which would prevent migrations in the future."); + // ------------------------------------------------------------------------------------------ // Enum option types // ------------------------------------------------------------------------------------------ diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java index 0e949aa666b..2c7a33042a0 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java @@ -207,8 +207,8 @@ public abstract class ExecNodeBase<T> implements ExecNode<T> { return getClass().getSimpleName().replace("StreamExec", "").replace("BatchExec", ""); } - protected String createTransformationUid(String operatorName) { - return context.generateUid(operatorName); + protected String createTransformationUid(String operatorName, ExecNodeConfig config) { + return context.generateUid(operatorName, config); } protected String createTransformationName(ReadableConfig config) { @@ -226,7 +226,7 @@ public abstract class ExecNodeBase<T> implements ExecNode<T> { createTransformationName(config), createTransformationDescription(config)); } else { return new TransformationMetadata( - createTransformationUid(operatorName), + createTransformationUid(operatorName, config), createTransformationName(config), createTransformationDescription(config)); } @@ -239,7 +239,8 @@ public abstract class ExecNodeBase<T> implements ExecNode<T> { if (ExecNodeMetadataUtil.isUnsupported(this.getClass()) || !config.shouldSetUid()) { return new TransformationMetadata(name, desc); } else { - return new TransformationMetadata(createTransformationUid(operatorName), name, desc); + return new TransformationMetadata( + createTransformationUid(operatorName, config), name, desc); } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeContext.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeContext.java index 0abb680b136..cc3436916c6 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeContext.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeContext.java @@ -23,6 +23,7 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.planner.plan.utils.ExecNodeMetadataUtil; import org.apache.flink.table.types.logical.LogicalType; @@ -30,6 +31,8 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCre import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonValue; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DatabindContext; +import org.apache.commons.lang3.StringUtils; + import javax.annotation.Nullable; import java.util.List; @@ -122,7 +125,7 @@ public final class ExecNodeContext { } /** Returns a new {@code uid} for transformations. */ - public String generateUid(String transformationName) { + public String generateUid(String transformationName, ExecNodeConfig config) { if (!transformationNamePattern.matcher(transformationName).matches()) { throw new TableException( "Invalid transformation name '" @@ -130,7 +133,20 @@ public final class ExecNodeContext { + "'. " + "This is a bug, please file an issue."); } - return String.format("%s_%s_%s", getId(), getTypeAsString(), transformationName); + final String uidPattern = config.get(ExecutionConfigOptions.TABLE_EXEC_UID_FORMAT); + // Note: name and version are not included in the UID by default as they would prevent + // migration. + // No version because: An operator can change its state layout and bump up the ExecNode + // version, in this case the UID should still be able to map state even after plan + // migration to the new version. + // No name because: We might fuse operators in the future, and a new operator might + // subscribe to multiple old UIDs. + return StringUtils.replaceEach( + uidPattern, + new String[] {"<id>", "<type>", "<version>", "<transformation>"}, + new String[] { + String.valueOf(id), name, String.valueOf(version), transformationName + }); } /** diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java index 5831bd3ed76..71f8184df2c 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java @@ -532,7 +532,7 @@ public abstract class CommonExecSink extends ExecNodeBase<Object> private ProviderContext createProviderContext(ExecNodeConfig config) { return name -> { if (this instanceof StreamExecNode && config.shouldSetUid()) { - return Optional.of(createTransformationUid(name)); + return Optional.of(createTransformationUid(name, config)); } return Optional.empty(); }; diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java index baebd1af435..a008f881a56 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java @@ -158,7 +158,7 @@ public abstract class CommonExecTableSourceScan extends ExecNodeBase<RowData> private ProviderContext createProviderContext(ExecNodeConfig config) { return name -> { if (this instanceof StreamExecNode && config.shouldSetUid()) { - return Optional.of(createTransformationUid(name)); + return Optional.of(createTransformationUid(name, config)); } return Optional.empty(); }; diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIntervalJoin.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIntervalJoin.java index 347203641e6..70288dd1ce6 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIntervalJoin.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIntervalJoin.java @@ -236,7 +236,7 @@ public class StreamExecIntervalJoin extends ExecNodeBase<RowData> int leftArity, int rightArity, InternalTypeInfo<RowData> returnTypeInfo, - ReadableConfig config) { + ExecNodeConfig config) { boolean shouldCreateUid = config.get(ExecutionConfigOptions.TABLE_EXEC_LEGACY_TRANSFORMATION_UIDS); @@ -260,7 +260,7 @@ public class StreamExecIntervalJoin extends ExecNodeBase<RowData> returnTypeInfo, leftParallelism); if (shouldCreateUid) { - filterAllLeftStream.setUid(createTransformationUid(FILTER_LEFT_TRANSFORMATION)); + filterAllLeftStream.setUid(createTransformationUid(FILTER_LEFT_TRANSFORMATION, config)); } filterAllLeftStream.setDescription( createFormattedTransformationDescription( @@ -277,7 +277,8 @@ public class StreamExecIntervalJoin extends ExecNodeBase<RowData> returnTypeInfo, rightParallelism); if (shouldCreateUid) { - filterAllRightStream.setUid(createTransformationUid(FILTER_RIGHT_TRANSFORMATION)); + filterAllRightStream.setUid( + createTransformationUid(FILTER_RIGHT_TRANSFORMATION, config)); } filterAllRightStream.setDescription( createFormattedTransformationDescription( @@ -294,7 +295,7 @@ public class StreamExecIntervalJoin extends ExecNodeBase<RowData> returnTypeInfo, leftParallelism); if (shouldCreateUid) { - padLeftStream.setUid(createTransformationUid(PAD_LEFT_TRANSFORMATION)); + padLeftStream.setUid(createTransformationUid(PAD_LEFT_TRANSFORMATION, config)); } padLeftStream.setDescription( createFormattedTransformationDescription("pad left input transformation", config)); @@ -310,7 +311,7 @@ public class StreamExecIntervalJoin extends ExecNodeBase<RowData> returnTypeInfo, rightParallelism); if (shouldCreateUid) { - padRightStream.setUid(createTransformationUid(PAD_RIGHT_TRANSFORMATION)); + padRightStream.setUid(createTransformationUid(PAD_RIGHT_TRANSFORMATION, config)); } padRightStream.setDescription( createFormattedTransformationDescription("pad right input transformation", config)); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java index 20cf5080f90..4fa88b2345b 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java @@ -31,19 +31,26 @@ import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.TableDescriptor; +import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.api.internal.CompiledPlanUtils; +import org.apache.flink.table.planner.utils.JsonTestUtils; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.parallel.Execution; import org.junit.jupiter.api.parallel.ExecutionMode; +import java.io.IOException; import java.util.List; import java.util.function.Consumer; import java.util.stream.Collectors; +import java.util.stream.IntStream; import static org.apache.flink.table.api.Expressions.$; import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_LEGACY_TRANSFORMATION_UIDS; +import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_UID_FORMAT; import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_UID_GENERATION; import static org.apache.flink.table.api.config.ExecutionConfigOptions.UidGeneration.ALWAYS; import static org.apache.flink.table.api.config.ExecutionConfigOptions.UidGeneration.DISABLED; @@ -192,10 +199,69 @@ class TransformationsTest { } } + @Test + public void testUidDefaults() throws IOException { + checkUidModification( + config -> {}, json -> {}, "\\d+_sink", "\\d+_constraint-validator", "\\d+_values"); + } + + @Test + public void testUidFlink1_15() throws IOException { + checkUidModification( + config -> + config.set(TABLE_EXEC_UID_FORMAT, "<id>_<type>_<version>_<transformation>"), + json -> {}, + "\\d+_stream-exec-sink_1_sink", + "\\d+_stream-exec-sink_1_constraint-validator", + "\\d+_stream-exec-values_1_values"); + } + + @Test + public void testPerNodeCustomUid() throws IOException { + checkUidModification( + config -> {}, + json -> + JsonTestUtils.setExecNodeConfig( + json, + "stream-exec-sink_1", + TABLE_EXEC_UID_FORMAT.key(), + "my_custom_<transformation>_<id>"), + "my_custom_sink_\\d+", + "my_custom_constraint-validator_\\d+", + "\\d+_values"); + } + + private static void checkUidModification( + Consumer<TableConfig> configModifier, + Consumer<JsonNode> jsonModifier, + String... expectedUidPatterns) + throws IOException { + final TableEnvironment env = TableEnvironment.create(EnvironmentSettings.inStreamingMode()); + configModifier.accept(env.getConfig()); + final String plan = minimalPlan(env).asJsonString(); + final JsonNode json = JsonTestUtils.readFromString(plan); + jsonModifier.accept(json); + final List<String> planUids = + CompiledPlanUtils.toTransformations( + env, env.loadPlan(PlanReference.fromJsonString(json.toString()))) + .get(0).getTransitivePredecessors().stream() + .map(Transformation::getUid) + .collect(Collectors.toList()); + assertThat(planUids).hasSize(expectedUidPatterns.length); + IntStream.range(0, expectedUidPatterns.length) + .forEach(i -> assertThat(planUids.get(i)).matches(expectedUidPatterns[i])); + } + // -------------------------------------------------------------------------------------------- // Helper methods // -------------------------------------------------------------------------------------------- + private static CompiledPlan minimalPlan(TableEnvironment env) { + return env.fromValues(1, 2, 3) + .insertInto(TableDescriptor.forConnector("blackhole").build()) + .compilePlan(); + } + private static LegacySourceTransformation<?> toLegacySourceTransformation( StreamTableEnvironment env, Table table) { Transformation<?> transform = env.toChangelogStream(table).getTransformation(); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/JsonTestUtils.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/JsonTestUtils.java index 6aa9f96c7d0..c4595a5ccd3 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/JsonTestUtils.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/JsonTestUtils.java @@ -39,8 +39,28 @@ public final class JsonTestUtils { return OBJECT_MAPPER_INSTANCE.readTree(JsonTestUtils.class.getResource(path)); } + public static JsonNode readFromString(String path) throws IOException { + return OBJECT_MAPPER_INSTANCE.readTree(path); + } + public static JsonNode setFlinkVersion(JsonNode target, FlinkVersion flinkVersion) { return ((ObjectNode) target) .set("flinkVersion", OBJECT_MAPPER_INSTANCE.valueToTree(flinkVersion.toString())); } + + public static JsonNode setExecNodeConfig( + JsonNode target, String type, String key, String value) { + target.get("nodes") + .elements() + .forEachRemaining( + n -> { + if (n.get("type").asText().equals(type)) { + final ObjectNode configNode = + OBJECT_MAPPER_INSTANCE.createObjectNode(); + configNode.put(key, value); + ((ObjectNode) n).set("configuration", configNode); + } + }); + return target; + } }