KurtYoung commented on a change in pull request #10270:
[FLINK-14672][sql-client] Make Executor stateful in sql client
URL: https://github.com/apache/flink/pull/10270#discussion_r349418251
##########
File path:
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
##########
@@ -188,10 +199,76 @@ public void start() {
// nothing to do yet
}
+ /**
+ * Create a new {@link ExecutionContext} by merging the default
environment the the environment in session context.
+ */
+ private ExecutionContext<?> createExecutionContext(SessionContext
sessionContext) {
+ return createExecutionContext(defaultEnvironment,
sessionContext.getSessionEnv());
+ }
+
+ /**
+ * Create a new {@link ExecutionContext} by merging the default
environment and session environment.
+ */
+ private ExecutionContext<?> createExecutionContext(Environment
defaultEnv, Environment sessionEnv) {
+ Environment mergedEnv = Environment.merge(defaultEnv,
sessionEnv);
+ return createExecutionContext(mergedEnv);
+ }
+
+ /**
+ * Create a new {@link ExecutionContext} by using the given environment.
+ */
+ private ExecutionContext<?> createExecutionContext(Environment
environment) {
+ try {
+ return new ExecutionContext<>(
+ environment,
+ dependencies,
+ flinkConfig,
+ clusterClientServiceLoader,
+ commandLineOptions,
+ commandLines);
+ } catch (Throwable t) {
+ // catch everything such that a configuration does not
crash the executor
+ throw new SqlExecutionException("Could not create
execution context.", t);
+ }
+ }
+
@Override
- public Map<String, String> getSessionProperties(SessionContext session)
throws SqlExecutionException {
- final Environment env = getOrCreateExecutionContext(session)
- .getMergedEnvironment();
+ public String openSession(SessionContext sessionContext) throws
SqlExecutionException {
+ String sessionId = sessionContext.getSessionId();
+ ExecutionContext previousContext =
this.contextMap.putIfAbsent(sessionId, createExecutionContext(sessionContext));
+ if (previousContext != null) {
+ throw new SqlExecutionException("Found another session
with the same session identifier: " + sessionContext.getSessionId());
Review comment:
`sessionContext.getSessionId()` -> `sessionId`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services