LadyForest commented on code in PR #21717: URL: https://github.com/apache/flink/pull/21717#discussion_r1089615342
########## flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutorImplITCase.java: ########## @@ -414,50 +462,130 @@ void testStopJob() throws Exception { final String insert = "INSERT INTO snk SELECT a FROM src;"; try { - executor.executeOperation(executor.parseStatement(srcDdl)); - executor.executeOperation(executor.parseStatement(snkDdl)); - TableResult result = executor.executeOperation(executor.parseStatement(insert)); - JobClient jobClient = result.getJobClient().get(); - JobID jobId = jobClient.getJobID(); + executor.configureSession(srcDdl); + executor.configureSession(snkDdl); + ClientResult result = executor.executeStatement(insert); + JobID jobID = result.getJobId(); // wait till the job turns into running status or the test times out - JobStatus jobStatus; - do { - Thread.sleep(2_000L); - jobStatus = jobClient.getJobStatus().get(); - } while (jobStatus != JobStatus.RUNNING); - - Optional<String> savepoint = executor.stopJob(jobId.toString(), true, true); - assertThat(savepoint).isPresent(); + TestUtils.waitUntilAllTasksAreRunning(clusterClient, jobID); + StringData savepointPath = + CollectionUtil.iteratorToList( + executor.executeStatement( + String.format("STOP JOB '%s' WITH SAVEPOINT", jobID))) + .get(0) + .getString(0); + assertThat( + Files.exists( + Paths.get( + URI.create( + Preconditions.checkNotNull(savepointPath) + .toString())))) + .isTrue(); Review Comment: ```suggestion assertThat(savepointPath) .isNotNull() .matches( stringData -> Files.exists(Paths.get(URI.create(stringData.toString())))); ``` ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/session/ConfigureSessionRequestBody.java: ########## @@ -39,6 +39,10 @@ public class ConfigureSessionRequestBody implements RequestBody { @Nullable private final Long timeout; + public ConfigureSessionRequestBody(String statement) { + this(statement, null); + } + public ConfigureSessionRequestBody( Review Comment: > The annotation `JsonCreator` is used when serde the json object. However, the developer uses the creator here to build the object conveniently. Do you mean we should add annotation `JsonCreator` for the constructor `ConfigureSessionRequestBody(@JsonProperty(FIELD_NAME_STATEMENT) String statement, @Nullable @JsonProperty(FIELD_NAME_EXECUTION_TIMEOUT) Long timeout)`? Yes. The base interface `RequestBody` says > Subclass instances are converted to JSON using jackson-databind. Subclasses must have a constructor that accepts all fields of the JSON request, that should be annotated with @JsonCreator. ########## flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutorImplITCase.java: ########## @@ -414,50 +462,130 @@ void testStopJob() throws Exception { final String insert = "INSERT INTO snk SELECT a FROM src;"; try { - executor.executeOperation(executor.parseStatement(srcDdl)); - executor.executeOperation(executor.parseStatement(snkDdl)); - TableResult result = executor.executeOperation(executor.parseStatement(insert)); - JobClient jobClient = result.getJobClient().get(); - JobID jobId = jobClient.getJobID(); + executor.configureSession(srcDdl); + executor.configureSession(snkDdl); + ClientResult result = executor.executeStatement(insert); + JobID jobID = result.getJobId(); // wait till the job turns into running status or the test times out - JobStatus jobStatus; - do { - Thread.sleep(2_000L); - jobStatus = jobClient.getJobStatus().get(); - } while (jobStatus != JobStatus.RUNNING); - - Optional<String> savepoint = executor.stopJob(jobId.toString(), true, true); - assertThat(savepoint).isPresent(); + TestUtils.waitUntilAllTasksAreRunning(clusterClient, jobID); + StringData savepointPath = + CollectionUtil.iteratorToList( + executor.executeStatement( + String.format("STOP JOB '%s' WITH SAVEPOINT", jobID))) + .get(0) + .getString(0); + assertThat( + Files.exists( + Paths.get( + URI.create( + Preconditions.checkNotNull(savepointPath) + .toString())))) + .isTrue(); Review Comment: Btw, I found the updated configuration is not read ```sql Flink SQL> CREATE TABLE src (a STRING) WITH ('connector' = 'datagen'); [INFO] Execute statement succeed. Flink SQL> CREATE TABLE snk (a STRING) WITH ('connector' = 'blackhole'); [INFO] Execute statement succeed. Flink SQL> INSERT INTO snk SELECT a FROM src; [INFO] Submitting SQL update statement to the cluster... [INFO] SQL update statement has been successfully submitted to the cluster: Job ID: 9bafe0a3408cf8db4ae8b4fc4110322f Flink SQL> show jobs; +----------------------------------+--------------------------------------------------+---------+-------------------------+ | job id | job name | status | start time | +----------------------------------+--------------------------------------------------+---------+-------------------------+ | 9bafe0a3408cf8db4ae8b4fc4110322f | insert-into_default_catalog.default_database.snk | RUNNING | 2023-01-28T03:47:56.161 | +----------------------------------+--------------------------------------------------+---------+-------------------------+ 1 row in set Flink SQL> stop job '9bafe0a3408cf8db4ae8b4fc4110322f' with savepoint; [ERROR] Could not execute SQL statement. Reason: org.apache.flink.runtime.rest.util.RestClientException: [org.apache.flink.runtime.rest.handler.RestHandlerException: Config key [state.savepoints.dir] is not set. Property [targetDirectory] must be provided. Flink SQL> set 'state.savepoints.dir' = '/tmp/test'; [INFO] Execute statement succeed. Flink SQL> stop job '9bafe0a3408cf8db4ae8b4fc4110322f' with savepoint; [ERROR] Could not execute SQL statement. Reason: org.apache.flink.runtime.rest.util.RestClientException: [org.apache.flink.runtime.rest.handler.RestHandlerException: Config key [state.savepoints.dir] is not set. Property [targetDirectory] must be provided. Flink SQL> set; +--------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------+ | key | value | +--------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------+ | execution.attached | true | | execution.savepoint-restore-mode | NO_CLAIM | | execution.savepoint.ignore-unclaimed-state | false | | execution.shutdown-on-attached-exit | false | | execution.target | remote | | jobmanager.bind-host | localhost | | jobmanager.execution.failover-strategy | region | | jobmanager.memory.process.size | 1600m | | jobmanager.rpc.address | localhost | | jobmanager.rpc.port | 6123 | | parallelism.default | 1 | | pipeline.classpaths | | | pipeline.jars | file:${baseDir}/flink/flink-dist/target/flink-1.17-SNAPSHOT-bin/flink-1.17-SNAPSHOT/opt/flink-python-1.17-SNAPSHOT.jar | | rest.address | localhost | | rest.bind-address | localhost | | sql-gateway.endpoint.rest.address | localhost | | sql-gateway.endpoint.rest.port | 58950 | | state.savepoints.dir | /tmp/test | | table.resources.download-dir | /var/folders/xd/9dp1y4vd3h56kjkvdk426l500000gp/T/sql-gateway-3623ad21-994f-46e3-bfa5-2fb01a99d3c6 | | taskmanager.bind-host | localhost | | taskmanager.host | localhost | | taskmanager.memory.process.size | 1728m | | taskmanager.numberOfTaskSlots | 1 | +--------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------+ 23 rows in set Flink SQL> set sql-client.verbose=true; [INFO] Execute statement succeed. Flink SQL> stop job '9bafe0a3408cf8db4ae8b4fc4110322f' with savepoint; [ERROR] Could not execute SQL statement. Reason: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., <Exception on server side: org.apache.flink.table.gateway.api.utils.SqlGatewayException: org.apache.flink.table.gateway.api.utils.SqlGatewayException: Failed to fetchResults. at org.apache.flink.table.gateway.rest.handler.statement.FetchResultsHandler.handleRequest(FetchResultsHandler.java:85) at org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler.respondToRequest(AbstractSqlGatewayRestHandler.java:84) at org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler.respondToRequest(AbstractSqlGatewayRestHandler.java:52) at org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:196) at org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:83) at java.util.Optional.ifPresent(Optional.java:159) at org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45) at org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:80) at org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:49) at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:115) at org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:94) at org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:55) at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:208) at org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:69) at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:336) at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:308) at org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.table.gateway.api.utils.SqlGatewayException: Failed to fetchResults. at org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.fetchResults(SqlGatewayServiceImpl.java:229) at org.apache.flink.table.gateway.rest.handler.statement.FetchResultsHandler.handleRequest(FetchResultsHandler.java:83) ... 48 more Caused by: org.apache.flink.table.gateway.service.utils.SqlExecutionException: Failed to execute the operation d6d8fe6f-b0d0-4040-8420-80c605cf6b34. at org.apache.flink.table.gateway.service.operation.OperationManager$Operation.processThrowable(OperationManager.java:398) at org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:251) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more Caused by: org.apache.flink.table.gateway.service.utils.SqlExecutionException: Could not stop job 9bafe0a3408cf8db4ae8b4fc4110322f for operation d6d8fe6f-b0d0-4040-8420-80c605cf6b34. at org.apache.flink.table.gateway.service.operation.OperationExecutor.callStopJobOperation(OperationExecutor.java:520) at org.apache.flink.table.gateway.service.operation.OperationExecutor.executeOperation(OperationExecutor.java:332) at org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:181) at org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.lambda$executeStatement$1(SqlGatewayServiceImpl.java:212) at org.apache.flink.table.gateway.service.operation.OperationManager.lambda$submitOperation$1(OperationManager.java:110) at org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:242) ... 7 more Caused by: org.apache.flink.table.gateway.service.utils.SqlExecutionException: Failed to run cluster action. at org.apache.flink.table.gateway.service.operation.OperationExecutor.runClusterAction(OperationExecutor.java:600) at org.apache.flink.table.gateway.service.operation.OperationExecutor.callStopJobOperation(OperationExecutor.java:487) ... 12 more Caused by: org.apache.flink.util.FlinkException: Could not stop job 9bafe0a3408cf8db4ae8b4fc4110322f in session d6d8fe6f-b0d0-4040-8420-80c605cf6b34. at org.apache.flink.table.gateway.service.operation.OperationExecutor.lambda$callStopJobOperation$10(OperationExecutor.java:510) at org.apache.flink.table.gateway.service.operation.OperationExecutor.runClusterAction(OperationExecutor.java:598) ... 13 more Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.rest.util.RestClientException: [org.apache.flink.runtime.rest.handler.RestHandlerException: Config key [state.savepoints.dir] is not set. Property [targetDirectory] must be provided. at org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers$StopWithSavepointHandler.triggerOperation(SavepointHandlers.java:200) at org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers$SavepointHandlerBase.handleRequest(SavepointHandlers.java:149) at org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers$StopWithSavepointHandler.handleRequest(SavepointHandlers.java:170) at org.apache.flink.runtime.rest.handler.AbstractRestHandler.respondToRequest(AbstractRestHandler.java:83) at org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:196) at org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:83) at java.util.Optional.ifPresent(Optional.java:159) at org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45) at org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:80) at org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:49) at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:115) at org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:94) at org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:55) at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:208) at org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:69) at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:336) at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:308) at org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at java.lang.Thread.run(Thread.java:748) ] at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) at org.apache.flink.table.gateway.service.operation.OperationExecutor.lambda$callStopJobOperation$10(OperationExecutor.java:502) ... 14 more Caused by: org.apache.flink.runtime.rest.util.RestClientException: [org.apache.flink.runtime.rest.handler.RestHandlerException: Config key [state.savepoints.dir] is not set. Property [targetDirectory] must be provided. at org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers$StopWithSavepointHandler.triggerOperation(SavepointHandlers.java:200) at org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers$SavepointHandlerBase.handleRequest(SavepointHandlers.java:149) at org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers$StopWithSavepointHandler.handleRequest(SavepointHandlers.java:170) at org.apache.flink.runtime.rest.handler.AbstractRestHandler.respondToRequest(AbstractRestHandler.java:83) at org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:196) at org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:83) at java.util.Optional.ifPresent(Optional.java:159) at org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45) at org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:80) at org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:49) at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:115) at org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:94) at org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:55) at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:208) at org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:69) at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:336) at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:308) at org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at java.lang.Thread.run(Thread.java:748) ] at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:536) at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:516) at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) ... 3 more End of exception on server side>] at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:536) at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:516) at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) ``` ########## flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutorImplITCase.java: ########## @@ -414,50 +462,130 @@ void testStopJob() throws Exception { final String insert = "INSERT INTO snk SELECT a FROM src;"; try { - executor.executeOperation(executor.parseStatement(srcDdl)); - executor.executeOperation(executor.parseStatement(snkDdl)); - TableResult result = executor.executeOperation(executor.parseStatement(insert)); - JobClient jobClient = result.getJobClient().get(); - JobID jobId = jobClient.getJobID(); + executor.configureSession(srcDdl); + executor.configureSession(snkDdl); + ClientResult result = executor.executeStatement(insert); + JobID jobID = result.getJobId(); // wait till the job turns into running status or the test times out - JobStatus jobStatus; - do { - Thread.sleep(2_000L); - jobStatus = jobClient.getJobStatus().get(); - } while (jobStatus != JobStatus.RUNNING); - - Optional<String> savepoint = executor.stopJob(jobId.toString(), true, true); - assertThat(savepoint).isPresent(); + TestUtils.waitUntilAllTasksAreRunning(clusterClient, jobID); + StringData savepointPath = + CollectionUtil.iteratorToList( + executor.executeStatement( + String.format("STOP JOB '%s' WITH SAVEPOINT", jobID))) + .get(0) + .getString(0); + assertThat( + Files.exists( + Paths.get( + URI.create( + Preconditions.checkNotNull(savepointPath) + .toString())))) + .isTrue(); Review Comment: And `stop job` seems not to work ![image](https://user-images.githubusercontent.com/55568005/215242202-5aff4983-da28-49fc-a27e-81579f5063ce.png) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org