This is an automated email from the ASF dual-hosted git repository. shengkai pushed a commit to branch release-1.17 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 594010624f8084efd99d6d405b5310ab24013aeb Author: Shengkai <[email protected]> AuthorDate: Thu Mar 2 17:57:55 2023 +0800 [FLINK-31092][sql-gateway] Fix OperationManager can not kill the running task by force --- .../flink/table/gateway/api/utils/ThreadUtils.java | 4 +- .../service/operation/OperationManager.java | 47 +++++++++++++++++++++- .../service/operation/OperationManagerTest.java | 29 +++++++++++++ 3 files changed, 78 insertions(+), 2 deletions(-) diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/utils/ThreadUtils.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/utils/ThreadUtils.java index 330d8ed9eb9..9408c61ecbf 100644 --- a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/utils/ThreadUtils.java +++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/utils/ThreadUtils.java @@ -19,6 +19,7 @@ package org.apache.flink.table.gateway.api.utils; import org.apache.flink.annotation.Internal; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,6 +48,7 @@ public class ThreadUtils { poolQueueSize, keepAliveMs, TimeUnit.MILLISECONDS, - new SynchronousQueue<>()); + new SynchronousQueue<>(), + new ExecutorThreadFactory(threadPoolName)); } } diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java index 4e16cb5dd6c..8b239abc771 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java @@ -20,6 +20,7 @@ package org.apache.flink.table.gateway.service.operation; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.time.Deadline; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.gateway.api.operation.OperationHandle; import org.apache.flink.table.gateway.api.operation.OperationStatus; @@ -34,8 +35,11 @@ import org.apache.flink.table.gateway.service.utils.SqlExecutionException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.lang.reflect.Field; +import java.time.Duration; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.FutureTask; @@ -366,7 +370,6 @@ public class OperationManager { String.format( "Failed to convert the Operation Status from %s to %s for %s.", currentStatus, toStatus, operationHandle); - LOG.error(message); throw new SqlGatewayException(message); } } while (!status.compareAndSet(currentStatus, toStatus)); @@ -384,6 +387,7 @@ public class OperationManager { private void closeResources() { if (invocation != null && !invocation.isDone()) { invocation.cancel(true); + stopExecutionByForce(invocation); LOG.debug(String.format("Cancel the operation %s.", operationHandle)); } @@ -400,6 +404,47 @@ public class OperationManager { // when status is error. updateState(OperationStatus.ERROR); } + + private void stopExecutionByForce(FutureTask<?> invocation) { + // thread is cleaned async, waiting for a while + Deadline deadline = Deadline.fromNow(Duration.ofSeconds(1)); + while (deadline.hasTimeLeft()) { + Optional<Thread> threadOptional = getThreadInFuture(invocation); + if (!threadOptional.isPresent()) { + // thread has been cleaned up + return; + } + } + Optional<Thread> threadOptional = getThreadInFuture(invocation); + if (threadOptional.isPresent()) { + // we have to use Thread.stop() here, because this can + // guarantee thread to be stopped, even there is some + // potential consistent problem, we are fine with it. + Thread thread = threadOptional.get(); + LOG.info( + "\"Future.cancel(true)\" can't cleanup current thread {}, using \"Thread.stop()\" instead.", + thread.getName()); + try { + thread.stop(); + } catch (Throwable e) { + // catch all errors to project the sqlserver + LOG.error("Failed to stop thread: " + thread.getName(), e); + } + } + } + + private Optional<Thread> getThreadInFuture(FutureTask<?> invocation) { + try { + Class<?> k = FutureTask.class; + Field runnerField = k.getDeclaredField("runner"); + runnerField.setAccessible(true); + Thread t = (Thread) runnerField.get(invocation); + return Optional.of(t); + } catch (Throwable e) { + // can't get thread + return Optional.empty(); + } + } } // ------------------------------------------------------------------------------------------- diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/operation/OperationManagerTest.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/operation/OperationManagerTest.java index b1a1863ef82..e4d95f1bae1 100644 --- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/operation/OperationManagerTest.java +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/operation/OperationManagerTest.java @@ -18,6 +18,7 @@ package org.apache.flink.table.gateway.service.operation; +import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.core.testutils.FlinkAssertions; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.ResultKind; @@ -38,10 +39,12 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import java.time.Duration; import java.util.Collections; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicReference; import static org.apache.flink.table.api.internal.StaticResultProvider.SIMPLE_ROW_DATA_TO_STRING_CONVERTER; import static org.apache.flink.table.gateway.api.results.ResultSet.ResultType.PAYLOAD; @@ -125,6 +128,32 @@ public class OperationManagerTest { .isEqualTo(OperationStatus.CANCELED); } + @Test + public void testCancelOperationByForce() throws Exception { + AtomicReference<Throwable> exception = new AtomicReference<>(null); + OperationHandle operationHandle = + operationManager.submitOperation( + () -> { + try { + // mock cpu busy task that doesn't interrupt system call + while (true) {} + } catch (Throwable t) { + exception.set(t); + throw t; + } + }); + + threadFactory.newThread(() -> operationManager.cancelOperation(operationHandle)).start(); + operationManager.awaitOperationTermination(operationHandle); + + assertThat(operationManager.getOperationInfo(operationHandle).getStatus()) + .isEqualTo(OperationStatus.CANCELED); + CommonTestUtils.waitUtil( + () -> exception.get() != null, + Duration.ofSeconds(10), + "Failed to kill the task with infinite loop."); + } + @Test public void testCloseOperation() throws Exception { CountDownLatch endRunningLatch = new CountDownLatch(1);
