link3280 commented on code in PR #20159:
URL: https://github.com/apache/flink/pull/20159#discussion_r924595473
##########
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java:
##########
@@ -322,4 +339,96 @@ public List<String> listJars(String sessionId) {
final SessionContext context = getSessionContext(sessionId);
return context.listJars();
}
+
+ @Override
+ public Optional<String> stopJob(
+ String sessionId, String jobId, boolean isWithSavepoint, boolean
isWithDrain)
+ throws SqlExecutionException {
+ Duration clientTimeout =
getSessionConfig(sessionId).get(ClientOptions.CLIENT_TIMEOUT);
+ try {
+ return runClusterAction(
+ sessionId,
+ clusterClient -> {
+ if (isWithSavepoint) {
+ // blocking get savepoint path
+ try {
+ String savepoint =
+ clusterClient
+ .stopWithSavepoint(
+
JobID.fromHexString(jobId),
+ isWithDrain,
+ null,
+
SavepointFormatType.DEFAULT)
+ .get(
+
clientTimeout.toMillis(),
+ TimeUnit.MILLISECONDS);
+ return Optional.of(savepoint);
+ } catch (Exception e) {
+ throw new FlinkException(
+ "Could not stop job "
+ + jobId
+ + " in session "
+ + sessionId
+ + ".",
+ e);
+ }
+ } else {
+ clusterClient.cancel(JobID.fromHexString(jobId));
+ return Optional.empty();
+ }
+ });
+ } catch (Exception e) {
+ throw new SqlExecutionException(
+ "Could not stop job " + jobId + " in session " + sessionId
+ ".", e);
+ }
+ }
+
+ /**
+ * Retrieves the {@link ClusterClient} from the session and runs the given
{@link ClusterAction}
+ * against it.
+ *
+ * @param sessionId the specified session ID
+ * @param clusterAction the cluster action to run against the retrieved
{@link ClusterClient}.
+ * @param <ClusterID> type of the cluster id
+ * @param <Result>> type of the result
+ * @throws FlinkException if something goes wrong
+ */
+ private <ClusterID, Result> Result runClusterAction(
Review Comment:
I agree that there are some duplicate codes between `CliFrontEnd` and
`LocalExecutor`, but the method signatures are different. `CliFrontEnd` gets
the cluster ID from the command line and local configurations, while
`LocalExecutor` gets the cluster ID from the session configuration. Given the
duplicated codes are few, I think it's acceptable. WDYT?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]