This is an automated email from the ASF dual-hosted git repository.
lihaosky pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 6d47db1d541 [FLINK-39914][runtime] Fix flaky
TaskDeploymentDescriptorFactoryTest#testHybridVertexFinish caused by async TDD
creation (#28398)
6d47db1d541 is described below
commit 6d47db1d54147eb0da70d5fd39697910ae3aea9f
Author: Chan hae OH <[email protected]>
AuthorDate: Sun Jun 14 04:10:14 2026 +0900
[FLINK-39914][runtime] Fix flaky
TaskDeploymentDescriptorFactoryTest#testHybridVertexFinish caused by async TDD
creation (#28398)
---
.../TaskDeploymentDescriptorFactoryTest.java | 20 +++++++++++++++-----
1 file changed, 15 insertions(+), 5 deletions(-)
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactoryTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactoryTest.java
index 6960ec3ff67..e34760696c3 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactoryTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactoryTest.java
@@ -35,6 +35,7 @@ import
org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.executiongraph.MarkPartitionFinishedStrategy;
import
org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder;
+import org.apache.flink.runtime.executiongraph.utils.ExecutionUtils;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
@@ -49,6 +50,7 @@ import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
+import org.apache.flink.util.Preconditions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
@@ -58,6 +60,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import static
org.apache.flink.configuration.JobManagerOptions.HybridPartitionDataConsumeConstraint.ONLY_FINISHED_PRODUCERS;
@@ -221,18 +224,25 @@ class TaskDeploymentDescriptorFactoryTest {
ResultPartitionType.HYBRID_FULL);
JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(producer,
consumer);
+
SchedulerBase scheduler =
new DefaultSchedulerBuilder(
jobGraph,
-
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
+
ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(
+ EXECUTOR_RESOURCE.getExecutor()),
EXECUTOR_RESOURCE.getExecutor())
.setHybridPartitionDataConsumeConstraint(ONLY_FINISHED_PRODUCERS)
.buildAdaptiveBatchJobScheduler();
- scheduler.startScheduling();
+ CompletableFuture.runAsync(scheduler::startScheduling,
EXECUTOR_RESOURCE.getExecutor())
+ .join();
+ // Barrier: ensure the async vertex-initialization action queued
inside startScheduling
+ // has completed before we access getTaskVertices().
+ CompletableFuture.runAsync(() -> {},
EXECUTOR_RESOURCE.getExecutor()).join();
ExecutionGraph executionGraph = scheduler.getExecutionGraph();
- return Tuple2.of(
- executionGraph.getJobVertex(producer.getID()),
- executionGraph.getJobVertex(consumer.getID()));
+ ExecutionJobVertex producerVertex =
+
Preconditions.checkNotNull(executionGraph.getJobVertex(producer.getID()));
+
ExecutionUtils.waitForTaskDeploymentDescriptorsCreation(producerVertex.getTaskVertices());
+ return Tuple2.of(producerVertex,
executionGraph.getJobVertex(consumer.getID()));
}
// ============== Utils ==============