zhuzhurk commented on a change in pull request #13641:
URL: https://github.com/apache/flink/pull/13641#discussion_r543395440
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
##########
@@ -82,202 +82,12 @@
private final TestingComponentMainThreadExecutor testMainThreadUtil =
EXECUTOR_RESOURCE.getComponentMainThreadTestExecutor();
- /**
- * Tests that slots are released if we cannot assign the allocated
resource to the
- * Execution.
- */
- @Test
- public void testSlotReleaseOnFailedResourceAssignment() throws
Exception {
- final JobVertex jobVertex = createNoOpJobVertex();
- final JobVertexID jobVertexId = jobVertex.getID();
-
- final CompletableFuture<LogicalSlot> slotFuture = new
CompletableFuture<>();
- final ProgrammedSlotProvider slotProvider = new
ProgrammedSlotProvider(1);
- slotProvider.addSlot(jobVertexId, 0, slotFuture);
-
- ExecutionGraph executionGraph =
ExecutionGraphTestUtils.createSimpleTestGraph(
- slotProvider,
- new NoRestartStrategy(),
- jobVertex);
-
-
executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
-
- ExecutionJobVertex executionJobVertex =
executionGraph.getJobVertex(jobVertexId);
-
- final Execution execution =
executionJobVertex.getTaskVertices()[0].getCurrentExecutionAttempt();
-
- final SingleSlotTestingSlotOwner slotOwner = new
SingleSlotTestingSlotOwner();
-
- final LogicalSlot slot = createTestingLogicalSlot(slotOwner);
-
- final LogicalSlot otherSlot = new
TestingLogicalSlotBuilder().createTestingLogicalSlot();
-
- CompletableFuture<Execution> allocationFuture =
execution.allocateResourcesForExecution(
- executionGraph.getSlotProviderStrategy(),
- LocationPreferenceConstraint.ALL,
- Collections.emptySet());
-
- assertFalse(allocationFuture.isDone());
-
- assertEquals(ExecutionState.SCHEDULED, execution.getState());
-
- // assign a different resource to the execution
- assertTrue(execution.tryAssignResource(otherSlot));
-
- // completing now the future should cause the slot to be
released
- slotFuture.complete(slot);
-
- assertEquals(slot, slotOwner.getReturnedSlotFuture().get());
- }
-
private TestingLogicalSlot createTestingLogicalSlot(SlotOwner
slotOwner) {
return new TestingLogicalSlotBuilder()
.setSlotOwner(slotOwner)
.createTestingLogicalSlot();
}
- /**
- * Tests that the slot is released in case of a execution cancellation
when having
- * a slot assigned and being in state SCHEDULED.
- */
- @Test
- public void testSlotReleaseOnExecutionCancellationInScheduled() throws
Exception {
- final JobVertex jobVertex = createNoOpJobVertex();
- final JobVertexID jobVertexId = jobVertex.getID();
-
- final SingleSlotTestingSlotOwner slotOwner = new
SingleSlotTestingSlotOwner();
-
- final LogicalSlot slot = createTestingLogicalSlot(slotOwner);
-
- final ProgrammedSlotProvider slotProvider = new
ProgrammedSlotProvider(1);
- slotProvider.addSlot(jobVertexId, 0,
CompletableFuture.completedFuture(slot));
-
- ExecutionGraph executionGraph =
ExecutionGraphTestUtils.createSimpleTestGraph(
- slotProvider,
- new NoRestartStrategy(),
- jobVertex);
-
-
executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
-
- ExecutionJobVertex executionJobVertex =
executionGraph.getJobVertex(jobVertexId);
-
- final Execution execution =
executionJobVertex.getTaskVertices()[0].getCurrentExecutionAttempt();
-
- CompletableFuture<Execution> allocationFuture =
execution.allocateResourcesForExecution(
- executionGraph.getSlotProviderStrategy(),
- LocationPreferenceConstraint.ALL,
- Collections.emptySet());
-
- assertTrue(allocationFuture.isDone());
-
- assertEquals(ExecutionState.SCHEDULED, execution.getState());
-
- assertEquals(slot, execution.getAssignedResource());
-
- // cancelling the execution should move it into state CANCELED
- execution.cancel();
- assertEquals(ExecutionState.CANCELED, execution.getState());
-
- assertEquals(slot, slotOwner.getReturnedSlotFuture().get());
- }
-
- /**
- * Tests that the slot is released in case of a execution cancellation
when being in state
- * RUNNING.
- */
- @Test
- public void testSlotReleaseOnExecutionCancellationInRunning() throws
Exception {
- final JobVertex jobVertex = createNoOpJobVertex();
- final JobVertexID jobVertexId = jobVertex.getID();
-
- final SingleSlotTestingSlotOwner slotOwner = new
SingleSlotTestingSlotOwner();
-
- final LogicalSlot slot = createTestingLogicalSlot(slotOwner);
-
- final ProgrammedSlotProvider slotProvider = new
ProgrammedSlotProvider(1);
- slotProvider.addSlot(jobVertexId, 0,
CompletableFuture.completedFuture(slot));
-
- ExecutionGraph executionGraph =
ExecutionGraphTestUtils.createSimpleTestGraph(
- slotProvider,
- new NoRestartStrategy(),
- jobVertex);
-
- ExecutionJobVertex executionJobVertex =
executionGraph.getJobVertex(jobVertexId);
-
- final Execution execution =
executionJobVertex.getTaskVertices()[0].getCurrentExecutionAttempt();
-
- CompletableFuture<Execution> allocationFuture =
execution.allocateResourcesForExecution(
- executionGraph.getSlotProviderStrategy(),
- LocationPreferenceConstraint.ALL,
- Collections.emptySet());
-
- assertTrue(allocationFuture.isDone());
-
- assertEquals(ExecutionState.SCHEDULED, execution.getState());
-
- assertEquals(slot, execution.getAssignedResource());
-
- execution.deploy();
-
- execution.switchToRunning();
-
- // cancelling the execution should move it into state CANCELING
- execution.cancel();
- assertEquals(ExecutionState.CANCELING, execution.getState());
-
- execution.completeCancelling();
-
- assertEquals(slot, slotOwner.getReturnedSlotFuture().get());
- }
-
- /**
- * Tests that a slot allocation from a {@link SlotProvider} is
cancelled if the
- * {@link Execution} is cancelled.
- */
- @Test
- public void testSlotAllocationCancellationWhenExecutionCancelled()
throws Exception {
Review comment:
This test is outdated because it is testing the implementation of legacy
method `allocateResourcesForExecution()`, while the DefaultScheduler has taken
over the responsibility to allocate a slot for an execution. The cancellation
of pending requests on vertex cancellation now locates in
`DefaultScheduler#cancelExecutionVertex()`.
But I think you are right that now we lacks a test to verify that
`ExecutionSlotAllocator#cancel()` is invoked when an execution vertex is
canceled. I will add one in `DefaultSchedulerTest`.
Regarding `SchedulerNG.cancel()`, currently canceling a job does not
directly cancel all pending requests in SlotPool. After the entire job is
canceled and JobMaster is shutting down, all pending requests as well as
allocated slots will be released when the SlotPool is closed. So I think the
test needed is `SlotPoolImplTest#testShutdownCancelsAllPendingRequests()` which
is missing at the moment. I will add it as well.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]