zhuzhurk commented on a change in pull request #13641:
URL: https://github.com/apache/flink/pull/13641#discussion_r543395440



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
##########
@@ -82,202 +82,12 @@
        private final TestingComponentMainThreadExecutor testMainThreadUtil =
                EXECUTOR_RESOURCE.getComponentMainThreadTestExecutor();
 
-       /**
-        * Tests that slots are released if we cannot assign the allocated 
resource to the
-        * Execution.
-        */
-       @Test
-       public void testSlotReleaseOnFailedResourceAssignment() throws 
Exception {
-               final JobVertex jobVertex = createNoOpJobVertex();
-               final JobVertexID jobVertexId = jobVertex.getID();
-
-               final CompletableFuture<LogicalSlot> slotFuture = new 
CompletableFuture<>();
-               final ProgrammedSlotProvider slotProvider = new 
ProgrammedSlotProvider(1);
-               slotProvider.addSlot(jobVertexId, 0, slotFuture);
-
-               ExecutionGraph executionGraph = 
ExecutionGraphTestUtils.createSimpleTestGraph(
-                       slotProvider,
-                       new NoRestartStrategy(),
-                       jobVertex);
-
-               
executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
-
-               ExecutionJobVertex executionJobVertex = 
executionGraph.getJobVertex(jobVertexId);
-
-               final Execution execution = 
executionJobVertex.getTaskVertices()[0].getCurrentExecutionAttempt();
-
-               final SingleSlotTestingSlotOwner slotOwner = new 
SingleSlotTestingSlotOwner();
-
-               final LogicalSlot slot = createTestingLogicalSlot(slotOwner);
-
-               final LogicalSlot otherSlot = new 
TestingLogicalSlotBuilder().createTestingLogicalSlot();
-
-               CompletableFuture<Execution> allocationFuture = 
execution.allocateResourcesForExecution(
-                       executionGraph.getSlotProviderStrategy(),
-                       LocationPreferenceConstraint.ALL,
-                       Collections.emptySet());
-
-               assertFalse(allocationFuture.isDone());
-
-               assertEquals(ExecutionState.SCHEDULED, execution.getState());
-
-               // assign a different resource to the execution
-               assertTrue(execution.tryAssignResource(otherSlot));
-
-               // completing now the future should cause the slot to be 
released
-               slotFuture.complete(slot);
-
-               assertEquals(slot, slotOwner.getReturnedSlotFuture().get());
-       }
-
        private TestingLogicalSlot createTestingLogicalSlot(SlotOwner 
slotOwner) {
                return new TestingLogicalSlotBuilder()
                        .setSlotOwner(slotOwner)
                        .createTestingLogicalSlot();
        }
 
