This is an automated email from the ASF dual-hosted git repository. shengkai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 73717520cc63df8bd08fd008ac004659b210cfd1 Author: Jiabao Sun <[email protected]> AuthorDate: Thu Sep 21 17:43:38 2023 +0800 [FLINK-33000][sql-gateway] OperationManagerTest should utilize TestExecutorExtension instead of using a ThreadFactory --- .../service/operation/OperationManagerTest.java | 43 +++++++++++----------- 1 file changed, 22 insertions(+), 21 deletions(-) diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/operation/OperationManagerTest.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/operation/OperationManagerTest.java index d734ad8afa3..fec988939c6 100644 --- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/operation/OperationManagerTest.java +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/operation/OperationManagerTest.java @@ -30,22 +30,22 @@ import org.apache.flink.table.gateway.api.operation.OperationStatus; import org.apache.flink.table.gateway.api.results.ResultSet; import org.apache.flink.table.gateway.api.results.ResultSetImpl; import org.apache.flink.table.gateway.api.utils.SqlGatewayException; -import org.apache.flink.table.gateway.api.utils.ThreadUtils; import org.apache.flink.table.gateway.service.utils.IgnoreExceptionHandler; import org.apache.flink.table.gateway.service.utils.SqlCancelException; import org.apache.flink.table.gateway.service.utils.SqlExecutionException; +import org.apache.flink.testutils.executor.TestExecutorExtension; import org.apache.flink.util.concurrent.ExecutorThreadFactory; -import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; import java.time.Duration; import java.util.Collections; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; -import java.util.concurrent.ThreadFactory; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicReference; import static org.apache.flink.table.api.internal.StaticResultProvider.SIMPLE_ROW_DATA_TO_STRING_CONVERTER; @@ -56,19 +56,21 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Test for {@link OperationManager}. */ class OperationManagerTest { - private static final ExecutorService EXECUTOR_SERVICE = - ThreadUtils.newThreadPool(5, 500, 60_0000, "operation-manager-test"); - private static OperationManager operationManager; private static ResultSet defaultResultSet; - private final ThreadFactory threadFactory = - new ExecutorThreadFactory( - "SqlGatewayService Test Pool", IgnoreExceptionHandler.INSTANCE); + @RegisterExtension + private static final TestExecutorExtension<ExecutorService> EXECUTOR_EXTENSION = + new TestExecutorExtension<>( + () -> + Executors.newCachedThreadPool( + new ExecutorThreadFactory( + "SqlGatewayService Test Pool", + IgnoreExceptionHandler.INSTANCE))); @BeforeEach void setUp() { - operationManager = new OperationManager(EXECUTOR_SERVICE); + operationManager = new OperationManager(EXECUTOR_EXTENSION.getExecutor()); defaultResultSet = new ResultSetImpl( PAYLOAD, @@ -86,11 +88,6 @@ class OperationManagerTest { operationManager.close(); } - @AfterAll - static void cleanUp() { - EXECUTOR_SERVICE.shutdown(); - } - @Test void testRunOperationAsynchronously() throws Exception { OperationHandle operationHandle = operationManager.submitOperation(() -> defaultResultSet); @@ -127,7 +124,9 @@ class OperationManagerTest { return defaultResultSet; }); - threadFactory.newThread(() -> operationManager.cancelOperation(operationHandle)).start(); + EXECUTOR_EXTENSION + .getExecutor() + .submit(() -> operationManager.cancelOperation(operationHandle)); operationManager.awaitOperationTermination(operationHandle); assertThat(operationManager.getOperationInfo(operationHandle).getStatus()) @@ -164,8 +163,9 @@ class OperationManagerTest { void testCloseUninterruptedOperation() throws Exception { AtomicReference<Boolean> isRunning = new AtomicReference<>(false); for (int i = 0; i < 10; i++) { - threadFactory - .newThread( + EXECUTOR_EXTENSION + .getExecutor() + .submit( () -> { operationManager.submitOperation( () -> { @@ -174,8 +174,7 @@ class OperationManagerTest { isRunning.compareAndSet(false, true); } }); - }) - .start(); + }); } CommonTestUtils.waitUtil( isRunning::get, Duration.ofSeconds(10), "Failed to start up the task."); @@ -195,7 +194,9 @@ class OperationManagerTest { return defaultResultSet; }); - threadFactory.newThread(() -> operationManager.closeOperation(operationHandle)).start(); + EXECUTOR_EXTENSION + .getExecutor() + .submit(() -> operationManager.closeOperation(operationHandle)); assertThatThrownBy( () -> {
