twalthr commented on a change in pull request #19187:
URL: https://github.com/apache/flink/pull/19187#discussion_r831116796
##########
File path:
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
##########
@@ -156,7 +156,9 @@ public boolean isBounded() {
HiveSourceFileEnumerator.createInputSplits(
0,
hivePartitionsToRead,
- flinkConf,
+ flinkConf.get(
+ HiveOptions
Review comment:
very nit: introduce a local variable for it? do we actually still need
the entire flinkConf in this class?
##########
File path:
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
##########
@@ -484,10 +484,10 @@ abstract class PlannerBase(
// Add query start time to TableConfig, these config are used internally,
// these configs will be used by temporal functions like
CURRENT_TIMESTAMP,LOCALTIMESTAMP.
val epochTime :JLong = System.currentTimeMillis()
- configuration.set(TABLE_QUERY_START_EPOCH_TIME, epochTime)
Review comment:
move the declaration of the local variable `configuration` downwards and
maybe give a better name. also replace `getTableConfig.getConfiguration` below.
##########
File path:
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala
##########
@@ -29,7 +29,7 @@ import org.apache.flink.table.data.RowData
import org.apache.flink.table.factories.{DynamicTableFactory,
DynamicTableSourceFactory}
import org.apache.flink.table.planner.utils.{TableTestBase,
TestingTableEnvironment}
-import org.junit.Test
+import org.junit.{Ignore, Test}
Review comment:
unused imports
##########
File path:
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelOptUtil.scala
##########
@@ -200,8 +200,7 @@ object FlinkRelOptUtil {
/** Get max cnf node limit by context of rel */
def getMaxCnfNodeCount(rel: RelNode): Int = {
- val tableConfig = getTableConfigFromContext(rel)
-
tableConfig.getConfiguration.getInteger(FlinkRexUtil.TABLE_OPTIMIZER_CNF_NODES_LIMIT)
+
getTableConfigFromContext(rel).get(FlinkRexUtil.TABLE_OPTIMIZER_CNF_NODES_LIMIT)
Review comment:
use `ShortcutUtils` or introduce a method there
##########
File path:
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
##########
@@ -489,9 +487,7 @@ abstract class PlannerBase(
TimeZone.getTimeZone(tableConfig.getLocalTimeZone).getOffset(epochTime)
tableConfig.set(TABLE_QUERY_START_LOCAL_TIME, localTime)
- getExecEnv.configure(
- configuration,
- Thread.currentThread().getContextClassLoader)
+ getExecEnv.configure(tableConfig.getConfiguration,
Thread.currentThread().getContextClassLoader)
Review comment:
add a comment that we intentionally don't want to repropagate the
rootConfigutation here and in the `DefaultExecutor`
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeConfig.java
##########
@@ -20,44 +20,34 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
import org.apache.flink.table.planner.delegation.PlannerBase;
-import org.apache.flink.table.planner.delegation.PlannerConfig;
import java.time.ZoneId;
import java.util.Optional;
/**
- * Configuration view which is used combine the {@link PlannerConfig} with the
{@link
+ * Configuration view which is used combine the {@link
PlannerBase#getTableConfig()} with the {@link
* ExecNodeBase#getNodeConfig()} configuration. The persisted configuration of
the {@link ExecNode}
- * which is deserialized from the JSON plan has precedence over the {@link
PlannerConfig}.
+ * which is deserialized from the JSON plan has precedence over the {@link
+ * PlannerBase#getTableConfig()}.
*/
@Internal
public final class ExecNodeConfig implements ReadableConfig {
- private final ReadableConfig plannerConfig;
-
// See https://issues.apache.org/jira/browse/FLINK-26190
// Used only for the deprecated getMaxIdleStateRetentionTime to also
satisfy tests which
// manipulate maxIdleStateRetentionTime, like OverAggregateHarnessTest.
- private final TableConfig originalTableConfig;
- // See https://issues.apache.org/jira/browse/FLINK-26190
- private final TableConfig tableConfig;
+ private final TableConfig plannerConfig;
Review comment:
call it `tableConfig` here and in the constructor?
##########
File path:
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/PartitionMonitorTest.java
##########
@@ -182,7 +182,8 @@ public void close() throws Exception {}
0L,
seenPartitionsSinceOffset,
tablePath,
- configuration,
+ configuration.get(
Review comment:
move this to the Hive commit
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeConfig.java
##########
@@ -97,18 +87,18 @@ public long getStateRetentionTime() {
// See https://issues.apache.org/jira/browse/FLINK-26190
/**
- * Using {@link #originalTableConfig} to satisify tests like {@code
OverAggregateHarnessTest},
- * which use {@code HarnessTestBase#TestTableConfig} to individually
manipulate the
+ * Using {@link #plannerConfig} to satisify tests like {@code
OverAggregateHarnessTest}, which
+ * use {@code HarnessTestBase#TestTableConfig} to individually manipulate
the
* maxIdleStateRetentionTime. See {@link
TableConfig#getMaxIdleStateRetentionTime()}.
*/
@Deprecated
public long getMaxIdleStateRetentionTime() {
- return originalTableConfig.getMaxIdleStateRetentionTime();
+ return plannerConfig.getMaxIdleStateRetentionTime();
}
// See https://issues.apache.org/jira/browse/FLINK-26190
/** See {@link TableConfig#getLocalTimeZone()}. */
public ZoneId getLocalTimeZone() {
- return tableConfig.getLocalTimeZone();
+ return plannerConfig.getLocalTimeZone();
Review comment:
can't we use the `TableConfigUtils` now and include the `nodeConfig`
with this as well?
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCorrelate.java
##########
@@ -88,7 +88,7 @@ public CommonExecPythonCorrelate(
final Transformation<RowData> inputTransform =
(Transformation<RowData>) inputEdge.translateToPlan(planner);
final Configuration pythonConfig =
- CommonPythonUtil.getMergedConfig(planner.getExecEnv(),
config.getTableConfig());
+ CommonPythonUtil.getMergedConfig(planner.getExecEnv(),
config.getPlannerConfig());
Review comment:
As mentioned before, I would strongly vote for still calling this
`TableConfig`.
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/WatermarkPushDownSpec.java
##########
@@ -75,15 +74,15 @@ public void apply(DynamicTableSource tableSource,
SourceAbilityContext context)
if (tableSource instanceof SupportsWatermarkPushDown) {
GeneratedWatermarkGenerator generatedWatermarkGenerator =
WatermarkGeneratorCodeGenerator.generateWatermarkGenerator(
- context.getTableConfig().getConfiguration(),
+ context.getTableConfig(),
context.getSourceRowType(),
watermarkExpr,
Option.apply("context"));
- Configuration configuration =
context.getTableConfig().getConfiguration();
WatermarkGeneratorSupplier<RowData> supplier =
new GeneratedWatermarkGeneratorSupplier(
- configuration, generatedWatermarkGenerator);
+ context.getTableConfig().getConfiguration(),
Review comment:
still `getConfiguration` here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]