slinkydeveloper commented on a change in pull request #18756:
URL: https://github.com/apache/flink/pull/18756#discussion_r809040283



##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ConfigurationJsonDeserializer.java
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Custom deserializer for {@link Configuration} used for {@link 
ExecNodeBase#getPersistedConfig}.
+ */
+@Internal
+public class ConfigurationJsonDeserializer extends 
StdDeserializer<Configuration> {

Review comment:
       No need for public?

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeConfiguration.java
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.delegation.PlannerConfiguration;
+
+import java.time.ZoneId;
+import java.util.Optional;
+
+/**
+ * Configuration view that combines the {@link PlannerConfiguration} with the 
{@link
+ * ExecNodeBase#getPersistedConfig()} configuration. The persisted 
configuration of the {@link
+ * ExecNode} which is deserialized from the JSON plan has precedence over the 
{@link
+ * PlannerConfiguration}.
+ */
+@Internal
+public class ExecNodeConfiguration implements ReadableConfig {
+
+    private final ReadableConfig plannerConfig;
+
+    // See https://issues.apache.org/jira/browse/FLINK-26190
+    private final TableConfig tableConfig;
+
+    private final ReadableConfig persistedConfig;
+
+    ExecNodeConfiguration(
+            ReadableConfig plannerConfig, TableConfig tableConfig, 
ReadableConfig persistedConfig) {
+        this.plannerConfig = plannerConfig;
+        this.persistedConfig = persistedConfig;
+        this.tableConfig = TableConfig.getDefault();
+        this.tableConfig.setNullCheck(tableConfig.getNullCheck());
+        this.tableConfig.setDecimalContext(tableConfig.getDecimalContext());
+        this.tableConfig.addConfiguration(tableConfig.getConfiguration());
+        this.tableConfig.addConfiguration((Configuration) persistedConfig);
+    }
+
+    /**
+     * Return the merged {@link TableConfig} from {@link 
PlannerBase#getTableConfig()} and {@link
+     * ExecNodeBase#getPersistedConfig()}.
+     *
+     * @return the {@link TableConfig}.
+     * @deprecated This method is used only for {@link CodeGeneratorContext} 
and related methods,
+     *     which end up passing the {@link TableConfig} to the {@link 
CodeGeneratorContext}. It
+     *     should be removed once {@link CodeGeneratorContext#nullCheck()} is 
removed, since for all
+     *     other usages it's possible to use the {@link ReadableConfig}.
+     */
+    // See https://issues.apache.org/jira/browse/FLINK-26190
+    @Deprecated
+    public TableConfig getTableConfig() {
+        return tableConfig;
+    }
+
+    @Override
+    public <T> T get(ConfigOption<T> option) {
+        return persistedConfig.getOptional(option).orElseGet(() -> 
plannerConfig.get(option));
+    }
+
+    @Override
+    public <T> Optional<T> getOptional(ConfigOption<T> option) {
+        final Optional<T> tableValue = persistedConfig.getOptional(option);
+        if (tableValue.isPresent()) {
+            return tableValue;
+        }
+        return plannerConfig.getOptional(option);
+    }
+
+    /** @return The duration until state which was not updated will be 
retained. */
+    public long getIdleStateRetentionTime() {
+        return get(ExecutionConfigOptions.IDLE_STATE_RETENTION).toMillis();
+    }
+
+    // See https://issues.apache.org/jira/browse/FLINK-26190
+    /** See {@link TableConfig#getMaxGeneratedCodeLength()}. */
+    @Deprecated
+    public long getMaxIdleStateRetentionTime() {
+        return tableConfig.getMaxGeneratedCodeLength();
+    }
+
+    // See https://issues.apache.org/jira/browse/FLINK-26190
+    /** See {@link TableConfig#getLocalTimeZone()}. */
+    public ZoneId getLocalTimeZone() {
+        return tableConfig.getLocalTimeZone();

Review comment:
       Not to do in this PR, but does it makes sense to have in the config 
option builder the support for the `ZoneId` type directly, so you remove the 
intermediate parsing code in `TableConfig`?

##########
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalLegacyTableSourceScan.scala
##########
@@ -57,6 +58,7 @@ class StreamPhysicalLegacyTableSourceScan(
 
   override def translateToExecNode(): ExecNode[_] = {
     new StreamExecLegacyTableSourceScan(
+      ShortcutUtils.unwrapConfig(this),

Review comment:
       Perhaps here and in other places, static import methods from 
`ShortcutUtils`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to