This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch release-1.17 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 2ad7d97c758622bd095303c8bc5b793f20c7612e Author: Jiabao Sun <[email protected]> AuthorDate: Thu Sep 21 15:30:45 2023 +0800 [FLINK-33000][sql-gateway] SqlGatewayServiceITCase should utilize TestExecutorExtension instead of using a ThreadFactory --- .../gateway/service/SqlGatewayServiceITCase.java | 59 +++++++++++++--------- 1 file changed, 34 insertions(+), 25 deletions(-) diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java index fafdcb30857..64b91ff9df7 100644 --- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java @@ -66,6 +66,7 @@ import org.apache.flink.table.planner.utils.TableFunc0; import org.apache.flink.test.junit5.InjectClusterClient; import org.apache.flink.test.junit5.MiniClusterExtension; import org.apache.flink.test.util.TestUtils; +import org.apache.flink.testutils.executor.TestExecutorExtension; import org.apache.flink.util.CollectionUtil; import org.apache.flink.util.UserClassLoaderJarTestUtils; import org.apache.flink.util.concurrent.ExecutorThreadFactory; @@ -96,9 +97,10 @@ import java.util.Random; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.FutureTask; import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicReference; import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches; @@ -125,7 +127,7 @@ public class SqlGatewayServiceITCase { @RegisterExtension @Order(1) - public static final MiniClusterExtension MINI_CLUSTER = + static final MiniClusterExtension MINI_CLUSTER = new MiniClusterExtension( new MiniClusterResourceConfiguration.Builder() .setNumberTaskManagers(2) @@ -133,9 +135,19 @@ public class SqlGatewayServiceITCase { @RegisterExtension @Order(2) - public static final SqlGatewayServiceExtension SQL_GATEWAY_SERVICE_EXTENSION = + static final SqlGatewayServiceExtension SQL_GATEWAY_SERVICE_EXTENSION = new SqlGatewayServiceExtension(MINI_CLUSTER::getClientConfiguration); + @RegisterExtension + @Order(3) + static final TestExecutorExtension<ExecutorService> EXECUTOR_EXTENSION = + new TestExecutorExtension<>( + () -> + Executors.newCachedThreadPool( + new ExecutorThreadFactory( + "SqlGatewayService Test Pool", + IgnoreExceptionHandler.INSTANCE))); + private static SessionManagerImpl sessionManager; private static SqlGatewayServiceImpl service; @@ -143,9 +155,6 @@ public class SqlGatewayServiceITCase { SessionEnvironment.newBuilder() .setSessionEndpointVersion(MockedEndpointVersion.V1) .build(); - private final ThreadFactory threadFactory = - new ExecutorThreadFactory( - "SqlGatewayService Test Pool", IgnoreExceptionHandler.INSTANCE); @BeforeAll public static void setUp() { @@ -786,12 +795,10 @@ public class SqlGatewayServiceITCase { service.getSession(sessionHandle) .getOperationManager() .getOperation(operationHandle)); - threadFactory - .newThread(() -> service.cancelOperation(sessionHandle, operationHandle)) - .start(); - threadFactory - .newThread(() -> service.closeOperation(sessionHandle, operationHandle)) - .start(); + + ExecutorService executor = EXECUTOR_EXTENSION.getExecutor(); + executor.submit(() -> service.cancelOperation(sessionHandle, operationHandle)); + executor.submit(() -> service.closeOperation(sessionHandle, operationHandle)); } CommonTestUtils.waitUtil( @@ -813,16 +820,16 @@ public class SqlGatewayServiceITCase { int submitThreadsNum = 100; CountDownLatch latch = new CountDownLatch(submitThreadsNum); for (int i = 0; i < submitThreadsNum; i++) { - threadFactory - .newThread( + EXECUTOR_EXTENSION + .getExecutor() + .submit( () -> { try { submitDefaultOperation(sessionHandle, () -> {}); } finally { latch.countDown(); } - }) - .start(); + }); } manager.close(); latch.await(); @@ -836,8 +843,9 @@ public class SqlGatewayServiceITCase { CountDownLatch terminateRunning = new CountDownLatch(1); SessionHandle sessionHandle = service.openSession(defaultSessionEnvironment); for (int i = 0; i < count; i++) { - threadFactory - .newThread( + EXECUTOR_EXTENSION + .getExecutor() + .submit( () -> service.submitOperation( sessionHandle, @@ -845,8 +853,7 @@ public class SqlGatewayServiceITCase { startRunning.countDown(); terminateRunning.await(); return getDefaultResultSet(); - })) - .start(); + })); } startRunning.await(); service.getSession(sessionHandle).getOperationManager().close(); @@ -1047,7 +1054,8 @@ public class SqlGatewayServiceITCase { schemaFetcherIsRunning.countDown(); return service.getOperationResultSchema(sessionHandle, operationHandle); }); - threadFactory.newThread(task).start(); + + EXECUTOR_EXTENSION.getExecutor().submit(task); schemaFetcherIsRunning.await(); operationIsRunning.countDown(); @@ -1061,16 +1069,17 @@ public class SqlGatewayServiceITCase { Condition<String> condition) { List<RowData> actual = new ArrayList<>(); - threadFactory - .newThread( + + EXECUTOR_EXTENSION + .getExecutor() + .submit( () -> { try { cancelOrClose.run(); } catch (Exception e) { // ignore } - }) - .start(); + }); assertThatThrownBy( () -> {
