swuferhong commented on code in PR #21597:
URL: https://github.com/apache/flink/pull/21597#discussion_r1063058379
##########
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java:
##########
@@ -99,88 +94,69 @@ public void start() {
}
@Override
- public String openSession(@Nullable String sessionId) throws
SqlExecutionException {
- SessionContext sessionContext =
- LocalContextUtils.buildSessionContext(sessionId,
defaultContext);
- sessionId = sessionContext.getSessionId();
- if (this.contextMap.containsKey(sessionId)) {
- throw new SqlExecutionException(
- "Found another session with the same session identifier: "
+ sessionId);
- } else {
- this.contextMap.put(sessionId, sessionContext);
- }
- return sessionId;
+ public void openSession(@Nullable String sessionId) throws
SqlExecutionException {
+ // do nothing
+ sessionContext = LocalContextUtils.buildSessionContext(sessionId,
defaultContext);
}
@Override
- public void closeSession(String sessionId) throws SqlExecutionException {
+ public void closeSession() throws SqlExecutionException {
resultStore
.getResults()
.forEach(
(resultId) -> {
try {
- cancelQuery(sessionId, resultId);
+ cancelQuery(resultId);
} catch (Throwable t) {
// ignore any throwable to keep the clean up
running
}
});
// Remove the session's ExecutionContext from contextMap and close it.
- SessionContext context = this.contextMap.remove(sessionId);
- if (context != null) {
- context.close();
- }
- }
-
- private SessionContext getSessionContext(String sessionId) {
- SessionContext context = this.contextMap.get(sessionId);
- if (context == null) {
- throw new SqlExecutionException("Invalid session identifier: " +
sessionId);
+ if (sessionContext != null) {
+ sessionContext.close();
}
- return context;
}
/**
* Get the existed {@link ExecutionContext} from contextMap, or thrown
exception if does not
* exist.
*/
@VisibleForTesting
- protected ExecutionContext getExecutionContext(String sessionId) throws
SqlExecutionException {
- return getSessionContext(sessionId).getExecutionContext();
+ protected ExecutionContext getExecutionContext() throws
SqlExecutionException {
+ return sessionContext.getExecutionContext();
}
@Override
- public Map<String, String> getSessionConfigMap(String sessionId) throws
SqlExecutionException {
- return getSessionContext(sessionId).getConfigMap();
+ public Map<String, String> getSessionConfigMap() throws
SqlExecutionException {
+ return sessionContext.getConfigMap();
}
@Override
- public ReadableConfig getSessionConfig(String sessionId) throws
SqlExecutionException {
- return getSessionContext(sessionId).getReadableConfig();
+ public ReadableConfig getSessionConfig() throws SqlExecutionException {
+ return sessionContext.getReadableConfig();
}
@Override
- public void resetSessionProperties(String sessionId) throws
SqlExecutionException {
- SessionContext context = getSessionContext(sessionId);
+ public void resetSessionProperties() throws SqlExecutionException {
+ SessionContext context = sessionContext;
context.reset();
}
@Override
- public void resetSessionProperty(String sessionId, String key) throws
SqlExecutionException {
- SessionContext context = getSessionContext(sessionId);
+ public void resetSessionProperty(String key) throws SqlExecutionException {
+ SessionContext context = sessionContext;
context.reset(key);
}
@Override
- public void setSessionProperty(String sessionId, String key, String value)
- throws SqlExecutionException {
- SessionContext context = getSessionContext(sessionId);
+ public void setSessionProperty(String key, String value) throws
SqlExecutionException {
+ SessionContext context = sessionContext;
context.set(key, value);
Review Comment:
ditto
##########
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java:
##########
@@ -320,19 +294,17 @@ public void cancelQuery(String sessionId, String
resultId) throws SqlExecutionEx
}
@Override
- public void removeJar(String sessionId, String jarUrl) {
- final SessionContext context = getSessionContext(sessionId);
+ public void removeJar(String jarUrl) {
+ final SessionContext context = sessionContext;
context.removeJar(jarUrl);
Review Comment:
ditto
##########
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java:
##########
@@ -42,54 +42,50 @@ public interface Executor {
* Open a new session by using the given session id.
*
* @param sessionId session identifier.
- * @return used session identifier to track the session.
* @throws SqlExecutionException if any error happen
*/
- String openSession(@Nullable String sessionId) throws
SqlExecutionException;
+ void openSession(@Nullable String sessionId) throws SqlExecutionException;
/**
* Close the resources of session for given session id.
*
- * @param sessionId session identifier
* @throws SqlExecutionException if any error happen
*/
- void closeSession(String sessionId) throws SqlExecutionException;
+ void closeSession() throws SqlExecutionException;
/**
* Returns a copy of {@link Map} of all session configurations that are
defined by the executor
* and the session.
*
- * <p>Both this method and {@link #getSessionConfig(String)} return the
same configuration set,
- * but different return type.
+ * <p>Both this method and {@link #getSessionConfig()} return the same
configuration set, but
+ * different return type.
*/
- Map<String, String> getSessionConfigMap(String sessionId) throws
SqlExecutionException;
Review Comment:
I have one question. Original `LocalExecutor` implements `Executor` have a
map `ConcurrentHashMap<String, SessionContext> contextMap`, but now, we just
delete it. Does this mean that each executor can only have one session? (If
that, why we need sessionId, we can just define an executorId instead ? ).
Also, I think if you delete this `contextMap`, you need add tests to verify we
don't need this map. WDYT ?
##########
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java:
##########
@@ -240,10 +215,9 @@ public TableResultInternal executeModifyOperations(
}
@Override
- public ResultDescriptor executeQuery(String sessionId, QueryOperation
query)
- throws SqlExecutionException {
- final TableResultInternal tableResult = executeOperation(sessionId,
query);
- final SessionContext context = getSessionContext(sessionId);
+ public ResultDescriptor executeQuery(QueryOperation query) throws
SqlExecutionException {
+ final TableResultInternal tableResult = executeOperation(query);
+ final SessionContext context = sessionContext;
Review Comment:
ditto
##########
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java:
##########
@@ -99,88 +94,69 @@ public void start() {
}
@Override
- public String openSession(@Nullable String sessionId) throws
SqlExecutionException {
- SessionContext sessionContext =
- LocalContextUtils.buildSessionContext(sessionId,
defaultContext);
- sessionId = sessionContext.getSessionId();
- if (this.contextMap.containsKey(sessionId)) {
- throw new SqlExecutionException(
- "Found another session with the same session identifier: "
+ sessionId);
- } else {
- this.contextMap.put(sessionId, sessionContext);
- }
- return sessionId;
+ public void openSession(@Nullable String sessionId) throws
SqlExecutionException {
+ // do nothing
+ sessionContext = LocalContextUtils.buildSessionContext(sessionId,
defaultContext);
}
@Override
- public void closeSession(String sessionId) throws SqlExecutionException {
+ public void closeSession() throws SqlExecutionException {
resultStore
.getResults()
.forEach(
(resultId) -> {
try {
- cancelQuery(sessionId, resultId);
+ cancelQuery(resultId);
} catch (Throwable t) {
// ignore any throwable to keep the clean up
running
}
});
// Remove the session's ExecutionContext from contextMap and close it.
- SessionContext context = this.contextMap.remove(sessionId);
- if (context != null) {
- context.close();
- }
- }
-
- private SessionContext getSessionContext(String sessionId) {
- SessionContext context = this.contextMap.get(sessionId);
- if (context == null) {
- throw new SqlExecutionException("Invalid session identifier: " +
sessionId);
+ if (sessionContext != null) {
+ sessionContext.close();
}
- return context;
}
/**
* Get the existed {@link ExecutionContext} from contextMap, or thrown
exception if does not
* exist.
*/
@VisibleForTesting
- protected ExecutionContext getExecutionContext(String sessionId) throws
SqlExecutionException {
- return getSessionContext(sessionId).getExecutionContext();
+ protected ExecutionContext getExecutionContext() throws
SqlExecutionException {
+ return sessionContext.getExecutionContext();
}
@Override
- public Map<String, String> getSessionConfigMap(String sessionId) throws
SqlExecutionException {
- return getSessionContext(sessionId).getConfigMap();
+ public Map<String, String> getSessionConfigMap() throws
SqlExecutionException {
+ return sessionContext.getConfigMap();
}
@Override
- public ReadableConfig getSessionConfig(String sessionId) throws
SqlExecutionException {
- return getSessionContext(sessionId).getReadableConfig();
+ public ReadableConfig getSessionConfig() throws SqlExecutionException {
+ return sessionContext.getReadableConfig();
}
@Override
- public void resetSessionProperties(String sessionId) throws
SqlExecutionException {
- SessionContext context = getSessionContext(sessionId);
+ public void resetSessionProperties() throws SqlExecutionException {
+ SessionContext context = sessionContext;
context.reset();
}
Review Comment:
Why not use `sessionContext.reset()` directly here.
##########
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java:
##########
@@ -85,16 +85,15 @@ private void start() {
executor.start();
// Open an new session
Review Comment:
nit: We can correct these obvious mistakes together: `an` -> `a`. Btw, you
can also pay attention to the warning of the idea checkStyle, like this class.
##########
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java:
##########
@@ -519,43 +516,38 @@ public String openSession(@Nullable String sessionId)
throws SqlExecutionExcepti
Collections.emptyList(),
configuration,
Collections.singletonList(new DefaultCLI()));
- SessionContext context = SessionContext.create(defaultContext,
sessionId);
- sessionMap.put(sessionId, context);
+ sessionContext = SessionContext.create(defaultContext, sessionId);
helper.registerTables();
- return sessionId;
}
@Override
- public void closeSession(String sessionId) throws
SqlExecutionException {}
+ public void closeSession() throws SqlExecutionException {}
@Override
- public Map<String, String> getSessionConfigMap(String sessionId)
- throws SqlExecutionException {
- return this.sessionMap.get(sessionId).getConfigMap();
+ public Map<String, String> getSessionConfigMap() throws
SqlExecutionException {
+ return sessionContext.getConfigMap();
}
@Override
- public ReadableConfig getSessionConfig(String sessionId) throws
SqlExecutionException {
- SessionContext context = this.sessionMap.get(sessionId);
+ public ReadableConfig getSessionConfig() throws SqlExecutionException {
+ SessionContext context = sessionContext;
Review Comment:
ditto
##########
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java:
##########
@@ -99,88 +94,69 @@ public void start() {
}
@Override
- public String openSession(@Nullable String sessionId) throws
SqlExecutionException {
- SessionContext sessionContext =
- LocalContextUtils.buildSessionContext(sessionId,
defaultContext);
- sessionId = sessionContext.getSessionId();
- if (this.contextMap.containsKey(sessionId)) {
- throw new SqlExecutionException(
- "Found another session with the same session identifier: "
+ sessionId);
- } else {
- this.contextMap.put(sessionId, sessionContext);
- }
- return sessionId;
+ public void openSession(@Nullable String sessionId) throws
SqlExecutionException {
+ // do nothing
+ sessionContext = LocalContextUtils.buildSessionContext(sessionId,
defaultContext);
}
@Override
- public void closeSession(String sessionId) throws SqlExecutionException {
+ public void closeSession() throws SqlExecutionException {
resultStore
.getResults()
.forEach(
(resultId) -> {
try {
- cancelQuery(sessionId, resultId);
+ cancelQuery(resultId);
} catch (Throwable t) {
// ignore any throwable to keep the clean up
running
}
});
// Remove the session's ExecutionContext from contextMap and close it.
- SessionContext context = this.contextMap.remove(sessionId);
- if (context != null) {
- context.close();
- }
- }
-
- private SessionContext getSessionContext(String sessionId) {
- SessionContext context = this.contextMap.get(sessionId);
- if (context == null) {
- throw new SqlExecutionException("Invalid session identifier: " +
sessionId);
+ if (sessionContext != null) {
+ sessionContext.close();
}
- return context;
}
/**
* Get the existed {@link ExecutionContext} from contextMap, or thrown
exception if does not
* exist.
*/
@VisibleForTesting
- protected ExecutionContext getExecutionContext(String sessionId) throws
SqlExecutionException {
- return getSessionContext(sessionId).getExecutionContext();
+ protected ExecutionContext getExecutionContext() throws
SqlExecutionException {
+ return sessionContext.getExecutionContext();
}
@Override
- public Map<String, String> getSessionConfigMap(String sessionId) throws
SqlExecutionException {
- return getSessionContext(sessionId).getConfigMap();
+ public Map<String, String> getSessionConfigMap() throws
SqlExecutionException {
+ return sessionContext.getConfigMap();
}
@Override
- public ReadableConfig getSessionConfig(String sessionId) throws
SqlExecutionException {
- return getSessionContext(sessionId).getReadableConfig();
+ public ReadableConfig getSessionConfig() throws SqlExecutionException {
+ return sessionContext.getReadableConfig();
}
@Override
- public void resetSessionProperties(String sessionId) throws
SqlExecutionException {
- SessionContext context = getSessionContext(sessionId);
+ public void resetSessionProperties() throws SqlExecutionException {
+ SessionContext context = sessionContext;
context.reset();
}
@Override
- public void resetSessionProperty(String sessionId, String key) throws
SqlExecutionException {
- SessionContext context = getSessionContext(sessionId);
+ public void resetSessionProperty(String key) throws SqlExecutionException {
+ SessionContext context = sessionContext;
context.reset(key);
}
Review Comment:
Why not use `sessionContext.reset(key);` directly here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]