fsk119 commented on code in PR #20159:
URL: https://github.com/apache/flink/pull/20159#discussion_r927344410
##########
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java:
##########
@@ -115,6 +121,14 @@ private static Configuration getConfig() {
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
config.setBoolean(WebOptions.SUBMIT_ENABLE, false);
+ config.set(StateBackendOptions.STATE_BACKEND, "hashmap");
+ config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
+ config.set(
+ CheckpointingOptions.CHECKPOINTS_DIRECTORY,
+
Paths.get(System.getProperty("java.io.tmpdir")).toUri().toString());
+ config.set(
+ CheckpointingOptions.SAVEPOINT_DIRECTORY,
+
Paths.get(System.getProperty("java.io.tmpdir")).toUri().toString());
Review Comment:
I think we have two ways to solve this:
1. we can use junit5 api, which has `@Order` annotation to control the
order. You can cc SqlGatewayStatemetnITCase.
2. we can set this value in the test
##########
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java:
##########
@@ -118,6 +118,10 @@ Set<URL> getDependencies() {
return dependencies;
}
+ public URLClassLoader getClassLoader() {
+ return classLoader;
+ }
Review Comment:
It seems no one uses this. I think we can remove it.
##########
flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl:
##########
@@ -2296,3 +2296,38 @@ SqlNode SqlAnalyzeTable():
return new SqlAnalyzeTable(s.end(this), tableName, partitionSpec,
columns, allColumns);
}
}
+
+/**
+* Parses a STOP JOB statement:
+* STOP JOB <JOB_ID> [<WITH SAVEPOINT>] [<WITH DRAIN>];
+*/
+SqlStopJob SqlStopJob() :
+{
+ SqlCharStringLiteral jobId;
+ boolean isWithSavepoint = false;
+ boolean isWithDrain = false;
+}
+{
+ <STOP> <JOB> <QUOTED_STRING>
+ {
+ String id = SqlParserUtil.parseString(token.image);
+ jobId = SqlLiteral.createCharString(id, getPos());
+ }
+ [
+ LOOKAHEAD(2)
+ <WITH> <SAVEPOINT>
Review Comment:
It seems it's impossible to parse
```
STOP JOB 'myjob' WITH DRAIN WITH SAVEPOINT
```
I think we can do as `EXPLAIN` syntax that allows users to specify
`ExplainDetail` in any order.
##########
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java:
##########
@@ -322,4 +339,95 @@ public List<String> listJars(String sessionId) {
final SessionContext context = getSessionContext(sessionId);
return context.listJars();
}
+
+ @Override
+ public Optional<String> stopJob(
+ String sessionId, String jobId, boolean isWithSavepoint, boolean
isWithDrain)
+ throws SqlExecutionException {
+ Duration clientTimeout =
getSessionConfig(sessionId).get(ClientOptions.CLIENT_TIMEOUT);
+ try {
+ return runClusterAction(
+ sessionId,
+ clusterClient -> {
+ if (isWithSavepoint) {
+ // blocking get savepoint path
+ try {
+ String savepoint =
+ clusterClient
+ .stopWithSavepoint(
+
JobID.fromHexString(jobId),
+ isWithDrain,
+ null,
+
SavepointFormatType.DEFAULT)
+ .get(
+
clientTimeout.toMillis(),
+ TimeUnit.MILLISECONDS);
+ return Optional.of(savepoint);
+ } catch (Exception e) {
+ throw new FlinkException(
+ "Could not stop job "
+ + jobId
+ + " in session "
+ + sessionId
+ + ".",
+ e);
+ }
+ } else {
+ clusterClient.cancel(JobID.fromHexString(jobId));
+ return Optional.empty();
+ }
+ });
+ } catch (Exception e) {
+ throw new SqlExecutionException(
+ "Could not stop job " + jobId + " in session " + sessionId
+ ".", e);
+ }
+ }
+
+ /**
+ * Retrieves the {@link ClusterClient} from the session and runs the given
{@link ClusterAction}
+ * against it.
+ *
+ * @param sessionId the specified session ID
+ * @param clusterAction the cluster action to run against the retrieved
{@link ClusterClient}.
+ * @param <ClusterID> type of the cluster id
+ * @param <Result>> type of the result
+ * @throws FlinkException if something goes wrong
+ */
+ private <ClusterID, Result> Result runClusterAction(
+ String sessionId, ClusterAction<ClusterID, Result> clusterAction)
+ throws FlinkException {
+ final SessionContext context = getSessionContext(sessionId);
+ final Configuration configuration = (Configuration)
context.getReadableConfig();
+ final ClusterClientFactory<ClusterID> clusterClientFactory =
+
clusterClientServiceLoader.getClusterClientFactory(configuration);
+
+ final ClusterID clusterId =
clusterClientFactory.getClusterId(configuration);
Review Comment:
Actually `ClusterClientServiceLoader` is not thread-safe. It uses the
```
final ServiceLoader<ClusterClientFactory> loader =
ServiceLoader.load(ClusterClientFactory.class);
```
to load the Factory, which uses the thread-level classloader. It's better we
can add a method named `getClusterId` in the `SessionContext`. I think it
brings benefits:
1. we don't cast the ReadableConfig to Configuration
2. we can use the session-level classloader inside the session context.
With these benefits, we only need to pay the cost: create the
`ClusterServiceLoader` per session.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]