This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9d7b1663296ef6f85572314a2b26a716981d8bff Author: 1996fanrui <[email protected]> AuthorDate: Fri Jun 30 13:18:00 2023 +0800 [FLINK-32495][connectors/common] Limit the CoordinatorExecutorThreadFactory to be a one-off --- .../source/coordinator/SourceCoordinatorProvider.java | 11 ++++++++++- .../coordinator/SourceCoordinatorProviderTest.java | 16 ++++++++++++++++ 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java index c76a3ec1910..d238e28be5f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java @@ -35,6 +35,8 @@ import java.util.concurrent.Callable; import java.util.concurrent.ThreadFactory; import java.util.function.BiConsumer; +import static org.apache.flink.util.Preconditions.checkState; + /** The provider of {@link SourceCoordinator}. */ public class SourceCoordinatorProvider<SplitT extends SourceSplit> extends RecreateOnResetOperatorCoordinator.Provider { @@ -94,7 +96,10 @@ public class SourceCoordinatorProvider<SplitT extends SourceSplit> coordinatorListeningID); } - /** A thread factory class that provides some helper methods. */ + /** + * A thread factory class that provides some helper methods. Because it is used to check the + * current thread, it is a one-off, do not use this ThreadFactory to create multiple threads. + */ public static class CoordinatorExecutorThreadFactory implements ThreadFactory, Thread.UncaughtExceptionHandler { @@ -130,6 +135,10 @@ public class SourceCoordinatorProvider<SplitT extends SourceSplit> @Override public synchronized Thread newThread(Runnable r) { + checkState( + t == null, + "Please using the new CoordinatorExecutorThreadFactory," + + " this factory cannot new multiple threads."); t = new Thread(r, coordinatorThreadName); t.setContextClassLoader(cl); t.setUncaughtExceptionHandler(this); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java index 348e3e048c7..11fefa5b981 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java @@ -36,6 +36,7 @@ import java.time.Duration; import java.util.concurrent.CompletableFuture; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Unit tests for {@link SourceCoordinatorProvider}. */ class SourceCoordinatorProviderTest { @@ -121,4 +122,19 @@ class SourceCoordinatorProviderTest { Duration.ofSeconds(10L), "The job did not fail before timeout."); } + + @Test + void testCoordinatorExecutorThreadFactoryNewMultipleThread() { + SourceCoordinatorProvider.CoordinatorExecutorThreadFactory + coordinatorExecutorThreadFactory = + new SourceCoordinatorProvider.CoordinatorExecutorThreadFactory( + "test_coordinator_thread", + new MockOperatorCoordinatorContext( + new OperatorID(1234L, 5678L), 3)); + + coordinatorExecutorThreadFactory.newThread(() -> {}); + // coordinatorExecutorThreadFactory cannot create multiple threads. + assertThatThrownBy(() -> coordinatorExecutorThreadFactory.newThread(() -> {})) + .isInstanceOf(IllegalStateException.class); + } }
