twalthr commented on a change in pull request #18908:
URL: https://github.com/apache/flink/pull/18908#discussion_r814615122
##########
File path:
flink-core/src/main/java/org/apache/flink/configuration/ConfigUtils.java
##########
@@ -132,5 +135,25 @@
return result;
}
+ public static Set<ConfigOption<?>> getAllConfigOptions(Class<?>
configOptionsClass)
+ throws IllegalStateException {
+ Set<ConfigOption<?>> options = new HashSet<>();
Review comment:
nit: I would recommend to use `final` more aggressively. You can see it
in the code above that this is actually quite common the Flink core.
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java
##########
@@ -105,6 +106,7 @@
public PlannerContext(
boolean isBatchMode,
+ ReadableConfig plannerConfig,
TableConfig tableConfig,
Review comment:
do we still need `TableConfig` in here? The node use `ExecNodeConfig`
now and planner config actually includes table config.
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeConfig.java
##########
@@ -51,20 +51,20 @@
private final ReadableConfig nodeConfig;
ExecNodeConfig(
- ReadableConfig plannerConfig, TableConfig tableConfig,
ReadableConfig nodeConfig) {
+ ReadableConfig plannerConfig, TableConfig tableConfig,
ReadableConfig persistedConfig) {
Review comment:
as mentioned before, we use `PlannerConfig` instead of `ReadableConfig`
we don't need to pass `tableConfig` here. Because actually `tableConfig` is
contained in `PlannerConfig`.
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeContext.java
##########
@@ -176,4 +187,46 @@ public String toString() {
}
return new ExecNodeContext(metadata.name(), metadata.version());
}
+
+ /**
+ * Create a configuration for the {@link ExecNode}, ready to be persisted
to a JSON plan.
+ *
+ * @param execNodeClass The {@link ExecNode} class.
+ * @param plannerConfig The planner configuration (include the {@link
TableConfig}).
+ * @return The {@link ExecNode} configuration, which contains the consumed
options for the node,
+ * defined by {@link ExecNodeMetadata#consumedOptions()}, along with
their values.
+ */
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public static <T extends ExecNode<?>> ReadableConfig newPersistedConfig(
+ Class<T> execNodeClass, ReadableConfig plannerConfig) {
+ Map<String, ConfigOption<?>> configOptions =
+ Stream.concat(
+
ExecNodeMetadataUtil.TABLE_CONFIG_OPTIONS.stream(),
+
ExecNodeMetadataUtil.EXECUTION_CONFIG_OPTIONS.stream())
+ .collect(Collectors.toMap(ConfigOption::key, o -> o));
Review comment:
we should also consider deprecated and fallback keys here. also add a
test for it. e.g. an ExecNode with two versions. version 1 holds has a
deprecated key, version 2 the new key, but both actually describe the same
option.
##########
File path:
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/PlannerMocks.java
##########
@@ -47,27 +46,28 @@
private final FlinkPlannerImpl planner;
private final ParserImpl parser;
private final CatalogManager catalogManager;
+ private final FunctionCatalog functionCatalog;
private final TableConfig tableConfig;
private final PlannerContext plannerContext;
- private PlannerMocks(TableConfig tableConfig) {
+ private PlannerMocks(boolean batchMode, TableConfig tableConfig) {
Review comment:
nit: rename to `isBatchMode` to be consistent with other locations
##########
File path:
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/BatchCommonSubGraphBasedOptimizer.scala
##########
@@ -77,9 +78,10 @@ class BatchCommonSubGraphBasedOptimizer(planner:
BatchPlanner)
* @return The optimized [[RelNode]] tree
*/
private def optimizeTree(relNode: RelNode): RelNode = {
- val config = planner.getTableConfig
- val programs = TableConfigUtils.getCalciteConfig(config).getBatchProgram
- .getOrElse(FlinkBatchProgram.buildProgram(config.getConfiguration))
+ val plannerConfig = planner.getConfiguration
+ val tableConfig = planner.getTableConfig
+ val programs =
TableConfigUtils.getCalciteConfig(tableConfig).getBatchProgram
Review comment:
we can also let `planner.getConfiguration` return `PlannerConfig`
instance to access the `CalciteConfig` without having both `TableConfig` +
`PlannerConfig` in the `FlinkContext`.
##########
File path:
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java
##########
@@ -318,6 +321,46 @@ public void testExplainPlan() throws IOException {
.isEqualTo(expected);
}
+ @Test
+ public void testPersistedConfigOption() throws Exception {
+ Path planPath =
Paths.get(URI.create(getTempDirPath("plan")).getPath(), "plan.json");
+ FileUtils.createParentDirectories(planPath.toFile());
+
+ List<String> data =
+ Stream.concat(
+ DATA.stream(),
+ Stream.of(
+ "4,2,This string is long",
+ "5,3,This is an even longer string"))
+ .collect(Collectors.toList());
+ String[] sinkColumnDefinitions = new String[] {"a bigint", "b int", "c
varchar(11)"};
+
+ createTestCsvSourceTable("src", data, COLUMNS_DEFINITION);
+ File sinkPath = createTestCsvSinkTable("sink", sinkColumnDefinitions);
+
+ // Set config option to trim the strings, so it's persisted in the
json plan
+ tableEnv.getConfig()
+ .getConfiguration()
+ .set(
+
ExecutionConfigOptions.TABLE_EXEC_SINK_TYPE_LENGTH_ENFORCER,
+ ExecutionConfigOptions.TypeLengthEnforcer.TRIM_PAD);
+ CompiledPlan plan = tableEnv.compilePlanSql("insert into sink select *
from src");
+ plan.writeToFile(planPath);
Review comment:
We can simplify the test a bit by just converting to string and back. We
don't need to necessarily write to file.
##########
File path:
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/TemporalTableSourceSpecSerdeTest.java
##########
@@ -59,16 +58,11 @@
/** Tests for {@link TemporalTableSourceSpec} serialization and
deserialization. */
@Execution(CONCURRENT)
public class TemporalTableSourceSpecSerdeTest {
+
private static final FlinkTypeFactory FACTORY =
FlinkTypeFactory.INSTANCE();
private static final FlinkContext FLINK_CONTEXT =
- new FlinkContextImpl(
- false,
- TableConfig.getDefault(),
- new ModuleManager(),
- null,
- CatalogManagerMocks.createEmptyCatalogManager(),
- null);
+ PlannerMocks.create().getPlannerContext().getFlinkContext();
Review comment:
Creating a planner for a context sounds overkill to me. Let's use
`JsonSerdeTestUtil`s instead?
##########
File path:
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtilTest.java
##########
@@ -300,10 +338,11 @@ protected DummyNodeNoAnnotation(
@JsonCreator
protected DummyNodeBothAnnotations(
ExecNodeContext context,
+ ReadableConfig config,
Review comment:
give a better name to variables in this class as well
##########
File path:
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala
##########
@@ -252,9 +253,13 @@ object MetadataTestUtil {
getMetadataTable(fieldNames, fieldTypes, new FlinkStatistic(tableStats))
}
+ val tableConfig = TableConfig.getDefault
Review comment:
nit: inline?
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ConfigurationJsonSerializerFilter.java
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+
+/**
+ * Custom filtering for {@link Configuration} used by {@link
ExecNodeBase#getPersistedConfig()} to
+ * avoid serializing null or empty configurations.
+ */
+@Internal
+public class ConfigurationJsonSerializerFilter {
Review comment:
most utils in this package are default-scoped now, isn't it possible to
include the filter in the serializer directly?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]