-       /**
-        * Tests that the slot is released in case of a execution cancellation 
when having
-        * a slot assigned and being in state SCHEDULED.
-        */
-       @Test
-       public void testSlotReleaseOnExecutionCancellationInScheduled() throws 
Exception {
-               final JobVertex jobVertex = createNoOpJobVertex();
-               final JobVertexID jobVertexId = jobVertex.getID();
-
-               final SingleSlotTestingSlotOwner slotOwner = new 
SingleSlotTestingSlotOwner();
-
-               final LogicalSlot slot = createTestingLogicalSlot(slotOwner);
-
-               final ProgrammedSlotProvider slotProvider = new 
ProgrammedSlotProvider(1);
-               slotProvider.addSlot(jobVertexId, 0, 
CompletableFuture.completedFuture(slot));
-
-               ExecutionGraph executionGraph = 
ExecutionGraphTestUtils.createSimpleTestGraph(
-                       slotProvider,
-                       new NoRestartStrategy(),
-                       jobVertex);
-
-               
executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
-
-               ExecutionJobVertex executionJobVertex = 
executionGraph.getJobVertex(jobVertexId);
-
-               final Execution execution = 
executionJobVertex.getTaskVertices()[0].getCurrentExecutionAttempt();
-
-               CompletableFuture<Execution> allocationFuture = 
execution.allocateResourcesForExecution(
-                       executionGraph.getSlotProviderStrategy(),
-                       LocationPreferenceConstraint.ALL,
-                       Collections.emptySet());
-
-               assertTrue(allocationFuture.isDone());
-
-               assertEquals(ExecutionState.SCHEDULED, execution.getState());
-
-               assertEquals(slot, execution.getAssignedResource());
-
-               // cancelling the execution should move it into state CANCELED
-               execution.cancel();
-               assertEquals(ExecutionState.CANCELED, execution.getState());
-
-               assertEquals(slot, slotOwner.getReturnedSlotFuture().get());
-       }
-
-       /**
-        * Tests that the slot is released in case of a execution cancellation 
when being in state
-        * RUNNING.
-        */
-       @Test
-       public void testSlotReleaseOnExecutionCancellationInRunning() throws 
Exception {
-               final JobVertex jobVertex = createNoOpJobVertex();
-               final JobVertexID jobVertexId = jobVertex.getID();
-
-               final SingleSlotTestingSlotOwner slotOwner = new 
SingleSlotTestingSlotOwner();
-
-               final LogicalSlot slot = createTestingLogicalSlot(slotOwner);
-
-               final ProgrammedSlotProvider slotProvider = new 
ProgrammedSlotProvider(1);
-               slotProvider.addSlot(jobVertexId, 0, 
CompletableFuture.completedFuture(slot));
-
-               ExecutionGraph executionGraph = 
ExecutionGraphTestUtils.createSimpleTestGraph(
-                       slotProvider,
-                       new NoRestartStrategy(),
-                       jobVertex);
-
-               ExecutionJobVertex executionJobVertex = 
executionGraph.getJobVertex(jobVertexId);
-
-               final Execution execution = 
executionJobVertex.getTaskVertices()[0].getCurrentExecutionAttempt();
-
-               CompletableFuture<Execution> allocationFuture = 
execution.allocateResourcesForExecution(
-                       executionGraph.getSlotProviderStrategy(),
-                       LocationPreferenceConstraint.ALL,
-                       Collections.emptySet());
-
-               assertTrue(allocationFuture.isDone());
-
-               assertEquals(ExecutionState.SCHEDULED, execution.getState());
-
-               assertEquals(slot, execution.getAssignedResource());
-
-               execution.deploy();
-
-               execution.switchToRunning();
-
-               // cancelling the execution should move it into state CANCELING
-               execution.cancel();
-               assertEquals(ExecutionState.CANCELING, execution.getState());
-
-               execution.completeCancelling();
-
-               assertEquals(slot, slotOwner.getReturnedSlotFuture().get());
-       }
-
-       /**
-        * Tests that a slot allocation from a {@link SlotProvider} is 
cancelled if the
-        * {@link Execution} is cancelled.
-        */
-       @Test
-       public void testSlotAllocationCancellationWhenExecutionCancelled() 
throws Exception {

Review comment:
       This test is outdated because it is testing the implementation of legacy 
method `allocateResourcesForExecution()`, while the DefaultScheduler has taken 
over the responsibility to allocate a slot for an execution. The cancellation 
of pending requests on vertex cancellation now locates in 
`DefaultScheduler#cancelExecutionVertex()`.
   But I think you are right that now we lacks a test to verify that 
`ExecutionSlotAllocator#cancel()` is invoked when an execution vertex is 
canceled. I will add one in `DefaultSchedulerTest`.
   
   Regarding `SchedulerNG.cancel()`, currently canceling a job does not 
directly cancel all pending requests in SlotPool. Instead, after the entire job 
is canceled and JobMaster is shutting down, all pending requests as well as 
allocated slots will be released when the SlotPool is closed. So I think the 
test needed is `SlotPoolImplTest#testShutdownCancelsAllPendingRequests()` which 
is missing at the moment. I will add it as well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to