This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 471a3dfc601f105055c19fd7647110488ccdff70 Author: godfreyhe <[email protected]> AuthorDate: Sat Jun 13 21:08:35 2020 +0800 [FLINK-18161][sql-client] Fix state retention config does not work in sql client --- .../client/gateway/local/ExecutionContext.java | 19 ++++++++++--- .../client/gateway/local/ExecutionContextTest.java | 32 ++++++++-------------- .../test/resources/test-sql-client-defaults.yaml | 4 +-- 3 files changed, 29 insertions(+), 26 deletions(-) diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java index afe9a06..6198440 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java @@ -19,6 +19,7 @@ package org.apache.flink.table.client.gateway.local; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.time.Time; import org.apache.flink.api.dag.Pipeline; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.client.ClientUtils; @@ -52,6 +53,7 @@ import org.apache.flink.table.catalog.GenericInMemoryCatalog; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.client.config.Environment; import org.apache.flink.table.client.config.entries.DeploymentEntry; +import org.apache.flink.table.client.config.entries.ExecutionEntry; import org.apache.flink.table.client.config.entries.SinkTableEntry; import org.apache.flink.table.client.config.entries.SourceSinkTableEntry; import org.apache.flink.table.client.config.entries.SourceTableEntry; @@ -452,10 +454,7 @@ public class ExecutionContext<ClusterID> { final EnvironmentSettings settings = environment.getExecution().getEnvironmentSettings(); final boolean noInheritedState = sessionState == null; // Step 0.0 Initialize the table configuration. - final TableConfig config = new TableConfig(); - config.addConfiguration(flinkConfig); - environment.getConfiguration().asMap().forEach((k, v) -> - config.getConfiguration().setString(k, v)); + final TableConfig config = createTableConfig(); if (noInheritedState) { //-------------------------------------------------------------------------------------------------------------- @@ -522,6 +521,18 @@ public class ExecutionContext<ClusterID> { } } + private TableConfig createTableConfig() { + final TableConfig config = new TableConfig(); + config.addConfiguration(flinkConfig); + environment.getConfiguration().asMap().forEach((k, v) -> + config.getConfiguration().setString(k, v)); + ExecutionEntry execution = environment.getExecution(); + config.setIdleStateRetentionTime( + Time.milliseconds(execution.getMinStateRetention()), + Time.milliseconds(execution.getMaxStateRetention())); + return config; + } + private void createTableEnvironment( EnvironmentSettings settings, TableConfig config, diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java index a5cc8d1..7beee2a 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java @@ -90,6 +90,10 @@ public class ExecutionContextTest { assertEquals(10, failureRateStrategy.getMaxFailureRate()); assertEquals(99_000, failureRateStrategy.getFailureInterval().toMilliseconds()); assertEquals(1_000, failureRateStrategy.getDelayBetweenAttemptsInterval().toMilliseconds()); + + final TableEnvironment tableEnv = context.getTableEnvironment(); + assertEquals(1_000, tableEnv.getConfig().getMinIdleStateRetentionTime()); + assertEquals(600_000, tableEnv.getConfig().getMaxIdleStateRetentionTime()); } @Test @@ -255,35 +259,23 @@ public class ExecutionContextTest { final ExecutionContext<?> context = createConfigurationExecutionContext(); final TableEnvironment tableEnv = context.getTableEnvironment(); - assertEquals( - 100, - tableEnv.getConfig().getConfiguration().getInteger( - ExecutionConfigOptions.TABLE_EXEC_SORT_DEFAULT_LIMIT)); - assertTrue( - tableEnv.getConfig().getConfiguration().getBoolean( - ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED)); - assertEquals( - "128kb", - tableEnv.getConfig().getConfiguration().getString( - ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE)); + Configuration conf = tableEnv.getConfig().getConfiguration(); + assertEquals(100, conf.getInteger(ExecutionConfigOptions.TABLE_EXEC_SORT_DEFAULT_LIMIT)); + assertTrue(conf.getBoolean(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED)); + assertEquals("128kb", conf.getString(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE)); - assertTrue( - tableEnv.getConfig().getConfiguration().getBoolean( - OptimizerConfigOptions.TABLE_OPTIMIZER_JOIN_REORDER_ENABLED)); + assertTrue(conf.getBoolean(OptimizerConfigOptions.TABLE_OPTIMIZER_JOIN_REORDER_ENABLED)); // these options are not modified and should be equal to their default value assertEquals( ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED.defaultValue(), - tableEnv.getConfig().getConfiguration().getBoolean( - ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED)); + conf.getBoolean(ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED)); assertEquals( ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE.defaultValue(), - tableEnv.getConfig().getConfiguration().getString( - ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE)); + conf.getString(ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE)); assertEquals( OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD.defaultValue().longValue(), - tableEnv.getConfig().getConfiguration().getLong( - OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD)); + conf.getLong(OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD)); } @Test diff --git a/flink-table/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml b/flink-table/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml index df53d19..a730f4c 100644 --- a/flink-table/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml +++ b/flink-table/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml @@ -141,8 +141,8 @@ execution: periodic-watermarks-interval: 99 parallelism: 1 max-parallelism: 16 - min-idle-state-retention: 0 - max-idle-state-retention: 0 + min-idle-state-retention: 1000 + max-idle-state-retention: 600000 result-mode: "$VAR_RESULT_MODE" max-table-result-rows: "$VAR_MAX_ROWS" restart-strategy:
