fsk119 commented on code in PR #21802:
URL: https://github.com/apache/flink/pull/21802#discussion_r1093998583
##########
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutorImplITCase.java:
##########
@@ -481,17 +489,35 @@ void testStopJob() throws Exception {
// wait till the job turns into running status or the test times
out
TestUtils.waitUntilAllTasksAreRunning(clusterClient, jobID);
- StringData savepointPath =
- CollectionUtil.iteratorToList(
- executor.executeStatement(
- String.format("STOP JOB '%s' WITH
SAVEPOINT", jobID)))
- .get(0)
- .getString(0);
- assertThat(savepointPath)
- .isNotNull()
- .matches(
- stringData ->
-
Files.exists(Paths.get(URI.create(stringData.toString()))));
+ String stopJobCmd =
+ String.format("STOP JOB '%s'%s", jobID, withSavepoint ? "
WITH SAVEPOINT" : "");
+ ClientResult stopResult = executor.executeStatement(stopJobCmd);
+ JobStatus expectedJobStatus;
+ if (withSavepoint) {
+ String savepointPath =
+
CollectionUtil.iteratorToList(stopResult).get(0).getString(0).toString();
+
+ assertThat(savepointPath)
+ .matches(path ->
Files.exists(Paths.get(URI.create(path))));
+ expectedJobStatus = JobStatus.FINISHED;
+ } else {
+ TestUtils.waitUntilJobCanceled(jobID, clusterClient);
Review Comment:
Why we need to wait here? We have already wait during the execution?
##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java:
##########
@@ -515,12 +519,12 @@ public ResultFetcher callStopJobOperation(
return Optional.empty();
}
} catch (Exception e) {
- throw new SqlExecutionException(
- "Could not stop job "
- + jobId
- + " for operation "
- + handle.getIdentifier()
- + ".",
+ throw new FlinkException(
Review Comment:
Use `SqlExecutionException`
##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java:
##########
@@ -292,10 +292,16 @@ public ResultFetcher getCompletionHints(
@VisibleForTesting
public TableEnvironmentInternal getTableEnvironment() {
TableEnvironmentInternal tableEnv =
sessionContext.createTableEnvironment();
- tableEnv.getConfig().getConfiguration().addAll(executionConfig);
+ tableEnv.getConfig().getConfiguration().addAll(operationConfig);
Review Comment:
After rethink, it's better we can give the execution config when creating
the table env. Because we can not modify the configuration after creating
envrionment, e.g. execution.runtime-mode
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]