This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 594010624f8084efd99d6d405b5310ab24013aeb
Author: Shengkai <[email protected]>
AuthorDate: Thu Mar 2 17:57:55 2023 +0800

    [FLINK-31092][sql-gateway] Fix OperationManager can not kill the running 
task by force
---
 .../flink/table/gateway/api/utils/ThreadUtils.java |  4 +-
 .../service/operation/OperationManager.java        | 47 +++++++++++++++++++++-
 .../service/operation/OperationManagerTest.java    | 29 +++++++++++++
 3 files changed, 78 insertions(+), 2 deletions(-)

diff --git 
a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/utils/ThreadUtils.java
 
b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/utils/ThreadUtils.java
index 330d8ed9eb9..9408c61ecbf 100644
--- 
a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/utils/ThreadUtils.java
+++ 
b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/utils/ThreadUtils.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.gateway.api.utils;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,6 +48,7 @@ public class ThreadUtils {
                 poolQueueSize,
                 keepAliveMs,
                 TimeUnit.MILLISECONDS,
-                new SynchronousQueue<>());
+                new SynchronousQueue<>(),
+                new ExecutorThreadFactory(threadPoolName));
     }
 }
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java
index 4e16cb5dd6c..8b239abc771 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.gateway.service.operation;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.gateway.api.operation.OperationHandle;
 import org.apache.flink.table.gateway.api.operation.OperationStatus;
@@ -34,8 +35,11 @@ import 
org.apache.flink.table.gateway.service.utils.SqlExecutionException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.lang.reflect.Field;
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.FutureTask;
@@ -366,7 +370,6 @@ public class OperationManager {
                             String.format(
                                     "Failed to convert the Operation Status 
from %s to %s for %s.",
                                     currentStatus, toStatus, operationHandle);
-                    LOG.error(message);
                     throw new SqlGatewayException(message);
                 }
             } while (!status.compareAndSet(currentStatus, toStatus));
@@ -384,6 +387,7 @@ public class OperationManager {
         private void closeResources() {
             if (invocation != null && !invocation.isDone()) {
                 invocation.cancel(true);
+                stopExecutionByForce(invocation);
                 LOG.debug(String.format("Cancel the operation %s.", 
operationHandle));
             }
 
@@ -400,6 +404,47 @@ public class OperationManager {
             // when status is error.
             updateState(OperationStatus.ERROR);
         }
+
+        private void stopExecutionByForce(FutureTask<?> invocation) {
+            // thread is cleaned async, waiting for a while
+            Deadline deadline = Deadline.fromNow(Duration.ofSeconds(1));
+            while (deadline.hasTimeLeft()) {
+                Optional<Thread> threadOptional = 
getThreadInFuture(invocation);
+                if (!threadOptional.isPresent()) {
+                    // thread has been cleaned up
+                    return;
+                }
+            }
+            Optional<Thread> threadOptional = getThreadInFuture(invocation);
+            if (threadOptional.isPresent()) {
+                // we have to use Thread.stop() here, because this can
+                // guarantee thread to be stopped, even there is some
+                // potential consistent problem, we are fine with it.
+                Thread thread = threadOptional.get();
+                LOG.info(
+                        "\"Future.cancel(true)\" can't cleanup current thread 
{}, using \"Thread.stop()\" instead.",
+                        thread.getName());
+                try {
+                    thread.stop();
+                } catch (Throwable e) {
+                    // catch all errors to project the sqlserver
+                    LOG.error("Failed to stop thread: " + thread.getName(), e);
+                }
+            }
+        }
+
+        private Optional<Thread> getThreadInFuture(FutureTask<?> invocation) {
+            try {
+                Class<?> k = FutureTask.class;
+                Field runnerField = k.getDeclaredField("runner");
+                runnerField.setAccessible(true);
+                Thread t = (Thread) runnerField.get(invocation);
+                return Optional.of(t);
+            } catch (Throwable e) {
+                // can't get thread
+                return Optional.empty();
+            }
+        }
     }
 
     // 
-------------------------------------------------------------------------------------------
diff --git 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/operation/OperationManagerTest.java
 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/operation/OperationManagerTest.java
index b1a1863ef82..e4d95f1bae1 100644
--- 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/operation/OperationManagerTest.java
+++ 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/operation/OperationManagerTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.gateway.service.operation;
 
+import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.core.testutils.FlinkAssertions;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.ResultKind;
@@ -38,10 +39,12 @@ import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 
+import java.time.Duration;
 import java.util.Collections;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static 
org.apache.flink.table.api.internal.StaticResultProvider.SIMPLE_ROW_DATA_TO_STRING_CONVERTER;
 import static 
org.apache.flink.table.gateway.api.results.ResultSet.ResultType.PAYLOAD;
@@ -125,6 +128,32 @@ public class OperationManagerTest {
                 .isEqualTo(OperationStatus.CANCELED);
     }
 
+    @Test
+    public void testCancelOperationByForce() throws Exception {
+        AtomicReference<Throwable> exception = new AtomicReference<>(null);
+        OperationHandle operationHandle =
+                operationManager.submitOperation(
+                        () -> {
+                            try {
+                                // mock cpu busy task that doesn't interrupt 
system call
+                                while (true) {}
+                            } catch (Throwable t) {
+                                exception.set(t);
+                                throw t;
+                            }
+                        });
+
+        threadFactory.newThread(() -> 
operationManager.cancelOperation(operationHandle)).start();
+        operationManager.awaitOperationTermination(operationHandle);
+
+        
assertThat(operationManager.getOperationInfo(operationHandle).getStatus())
+                .isEqualTo(OperationStatus.CANCELED);
+        CommonTestUtils.waitUtil(
+                () -> exception.get() != null,
+                Duration.ofSeconds(10),
+                "Failed to kill the task with infinite loop.");
+    }
+
     @Test
     public void testCloseOperation() throws Exception {
         CountDownLatch endRunningLatch = new CountDownLatch(1);

Reply via email to