swuferhong commented on code in PR #21597:
URL: https://github.com/apache/flink/pull/21597#discussion_r1063058379


##########
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java:
##########
@@ -99,88 +94,69 @@ public void start() {
     }
 
     @Override
-    public String openSession(@Nullable String sessionId) throws 
SqlExecutionException {
-        SessionContext sessionContext =
-                LocalContextUtils.buildSessionContext(sessionId, 
defaultContext);
-        sessionId = sessionContext.getSessionId();
-        if (this.contextMap.containsKey(sessionId)) {
-            throw new SqlExecutionException(
-                    "Found another session with the same session identifier: " 
+ sessionId);
-        } else {
-            this.contextMap.put(sessionId, sessionContext);
-        }
-        return sessionId;
+    public void openSession(@Nullable String sessionId) throws 
SqlExecutionException {
+        // do nothing
+        sessionContext = LocalContextUtils.buildSessionContext(sessionId, 
defaultContext);
     }
 
     @Override
-    public void closeSession(String sessionId) throws SqlExecutionException {
+    public void closeSession() throws SqlExecutionException {
         resultStore
                 .getResults()
                 .forEach(
                         (resultId) -> {
                             try {
-                                cancelQuery(sessionId, resultId);
+                                cancelQuery(resultId);
                             } catch (Throwable t) {
                                 // ignore any throwable to keep the clean up 
running
                             }
                         });
         // Remove the session's ExecutionContext from contextMap and close it.
-        SessionContext context = this.contextMap.remove(sessionId);
-        if (context != null) {
-            context.close();
-        }
-    }
-
-    private SessionContext getSessionContext(String sessionId) {
-        SessionContext context = this.contextMap.get(sessionId);
-        if (context == null) {
-            throw new SqlExecutionException("Invalid session identifier: " + 
sessionId);
+        if (sessionContext != null) {
+            sessionContext.close();
         }
-        return context;
     }
 
     /**
      * Get the existed {@link ExecutionContext} from contextMap, or thrown 
exception if does not
      * exist.
      */
     @VisibleForTesting
-    protected ExecutionContext getExecutionContext(String sessionId) throws 
SqlExecutionException {
-        return getSessionContext(sessionId).getExecutionContext();
+    protected ExecutionContext getExecutionContext() throws 
SqlExecutionException {
+        return sessionContext.getExecutionContext();
     }
 
     @Override
-    public Map<String, String> getSessionConfigMap(String sessionId) throws 
SqlExecutionException {
-        return getSessionContext(sessionId).getConfigMap();
+    public Map<String, String> getSessionConfigMap() throws 
SqlExecutionException {
+        return sessionContext.getConfigMap();
     }
 
     @Override
-    public ReadableConfig getSessionConfig(String sessionId) throws 
SqlExecutionException {
-        return getSessionContext(sessionId).getReadableConfig();
+    public ReadableConfig getSessionConfig() throws SqlExecutionException {
+        return sessionContext.getReadableConfig();
     }
 
     @Override
-    public void resetSessionProperties(String sessionId) throws 
SqlExecutionException {
-        SessionContext context = getSessionContext(sessionId);
+    public void resetSessionProperties() throws SqlExecutionException {
+        SessionContext context = sessionContext;
         context.reset();
     }
 
     @Override
-    public void resetSessionProperty(String sessionId, String key) throws 
SqlExecutionException {
-        SessionContext context = getSessionContext(sessionId);
+    public void resetSessionProperty(String key) throws SqlExecutionException {
+        SessionContext context = sessionContext;
         context.reset(key);
     }
 
     @Override
-    public void setSessionProperty(String sessionId, String key, String value)
-            throws SqlExecutionException {
-        SessionContext context = getSessionContext(sessionId);
+    public void setSessionProperty(String key, String value) throws 
SqlExecutionException {
+        SessionContext context = sessionContext;
         context.set(key, value);

Review Comment:
   ditto



##########
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java:
##########
@@ -320,19 +294,17 @@ public void cancelQuery(String sessionId, String 
resultId) throws SqlExecutionEx
     }
 
     @Override
-    public void removeJar(String sessionId, String jarUrl) {
-        final SessionContext context = getSessionContext(sessionId);
+    public void removeJar(String jarUrl) {
+        final SessionContext context = sessionContext;
         context.removeJar(jarUrl);

Review Comment:
   ditto



##########
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java:
##########
@@ -42,54 +42,50 @@ public interface Executor {
      * Open a new session by using the given session id.
      *
      * @param sessionId session identifier.
-     * @return used session identifier to track the session.
      * @throws SqlExecutionException if any error happen
      */
-    String openSession(@Nullable String sessionId) throws 
SqlExecutionException;
+    void openSession(@Nullable String sessionId) throws SqlExecutionException;
 
     /**
      * Close the resources of session for given session id.
      *
-     * @param sessionId session identifier
      * @throws SqlExecutionException if any error happen
      */
-    void closeSession(String sessionId) throws SqlExecutionException;
+    void closeSession() throws SqlExecutionException;
 
     /**
      * Returns a copy of {@link Map} of all session configurations that are 
defined by the executor
      * and the session.
      *
-     * <p>Both this method and {@link #getSessionConfig(String)} return the 
same configuration set,
-     * but different return type.
+     * <p>Both this method and {@link #getSessionConfig()} return the same 
configuration set, but
+     * different return type.
      */
-    Map<String, String> getSessionConfigMap(String sessionId) throws 
SqlExecutionException;

Review Comment:
   I have one question. Original  `LocalExecutor` implements `Executor` have a 
map `ConcurrentHashMap<String, SessionContext> contextMap`, but now, we just 
delete it. Does this mean that each executor can only have one session? (If 
that, why we need sessionId, we can just define an executorId instead ? ). 
Also, I think if you delete this `contextMap`, you need add tests to verify we 
don't need this map. WDYT ?



##########
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java:
##########
@@ -240,10 +215,9 @@ public TableResultInternal executeModifyOperations(
     }
 
     @Override
-    public ResultDescriptor executeQuery(String sessionId, QueryOperation 
query)
-            throws SqlExecutionException {
-        final TableResultInternal tableResult = executeOperation(sessionId, 
query);
-        final SessionContext context = getSessionContext(sessionId);
+    public ResultDescriptor executeQuery(QueryOperation query) throws 
SqlExecutionException {
+        final TableResultInternal tableResult = executeOperation(query);
+        final SessionContext context = sessionContext;

Review Comment:
   ditto



##########
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java:
##########
@@ -99,88 +94,69 @@ public void start() {
     }
 
     @Override
-    public String openSession(@Nullable String sessionId) throws 
SqlExecutionException {
-        SessionContext sessionContext =
-                LocalContextUtils.buildSessionContext(sessionId, 
defaultContext);
-        sessionId = sessionContext.getSessionId();
-        if (this.contextMap.containsKey(sessionId)) {
-            throw new SqlExecutionException(
-                    "Found another session with the same session identifier: " 
+ sessionId);
-        } else {
-            this.contextMap.put(sessionId, sessionContext);
-        }
-        return sessionId;
+    public void openSession(@Nullable String sessionId) throws 
SqlExecutionException {
+        // do nothing
+        sessionContext = LocalContextUtils.buildSessionContext(sessionId, 
defaultContext);
     }
 
     @Override
-    public void closeSession(String sessionId) throws SqlExecutionException {
+    public void closeSession() throws SqlExecutionException {
         resultStore
                 .getResults()
                 .forEach(
                         (resultId) -> {
                             try {
-                                cancelQuery(sessionId, resultId);
+                                cancelQuery(resultId);
                             } catch (Throwable t) {
                                 // ignore any throwable to keep the clean up 
running
                             }
                         });
         // Remove the session's ExecutionContext from contextMap and close it.
-        SessionContext context = this.contextMap.remove(sessionId);
-        if (context != null) {
-            context.close();
-        }
-    }
-
-    private SessionContext getSessionContext(String sessionId) {
-        SessionContext context = this.contextMap.get(sessionId);
-        if (context == null) {
-            throw new SqlExecutionException("Invalid session identifier: " + 
sessionId);
+        if (sessionContext != null) {
+            sessionContext.close();
         }
-        return context;
     }
 
     /**
      * Get the existed {@link ExecutionContext} from contextMap, or thrown 
exception if does not
      * exist.
      */
     @VisibleForTesting
-    protected ExecutionContext getExecutionContext(String sessionId) throws 
SqlExecutionException {
-        return getSessionContext(sessionId).getExecutionContext();
+    protected ExecutionContext getExecutionContext() throws 
SqlExecutionException {
+        return sessionContext.getExecutionContext();
     }
 
     @Override
-    public Map<String, String> getSessionConfigMap(String sessionId) throws 
SqlExecutionException {
-        return getSessionContext(sessionId).getConfigMap();
+    public Map<String, String> getSessionConfigMap() throws 
SqlExecutionException {
+        return sessionContext.getConfigMap();
     }
 
     @Override
-    public ReadableConfig getSessionConfig(String sessionId) throws 
SqlExecutionException {
-        return getSessionContext(sessionId).getReadableConfig();
+    public ReadableConfig getSessionConfig() throws SqlExecutionException {
+        return sessionContext.getReadableConfig();
     }
 
     @Override
-    public void resetSessionProperties(String sessionId) throws 
SqlExecutionException {
-        SessionContext context = getSessionContext(sessionId);
+    public void resetSessionProperties() throws SqlExecutionException {
+        SessionContext context = sessionContext;
         context.reset();
     }

Review Comment:
   Why not use `sessionContext.reset()` directly here.



##########
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java:
##########
@@ -85,16 +85,15 @@ private void start() {
             executor.start();
 
             // Open an new session

Review Comment:
   nit: We can correct these obvious mistakes together: `an` -> `a`.  Btw, you 
can also pay attention to the warning of the idea checkStyle, like this class.



##########
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java:
##########
@@ -519,43 +516,38 @@ public String openSession(@Nullable String sessionId) 
throws SqlExecutionExcepti
                             Collections.emptyList(),
                             configuration,
                             Collections.singletonList(new DefaultCLI()));
-            SessionContext context = SessionContext.create(defaultContext, 
sessionId);
-            sessionMap.put(sessionId, context);
+            sessionContext = SessionContext.create(defaultContext, sessionId);
             helper.registerTables();
-            return sessionId;
         }
 
         @Override
-        public void closeSession(String sessionId) throws 
SqlExecutionException {}
+        public void closeSession() throws SqlExecutionException {}
 
         @Override
-        public Map<String, String> getSessionConfigMap(String sessionId)
-                throws SqlExecutionException {
-            return this.sessionMap.get(sessionId).getConfigMap();
+        public Map<String, String> getSessionConfigMap() throws 
SqlExecutionException {
+            return sessionContext.getConfigMap();
         }
 
         @Override
-        public ReadableConfig getSessionConfig(String sessionId) throws 
SqlExecutionException {
-            SessionContext context = this.sessionMap.get(sessionId);
+        public ReadableConfig getSessionConfig() throws SqlExecutionException {
+            SessionContext context = sessionContext;

Review Comment:
   ditto



##########
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java:
##########
@@ -99,88 +94,69 @@ public void start() {
     }
 
     @Override
-    public String openSession(@Nullable String sessionId) throws 
SqlExecutionException {
-        SessionContext sessionContext =
-                LocalContextUtils.buildSessionContext(sessionId, 
defaultContext);
-        sessionId = sessionContext.getSessionId();
-        if (this.contextMap.containsKey(sessionId)) {
-            throw new SqlExecutionException(
-                    "Found another session with the same session identifier: " 
+ sessionId);
-        } else {
-            this.contextMap.put(sessionId, sessionContext);
-        }
-        return sessionId;
+    public void openSession(@Nullable String sessionId) throws 
SqlExecutionException {
+        // do nothing
+        sessionContext = LocalContextUtils.buildSessionContext(sessionId, 
defaultContext);
     }
 
     @Override
-    public void closeSession(String sessionId) throws SqlExecutionException {
+    public void closeSession() throws SqlExecutionException {
         resultStore
                 .getResults()
                 .forEach(
                         (resultId) -> {
                             try {
-                                cancelQuery(sessionId, resultId);
+                                cancelQuery(resultId);
                             } catch (Throwable t) {
                                 // ignore any throwable to keep the clean up 
running
                             }
                         });
         // Remove the session's ExecutionContext from contextMap and close it.
-        SessionContext context = this.contextMap.remove(sessionId);
-        if (context != null) {
-            context.close();
-        }
-    }
-
-    private SessionContext getSessionContext(String sessionId) {
-        SessionContext context = this.contextMap.get(sessionId);
-        if (context == null) {
-            throw new SqlExecutionException("Invalid session identifier: " + 
sessionId);
+        if (sessionContext != null) {
+            sessionContext.close();
         }
-        return context;
     }
 
     /**
      * Get the existed {@link ExecutionContext} from contextMap, or thrown 
exception if does not
      * exist.
      */
     @VisibleForTesting
-    protected ExecutionContext getExecutionContext(String sessionId) throws 
SqlExecutionException {
-        return getSessionContext(sessionId).getExecutionContext();
+    protected ExecutionContext getExecutionContext() throws 
SqlExecutionException {
+        return sessionContext.getExecutionContext();
     }
 
     @Override
-    public Map<String, String> getSessionConfigMap(String sessionId) throws 
SqlExecutionException {
-        return getSessionContext(sessionId).getConfigMap();
+    public Map<String, String> getSessionConfigMap() throws 
SqlExecutionException {
+        return sessionContext.getConfigMap();
     }
 
     @Override
-    public ReadableConfig getSessionConfig(String sessionId) throws 
SqlExecutionException {
-        return getSessionContext(sessionId).getReadableConfig();
+    public ReadableConfig getSessionConfig() throws SqlExecutionException {
+        return sessionContext.getReadableConfig();
     }
 
     @Override
-    public void resetSessionProperties(String sessionId) throws 
SqlExecutionException {
-        SessionContext context = getSessionContext(sessionId);
+    public void resetSessionProperties() throws SqlExecutionException {
+        SessionContext context = sessionContext;
         context.reset();
     }
 
     @Override
-    public void resetSessionProperty(String sessionId, String key) throws 
SqlExecutionException {
-        SessionContext context = getSessionContext(sessionId);
+    public void resetSessionProperty(String key) throws SqlExecutionException {
+        SessionContext context = sessionContext;
         context.reset(key);
     }

Review Comment:
   Why not use `sessionContext.reset(key);` directly here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to