zentol commented on a change in pull request #8060: [FLINK-12021] Deploy 
execution in topological sorted order
URL: https://github.com/apache/flink/pull/8060#discussion_r269603626
 
 

 ##########
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
 ##########
 @@ -662,6 +674,91 @@ public void 
testEagerSchedulingWaitsOnAllInputPreferredLocations() throws Except
                }
        }
 
+       /**
+        * Tests that the {@link ExecutionGraph} is deployed in topological 
order.
+        */
+       @Test
+       public void testExecutionGraphIsDeployedInTopologicalOrder() throws 
Exception {
+               final int sourceParallelism = 2;
+               final int sinkParallelism = 1;
+
+               final JobVertex sourceVertex = new JobVertex("source");
+               sourceVertex.setInvokableClass(NoOpInvokable.class);
+               sourceVertex.setParallelism(sourceParallelism);
+
+               final JobVertex sinkVertex = new JobVertex("sink");
+               sinkVertex.setInvokableClass(NoOpInvokable.class);
+               sinkVertex.setParallelism(sinkParallelism);
+
+               sinkVertex.connectNewDataSetAsInput(sourceVertex, 
DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+
+               final JobID jobId = new JobID();
+               final int numberTasks = sourceParallelism + sinkParallelism;
+               final ArrayBlockingQueue<ExecutionAttemptID> 
submittedTasksQueue = new ArrayBlockingQueue<>(numberTasks);
+               TestingTaskExecutorGatewayBuilder 
testingTaskExecutorGatewayBuilder = new TestingTaskExecutorGatewayBuilder();
+               
testingTaskExecutorGatewayBuilder.setSubmitTaskConsumer((taskDeploymentDescriptor,
 jobMasterId) -> {
+                       
submittedTasksQueue.offer(taskDeploymentDescriptor.getExecutionAttemptId());
+                       return 
CompletableFuture.completedFuture(Acknowledge.get());
+               });
+
+               final TestingTaskExecutorGateway taskExecutorGateway = 
testingTaskExecutorGatewayBuilder.createTestingTaskExecutorGateway();
+               final RpcTaskManagerGateway taskManagerGateway = new 
RpcTaskManagerGateway(taskExecutorGateway, JobMasterId.generate());
+
+               final Collection<CompletableFuture<LogicalSlot>> slotFutures = 
new ArrayList<>(numberTasks);
+               for (int i = 0; i < numberTasks; i++) {
+                       slotFutures.add(new CompletableFuture<>());
+               }
+
+               final Iterator<CompletableFuture<LogicalSlot>> slotIterator = 
slotFutures.iterator();
+               final SlotProvider slotProvider = new 
TestingSlotProvider(slotRequestId -> {
 
 Review comment:
   This looks like a basic use-case that we could move into a reusable 
`FixedSizeTestingSlotProvider extends TestingSlotProvider` class with a 
`Collection<CompletableFuture<LogicalSlot>> getSlotFutures()` method.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to