This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 471a3dfc601f105055c19fd7647110488ccdff70
Author: godfreyhe <[email protected]>
AuthorDate: Sat Jun 13 21:08:35 2020 +0800

    [FLINK-18161][sql-client] Fix state retention config does not work in sql 
client
---
 .../client/gateway/local/ExecutionContext.java     | 19 ++++++++++---
 .../client/gateway/local/ExecutionContextTest.java | 32 ++++++++--------------
 .../test/resources/test-sql-client-defaults.yaml   |  4 +--
 3 files changed, 29 insertions(+), 26 deletions(-)

diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index afe9a06..6198440 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.client.gateway.local;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.dag.Pipeline;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.client.ClientUtils;
@@ -52,6 +53,7 @@ import org.apache.flink.table.catalog.GenericInMemoryCatalog;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.client.config.Environment;
 import org.apache.flink.table.client.config.entries.DeploymentEntry;
+import org.apache.flink.table.client.config.entries.ExecutionEntry;
 import org.apache.flink.table.client.config.entries.SinkTableEntry;
 import org.apache.flink.table.client.config.entries.SourceSinkTableEntry;
 import org.apache.flink.table.client.config.entries.SourceTableEntry;
@@ -452,10 +454,7 @@ public class ExecutionContext<ClusterID> {
                final EnvironmentSettings settings = 
environment.getExecution().getEnvironmentSettings();
                final boolean noInheritedState = sessionState == null;
                // Step 0.0 Initialize the table configuration.
-               final TableConfig config = new TableConfig();
-               config.addConfiguration(flinkConfig);
-               environment.getConfiguration().asMap().forEach((k, v) ->
-                               config.getConfiguration().setString(k, v));
+               final TableConfig config = createTableConfig();
 
                if (noInheritedState) {
                        
//--------------------------------------------------------------------------------------------------------------
@@ -522,6 +521,18 @@ public class ExecutionContext<ClusterID> {
                }
        }
 
+       private TableConfig createTableConfig() {
+               final TableConfig config = new TableConfig();
+               config.addConfiguration(flinkConfig);
+               environment.getConfiguration().asMap().forEach((k, v) ->
+                               config.getConfiguration().setString(k, v));
+               ExecutionEntry execution = environment.getExecution();
+               config.setIdleStateRetentionTime(
+                               
Time.milliseconds(execution.getMinStateRetention()),
+                               
Time.milliseconds(execution.getMaxStateRetention()));
+               return config;
+       }
+
        private void createTableEnvironment(
                        EnvironmentSettings settings,
                        TableConfig config,
diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
index a5cc8d1..7beee2a 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
@@ -90,6 +90,10 @@ public class ExecutionContextTest {
                assertEquals(10, failureRateStrategy.getMaxFailureRate());
                assertEquals(99_000, 
failureRateStrategy.getFailureInterval().toMilliseconds());
                assertEquals(1_000, 
failureRateStrategy.getDelayBetweenAttemptsInterval().toMilliseconds());
+
+               final TableEnvironment tableEnv = context.getTableEnvironment();
+               assertEquals(1_000, 
tableEnv.getConfig().getMinIdleStateRetentionTime());
+               assertEquals(600_000, 
tableEnv.getConfig().getMaxIdleStateRetentionTime());
        }
 
        @Test
@@ -255,35 +259,23 @@ public class ExecutionContextTest {
                final ExecutionContext<?> context = 
createConfigurationExecutionContext();
                final TableEnvironment tableEnv = context.getTableEnvironment();
 
-               assertEquals(
-                       100,
-                       tableEnv.getConfig().getConfiguration().getInteger(
-                               
ExecutionConfigOptions.TABLE_EXEC_SORT_DEFAULT_LIMIT));
-               assertTrue(
-                       tableEnv.getConfig().getConfiguration().getBoolean(
-                               
ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED));
-               assertEquals(
-                       "128kb",
-                       tableEnv.getConfig().getConfiguration().getString(
-                               
ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE));
+               Configuration conf = tableEnv.getConfig().getConfiguration();
+               assertEquals(100, 
conf.getInteger(ExecutionConfigOptions.TABLE_EXEC_SORT_DEFAULT_LIMIT));
+               
assertTrue(conf.getBoolean(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED));
+               assertEquals("128kb", 
conf.getString(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE));
 
-               assertTrue(
-                       tableEnv.getConfig().getConfiguration().getBoolean(
-                               
OptimizerConfigOptions.TABLE_OPTIMIZER_JOIN_REORDER_ENABLED));
+               
assertTrue(conf.getBoolean(OptimizerConfigOptions.TABLE_OPTIMIZER_JOIN_REORDER_ENABLED));
 
                // these options are not modified and should be equal to their 
default value
                assertEquals(
                        
ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED.defaultValue(),
-                       tableEnv.getConfig().getConfiguration().getBoolean(
-                               
ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED));
+                       
conf.getBoolean(ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED));
                assertEquals(
                        
ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE.defaultValue(),
-                       tableEnv.getConfig().getConfiguration().getString(
-                               
ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE));
+                       
conf.getString(ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE));
                assertEquals(
                        
OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD.defaultValue().longValue(),
-                       tableEnv.getConfig().getConfiguration().getLong(
-                               
OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD));
+                       
conf.getLong(OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD));
        }
 
        @Test
diff --git 
a/flink-table/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml 
b/flink-table/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
index df53d19..a730f4c 100644
--- 
a/flink-table/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
+++ 
b/flink-table/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
@@ -141,8 +141,8 @@ execution:
   periodic-watermarks-interval: 99
   parallelism: 1
   max-parallelism: 16
-  min-idle-state-retention: 0
-  max-idle-state-retention: 0
+  min-idle-state-retention: 1000
+  max-idle-state-retention: 600000
   result-mode: "$VAR_RESULT_MODE"
   max-table-result-rows: "$VAR_MAX_ROWS"
   restart-strategy:

Reply via email to