This is an automated email from the ASF dual-hosted git repository.

lihaosky pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d47db1d541 [FLINK-39914][runtime] Fix flaky 
TaskDeploymentDescriptorFactoryTest#testHybridVertexFinish caused by async TDD 
creation (#28398)
6d47db1d541 is described below

commit 6d47db1d54147eb0da70d5fd39697910ae3aea9f
Author: Chan hae OH <[email protected]>
AuthorDate: Sun Jun 14 04:10:14 2026 +0900

    [FLINK-39914][runtime] Fix flaky 
TaskDeploymentDescriptorFactoryTest#testHybridVertexFinish caused by async TDD 
creation (#28398)
---
 .../TaskDeploymentDescriptorFactoryTest.java         | 20 +++++++++++++++-----
 1 file changed, 15 insertions(+), 5 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactoryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactoryTest.java
index 6960ec3ff67..e34760696c3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactoryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactoryTest.java
@@ -35,6 +35,7 @@ import 
org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.executiongraph.IntermediateResult;
 import org.apache.flink.runtime.executiongraph.MarkPartitionFinishedStrategy;
 import 
org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder;
+import org.apache.flink.runtime.executiongraph.utils.ExecutionUtils;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
@@ -49,6 +50,7 @@ import org.apache.flink.runtime.scheduler.SchedulerBase;
 import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 import org.apache.flink.testutils.TestingUtils;
 import org.apache.flink.testutils.executor.TestExecutorExtension;
+import org.apache.flink.util.Preconditions;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
@@ -58,6 +60,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
 
 import static 
org.apache.flink.configuration.JobManagerOptions.HybridPartitionDataConsumeConstraint.ONLY_FINISHED_PRODUCERS;
@@ -221,18 +224,25 @@ class TaskDeploymentDescriptorFactoryTest {
                 ResultPartitionType.HYBRID_FULL);
 
         JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(producer, 
consumer);
+
         SchedulerBase scheduler =
                 new DefaultSchedulerBuilder(
                                 jobGraph,
-                                
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
+                                
ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(
+                                        EXECUTOR_RESOURCE.getExecutor()),
                                 EXECUTOR_RESOURCE.getExecutor())
                         
.setHybridPartitionDataConsumeConstraint(ONLY_FINISHED_PRODUCERS)
                         .buildAdaptiveBatchJobScheduler();
-        scheduler.startScheduling();
+        CompletableFuture.runAsync(scheduler::startScheduling, 
EXECUTOR_RESOURCE.getExecutor())
+                .join();
+        // Barrier: ensure the async vertex-initialization action queued 
inside startScheduling
+        // has completed before we access getTaskVertices().
+        CompletableFuture.runAsync(() -> {}, 
EXECUTOR_RESOURCE.getExecutor()).join();
         ExecutionGraph executionGraph = scheduler.getExecutionGraph();
-        return Tuple2.of(
-                executionGraph.getJobVertex(producer.getID()),
-                executionGraph.getJobVertex(consumer.getID()));
+        ExecutionJobVertex producerVertex =
+                
Preconditions.checkNotNull(executionGraph.getJobVertex(producer.getID()));
+        
ExecutionUtils.waitForTaskDeploymentDescriptorsCreation(producerVertex.getTaskVertices());
+        return Tuple2.of(producerVertex, 
executionGraph.getJobVertex(consumer.getID()));
     }
 
     // ============== Utils ==============

Reply via email to