zhuzhurk commented on a change in pull request #13641:
URL: https://github.com/apache/flink/pull/13641#discussion_r512792107



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
##########
@@ -566,174 +552,6 @@ public void 
testSettingIllegalMaxNumberOfCheckpointsToRetain() throws Exception
                        
eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
        }
 
-       /**
-        * Tests that eager scheduling will wait until all input locations have 
been set before
-        * scheduling a task.
-        */
-       @Test
-       public void testEagerSchedulingWaitsOnAllInputPreferredLocations() 
throws Exception {
-               final int parallelism = 2;
-               final ProgrammedSlotProvider slotProvider = new 
ProgrammedSlotProvider(parallelism);
-
-               final Time timeout = Time.hours(1L);
-               final JobVertexID sourceVertexId = new JobVertexID();
-               final JobVertex sourceVertex = new JobVertex("Test source", 
sourceVertexId);
-               sourceVertex.setInvokableClass(NoOpInvokable.class);
-               sourceVertex.setParallelism(parallelism);
-
-               final JobVertexID sinkVertexId = new JobVertexID();
-               final JobVertex sinkVertex = new JobVertex("Test sink", 
sinkVertexId);
-               sinkVertex.setInvokableClass(NoOpInvokable.class);
-               sinkVertex.setParallelism(parallelism);
-
-               sinkVertex.connectNewDataSetAsInput(sourceVertex, 
DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
-
-               final Map<JobVertexID, CompletableFuture<LogicalSlot>[]> 
slotFutures = new HashMap<>(2);
-
-               for (JobVertexID jobVertexID : Arrays.asList(sourceVertexId, 
sinkVertexId)) {
-                       CompletableFuture<LogicalSlot>[] slotFutureArray = new 
CompletableFuture[parallelism];
-
-                       for (int i = 0; i < parallelism; i++) {
-                               slotFutureArray[i] = new CompletableFuture<>();
-                       }
-
-                       slotFutures.put(jobVertexID, slotFutureArray);
-                       slotProvider.addSlots(jobVertexID, slotFutureArray);
-               }
-
-               final ScheduledExecutorService scheduledExecutorService = new 
ScheduledThreadPoolExecutor(3);
-
-               final JobGraph jobGraph = new JobGraph(sourceVertex, 
sinkVertex);
-               jobGraph.setScheduleMode(ScheduleMode.EAGER);
-
-               final ExecutionGraph executionGraph = 
TestingExecutionGraphBuilder
-                       .newBuilder()
-                       .setJobGraph(jobGraph)
-                       .setSlotProvider(slotProvider)
-                       .setIoExecutor(scheduledExecutorService)
-                       .setFutureExecutor(scheduledExecutorService)
-                       .setAllocationTimeout(timeout)
-                       .setRpcTimeout(timeout)
-                       .build();
-
-               
executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
-               executionGraph.scheduleForExecution();
-
-               // all tasks should be in state SCHEDULED
-               for (ExecutionVertex executionVertex : 
executionGraph.getAllExecutionVertices()) {
-                       assertEquals(ExecutionState.SCHEDULED, 
executionVertex.getCurrentExecutionAttempt().getState());
-               }
-
-               // wait until the source vertex slots have been requested
-               assertTrue(slotProvider.getSlotRequestedFuture(sourceVertexId, 
0).get());
-               assertTrue(slotProvider.getSlotRequestedFuture(sourceVertexId, 
1).get());
-
-               // check that the sinks have not requested their slots because 
they need the location
-               // information of the sources
-               assertFalse(slotProvider.getSlotRequestedFuture(sinkVertexId, 
0).isDone());
-               assertFalse(slotProvider.getSlotRequestedFuture(sinkVertexId, 
1).isDone());
-
-               final TaskManagerLocation localTaskManagerLocation = new 
LocalTaskManagerLocation();
-
-               final LogicalSlot sourceSlot1 = 
createSlot(localTaskManagerLocation, 0);
-               final LogicalSlot sourceSlot2 = 
createSlot(localTaskManagerLocation, 1);
-
-               final LogicalSlot sinkSlot1 = 
createSlot(localTaskManagerLocation, 0);
-               final LogicalSlot sinkSlot2 = 
createSlot(localTaskManagerLocation, 1);
-
-               slotFutures.get(sourceVertexId)[0].complete(sourceSlot1);
-               slotFutures.get(sourceVertexId)[1].complete(sourceSlot2);
-
-               // wait until the sink vertex slots have been requested after 
we completed the source slots
-               assertTrue(slotProvider.getSlotRequestedFuture(sinkVertexId, 
0).get());
-               assertTrue(slotProvider.getSlotRequestedFuture(sinkVertexId, 
1).get());
-
-               slotFutures.get(sinkVertexId)[0].complete(sinkSlot1);
-               slotFutures.get(sinkVertexId)[1].complete(sinkSlot2);
-
-               for (ExecutionVertex executionVertex : 
executionGraph.getAllExecutionVertices()) {
-                       
ExecutionGraphTestUtils.waitUntilExecutionState(executionVertex.getCurrentExecutionAttempt(),
 ExecutionState.DEPLOYING, 5000L);
-               }
-       }
-
-       /**
-        * Tests that the {@link ExecutionGraph} is deployed in topological 
order.
-        */
-       @Test
-       public void testExecutionGraphIsDeployedInTopologicalOrder() throws 
Exception {

Review comment:
       Can be removed because we already have 
`DefaultSchedulerTest#scheduledVertexOrderFromSchedulingStrategyIsRespected()`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to