twalthr commented on a change in pull request #19232:
URL: https://github.com/apache/flink/pull/19232#discussion_r834423232



##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeContext.java
##########
@@ -176,4 +187,46 @@ public String toString() {
         }
         return new ExecNodeContext(metadata.name(), metadata.version());
     }
+
+    /**
+     * Create a configuration for the {@link ExecNode}, ready to be persisted 
to a JSON plan.
+     *
+     * @param execNodeClass The {@link ExecNode} class.
+     * @param plannerConfig The planner configuration (include the {@link 
TableConfig}).
+     * @return The {@link ExecNode} configuration, which contains the consumed 
options for the node,
+     *     defined by {@link ExecNodeMetadata#consumedOptions()}, along with 
their values.
+     */
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    public static <T extends ExecNode<?>> ReadableConfig newPersistedConfig(
+            Class<T> execNodeClass, ReadableConfig plannerConfig) {
+        Map<String, ConfigOption<?>> configOptions =
+                Stream.concat(
+                                
ExecNodeMetadataUtil.TABLE_CONFIG_OPTIONS.stream(),
+                                
ExecNodeMetadataUtil.EXECUTION_CONFIG_OPTIONS.stream())
+                        .collect(Collectors.toMap(ConfigOption::key, o -> o));
+
+        Configuration nodeConfiguration = new Configuration();

Review comment:
       nit: call this variable `persistedConfig` as well?

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeContext.java
##########
@@ -176,4 +187,46 @@ public String toString() {
         }
         return new ExecNodeContext(metadata.name(), metadata.version());
     }
