This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.17 by this push:
     new 5dddc0dba2b [FLINK-30978][sql-client] Fix 
ExecutorImpl#testInterruptException hangs (#22099)
5dddc0dba2b is described below

commit 5dddc0dba2be20806e67769314eecadf56b87a53
Author: Shengkai <[email protected]>
AuthorDate: Mon Mar 6 18:15:35 2023 +0800

    [FLINK-30978][sql-client] Fix ExecutorImpl#testInterruptException hangs 
(#22099)
---
 .../flink/table/client/gateway/ExecutorImpl.java   | 58 ++++++++++++----------
 .../table/client/gateway/ExecutorImplITCase.java   | 19 ++++---
 2 files changed, 46 insertions(+), 31 deletions(-)

diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java
index 6d9a5206f2b..08b68319860 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java
@@ -205,32 +205,40 @@ public class ExecutorImpl implements Executor {
                         ExecuteStatementHeaders.getInstance(),
                         new SessionMessageParameters(sessionHandle),
                         request);
+
         // It's possible that the execution is canceled during the submission.
         // Close the Operation in background to make sure the execution can 
continue.
-        getResponse(
-                executeStatementResponse,
-                e -> {
-                    executorService.submit(
-                            () -> {
-                                try {
-                                    ExecuteStatementResponseBody 
executeStatementResponseBody =
-                                            executeStatementResponse.get();
-                                    // close operation in background to make 
sure users can not
-                                    // interrupt the execution.
-                                    closeOperationAsync(
-                                            getOperationHandle(
-                                                    
executeStatementResponseBody
-                                                            
::getOperationHandle));
-                                } catch (Exception newException) {
-                                    // ignore
-                                }
-                            });
-                    return new SqlExecutionException("Interrupted to get 
response.", e);
-                });
-
         OperationHandle operationHandle =
                 getOperationHandle(
-                        () -> 
getResponse(executeStatementResponse).getOperationHandle());
+                        () ->
+                                getResponse(
+                                                executeStatementResponse,
+                                                e -> {
+                                                    executorService.submit(
+                                                            () -> {
+                                                                try {
+                                                                    
ExecuteStatementResponseBody
+                                                                            
executeStatementResponseBody =
+                                                                               
     executeStatementResponse
+                                                                               
             .get();
+                                                                    // close 
operation in background
+                                                                    // to make 
sure users can not
+                                                                    // 
interrupt the execution.
+                                                                    
closeOperationAsync(
+                                                                            
getOperationHandle(
+                                                                               
     executeStatementResponseBody
+                                                                               
             ::getOperationHandle));
+                                                                } catch 
(Exception newException) {
+                                                                    
e.addSuppressed(newException);
+                                                                    LOG.error(
+                                                                            
"Failed to cancel the interrupted exception.",
+                                                                            e);
+                                                                }
+                                                            });
+                                                    return new 
SqlExecutionException(
+                                                            "Interrupted to 
get response.", e);
+                                                })
+                                        .getOperationHandle());
         FetchResultsResponseBody fetchResultsResponse = 
fetchUtilResultsReady(operationHandle);
         ResultInfo firstResult = fetchResultsResponse.getResults();
 
@@ -316,7 +324,7 @@ public class ExecutorImpl implements Executor {
             return getFetchResultResponse(
                     operationHandle,
                     token,
-                    true,
+                    false,
                     e -> {
                         sendRequest(
                                 CancelOperationHeaders.getInstance(),
@@ -368,7 +376,7 @@ public class ExecutorImpl implements Executor {
                     getFetchResultResponse(
                             operationHandle,
                             0L,
-                            false,
+                            true,
                             e -> {
                                 // CliClient will not close the results. Try 
best to close it.
                                 closeOperationAsync(operationHandle);
@@ -385,7 +393,7 @@ public class ExecutorImpl implements Executor {
             boolean fetchResultWithInterval,
             Function<InterruptedException, SqlExecutionException> 
interruptedExceptionHandler) {
         try {
-            if (!fetchResultWithInterval) {
+            if (fetchResultWithInterval) {
                 Thread.sleep(100);
             }
             return sendRequest(
diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/ExecutorImplITCase.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/ExecutorImplITCase.java
index 88447264246..8b4c7d24be9 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/ExecutorImplITCase.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/ExecutorImplITCase.java
@@ -518,7 +518,7 @@ class ExecutorImplITCase {
         testInterrupting(
                 executor -> {
                     try (StatementResult result =
-                            
executor.executeStatement(BlockPhase.EXECUTION.name())) {
+                            
executor.executeStatement(BlockPhase.FETCHING.name())) {
                         // trigger to fetch again
                         result.hasNext();
                     }
@@ -610,18 +610,25 @@ class ExecutorImplITCase {
 
     private void testInterrupting(Consumer<Executor> task) throws Exception {
         try (Executor executor = createTestServiceExecutor()) {
-            Thread t = threadFactory.newThread(() -> task.accept(executor));
-            t.start();
-
             TestSqlGatewayService service =
                     (TestSqlGatewayService)
                             
TEST_SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getSqlGatewayService();
+            Thread t =
+                    threadFactory.newThread(
+                            () -> {
+                                try {
+                                    task.accept(executor);
+                                } finally {
+                                    // notify server to return results until 
the executor finishes
+                                    // exception processing.
+                                    service.latch.countDown();
+                                }
+                            });
+            t.start();
             CommonTestUtils.waitUntilCondition(() -> service.isBlocking, 100L);
 
             // interrupt the submission
             t.interrupt();
-            // notify service return handle
-            service.latch.countDown();
 
             CommonTestUtils.waitUntilCondition(() -> service.isClosed, 100L);
         }

Reply via email to