This is an automated email from the ASF dual-hosted git repository.
fanrui pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.17 by this push:
new b4e02ab2a27 [FLINK-32495][connectors/common] Fix the bug that the
shared thread factory causes the source alignment unit test to fail
b4e02ab2a27 is described below
commit b4e02ab2a27ed8621f0fa0c759b2f58f14d770d4
Author: 1996fanrui <[email protected]>
AuthorDate: Fri Jun 30 13:12:49 2023 +0800
[FLINK-32495][connectors/common] Fix the bug that the shared thread factory
causes the source alignment unit test to fail
---
.../source/coordinator/SourceCoordinatorContextTest.java | 3 ++-
.../source/coordinator/SourceCoordinatorTestBase.java | 15 ++++++---------
2 files changed, 8 insertions(+), 10 deletions(-)
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
index dc760aca544..70e2ffc33a4 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
@@ -165,7 +165,8 @@ class SourceCoordinatorContextTest extends
SourceCoordinatorTestBase {
new SourceCoordinatorContext<>(
coordinatorExecutorWithExceptionHandler,
manualWorkerExecutor,
- coordinatorThreadFactory,
+ new
SourceCoordinatorProvider.CoordinatorExecutorThreadFactory(
+ coordinatorThreadName,
operatorCoordinatorContext),
operatorCoordinatorContext,
new MockSourceSplitSerializer(),
splitSplitAssignmentTracker,
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
index fb2886f8fb0..8fba35df0d3 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
@@ -65,7 +65,7 @@ abstract class SourceCoordinatorTestBase {
protected MockOperatorCoordinatorContext operatorCoordinatorContext;
// ---- Mocks for the Source Coordinator Context ----
- protected SourceCoordinatorProvider.CoordinatorExecutorThreadFactory
coordinatorThreadFactory;
+ protected String coordinatorThreadName;
protected SplitAssignmentTracker<MockSourceSplit>
splitSplitAssignmentTracker;
protected SourceCoordinatorContext<MockSourceSplit> context;
@@ -82,10 +82,7 @@ abstract class SourceCoordinatorTestBase {
operatorCoordinatorContext =
new MockOperatorCoordinatorContext(TEST_OPERATOR_ID,
NUM_SUBTASKS);
splitSplitAssignmentTracker = new SplitAssignmentTracker<>();
- String coordinatorThreadName = TEST_OPERATOR_ID.toHexString();
- coordinatorThreadFactory =
- new SourceCoordinatorProvider.CoordinatorExecutorThreadFactory(
- coordinatorThreadName, operatorCoordinatorContext);
+ coordinatorThreadName = TEST_OPERATOR_ID.toHexString();
sourceCoordinator = getNewSourceCoordinator();
context = sourceCoordinator.getContext();
@@ -215,14 +212,14 @@ abstract class SourceCoordinatorTestBase {
protected SourceCoordinatorContext<MockSourceSplit>
getNewSourceCoordinatorContext()
throws Exception {
+ SourceCoordinatorProvider.CoordinatorExecutorThreadFactory
coordinatorThreadFactory =
+ new SourceCoordinatorProvider.CoordinatorExecutorThreadFactory(
+ coordinatorThreadName, operatorCoordinatorContext);
SourceCoordinatorContext<MockSourceSplit> coordinatorContext =
new SourceCoordinatorContext<>(
Executors.newScheduledThreadPool(1,
coordinatorThreadFactory),
Executors.newScheduledThreadPool(
- 1,
- new ExecutorThreadFactory(
-
coordinatorThreadFactory.getCoordinatorThreadName()
- + "-worker")),
+ 1, new
ExecutorThreadFactory(coordinatorThreadName + "-worker")),
coordinatorThreadFactory,
operatorCoordinatorContext,
new MockSourceSplitSerializer(),