This is an automated email from the ASF dual-hosted git repository.
shengkai pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.17 by this push:
new 5dddc0dba2b [FLINK-30978][sql-client] Fix
ExecutorImpl#testInterruptException hangs (#22099)
5dddc0dba2b is described below
commit 5dddc0dba2be20806e67769314eecadf56b87a53
Author: Shengkai <[email protected]>
AuthorDate: Mon Mar 6 18:15:35 2023 +0800
[FLINK-30978][sql-client] Fix ExecutorImpl#testInterruptException hangs
(#22099)
---
.../flink/table/client/gateway/ExecutorImpl.java | 58 ++++++++++++----------
.../table/client/gateway/ExecutorImplITCase.java | 19 ++++---
2 files changed, 46 insertions(+), 31 deletions(-)
diff --git
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java
index 6d9a5206f2b..08b68319860 100644
---
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java
+++
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java
@@ -205,32 +205,40 @@ public class ExecutorImpl implements Executor {
ExecuteStatementHeaders.getInstance(),
new SessionMessageParameters(sessionHandle),
request);
+
// It's possible that the execution is canceled during the submission.
// Close the Operation in background to make sure the execution can
continue.
- getResponse(
- executeStatementResponse,
- e -> {
- executorService.submit(
- () -> {
- try {
- ExecuteStatementResponseBody
executeStatementResponseBody =
- executeStatementResponse.get();
- // close operation in background to make
sure users can not
- // interrupt the execution.
- closeOperationAsync(
- getOperationHandle(
-
executeStatementResponseBody
-
::getOperationHandle));
- } catch (Exception newException) {
- // ignore
- }
- });
- return new SqlExecutionException("Interrupted to get
response.", e);
- });
-
OperationHandle operationHandle =
getOperationHandle(
- () ->
getResponse(executeStatementResponse).getOperationHandle());
+ () ->
+ getResponse(
+ executeStatementResponse,
+ e -> {
+ executorService.submit(
+ () -> {
+ try {
+
ExecuteStatementResponseBody
+
executeStatementResponseBody =
+
executeStatementResponse
+
.get();
+ // close
operation in background
+ // to make
sure users can not
+ //
interrupt the execution.
+
closeOperationAsync(
+
getOperationHandle(
+
executeStatementResponseBody
+
::getOperationHandle));
+ } catch
(Exception newException) {
+
e.addSuppressed(newException);
+ LOG.error(
+
"Failed to cancel the interrupted exception.",
+ e);
+ }
+ });
+ return new
SqlExecutionException(
+ "Interrupted to
get response.", e);
+ })
+ .getOperationHandle());
FetchResultsResponseBody fetchResultsResponse =
fetchUtilResultsReady(operationHandle);
ResultInfo firstResult = fetchResultsResponse.getResults();
@@ -316,7 +324,7 @@ public class ExecutorImpl implements Executor {
return getFetchResultResponse(
operationHandle,
token,
- true,
+ false,
e -> {
sendRequest(
CancelOperationHeaders.getInstance(),
@@ -368,7 +376,7 @@ public class ExecutorImpl implements Executor {
getFetchResultResponse(
operationHandle,
0L,
- false,
+ true,
e -> {
// CliClient will not close the results. Try
best to close it.
closeOperationAsync(operationHandle);
@@ -385,7 +393,7 @@ public class ExecutorImpl implements Executor {
boolean fetchResultWithInterval,
Function<InterruptedException, SqlExecutionException>
interruptedExceptionHandler) {
try {
- if (!fetchResultWithInterval) {
+ if (fetchResultWithInterval) {
Thread.sleep(100);
}
return sendRequest(
diff --git
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/ExecutorImplITCase.java
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/ExecutorImplITCase.java
index 88447264246..8b4c7d24be9 100644
---
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/ExecutorImplITCase.java
+++
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/ExecutorImplITCase.java
@@ -518,7 +518,7 @@ class ExecutorImplITCase {
testInterrupting(
executor -> {
try (StatementResult result =
-
executor.executeStatement(BlockPhase.EXECUTION.name())) {
+
executor.executeStatement(BlockPhase.FETCHING.name())) {
// trigger to fetch again
result.hasNext();
}
@@ -610,18 +610,25 @@ class ExecutorImplITCase {
private void testInterrupting(Consumer<Executor> task) throws Exception {
try (Executor executor = createTestServiceExecutor()) {
- Thread t = threadFactory.newThread(() -> task.accept(executor));
- t.start();
-
TestSqlGatewayService service =
(TestSqlGatewayService)
TEST_SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getSqlGatewayService();
+ Thread t =
+ threadFactory.newThread(
+ () -> {
+ try {
+ task.accept(executor);
+ } finally {
+ // notify server to return results until
the executor finishes
+ // exception processing.
+ service.latch.countDown();
+ }
+ });
+ t.start();
CommonTestUtils.waitUntilCondition(() -> service.isBlocking, 100L);
// interrupt the submission
t.interrupt();
- // notify service return handle
- service.latch.countDown();
CommonTestUtils.waitUntilCondition(() -> service.isClosed, 100L);
}