This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
     new 08d941d  [FLINK-16822][sql-client] `table.xx` property set from CLI 
should also be set into TableEnvironment's TableConfig
08d941d is described below

commit 08d941df9ecdd645bc37697449509b6a456c2161
Author: godfrey he <[email protected]>
AuthorDate: Thu Apr 16 13:57:44 2020 +0800

    [FLINK-16822][sql-client] `table.xx` property set from CLI should also be 
set into TableEnvironment's TableConfig
    
    This closes #11763
---
 .../client/gateway/local/ExecutionContext.java     | 27 ++++++++----------
 .../table/client/gateway/local/LocalExecutor.java  |  5 +++-
 .../client/gateway/local/LocalExecutorITCase.java  | 33 ++++++++++++++++++++++
 3 files changed, 49 insertions(+), 16 deletions(-)

diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index 174c014..cc29019 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -443,26 +443,27 @@ public class ExecutionContext<ClusterID> {
        private void initializeTableEnvironment(@Nullable SessionState 
sessionState) {
                final EnvironmentSettings settings = 
environment.getExecution().getEnvironmentSettings();
                final boolean noInheritedState = sessionState == null;
+               // Step 0.0 Initialize the table configuration.
+               final TableConfig config = new TableConfig();
+               environment.getConfiguration().asMap().forEach((k, v) ->
+                               config.getConfiguration().setString(k, v));
+
                if (noInheritedState) {
                        
//--------------------------------------------------------------------------------------------------------------
                        // Step.1 Create environments
                        
//--------------------------------------------------------------------------------------------------------------
-                       // Step 1.0 Initialize the table configuration.
-                       final TableConfig config = new TableConfig();
-                       environment.getConfiguration().asMap().forEach((k, v) ->
-                                       config.getConfiguration().setString(k, 
v));
-                       // Step 1.1 Initialize the CatalogManager if required.
+                       // Step 1.0 Initialize the CatalogManager if required.
                        final CatalogManager catalogManager = new 
CatalogManager(
                                        settings.getBuiltInCatalogName(),
                                        new GenericInMemoryCatalog(
                                                        
settings.getBuiltInCatalogName(),
                                                        
settings.getBuiltInDatabaseName()));
-                       // Step 1.2 Initialize the ModuleManager if required.
+                       // Step 1.1 Initialize the ModuleManager if required.
                        final ModuleManager moduleManager = new ModuleManager();
-                       // Step 1.3 Initialize the FunctionCatalog if required.
+                       // Step 1.2 Initialize the FunctionCatalog if required.
                        final FunctionCatalog functionCatalog = new 
FunctionCatalog(config, catalogManager, moduleManager);
-                       // Step 1.4 Set up session state.
-                       this.sessionState = SessionState.of(config, 
catalogManager, moduleManager, functionCatalog);
+                       // Step 1.3 Set up session state.
+                       this.sessionState = SessionState.of(catalogManager, 
moduleManager, functionCatalog);
 
                        // Must initialize the table environment before 
actually the
                        createTableEnvironment(settings, config, 
catalogManager, moduleManager, functionCatalog);
@@ -497,7 +498,7 @@ public class ExecutionContext<ClusterID> {
                        this.sessionState = sessionState;
                        createTableEnvironment(
                                        settings,
-                                       sessionState.config,
+                                       config,
                                        sessionState.catalogManager,
                                        sessionState.moduleManager,
                                        sessionState.functionCatalog);
@@ -757,28 +758,24 @@ public class ExecutionContext<ClusterID> {
 
        /** Represents the state that should be reused in one session. **/
        public static class SessionState {
-               public final TableConfig config;
                public final CatalogManager catalogManager;
                public final ModuleManager moduleManager;
                public final FunctionCatalog functionCatalog;
 
                private SessionState(
-                               TableConfig config,
                                CatalogManager catalogManager,
                                ModuleManager moduleManager,
                                FunctionCatalog functionCatalog) {
-                       this.config = config;
                        this.catalogManager = catalogManager;
                        this.moduleManager = moduleManager;
                        this.functionCatalog = functionCatalog;
                }
 
                public static SessionState of(
-                               TableConfig config,
                                CatalogManager catalogManager,
                                ModuleManager moduleManager,
                                FunctionCatalog functionCatalog) {
-                       return new SessionState(config, catalogManager, 
moduleManager, functionCatalog);
+                       return new SessionState(catalogManager, moduleManager, 
functionCatalog);
                }
        }
 }
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index b1445cb..8493bee 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.client.gateway.local;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.dag.Pipeline;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -246,7 +247,8 @@ public class LocalExecutor implements Executor {
        /**
         * Get the existed {@link ExecutionContext} from contextMap, or thrown 
exception if does not exist.
         */
-       private ExecutionContext<?> getExecutionContext(String sessionId) 
throws SqlExecutionException {
+       @VisibleForTesting
+       protected ExecutionContext<?> getExecutionContext(String sessionId) 
throws SqlExecutionException {
                ExecutionContext<?> context = this.contextMap.get(sessionId);
                if (context == null) {
                        throw new SqlExecutionException("Invalid session 
identifier: " + sessionId);
@@ -282,6 +284,7 @@ public class LocalExecutor implements Executor {
                ExecutionContext<?> context = getExecutionContext(sessionId);
                Environment env = context.getEnvironment();
                Environment newEnv = Environment.enrich(env, 
ImmutableMap.of(key, value), ImmutableMap.of());
+
                // Renew the ExecutionContext by new environment.
                // Book keep all the session states of current ExecutionContext 
then
                // re-register them into the new one.
diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index 54e34e5..c89746e 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -34,6 +34,7 @@ import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
 import org.apache.flink.table.catalog.hive.HiveCatalog;
 import org.apache.flink.table.client.config.Environment;
 import org.apache.flink.table.client.config.entries.ExecutionEntry;
@@ -83,6 +84,7 @@ import java.util.stream.IntStream;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -327,6 +329,37 @@ public class LocalExecutorITCase extends TestLogger {
        }
 
        @Test
+       public void testSetSessionProperties() throws Exception {
+               final LocalExecutor executor = 
createDefaultExecutor(clusterClient);
+               String key = 
OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY.key();
+
+               final SessionContext session = new 
SessionContext("test-session", new Environment());
+               String sessionId = executor.openSession(session);
+               // check the config in Environment
+               assertNull(executor.getSessionProperties(sessionId).get(key));
+               // check the config in TableConfig
+               assertNull(executor.getExecutionContext(sessionId)
+                               
.getTableEnvironment().getConfig().getConfiguration().getString(key, null));
+
+               // modify config
+               executor.setSessionProperty(sessionId, key, "ONE_PHASE");
+               // check the config in Environment again
+               assertEquals("ONE_PHASE", 
executor.getSessionProperties(sessionId).get(key));
+               // check the config in TableConfig again
+               assertEquals("ONE_PHASE",
+                               executor.getExecutionContext(sessionId)
+                                               
.getTableEnvironment().getConfig().getConfiguration().getString(key, null));
+
+               // reset all properties
+               executor.resetSessionProperties(sessionId);
+               // check the config in Environment
+               assertNull(executor.getSessionProperties(sessionId).get(key));
+               // check the config in TableConfig
+               assertNull(executor.getExecutionContext(sessionId)
+                               
.getTableEnvironment().getConfig().getConfiguration().getString(key, null));
+       }
+
+       @Test
        public void testGetSessionProperties() throws Exception {
                final Executor executor = createDefaultExecutor(clusterClient);
 

Reply via email to