This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 24e479571facd07ff129c57cda86c7e34d4637bd
Author: 1996fanrui <[email protected]>
AuthorDate: Fri Jun 30 13:12:49 2023 +0800

    [FLINK-32495][connectors/common] Fix the bug that the shared thread factory 
causes the source alignment unit test to fail
---
 .../source/coordinator/SourceCoordinatorContextTest.java  |  3 ++-
 .../source/coordinator/SourceCoordinatorTestBase.java     | 15 ++++++---------
 2 files changed, 8 insertions(+), 10 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
index dc760aca544..70e2ffc33a4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
@@ -165,7 +165,8 @@ class SourceCoordinatorContextTest extends 
SourceCoordinatorTestBase {
                 new SourceCoordinatorContext<>(
                         coordinatorExecutorWithExceptionHandler,
                         manualWorkerExecutor,
-                        coordinatorThreadFactory,
+                        new 
SourceCoordinatorProvider.CoordinatorExecutorThreadFactory(
+                                coordinatorThreadName, 
operatorCoordinatorContext),
                         operatorCoordinatorContext,
                         new MockSourceSplitSerializer(),
                         splitSplitAssignmentTracker,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
index fb2886f8fb0..8fba35df0d3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
@@ -65,7 +65,7 @@ abstract class SourceCoordinatorTestBase {
     protected MockOperatorCoordinatorContext operatorCoordinatorContext;
 
     // ---- Mocks for the Source Coordinator Context ----
-    protected SourceCoordinatorProvider.CoordinatorExecutorThreadFactory 
coordinatorThreadFactory;
+    protected String coordinatorThreadName;
     protected SplitAssignmentTracker<MockSourceSplit> 
splitSplitAssignmentTracker;
     protected SourceCoordinatorContext<MockSourceSplit> context;
 
@@ -82,10 +82,7 @@ abstract class SourceCoordinatorTestBase {
         operatorCoordinatorContext =
                 new MockOperatorCoordinatorContext(TEST_OPERATOR_ID, 
NUM_SUBTASKS);
         splitSplitAssignmentTracker = new SplitAssignmentTracker<>();
-        String coordinatorThreadName = TEST_OPERATOR_ID.toHexString();
-        coordinatorThreadFactory =
-                new SourceCoordinatorProvider.CoordinatorExecutorThreadFactory(
-                        coordinatorThreadName, operatorCoordinatorContext);
+        coordinatorThreadName = TEST_OPERATOR_ID.toHexString();
 
         sourceCoordinator = getNewSourceCoordinator();
         context = sourceCoordinator.getContext();
@@ -215,14 +212,14 @@ abstract class SourceCoordinatorTestBase {
 
     protected SourceCoordinatorContext<MockSourceSplit> 
getNewSourceCoordinatorContext()
             throws Exception {
+        SourceCoordinatorProvider.CoordinatorExecutorThreadFactory 
coordinatorThreadFactory =
+                new SourceCoordinatorProvider.CoordinatorExecutorThreadFactory(
+                        coordinatorThreadName, operatorCoordinatorContext);
         SourceCoordinatorContext<MockSourceSplit> coordinatorContext =
                 new SourceCoordinatorContext<>(
                         Executors.newScheduledThreadPool(1, 
coordinatorThreadFactory),
                         Executors.newScheduledThreadPool(
-                                1,
-                                new ExecutorThreadFactory(
-                                        
coordinatorThreadFactory.getCoordinatorThreadName()
-                                                + "-worker")),
+                                1, new 
ExecutorThreadFactory(coordinatorThreadName + "-worker")),
                         coordinatorThreadFactory,
                         operatorCoordinatorContext,
                         new MockSourceSplitSerializer(),

Reply via email to