This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 24e479571facd07ff129c57cda86c7e34d4637bd Author: 1996fanrui <[email protected]> AuthorDate: Fri Jun 30 13:12:49 2023 +0800 [FLINK-32495][connectors/common] Fix the bug that the shared thread factory causes the source alignment unit test to fail --- .../source/coordinator/SourceCoordinatorContextTest.java | 3 ++- .../source/coordinator/SourceCoordinatorTestBase.java | 15 ++++++--------- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java index dc760aca544..70e2ffc33a4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java @@ -165,7 +165,8 @@ class SourceCoordinatorContextTest extends SourceCoordinatorTestBase { new SourceCoordinatorContext<>( coordinatorExecutorWithExceptionHandler, manualWorkerExecutor, - coordinatorThreadFactory, + new SourceCoordinatorProvider.CoordinatorExecutorThreadFactory( + coordinatorThreadName, operatorCoordinatorContext), operatorCoordinatorContext, new MockSourceSplitSerializer(), splitSplitAssignmentTracker, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java index fb2886f8fb0..8fba35df0d3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java @@ -65,7 +65,7 @@ abstract class SourceCoordinatorTestBase { protected MockOperatorCoordinatorContext operatorCoordinatorContext; // ---- Mocks for the Source Coordinator Context ---- - protected SourceCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorThreadFactory; + protected String coordinatorThreadName; protected SplitAssignmentTracker<MockSourceSplit> splitSplitAssignmentTracker; protected SourceCoordinatorContext<MockSourceSplit> context; @@ -82,10 +82,7 @@ abstract class SourceCoordinatorTestBase { operatorCoordinatorContext = new MockOperatorCoordinatorContext(TEST_OPERATOR_ID, NUM_SUBTASKS); splitSplitAssignmentTracker = new SplitAssignmentTracker<>(); - String coordinatorThreadName = TEST_OPERATOR_ID.toHexString(); - coordinatorThreadFactory = - new SourceCoordinatorProvider.CoordinatorExecutorThreadFactory( - coordinatorThreadName, operatorCoordinatorContext); + coordinatorThreadName = TEST_OPERATOR_ID.toHexString(); sourceCoordinator = getNewSourceCoordinator(); context = sourceCoordinator.getContext(); @@ -215,14 +212,14 @@ abstract class SourceCoordinatorTestBase { protected SourceCoordinatorContext<MockSourceSplit> getNewSourceCoordinatorContext() throws Exception { + SourceCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorThreadFactory = + new SourceCoordinatorProvider.CoordinatorExecutorThreadFactory( + coordinatorThreadName, operatorCoordinatorContext); SourceCoordinatorContext<MockSourceSplit> coordinatorContext = new SourceCoordinatorContext<>( Executors.newScheduledThreadPool(1, coordinatorThreadFactory), Executors.newScheduledThreadPool( - 1, - new ExecutorThreadFactory( - coordinatorThreadFactory.getCoordinatorThreadName() - + "-worker")), + 1, new ExecutorThreadFactory(coordinatorThreadName + "-worker")), coordinatorThreadFactory, operatorCoordinatorContext, new MockSourceSplitSerializer(),
