This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.10 by this push:
new 08d941d [FLINK-16822][sql-client] `table.xx` property set from CLI
should also be set into TableEnvironment's TableConfig
08d941d is described below
commit 08d941df9ecdd645bc37697449509b6a456c2161
Author: godfrey he <[email protected]>
AuthorDate: Thu Apr 16 13:57:44 2020 +0800
[FLINK-16822][sql-client] `table.xx` property set from CLI should also be
set into TableEnvironment's TableConfig
This closes #11763
---
.../client/gateway/local/ExecutionContext.java | 27 ++++++++----------
.../table/client/gateway/local/LocalExecutor.java | 5 +++-
.../client/gateway/local/LocalExecutorITCase.java | 33 ++++++++++++++++++++++
3 files changed, 49 insertions(+), 16 deletions(-)
diff --git
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index 174c014..cc29019 100644
---
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -443,26 +443,27 @@ public class ExecutionContext<ClusterID> {
private void initializeTableEnvironment(@Nullable SessionState
sessionState) {
final EnvironmentSettings settings =
environment.getExecution().getEnvironmentSettings();
final boolean noInheritedState = sessionState == null;
+ // Step 0.0 Initialize the table configuration.
+ final TableConfig config = new TableConfig();
+ environment.getConfiguration().asMap().forEach((k, v) ->
+ config.getConfiguration().setString(k, v));
+
if (noInheritedState) {
//--------------------------------------------------------------------------------------------------------------
// Step.1 Create environments
//--------------------------------------------------------------------------------------------------------------
- // Step 1.0 Initialize the table configuration.
- final TableConfig config = new TableConfig();
- environment.getConfiguration().asMap().forEach((k, v) ->
- config.getConfiguration().setString(k,
v));
- // Step 1.1 Initialize the CatalogManager if required.
+ // Step 1.0 Initialize the CatalogManager if required.
final CatalogManager catalogManager = new
CatalogManager(
settings.getBuiltInCatalogName(),
new GenericInMemoryCatalog(
settings.getBuiltInCatalogName(),
settings.getBuiltInDatabaseName()));
- // Step 1.2 Initialize the ModuleManager if required.
+ // Step 1.1 Initialize the ModuleManager if required.
final ModuleManager moduleManager = new ModuleManager();
- // Step 1.3 Initialize the FunctionCatalog if required.
+ // Step 1.2 Initialize the FunctionCatalog if required.
final FunctionCatalog functionCatalog = new
FunctionCatalog(config, catalogManager, moduleManager);
- // Step 1.4 Set up session state.
- this.sessionState = SessionState.of(config,
catalogManager, moduleManager, functionCatalog);
+ // Step 1.3 Set up session state.
+ this.sessionState = SessionState.of(catalogManager,
moduleManager, functionCatalog);
// Must initialize the table environment before
actually the
createTableEnvironment(settings, config,
catalogManager, moduleManager, functionCatalog);
@@ -497,7 +498,7 @@ public class ExecutionContext<ClusterID> {
this.sessionState = sessionState;
createTableEnvironment(
settings,
- sessionState.config,
+ config,
sessionState.catalogManager,
sessionState.moduleManager,
sessionState.functionCatalog);
@@ -757,28 +758,24 @@ public class ExecutionContext<ClusterID> {
/** Represents the state that should be reused in one session. **/
public static class SessionState {
- public final TableConfig config;
public final CatalogManager catalogManager;
public final ModuleManager moduleManager;
public final FunctionCatalog functionCatalog;
private SessionState(
- TableConfig config,
CatalogManager catalogManager,
ModuleManager moduleManager,
FunctionCatalog functionCatalog) {
- this.config = config;
this.catalogManager = catalogManager;
this.moduleManager = moduleManager;
this.functionCatalog = functionCatalog;
}
public static SessionState of(
- TableConfig config,
CatalogManager catalogManager,
ModuleManager moduleManager,
FunctionCatalog functionCatalog) {
- return new SessionState(config, catalogManager,
moduleManager, functionCatalog);
+ return new SessionState(catalogManager, moduleManager,
functionCatalog);
}
}
}
diff --git
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index b1445cb..8493bee 100644
---
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.client.gateway.local;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -246,7 +247,8 @@ public class LocalExecutor implements Executor {
/**
* Get the existed {@link ExecutionContext} from contextMap, or thrown
exception if does not exist.
*/
- private ExecutionContext<?> getExecutionContext(String sessionId)
throws SqlExecutionException {
+ @VisibleForTesting
+ protected ExecutionContext<?> getExecutionContext(String sessionId)
throws SqlExecutionException {
ExecutionContext<?> context = this.contextMap.get(sessionId);
if (context == null) {
throw new SqlExecutionException("Invalid session
identifier: " + sessionId);
@@ -282,6 +284,7 @@ public class LocalExecutor implements Executor {
ExecutionContext<?> context = getExecutionContext(sessionId);
Environment env = context.getEnvironment();
Environment newEnv = Environment.enrich(env,
ImmutableMap.of(key, value), ImmutableMap.of());
+
// Renew the ExecutionContext by new environment.
// Book keep all the session states of current ExecutionContext
then
// re-register them into the new one.
diff --git
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index 54e34e5..c89746e 100644
---
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -34,6 +34,7 @@ import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.client.config.Environment;
import org.apache.flink.table.client.config.entries.ExecutionEntry;
@@ -83,6 +84,7 @@ import java.util.stream.IntStream;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -327,6 +329,37 @@ public class LocalExecutorITCase extends TestLogger {
}
@Test
+ public void testSetSessionProperties() throws Exception {
+ final LocalExecutor executor =
createDefaultExecutor(clusterClient);
+ String key =
OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY.key();
+
+ final SessionContext session = new
SessionContext("test-session", new Environment());
+ String sessionId = executor.openSession(session);
+ // check the config in Environment
+ assertNull(executor.getSessionProperties(sessionId).get(key));
+ // check the config in TableConfig
+ assertNull(executor.getExecutionContext(sessionId)
+
.getTableEnvironment().getConfig().getConfiguration().getString(key, null));
+
+ // modify config
+ executor.setSessionProperty(sessionId, key, "ONE_PHASE");
+ // check the config in Environment again
+ assertEquals("ONE_PHASE",
executor.getSessionProperties(sessionId).get(key));
+ // check the config in TableConfig again
+ assertEquals("ONE_PHASE",
+ executor.getExecutionContext(sessionId)
+
.getTableEnvironment().getConfig().getConfiguration().getString(key, null));
+
+ // reset all properties
+ executor.resetSessionProperties(sessionId);
+ // check the config in Environment
+ assertNull(executor.getSessionProperties(sessionId).get(key));
+ // check the config in TableConfig
+ assertNull(executor.getExecutionContext(sessionId)
+
.getTableEnvironment().getConfig().getConfiguration().getString(key, null));
+ }
+
+ @Test
public void testGetSessionProperties() throws Exception {
final Executor executor = createDefaultExecutor(clusterClient);