wsry commented on code in PR #20350:
URL: https://github.com/apache/flink/pull/20350#discussion_r938361767


##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java:
##########
@@ -528,4 +546,150 @@ public static void verifyGeneratedExecutionJobVertex(
             subtaskIndex++;
         }
     }
+
+    public static SchedulerBase createSchedulerAndDeploy(
+            boolean isAdaptive,
+            JobID jobId,
+            JobVertex producer,
+            JobVertex[] consumers,
+            DistributionPattern distributionPattern,
+            BlobWriter blobWriter,
+            ComponentMainThreadExecutor mainThreadExecutor,
+            ScheduledExecutorService ioExecutor,
+            JobMasterPartitionTracker partitionTracker,
+            ScheduledExecutorService scheduledExecutor)
+            throws Exception {
+        final List<JobVertex> vertices = new 
ArrayList<>(Collections.singletonList(producer));
+        IntermediateDataSetID dataSetId = new IntermediateDataSetID();
+        for (JobVertex consumer : consumers) {
+            consumer.connectNewDataSetAsInput(
+                    producer, distributionPattern, 
ResultPartitionType.BLOCKING, dataSetId, false);
+            vertices.add(consumer);
+        }
+
+        final SchedulerBase scheduler =
+                createScheduler(
+                        isAdaptive,
+                        jobId,
+                        vertices,
+                        blobWriter,
+                        mainThreadExecutor,
+                        ioExecutor,
+                        partitionTracker,
+                        scheduledExecutor);
+        final ExecutionGraph executionGraph = scheduler.getExecutionGraph();
+        if (isAdaptive) {
+            initializeExecutionJobVertex(producer.getID(), executionGraph, 
mainThreadExecutor);
+        }
+        final TestingLogicalSlotBuilder slotBuilder = new 
TestingLogicalSlotBuilder();
+
+        CompletableFuture.runAsync(
+                        () -> {
+                            try {
+                                // Deploy upstream source vertices
+                                deployTasks(executionGraph, producer.getID(), 
slotBuilder);
+                                // Transition upstream vertices into FINISHED
+                                transitionTasksToFinished(executionGraph, 
producer.getID());
+                                // Deploy downstream sink vertices
+                                for (JobVertex consumer : consumers) {
+                                    if (isAdaptive) {
+                                        initializeExecutionJobVertex(
+                                                consumer.getID(), 
executionGraph);
+                                    }
+                                    deployTasks(executionGraph, 
consumer.getID(), slotBuilder);
+                                }
+                            } catch (Exception e) {
+                                throw new RuntimeException("Exceptions 
shouldn't happen here.", e);
+                            }
+                        },
+                        mainThreadExecutor)
+                .join();
+        return scheduler;
+    }
+
+    private static void initializeExecutionJobVertex(
+            JobVertexID jobVertex,
+            ExecutionGraph executionGraph,
+            ComponentMainThreadExecutor mainThreadExecutor) {
+        CompletableFuture.runAsync(
+                        () -> initializeExecutionJobVertex(jobVertex, 
executionGraph),
+                        mainThreadExecutor)
+                .join();
+    }
+
+    private static void initializeExecutionJobVertex(
+            JobVertexID jobVertex, ExecutionGraph executionGraph) {
+        try {
+            executionGraph.initializeJobVertex(
+                    executionGraph.getJobVertex(jobVertex), 
System.currentTimeMillis());
+            executionGraph.notifyNewlyInitializedJobVertices(
+                    
Collections.singletonList(executionGraph.getJobVertex(jobVertex)));
+        } catch (JobException exception) {
+            throw new RuntimeException(exception);
+        }
+    }
+
+    private static DefaultScheduler createScheduler(
+            boolean isAdaptive,
+            JobID jobId,
+            List<JobVertex> jobVertices,
+            BlobWriter blobWriter,
+            ComponentMainThreadExecutor mainThreadExecutor,
+            ScheduledExecutorService ioExecutor,
+            JobMasterPartitionTracker partitionTracker,
+            ScheduledExecutorService scheduledExecutor)
+            throws Exception {
+        final JobGraph jobGraph =
+                JobGraphBuilder.newBatchJobGraphBuilder()
+                        .setJobId(jobId)
+                        .addJobVertices(jobVertices)
+                        .build();
+
+        final DefaultSchedulerBuilder builder =
+                new DefaultSchedulerBuilder(jobGraph, mainThreadExecutor, 
scheduledExecutor)
+                        .setRestartBackoffTimeStrategy(new 
TestRestartBackoffTimeStrategy(true, 0))
+                        .setBlobWriter(blobWriter)
+                        .setIoExecutor(ioExecutor)
+                        .setPartitionTracker(partitionTracker);
+        return isAdaptive ? builder.buildAdaptiveBatchJobScheduler() : 
builder.build();
+    }
+
+    private static void deployTasks(
+            ExecutionGraph executionGraph,
+            JobVertexID jobVertexID,
+            TestingLogicalSlotBuilder slotBuilder)
+            throws JobException, ExecutionException, InterruptedException {
+
+        for (ExecutionVertex vertex :
+                
Objects.requireNonNull(executionGraph.getJobVertex(jobVertexID))
+                        .getTaskVertices()) {
+            LogicalSlot slot = slotBuilder.createTestingLogicalSlot();
+
+            Execution execution = vertex.getCurrentExecutionAttempt();
+            
execution.registerProducedPartitions(slot.getTaskManagerLocation()).get();
+            execution.transitionState(ExecutionState.SCHEDULED);
+
+            vertex.tryAssignResource(slot);
+            vertex.deploy();
+        }
+    }
+
+    public static void transitionTasksToFinished(

Review Comment:
   1. Moved and renamed the methods.
   2. Use similar implementation will throw exception and fail the tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to