zentol commented on a change in pull request #8060: [FLINK-12021] Deploy
execution in topological sorted order
URL: https://github.com/apache/flink/pull/8060#discussion_r269603626
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
##########
@@ -662,6 +674,91 @@ public void
testEagerSchedulingWaitsOnAllInputPreferredLocations() throws Except
}
}
+ /**
+ * Tests that the {@link ExecutionGraph} is deployed in topological
order.
+ */
+ @Test
+ public void testExecutionGraphIsDeployedInTopologicalOrder() throws
Exception {
+ final int sourceParallelism = 2;
+ final int sinkParallelism = 1;
+
+ final JobVertex sourceVertex = new JobVertex("source");
+ sourceVertex.setInvokableClass(NoOpInvokable.class);
+ sourceVertex.setParallelism(sourceParallelism);
+
+ final JobVertex sinkVertex = new JobVertex("sink");
+ sinkVertex.setInvokableClass(NoOpInvokable.class);
+ sinkVertex.setParallelism(sinkParallelism);
+
+ sinkVertex.connectNewDataSetAsInput(sourceVertex,
DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+
+ final JobID jobId = new JobID();
+ final int numberTasks = sourceParallelism + sinkParallelism;
+ final ArrayBlockingQueue<ExecutionAttemptID>
submittedTasksQueue = new ArrayBlockingQueue<>(numberTasks);
+ TestingTaskExecutorGatewayBuilder
testingTaskExecutorGatewayBuilder = new TestingTaskExecutorGatewayBuilder();
+
testingTaskExecutorGatewayBuilder.setSubmitTaskConsumer((taskDeploymentDescriptor,
jobMasterId) -> {
+
submittedTasksQueue.offer(taskDeploymentDescriptor.getExecutionAttemptId());
+ return
CompletableFuture.completedFuture(Acknowledge.get());
+ });
+
+ final TestingTaskExecutorGateway taskExecutorGateway =
testingTaskExecutorGatewayBuilder.createTestingTaskExecutorGateway();
+ final RpcTaskManagerGateway taskManagerGateway = new
RpcTaskManagerGateway(taskExecutorGateway, JobMasterId.generate());
+
+ final Collection<CompletableFuture<LogicalSlot>> slotFutures =
new ArrayList<>(numberTasks);
+ for (int i = 0; i < numberTasks; i++) {
+ slotFutures.add(new CompletableFuture<>());
+ }
+
+ final Iterator<CompletableFuture<LogicalSlot>> slotIterator =
slotFutures.iterator();
+ final SlotProvider slotProvider = new
TestingSlotProvider(slotRequestId -> {
Review comment:
This looks like a basic use-case that we could move into a reusable
`FixedSizeTestingSlotProvider extends TestingSlotProvider` class with a
`Collection<CompletableFuture<LogicalSlot>> getSlotFutures()` method.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services