twalthr commented on a change in pull request #19187:
URL: https://github.com/apache/flink/pull/19187#discussion_r831116796



##########
File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
##########
@@ -156,7 +156,9 @@ public boolean isBounded() {
                                             
HiveSourceFileEnumerator.createInputSplits(
                                                             0,
                                                             
hivePartitionsToRead,
-                                                            flinkConf,
+                                                            flinkConf.get(
+                                                                    HiveOptions

Review comment:
       very nit: introduce a local variable for it? do we actually still need 
the entire flinkConf in this class?

##########
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
##########
@@ -484,10 +484,10 @@ abstract class PlannerBase(
     // Add query start time to TableConfig, these config are used internally,
     // these configs will be used by temporal functions like 
CURRENT_TIMESTAMP,LOCALTIMESTAMP.
     val epochTime :JLong = System.currentTimeMillis()
-    configuration.set(TABLE_QUERY_START_EPOCH_TIME, epochTime)

Review comment:
       move the declaration of the local variable `configuration` downwards and 
maybe give a better name. also replace `getTableConfig.getConfiguration` below.

##########
File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala
##########
@@ -29,7 +29,7 @@ import org.apache.flink.table.data.RowData
 import org.apache.flink.table.factories.{DynamicTableFactory, 
DynamicTableSourceFactory}
 import org.apache.flink.table.planner.utils.{TableTestBase, 
TestingTableEnvironment}
 
-import org.junit.Test
+import org.junit.{Ignore, Test}

Review comment:
       unused imports

##########
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelOptUtil.scala
##########
@@ -200,8 +200,7 @@ object FlinkRelOptUtil {
 
   /** Get max cnf node limit by context of rel */
   def getMaxCnfNodeCount(rel: RelNode): Int = {
-    val tableConfig = getTableConfigFromContext(rel)
-    
tableConfig.getConfiguration.getInteger(FlinkRexUtil.TABLE_OPTIMIZER_CNF_NODES_LIMIT)
+    
getTableConfigFromContext(rel).get(FlinkRexUtil.TABLE_OPTIMIZER_CNF_NODES_LIMIT)

Review comment:
       use `ShortcutUtils` or introduce a method there

##########
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
##########
@@ -489,9 +487,7 @@ abstract class PlannerBase(
       TimeZone.getTimeZone(tableConfig.getLocalTimeZone).getOffset(epochTime)
     tableConfig.set(TABLE_QUERY_START_LOCAL_TIME, localTime)
 
-    getExecEnv.configure(
-      configuration,
-      Thread.currentThread().getContextClassLoader)
+    getExecEnv.configure(tableConfig.getConfiguration, 
Thread.currentThread().getContextClassLoader)

Review comment:
       add a comment that we intentionally don't want to repropagate the 
rootConfigutation here and in the `DefaultExecutor`

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeConfig.java
##########
@@ -20,44 +20,34 @@
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
 import org.apache.flink.table.planner.delegation.PlannerBase;
-import org.apache.flink.table.planner.delegation.PlannerConfig;
 
 import java.time.ZoneId;
 import java.util.Optional;
 
 /**
- * Configuration view which is used combine the {@link PlannerConfig} with the 
{@link
+ * Configuration view which is used combine the {@link 
PlannerBase#getTableConfig()} with the {@link
  * ExecNodeBase#getNodeConfig()} configuration. The persisted configuration of 
the {@link ExecNode}
- * which is deserialized from the JSON plan has precedence over the {@link 
PlannerConfig}.
+ * which is deserialized from the JSON plan has precedence over the {@link
+ * PlannerBase#getTableConfig()}.
  */
 @Internal
 public final class ExecNodeConfig implements ReadableConfig {
 
-    private final ReadableConfig plannerConfig;
-
     // See https://issues.apache.org/jira/browse/FLINK-26190
     // Used only for the deprecated getMaxIdleStateRetentionTime to also 
satisfy tests which
     // manipulate maxIdleStateRetentionTime, like OverAggregateHarnessTest.
-    private final TableConfig originalTableConfig;
-    // See https://issues.apache.org/jira/browse/FLINK-26190
-    private final TableConfig tableConfig;
+    private final TableConfig plannerConfig;

Review comment:
       call it `tableConfig` here and in the constructor?

##########
File path: 
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/PartitionMonitorTest.java
##########
@@ -182,7 +182,8 @@ public void close() throws Exception {}
                         0L,
                         seenPartitionsSinceOffset,
                         tablePath,
-                        configuration,
+                        configuration.get(

Review comment:
       move this to the Hive commit

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeConfig.java
##########
@@ -97,18 +87,18 @@ public long getStateRetentionTime() {
 
     // See https://issues.apache.org/jira/browse/FLINK-26190
     /**
-     * Using {@link #originalTableConfig} to satisify tests like {@code 
OverAggregateHarnessTest},
-     * which use {@code HarnessTestBase#TestTableConfig} to individually 
manipulate the
+     * Using {@link #plannerConfig} to satisify tests like {@code 
OverAggregateHarnessTest}, which
+     * use {@code HarnessTestBase#TestTableConfig} to individually manipulate 
the
      * maxIdleStateRetentionTime. See {@link 
TableConfig#getMaxIdleStateRetentionTime()}.
      */
     @Deprecated
     public long getMaxIdleStateRetentionTime() {
-        return originalTableConfig.getMaxIdleStateRetentionTime();
+        return plannerConfig.getMaxIdleStateRetentionTime();
     }
 
     // See https://issues.apache.org/jira/browse/FLINK-26190
     /** See {@link TableConfig#getLocalTimeZone()}. */
     public ZoneId getLocalTimeZone() {
-        return tableConfig.getLocalTimeZone();
+        return plannerConfig.getLocalTimeZone();

Review comment:
       can't we use the `TableConfigUtils` now and include the `nodeConfig` 
with this as well?

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCorrelate.java
##########
@@ -88,7 +88,7 @@ public CommonExecPythonCorrelate(
         final Transformation<RowData> inputTransform =
                 (Transformation<RowData>) inputEdge.translateToPlan(planner);
         final Configuration pythonConfig =
-                CommonPythonUtil.getMergedConfig(planner.getExecEnv(), 
config.getTableConfig());
+                CommonPythonUtil.getMergedConfig(planner.getExecEnv(), 
config.getPlannerConfig());

Review comment:
       As mentioned before, I would strongly vote for still calling this 
`TableConfig`.

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/WatermarkPushDownSpec.java
##########
@@ -75,15 +74,15 @@ public void apply(DynamicTableSource tableSource, 
SourceAbilityContext context)
         if (tableSource instanceof SupportsWatermarkPushDown) {
             GeneratedWatermarkGenerator generatedWatermarkGenerator =
                     WatermarkGeneratorCodeGenerator.generateWatermarkGenerator(
-                            context.getTableConfig().getConfiguration(),
+                            context.getTableConfig(),
                             context.getSourceRowType(),
                             watermarkExpr,
                             Option.apply("context"));
-            Configuration configuration = 
context.getTableConfig().getConfiguration();
 
             WatermarkGeneratorSupplier<RowData> supplier =
                     new GeneratedWatermarkGeneratorSupplier(
-                            configuration, generatedWatermarkGenerator);
+                            context.getTableConfig().getConfiguration(),

Review comment:
       still `getConfiguration` here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to