This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 73717520cc63df8bd08fd008ac004659b210cfd1
Author: Jiabao Sun <[email protected]>
AuthorDate: Thu Sep 21 17:43:38 2023 +0800

    [FLINK-33000][sql-gateway] OperationManagerTest should utilize 
TestExecutorExtension instead of using a ThreadFactory
---
 .../service/operation/OperationManagerTest.java    | 43 +++++++++++-----------
 1 file changed, 22 insertions(+), 21 deletions(-)

diff --git 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/operation/OperationManagerTest.java
 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/operation/OperationManagerTest.java
index d734ad8afa3..fec988939c6 100644
--- 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/operation/OperationManagerTest.java
+++ 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/operation/OperationManagerTest.java
@@ -30,22 +30,22 @@ import 
org.apache.flink.table.gateway.api.operation.OperationStatus;
 import org.apache.flink.table.gateway.api.results.ResultSet;
 import org.apache.flink.table.gateway.api.results.ResultSetImpl;
 import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
-import org.apache.flink.table.gateway.api.utils.ThreadUtils;
 import org.apache.flink.table.gateway.service.utils.IgnoreExceptionHandler;
 import org.apache.flink.table.gateway.service.utils.SqlCancelException;
 import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
 import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 
-import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
 
 import java.time.Duration;
 import java.util.Collections;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static 
org.apache.flink.table.api.internal.StaticResultProvider.SIMPLE_ROW_DATA_TO_STRING_CONVERTER;
@@ -56,19 +56,21 @@ import static 
org.assertj.core.api.Assertions.assertThatThrownBy;
 /** Test for {@link OperationManager}. */
 class OperationManagerTest {
 
-    private static final ExecutorService EXECUTOR_SERVICE =
-            ThreadUtils.newThreadPool(5, 500, 60_0000, 
"operation-manager-test");
-
     private static OperationManager operationManager;
     private static ResultSet defaultResultSet;
 
-    private final ThreadFactory threadFactory =
-            new ExecutorThreadFactory(
-                    "SqlGatewayService Test Pool", 
IgnoreExceptionHandler.INSTANCE);
+    @RegisterExtension
+    private static final TestExecutorExtension<ExecutorService> 
EXECUTOR_EXTENSION =
+            new TestExecutorExtension<>(
+                    () ->
+                            Executors.newCachedThreadPool(
+                                    new ExecutorThreadFactory(
+                                            "SqlGatewayService Test Pool",
+                                            IgnoreExceptionHandler.INSTANCE)));
 
     @BeforeEach
     void setUp() {
-        operationManager = new OperationManager(EXECUTOR_SERVICE);
+        operationManager = new 
OperationManager(EXECUTOR_EXTENSION.getExecutor());
         defaultResultSet =
                 new ResultSetImpl(
                         PAYLOAD,
@@ -86,11 +88,6 @@ class OperationManagerTest {
         operationManager.close();
     }
 
-    @AfterAll
-    static void cleanUp() {
-        EXECUTOR_SERVICE.shutdown();
-    }
-
     @Test
     void testRunOperationAsynchronously() throws Exception {
         OperationHandle operationHandle = operationManager.submitOperation(() 
-> defaultResultSet);
@@ -127,7 +124,9 @@ class OperationManagerTest {
                             return defaultResultSet;
                         });
 
-        threadFactory.newThread(() -> 
operationManager.cancelOperation(operationHandle)).start();
+        EXECUTOR_EXTENSION
+                .getExecutor()
+                .submit(() -> 
operationManager.cancelOperation(operationHandle));
         operationManager.awaitOperationTermination(operationHandle);
 
         
assertThat(operationManager.getOperationInfo(operationHandle).getStatus())
@@ -164,8 +163,9 @@ class OperationManagerTest {
     void testCloseUninterruptedOperation() throws Exception {
         AtomicReference<Boolean> isRunning = new AtomicReference<>(false);
         for (int i = 0; i < 10; i++) {
-            threadFactory
-                    .newThread(
+            EXECUTOR_EXTENSION
+                    .getExecutor()
+                    .submit(
                             () -> {
                                 operationManager.submitOperation(
                                         () -> {
@@ -174,8 +174,7 @@ class OperationManagerTest {
                                                 isRunning.compareAndSet(false, 
true);
                                             }
                                         });
-                            })
-                    .start();
+                            });
         }
         CommonTestUtils.waitUtil(
                 isRunning::get, Duration.ofSeconds(10), "Failed to start up 
the task.");
@@ -195,7 +194,9 @@ class OperationManagerTest {
                             return defaultResultSet;
                         });
 
-        threadFactory.newThread(() -> 
operationManager.closeOperation(operationHandle)).start();
+        EXECUTOR_EXTENSION
+                .getExecutor()
+                .submit(() -> 
operationManager.closeOperation(operationHandle));
 
         assertThatThrownBy(
                         () -> {

Reply via email to