LadyForest commented on code in PR #21717:
URL: https://github.com/apache/flink/pull/21717#discussion_r1089615342


##########
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutorImplITCase.java:
##########
@@ -414,50 +462,130 @@ void testStopJob() throws Exception {
         final String insert = "INSERT INTO snk SELECT a FROM src;";
 
         try {
-            executor.executeOperation(executor.parseStatement(srcDdl));
-            executor.executeOperation(executor.parseStatement(snkDdl));
-            TableResult result = 
executor.executeOperation(executor.parseStatement(insert));
-            JobClient jobClient = result.getJobClient().get();
-            JobID jobId = jobClient.getJobID();
+            executor.configureSession(srcDdl);
+            executor.configureSession(snkDdl);
+            ClientResult result = executor.executeStatement(insert);
+            JobID jobID = result.getJobId();
 
             // wait till the job turns into running status or the test times 
out
-            JobStatus jobStatus;
-            do {
-                Thread.sleep(2_000L);
-                jobStatus = jobClient.getJobStatus().get();
-            } while (jobStatus != JobStatus.RUNNING);
-
-            Optional<String> savepoint = executor.stopJob(jobId.toString(), 
true, true);
-            assertThat(savepoint).isPresent();
+            TestUtils.waitUntilAllTasksAreRunning(clusterClient, jobID);
+            StringData savepointPath =
+                    CollectionUtil.iteratorToList(
+                                    executor.executeStatement(
+                                            String.format("STOP JOB '%s' WITH 
SAVEPOINT", jobID)))
+                            .get(0)
+                            .getString(0);
+            assertThat(
+                            Files.exists(
+                                    Paths.get(
+                                            URI.create(
+                                                    
Preconditions.checkNotNull(savepointPath)
+                                                            .toString()))))
+                    .isTrue();

Review Comment:
   ```suggestion
               assertThat(savepointPath)
                       .isNotNull()
                       .matches(
                               stringData ->
                                       
Files.exists(Paths.get(URI.create(stringData.toString()))));
   ```



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/session/ConfigureSessionRequestBody.java:
##########
@@ -39,6 +39,10 @@ public class ConfigureSessionRequestBody implements 
RequestBody {
     @Nullable
     private final Long timeout;
 
+    public ConfigureSessionRequestBody(String statement) {
+        this(statement, null);
+    }
+
     public ConfigureSessionRequestBody(

Review Comment:
   > The annotation `JsonCreator` is used when serde the json object. However, 
the developer uses the creator here to build the object conveniently. Do you 
mean we should add annotation `JsonCreator` for the constructor 
`ConfigureSessionRequestBody(@JsonProperty(FIELD_NAME_STATEMENT) String 
statement, @Nullable @JsonProperty(FIELD_NAME_EXECUTION_TIMEOUT) Long timeout)`?
   
   Yes. The base interface `RequestBody` says 
   > Subclass instances are converted to JSON using jackson-databind. 
Subclasses must have a constructor that accepts all fields of the JSON request, 
that should be annotated with @JsonCreator.



##########
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutorImplITCase.java:
##########
@@ -414,50 +462,130 @@ void testStopJob() throws Exception {
         final String insert = "INSERT INTO snk SELECT a FROM src;";
 
         try {
-            executor.executeOperation(executor.parseStatement(srcDdl));
-            executor.executeOperation(executor.parseStatement(snkDdl));
-            TableResult result = 
executor.executeOperation(executor.parseStatement(insert));
-            JobClient jobClient = result.getJobClient().get();
-            JobID jobId = jobClient.getJobID();
+            executor.configureSession(srcDdl);
+            executor.configureSession(snkDdl);
+            ClientResult result = executor.executeStatement(insert);
+            JobID jobID = result.getJobId();
 
             // wait till the job turns into running status or the test times 
out
-            JobStatus jobStatus;
-            do {
-                Thread.sleep(2_000L);
-                jobStatus = jobClient.getJobStatus().get();
-            } while (jobStatus != JobStatus.RUNNING);
-
-            Optional<String> savepoint = executor.stopJob(jobId.toString(), 
true, true);
-            assertThat(savepoint).isPresent();
+            TestUtils.waitUntilAllTasksAreRunning(clusterClient, jobID);
+            StringData savepointPath =
+                    CollectionUtil.iteratorToList(
+                                    executor.executeStatement(
+                                            String.format("STOP JOB '%s' WITH 
SAVEPOINT", jobID)))
+                            .get(0)
+                            .getString(0);
+            assertThat(
+                            Files.exists(
+                                    Paths.get(
+                                            URI.create(
+                                                    
Preconditions.checkNotNull(savepointPath)
+                                                            .toString()))))
+                    .isTrue();

Review Comment:
   Btw, I found the updated configuration is not read
   
   ```sql
   Flink SQL> CREATE TABLE src (a STRING) WITH ('connector' = 'datagen');
   [INFO] Execute statement succeed.
   
   Flink SQL> CREATE TABLE snk (a STRING) WITH ('connector' = 'blackhole');
   [INFO] Execute statement succeed.
   
   Flink SQL> INSERT INTO snk SELECT a FROM src;
   [INFO] Submitting SQL update statement to the cluster...
   [INFO] SQL update statement has been successfully submitted to the cluster:
   Job ID: 9bafe0a3408cf8db4ae8b4fc4110322f
   
   
   Flink SQL> show jobs;
   
+----------------------------------+--------------------------------------------------+---------+-------------------------+
   |                           job id |                                         
job name |  status |              start time |
   
+----------------------------------+--------------------------------------------------+---------+-------------------------+
   | 9bafe0a3408cf8db4ae8b4fc4110322f | 
insert-into_default_catalog.default_database.snk | RUNNING | 
2023-01-28T03:47:56.161 |
   
+----------------------------------+--------------------------------------------------+---------+-------------------------+
   1 row in set
   
   Flink SQL> stop job '9bafe0a3408cf8db4ae8b4fc4110322f' with savepoint;
   [ERROR] Could not execute SQL statement. Reason:
   org.apache.flink.runtime.rest.util.RestClientException: 
[org.apache.flink.runtime.rest.handler.RestHandlerException: Config key 
[state.savepoints.dir] is not set. Property [targetDirectory] must be provided.
   
   Flink SQL> set 'state.savepoints.dir' = '/tmp/test';
   [INFO] Execute statement succeed.
   
   Flink SQL> stop job '9bafe0a3408cf8db4ae8b4fc4110322f' with savepoint;
   [ERROR] Could not execute SQL statement. Reason:
   org.apache.flink.runtime.rest.util.RestClientException: 
[org.apache.flink.runtime.rest.handler.RestHandlerException: Config key 
[state.savepoints.dir] is not set. Property [targetDirectory] must be provided.
   
   Flink SQL> set;
   
+--------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------+
   |                                        key |                               
                                                                                
                               value |
   
+--------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------+
   |                         execution.attached |                               
                                                                                
                                true |
   |           execution.savepoint-restore-mode |                               
                                                                                
                            NO_CLAIM |
   | execution.savepoint.ignore-unclaimed-state |                               
                                                                                
                               false |
   |        execution.shutdown-on-attached-exit |                               
                                                                                
                               false |
   |                           execution.target |                               
                                                                                
                              remote |
   |                       jobmanager.bind-host |                               
                                                                                
                           localhost |
   |     jobmanager.execution.failover-strategy |                               
                                                                                
                              region |
   |             jobmanager.memory.process.size |                               
                                                                                
                               1600m |
   |                     jobmanager.rpc.address |                               
                                                                                
                           localhost |
   |                        jobmanager.rpc.port |                               
                                                                                
                                6123 |
   |                        parallelism.default |                               
                                                                                
                                   1 |
   |                        pipeline.classpaths |                               
                                                                                
                                     |
   |                              pipeline.jars | 
file:${baseDir}/flink/flink-dist/target/flink-1.17-SNAPSHOT-bin/flink-1.17-SNAPSHOT/opt/flink-python-1.17-SNAPSHOT.jar
 |
   |                               rest.address |                               
                                                                                
                           localhost |
   |                          rest.bind-address |                               
                                                                                
                           localhost |
   |          sql-gateway.endpoint.rest.address |                               
                                                                                
                           localhost |
   |             sql-gateway.endpoint.rest.port |                               
                                                                                
                               58950 |
   |                       state.savepoints.dir |                               
                                                                                
                           /tmp/test |
   |               table.resources.download-dir |                               
                   
/var/folders/xd/9dp1y4vd3h56kjkvdk426l500000gp/T/sql-gateway-3623ad21-994f-46e3-bfa5-2fb01a99d3c6
 |
   |                      taskmanager.bind-host |                               
                                                                                
                           localhost |
   |                           taskmanager.host |                               
                                                                                
                           localhost |
   |            taskmanager.memory.process.size |                               
                                                                                
                               1728m |
   |              taskmanager.numberOfTaskSlots |                               
                                                                                
                                   1 |
   
+--------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------+
   23 rows in set
   
   Flink SQL> set sql-client.verbose=true;
   [INFO] Execute statement succeed.
   
   Flink SQL> stop job '9bafe0a3408cf8db4ae8b4fc4110322f' with savepoint;
   [ERROR] Could not execute SQL statement. Reason:
   org.apache.flink.runtime.rest.util.RestClientException: [Internal server 
error., <Exception on server side:
   org.apache.flink.table.gateway.api.utils.SqlGatewayException: 
org.apache.flink.table.gateway.api.utils.SqlGatewayException: Failed to 
fetchResults.
        at 
org.apache.flink.table.gateway.rest.handler.statement.FetchResultsHandler.handleRequest(FetchResultsHandler.java:85)
        at 
org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler.respondToRequest(AbstractSqlGatewayRestHandler.java:84)
        at 
org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler.respondToRequest(AbstractSqlGatewayRestHandler.java:52)
        at 
org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:196)
        at 
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:83)
        at java.util.Optional.ifPresent(Optional.java:159)
        at 
org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45)
        at 
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:80)
        at 
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:49)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at 
org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:115)
        at 
org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:94)
        at 
org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:55)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at 
org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:208)
        at 
org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:69)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
        at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:336)
        at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:308)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
        at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
        at 
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at java.lang.Thread.run(Thread.java:748)
   Caused by: org.apache.flink.table.gateway.api.utils.SqlGatewayException: 
Failed to fetchResults.
        at 
org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.fetchResults(SqlGatewayServiceImpl.java:229)
        at 
org.apache.flink.table.gateway.rest.handler.statement.FetchResultsHandler.handleRequest(FetchResultsHandler.java:83)
        ... 48 more
   Caused by: 
org.apache.flink.table.gateway.service.utils.SqlExecutionException: Failed to 
execute the operation d6d8fe6f-b0d0-4040-8420-80c605cf6b34.
        at 
org.apache.flink.table.gateway.service.operation.OperationManager$Operation.processThrowable(OperationManager.java:398)
        at 
org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:251)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        ... 1 more
   Caused by: 
org.apache.flink.table.gateway.service.utils.SqlExecutionException: Could not 
stop job 9bafe0a3408cf8db4ae8b4fc4110322f for operation 
d6d8fe6f-b0d0-4040-8420-80c605cf6b34.
        at 
org.apache.flink.table.gateway.service.operation.OperationExecutor.callStopJobOperation(OperationExecutor.java:520)
        at 
org.apache.flink.table.gateway.service.operation.OperationExecutor.executeOperation(OperationExecutor.java:332)
        at 
org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:181)
        at 
org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.lambda$executeStatement$1(SqlGatewayServiceImpl.java:212)
        at 
org.apache.flink.table.gateway.service.operation.OperationManager.lambda$submitOperation$1(OperationManager.java:110)
        at 
org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:242)
        ... 7 more
   Caused by: 
org.apache.flink.table.gateway.service.utils.SqlExecutionException: Failed to 
run cluster action.
        at 
org.apache.flink.table.gateway.service.operation.OperationExecutor.runClusterAction(OperationExecutor.java:600)
        at 
org.apache.flink.table.gateway.service.operation.OperationExecutor.callStopJobOperation(OperationExecutor.java:487)
        ... 12 more
   Caused by: org.apache.flink.util.FlinkException: Could not stop job 
9bafe0a3408cf8db4ae8b4fc4110322f in session 
d6d8fe6f-b0d0-4040-8420-80c605cf6b34.
        at 
org.apache.flink.table.gateway.service.operation.OperationExecutor.lambda$callStopJobOperation$10(OperationExecutor.java:510)
        at 
org.apache.flink.table.gateway.service.operation.OperationExecutor.runClusterAction(OperationExecutor.java:598)
        ... 13 more
   Caused by: java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.rest.util.RestClientException: 
[org.apache.flink.runtime.rest.handler.RestHandlerException: Config key 
[state.savepoints.dir] is not set. Property [targetDirectory] must be provided.
        at 
org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers$StopWithSavepointHandler.triggerOperation(SavepointHandlers.java:200)
        at 
org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers$SavepointHandlerBase.handleRequest(SavepointHandlers.java:149)
        at 
org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers$StopWithSavepointHandler.handleRequest(SavepointHandlers.java:170)
        at 
org.apache.flink.runtime.rest.handler.AbstractRestHandler.respondToRequest(AbstractRestHandler.java:83)
        at 
org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:196)
        at 
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:83)
        at java.util.Optional.ifPresent(Optional.java:159)
        at 
org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45)
        at 
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:80)
        at 
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:49)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at 
org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:115)
        at 
org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:94)
        at 
org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:55)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at 
org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:208)
        at 
org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:69)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
        at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:336)
        at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:308)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
        at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
        at 
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at java.lang.Thread.run(Thread.java:748)
   ]
        at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
        at 
org.apache.flink.table.gateway.service.operation.OperationExecutor.lambda$callStopJobOperation$10(OperationExecutor.java:502)
        ... 14 more
   Caused by: org.apache.flink.runtime.rest.util.RestClientException: 
[org.apache.flink.runtime.rest.handler.RestHandlerException: Config key 
[state.savepoints.dir] is not set. Property [targetDirectory] must be provided.
        at 
org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers$StopWithSavepointHandler.triggerOperation(SavepointHandlers.java:200)
        at 
org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers$SavepointHandlerBase.handleRequest(SavepointHandlers.java:149)
        at 
org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers$StopWithSavepointHandler.handleRequest(SavepointHandlers.java:170)
        at 
org.apache.flink.runtime.rest.handler.AbstractRestHandler.respondToRequest(AbstractRestHandler.java:83)
        at 
org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:196)
        at 
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:83)
        at java.util.Optional.ifPresent(Optional.java:159)
        at 
org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45)
        at 
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:80)
        at 
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:49)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at 
org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:115)
        at 
org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:94)
        at 
org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:55)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at 
org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:208)
        at 
org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:69)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
        at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:336)
        at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:308)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
        at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
        at 
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at java.lang.Thread.run(Thread.java:748)
   ]
        at 
org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:536)
        at 
org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:516)
        at 
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
        at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
        at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
        ... 3 more
   
   End of exception on server side>]
        at 
org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:536)
        at 
org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:516)
        at 
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
        at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
        at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
   ```



##########
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutorImplITCase.java:
##########
@@ -414,50 +462,130 @@ void testStopJob() throws Exception {
         final String insert = "INSERT INTO snk SELECT a FROM src;";
 
         try {
-            executor.executeOperation(executor.parseStatement(srcDdl));
-            executor.executeOperation(executor.parseStatement(snkDdl));
-            TableResult result = 
executor.executeOperation(executor.parseStatement(insert));
-            JobClient jobClient = result.getJobClient().get();
-            JobID jobId = jobClient.getJobID();
+            executor.configureSession(srcDdl);
+            executor.configureSession(snkDdl);
+            ClientResult result = executor.executeStatement(insert);
+            JobID jobID = result.getJobId();
 
             // wait till the job turns into running status or the test times 
out
-            JobStatus jobStatus;
-            do {
-                Thread.sleep(2_000L);
-                jobStatus = jobClient.getJobStatus().get();
-            } while (jobStatus != JobStatus.RUNNING);
-
-            Optional<String> savepoint = executor.stopJob(jobId.toString(), 
true, true);
-            assertThat(savepoint).isPresent();
+            TestUtils.waitUntilAllTasksAreRunning(clusterClient, jobID);
+            StringData savepointPath =
+                    CollectionUtil.iteratorToList(
+                                    executor.executeStatement(
+                                            String.format("STOP JOB '%s' WITH 
SAVEPOINT", jobID)))
+                            .get(0)
+                            .getString(0);
+            assertThat(
+                            Files.exists(
+                                    Paths.get(
+                                            URI.create(
+                                                    
Preconditions.checkNotNull(savepointPath)
+                                                            .toString()))))
+                    .isTrue();

Review Comment:
   And `stop job` seems not to work
   
![image](https://user-images.githubusercontent.com/55568005/215242202-5aff4983-da28-49fc-a27e-81579f5063ce.png)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to