This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9d7b1663296ef6f85572314a2b26a716981d8bff
Author: 1996fanrui <[email protected]>
AuthorDate: Fri Jun 30 13:18:00 2023 +0800

    [FLINK-32495][connectors/common] Limit the CoordinatorExecutorThreadFactory 
to be a one-off
---
 .../source/coordinator/SourceCoordinatorProvider.java    | 11 ++++++++++-
 .../coordinator/SourceCoordinatorProviderTest.java       | 16 ++++++++++++++++
 2 files changed, 26 insertions(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
index c76a3ec1910..d238e28be5f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
@@ -35,6 +35,8 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ThreadFactory;
 import java.util.function.BiConsumer;
 
+import static org.apache.flink.util.Preconditions.checkState;
+
 /** The provider of {@link SourceCoordinator}. */
 public class SourceCoordinatorProvider<SplitT extends SourceSplit>
         extends RecreateOnResetOperatorCoordinator.Provider {
@@ -94,7 +96,10 @@ public class SourceCoordinatorProvider<SplitT extends 
SourceSplit>
                 coordinatorListeningID);
     }
 
-    /** A thread factory class that provides some helper methods. */
+    /**
+     * A thread factory class that provides some helper methods. Because it is 
used to check the
+     * current thread, it is a one-off, do not use this ThreadFactory to 
create multiple threads.
+     */
     public static class CoordinatorExecutorThreadFactory
             implements ThreadFactory, Thread.UncaughtExceptionHandler {
 
@@ -130,6 +135,10 @@ public class SourceCoordinatorProvider<SplitT extends 
SourceSplit>
 
         @Override
         public synchronized Thread newThread(Runnable r) {
+            checkState(
+                    t == null,
+                    "Please using the new CoordinatorExecutorThreadFactory,"
+                            + " this factory cannot new multiple threads.");
             t = new Thread(r, coordinatorThreadName);
             t.setContextClassLoader(cl);
             t.setUncaughtExceptionHandler(this);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java
index 348e3e048c7..11fefa5b981 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java
@@ -36,6 +36,7 @@ import java.time.Duration;
 import java.util.concurrent.CompletableFuture;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Unit tests for {@link SourceCoordinatorProvider}. */
 class SourceCoordinatorProviderTest {
@@ -121,4 +122,19 @@ class SourceCoordinatorProviderTest {
                 Duration.ofSeconds(10L),
                 "The job did not fail before timeout.");
     }
+
+    @Test
+    void testCoordinatorExecutorThreadFactoryNewMultipleThread() {
+        SourceCoordinatorProvider.CoordinatorExecutorThreadFactory
+                coordinatorExecutorThreadFactory =
+                        new 
SourceCoordinatorProvider.CoordinatorExecutorThreadFactory(
+                                "test_coordinator_thread",
+                                new MockOperatorCoordinatorContext(
+                                        new OperatorID(1234L, 5678L), 3));
+
+        coordinatorExecutorThreadFactory.newThread(() -> {});
+        // coordinatorExecutorThreadFactory cannot create multiple threads.
+        assertThatThrownBy(() -> coordinatorExecutorThreadFactory.newThread(() 
-> {}))
+                .isInstanceOf(IllegalStateException.class);
+    }
 }

Reply via email to