KurtYoung commented on a change in pull request #10270: 
[FLINK-14672][sql-client] Make Executor stateful in sql client
URL: https://github.com/apache/flink/pull/10270#discussion_r349418274
 
 

 ##########
 File path: 
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 ##########
 @@ -188,10 +199,76 @@ public void start() {
                // nothing to do yet
        }
 
+       /**
+        * Create a new {@link ExecutionContext} by merging the default 
environment the the environment in session context.
+        */
+       private ExecutionContext<?> createExecutionContext(SessionContext 
sessionContext) {
+               return createExecutionContext(defaultEnvironment, 
sessionContext.getSessionEnv());
+       }
+
+       /**
+        * Create a new {@link ExecutionContext} by merging the default 
environment and session environment.
+        */
+       private ExecutionContext<?> createExecutionContext(Environment 
defaultEnv, Environment sessionEnv) {
+               Environment mergedEnv = Environment.merge(defaultEnv, 
sessionEnv);
+               return createExecutionContext(mergedEnv);
+       }
+
+       /**
+        * Create a new {@link ExecutionContext} by using the given environment.
+        */
+       private ExecutionContext<?> createExecutionContext(Environment 
environment) {
+               try {
+                       return new ExecutionContext<>(
+                               environment,
+                               dependencies,
+                               flinkConfig,
+                               clusterClientServiceLoader,
+                               commandLineOptions,
+                               commandLines);
+               } catch (Throwable t) {
+                       // catch everything such that a configuration does not 
crash the executor
+                       throw new SqlExecutionException("Could not create 
execution context.", t);
+               }
+       }
+
        @Override
-       public Map<String, String> getSessionProperties(SessionContext session) 
throws SqlExecutionException {
-               final Environment env = getOrCreateExecutionContext(session)
-                       .getMergedEnvironment();
+       public String openSession(SessionContext sessionContext) throws 
SqlExecutionException {
+               String sessionId = sessionContext.getSessionId();
+               ExecutionContext previousContext = 
this.contextMap.putIfAbsent(sessionId, createExecutionContext(sessionContext));
+               if (previousContext != null) {
+                       throw new SqlExecutionException("Found another session 
with the same session identifier: " + sessionContext.getSessionId());
+               }
+               return sessionContext.getSessionId();
 
 Review comment:
   `sessionContext.getSessionId()` -> `sessionId`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to