+
+    /**
+     * Create a configuration for the {@link ExecNode}, ready to be persisted 
to a JSON plan.
+     *
+     * @param execNodeClass The {@link ExecNode} class.
+     * @param plannerConfig The planner configuration (include the {@link 
TableConfig}).
+     * @return The {@link ExecNode} configuration, which contains the consumed 
options for the node,
+     *     defined by {@link ExecNodeMetadata#consumedOptions()}, along with 
their values.
+     */
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    public static <T extends ExecNode<?>> ReadableConfig newPersistedConfig(
+            Class<T> execNodeClass, ReadableConfig plannerConfig) {

Review comment:
       call this `tableConfig` now

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessor.java
##########
@@ -451,31 +453,35 @@ static boolean isChainableSource(ExecNode<?> node, 
ProcessorContext context) {
     // Multiple Input Nodes Creating
     // 
--------------------------------------------------------------------------------
 
-    private List<ExecNode<?>> createMultipleInputNodes(List<ExecNodeWrapper> 
rootWrappers) {
+    private List<ExecNode<?>> createMultipleInputNodes(
+            ReadableConfig plannerConfig, List<ExecNodeWrapper> rootWrappers) {

Review comment:
       tableConfig everywhere in this class

##########
File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtilTest.java
##########
@@ -218,7 +331,11 @@ public void testStreamExecNodeJsonSerdeCoverage() {
         @ExecNodeMetadata(
                 name = "dummy-node",
                 version = 3,
-                consumedOptions = {"table.exec.state.ttl", 
"table.exec.sink.not-null-enforcer"},
+                consumedOptions = {
+                    "table.exec.state.ttl",
+                    "table.exec.sink.not-null-enforcer",
+                    
"table.exec.deduplicate.mini-batch.compact-changes.enabled" // deprecated key

Review comment:
       we should not add a dependency to a real deprecated key, otherwise 
someone need to touch this class when removing the key, instead we should 
create an artificial deprecated key in this test

##########
File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtilTest.java
##########
@@ -338,11 +455,88 @@ protected DummyNodeNoAnnotation(
         @JsonCreator
         protected DummyNodeBothAnnotations(
                 ExecNodeContext context,
-                ReadableConfig config,
+                ReadableConfig persistedConfig,
+                List<InputProperty> properties,
+                LogicalType outputType,
+                String description) {
+            super(10, context, persistedConfig, properties, outputType, 
description);
+        }
+
+        @Override
+        protected Transformation<RowData> translateToPlanInternal(
+                PlannerBase planner, ExecNodeConfig config) {
+            return null;
+        }
+    }
+
+    @ExecNodeMetadata(
+            name = "dummy-node-duplicate-consumedOptions",
+            version = 3,
+            consumedOptions = {"option1", "option2", "option3", "option2"},
+            minPlanVersion = FlinkVersion.v1_15,
+            minStateVersion = FlinkVersion.v1_15)
+    private static class DummyNodeDuplicateConsumedOptions extends 
ExecNodeBase<RowData> {
+
+        @JsonCreator
+        protected DummyNodeDuplicateConsumedOptions(
+                ExecNodeContext context,
+                ReadableConfig persistedConfig,
+                List<InputProperty> properties,
+                LogicalType outputType,
+                String description) {
+            super(10, context, persistedConfig, properties, outputType, 
description);
+        }
+
+        @Override
+        protected Transformation<RowData> translateToPlanInternal(
+                PlannerBase planner, ExecNodeConfig config) {
+            return null;
+        }
+    }
+
+    @ExecNodeMetadata(
+            name = "dummy-node-duplicate-deprecated-keys-consumedOptions",
+            version = 3,
+            consumedOptions = {"option1", "option2", "option3", "option11"},
+            minPlanVersion = FlinkVersion.v1_15,
+            minStateVersion = FlinkVersion.v1_15)
+    private static class DummyNodeDuplicateDeprecatedKeysConsumedOptions
+            extends ExecNodeBase<RowData> {
+
+        @JsonCreator
+        protected DummyNodeDuplicateDeprecatedKeysConsumedOptions(
+                ExecNodeContext context,
+                ReadableConfig persistedConfig,
+                List<InputProperty> properties,
+                LogicalType outputType,
+                String description) {
+            super(10, context, persistedConfig, properties, outputType, 
description);
+        }
+
+        @Override
+        protected Transformation<RowData> translateToPlanInternal(

Review comment:
       create a base class for all these dummies, in case someone adds a method 
to `ExecNodeBase` they only need to update one location.

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashWindowAggregate.java
##########
@@ -67,6 +68,7 @@
     private final boolean isFinal;
 
     public BatchExecHashWindowAggregate(
+            ReadableConfig plannerConfig,

Review comment:
       `tableConfig` in all constructors

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/ForwardHashExchangeProcessor.java
##########
@@ -154,7 +161,10 @@ protected void visitNode(ExecNode<?> node) {
 
     // TODO This implementation should be updated once FLINK-21224 is finished.
     private ExecEdge addExchangeAndReconnectEdge(
-            ExecEdge edge, InputProperty inputProperty, boolean strict) {
+            ReadableConfig plannerConfig,

Review comment:
       `tableConfig` everywhere in this class

##########
File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java
##########
@@ -315,6 +318,46 @@ public void testExplainPlan() throws IOException {
                 .isEqualTo(expected);
     }
 
+    @Test
+    public void testPersistedConfigOption() throws Exception {
+        Path planPath = 
Paths.get(URI.create(getTempDirPath("plan")).getPath(), "plan.json");
+        FileUtils.createParentDirectories(planPath.toFile());
+
+        List<String> data =
+                Stream.concat(
+                                DATA.stream(),
+                                Stream.of(
+                                        "4,2,This string is long",
+                                        "5,3,This is an even longer string"))
+                        .collect(Collectors.toList());
+        String[] sinkColumnDefinitions = new String[] {"a bigint", "b int", "c 
varchar(11)"};
+
+        createTestCsvSourceTable("src", data, COLUMNS_DEFINITION);
+        File sinkPath = createTestCsvSinkTable("sink", sinkColumnDefinitions);
+
+        // Set config option to trim the strings, so it's persisted in the 
json plan
+        tableEnv.getConfig()
+                .getConfiguration()
+                .set(
+                        
ExecutionConfigOptions.TABLE_EXEC_SINK_TYPE_LENGTH_ENFORCER,
+                        ExecutionConfigOptions.TypeLengthEnforcer.TRIM_PAD);
+        CompiledPlan plan = tableEnv.compilePlanSql("insert into sink select * 
from src");

Review comment:
       nit: can we always write SQL stuff upper case like `INSERT INTO SELECT * 
FROM src`, also for the data type above

##########
File path: 
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeHopWindow.out
##########
@@ -338,6 +343,12 @@
   }, {
     "id" : 7,
     "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",

Review comment:
       I forgot the discussion but why did we decide for this default? was it 
before the decision to disable legacy casting?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to