This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit c75a242e94c8482a53214d523c7f204c18be1f4a Author: Zhu Zhu <[email protected]> AuthorDate: Mon Jul 11 18:02:18 2022 +0800 [hotfix] Migrate SlotSharingExecutionSlotAllocatorTest to JUnit5 --- .../SlotSharingExecutionSlotAllocatorTest.java | 196 +++++++++------------ 1 file changed, 86 insertions(+), 110 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java index aeac6d7cc11..8fbbf3adc91 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java @@ -34,10 +34,9 @@ import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkChecke import org.apache.flink.runtime.scheduler.SharedSlotProfileRetriever.SharedSlotProfileRetrieverFactory; import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; import org.apache.flink.util.FlinkException; -import org.apache.flink.util.TestLogger; import org.apache.flink.util.function.BiConsumerWithException; -import org.junit.Test; +import org.junit.jupiter.api.Test; import java.util.ArrayList; import java.util.Arrays; @@ -56,18 +55,11 @@ import java.util.stream.Collectors; import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId; import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createRandomExecutionVertexId; -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.empty; -import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.instanceOf; -import static org.hamcrest.Matchers.notNullValue; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Test suite for {@link SlotSharingExecutionSlotAllocator}. */ -public class SlotSharingExecutionSlotAllocatorTest extends TestLogger { +class SlotSharingExecutionSlotAllocatorTest { private static final Time ALLOCATION_TIMEOUT = Time.milliseconds(100L); private static final ResourceProfile RESOURCE_PROFILE = ResourceProfile.fromResources(3, 5); @@ -77,7 +69,7 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger { private static final ExecutionVertexID EV4 = createRandomExecutionVertexId(); @Test - public void testSlotProfileRequestAskedBulkAndGroup() { + void testSlotProfileRequestAskedBulkAndGroup() { AllocationContext context = AllocationContext.newBuilder().addGroup(EV1, EV2).build(); ExecutionSlotSharingGroup executionSlotSharingGroup = context.getSlotSharingStrategy().getExecutionSlotSharingGroup(EV1); @@ -86,15 +78,14 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger { List<Set<ExecutionVertexID>> askedBulks = context.getSlotProfileRetrieverFactory().getAskedBulks(); - assertThat(askedBulks, hasSize(1)); - assertThat(askedBulks.get(0), containsInAnyOrder(EV1, EV2)); - assertThat( - context.getSlotProfileRetrieverFactory().getAskedGroups(), - containsInAnyOrder(executionSlotSharingGroup)); + assertThat(askedBulks).hasSize(1); + assertThat(askedBulks.get(0)).containsExactlyInAnyOrder(EV1, EV2); + assertThat(context.getSlotProfileRetrieverFactory().getAskedGroups()) + .containsExactly(executionSlotSharingGroup); } @Test - public void testSlotRequestProfile() { + void testSlotRequestProfile() { AllocationContext context = AllocationContext.newBuilder().addGroup(EV1, EV2, EV3).build(); ResourceProfile physicalsSlotResourceProfile = RESOURCE_PROFILE.multiply(3); @@ -102,16 +93,13 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger { Optional<PhysicalSlotRequest> slotRequest = context.getSlotProvider().getRequests().values().stream().findFirst(); - assertThat(slotRequest.isPresent(), is(true)); - slotRequest.ifPresent( - r -> - assertThat( - r.getSlotProfile().getPhysicalSlotResourceProfile(), - is(physicalsSlotResourceProfile))); + assertThat(slotRequest).isPresent(); + assertThat(slotRequest.get().getSlotProfile().getPhysicalSlotResourceProfile()) + .isEqualTo(physicalsSlotResourceProfile); } @Test - public void testAllocatePhysicalSlotForNewSharedSlot() { + void testAllocatePhysicalSlotForNewSharedSlot() { AllocationContext context = AllocationContext.newBuilder().addGroup(EV1, EV2).addGroup(EV3, EV4).build(); @@ -119,12 +107,12 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger { context.allocateSlotsFor(EV1, EV2, EV3, EV4); Collection<ExecutionVertexID> assignIds = getAssignIds(executionSlotAssignments); - assertThat(assignIds, containsInAnyOrder(EV1, EV2, EV3, EV4)); - assertThat(context.getSlotProvider().getRequests().keySet(), hasSize(2)); + assertThat(assignIds).containsExactlyInAnyOrder(EV1, EV2, EV3, EV4); + assertThat(context.getSlotProvider().getRequests()).hasSize(2); } @Test - public void testAllocateLogicalSlotFromAvailableSharedSlot() { + void testAllocateLogicalSlotFromAvailableSharedSlot() { AllocationContext context = AllocationContext.newBuilder().addGroup(EV1, EV2).build(); context.allocateSlotsFor(EV1); @@ -134,26 +122,24 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger { // execution 0 from the first allocateSlotsFor call and execution 1 from the second // allocateSlotsFor call // share a slot, therefore only one physical slot allocation should happen - assertThat(assignIds, containsInAnyOrder(EV2)); - assertThat(context.getSlotProvider().getRequests().keySet(), hasSize(1)); + assertThat(assignIds).containsExactly(EV2); + assertThat(context.getSlotProvider().getRequests()).hasSize(1); } @Test - public void testDuplicateAllocationDoesNotRecreateLogicalSlotFuture() + void testDuplicateAllocationDoesNotRecreateLogicalSlotFuture() throws ExecutionException, InterruptedException { AllocationContext context = AllocationContext.newBuilder().addGroup(EV1).build(); ExecutionSlotAssignment assignment1 = context.allocateSlotsFor(EV1).get(0); ExecutionSlotAssignment assignment2 = context.allocateSlotsFor(EV1).get(0); - assertThat( - assignment1.getLogicalSlotFuture().get() - == assignment2.getLogicalSlotFuture().get(), - is(true)); + assertThat(assignment1.getLogicalSlotFuture().get()) + .isSameAs(assignment2.getLogicalSlotFuture().get()); } @Test - public void testFailedPhysicalSlotRequestFailsLogicalSlotFuturesAndRemovesSharedSlot() { + void testFailedPhysicalSlotRequestFailsLogicalSlotFuturesAndRemovesSharedSlot() { AllocationContext context = AllocationContext.newBuilder() .addGroup(EV1) @@ -166,27 +152,25 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger { SlotRequestId slotRequestId = context.getSlotProvider().getFirstRequestOrFail().getSlotRequestId(); - assertThat(logicalSlotFuture.isDone(), is(false)); + assertThat(logicalSlotFuture).isNotDone(); context.getSlotProvider() .getResponses() .get(slotRequestId) .completeExceptionally(new Throwable()); - assertThat(logicalSlotFuture.isCompletedExceptionally(), is(true)); + assertThat(logicalSlotFuture).isCompletedExceptionally(); // next allocation allocates new shared slot context.allocateSlotsFor(EV1); - assertThat(context.getSlotProvider().getRequests().keySet(), hasSize(2)); + assertThat(context.getSlotProvider().getRequests()).hasSize(2); } @Test - public void testSlotWillBeOccupiedIndefinitelyFalse() - throws ExecutionException, InterruptedException { + void testSlotWillBeOccupiedIndefinitelyFalse() throws ExecutionException, InterruptedException { testSlotWillBeOccupiedIndefinitely(false); } @Test - public void testSlotWillBeOccupiedIndefinitelyTrue() - throws ExecutionException, InterruptedException { + void testSlotWillBeOccupiedIndefinitelyTrue() throws ExecutionException, InterruptedException { testSlotWillBeOccupiedIndefinitely(true); } @@ -200,19 +184,18 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger { context.allocateSlotsFor(EV1); PhysicalSlotRequest slotRequest = context.getSlotProvider().getFirstRequestOrFail(); - assertThat( - slotRequest.willSlotBeOccupiedIndefinitely(), is(slotWillBeOccupiedIndefinitely)); + assertThat(slotRequest.willSlotBeOccupiedIndefinitely()) + .isEqualTo(slotWillBeOccupiedIndefinitely); TestingPhysicalSlot physicalSlot = context.getSlotProvider().getResponses().get(slotRequest.getSlotRequestId()).get(); - assertThat(physicalSlot.getPayload(), notNullValue()); - assertThat( - physicalSlot.getPayload().willOccupySlotIndefinitely(), - is(slotWillBeOccupiedIndefinitely)); + assertThat(physicalSlot.getPayload()).isNotNull(); + assertThat(physicalSlot.getPayload().willOccupySlotIndefinitely()) + .isEqualTo(slotWillBeOccupiedIndefinitely); } @Test - public void testReturningLogicalSlotsRemovesSharedSlot() throws Exception { + void testReturningLogicalSlotsRemovesSharedSlot() throws Exception { // physical slot request is completed and completes logical requests testLogicalSlotRequestCancellationOrRelease( false, @@ -221,24 +204,26 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger { } @Test - public void testLogicalSlotCancelsPhysicalSlotRequestAndRemovesSharedSlot() throws Exception { + void testLogicalSlotCancelsPhysicalSlotRequestAndRemovesSharedSlot() throws Exception { // physical slot request is not completed and does not complete logical requests testLogicalSlotRequestCancellationOrRelease( true, true, (context, assignment) -> { context.getAllocator().cancel(assignment.getExecutionAttemptId()); - try { - assignment.getLogicalSlotFuture().get(); - fail("The logical future must finish with the cancellation exception"); - } catch (InterruptedException | ExecutionException e) { - assertThat(e.getCause(), instanceOf(CancellationException.class)); - } + assertThatThrownBy( + () -> { + context.getAllocator() + .cancel(assignment.getExecutionAttemptId()); + assignment.getLogicalSlotFuture().get(); + }) + .as("The logical future must finish with the cancellation exception.") + .hasCauseInstanceOf(CancellationException.class); }); } @Test - public void + void testCompletedLogicalSlotCancelationDoesNotCancelPhysicalSlotRequestAndDoesNotRemoveSharedSlot() throws Exception { // physical slot request is completed and completes logical requests @@ -266,7 +251,7 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger { AllocationContext context = allocationContextBuilder.build(); List<ExecutionSlotAssignment> assignments = context.allocateSlotsFor(EV1, EV2); - assertThat(context.getSlotProvider().getRequests().keySet(), hasSize(1)); + assertThat(context.getSlotProvider().getRequests()).hasSize(1); // cancel or release only one sharing logical slots cancelOrReleaseAction.accept(context, assignments.get(0)); @@ -274,7 +259,7 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger { context.allocateSlotsFor(EV1, EV2); // there should be no more physical slot allocations, as the first logical slot reuses the // previous shared slot - assertThat(context.getSlotProvider().getRequests().keySet(), hasSize(1)); + assertThat(context.getSlotProvider().getRequests()).hasSize(1); // cancel or release all sharing logical slots for (ExecutionSlotAssignment assignment : assignmentsAfterOneCancellation) { @@ -282,22 +267,18 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger { } SlotRequestId slotRequestId = context.getSlotProvider().getFirstRequestOrFail().getSlotRequestId(); - assertThat( - context.getSlotProvider().getCancellations().containsKey(slotRequestId), - is(cancelsPhysicalSlotRequestAndRemovesSharedSlot)); + assertThat(context.getSlotProvider().getCancellations().containsKey(slotRequestId)) + .isEqualTo(cancelsPhysicalSlotRequestAndRemovesSharedSlot); context.allocateSlotsFor(EV3); // there should be one more physical slot allocation if the first allocation should be // removed with all logical slots int expectedNumberOfRequests = cancelsPhysicalSlotRequestAndRemovesSharedSlot ? 2 : 1; - assertThat( - context.getSlotProvider().getRequests().keySet(), - hasSize(expectedNumberOfRequests)); + assertThat(context.getSlotProvider().getRequests()).hasSize(expectedNumberOfRequests); } @Test - public void testPhysicalSlotReleaseLogicalSlots() - throws ExecutionException, InterruptedException { + void testPhysicalSlotReleaseLogicalSlots() throws ExecutionException, InterruptedException { AllocationContext context = AllocationContext.newBuilder().addGroup(EV1, EV2).build(); List<ExecutionSlotAssignment> assignments = context.allocateSlotsFor(EV1, EV2); List<TestingPayload> payloads = @@ -317,26 +298,23 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger { context.getSlotProvider().getFirstRequestOrFail().getSlotRequestId(); TestingPhysicalSlot physicalSlot = context.getSlotProvider().getFirstResponseOrFail().get(); - assertThat( - payloads.stream().allMatch(payload -> payload.getTerminalStateFuture().isDone()), - is(false)); - assertThat(physicalSlot.getPayload(), notNullValue()); + assertThat(payloads.stream().allMatch(payload -> payload.getTerminalStateFuture().isDone())) + .isFalse(); + assertThat(physicalSlot.getPayload()).isNotNull(); physicalSlot.getPayload().release(new Throwable()); - assertThat( - payloads.stream().allMatch(payload -> payload.getTerminalStateFuture().isDone()), - is(true)); + assertThat(payloads.stream().allMatch(payload -> payload.getTerminalStateFuture().isDone())) + .isTrue(); - assertThat( - context.getSlotProvider().getCancellations().containsKey(slotRequestId), is(true)); + assertThat(context.getSlotProvider().getCancellations()).containsKey(slotRequestId); context.allocateSlotsFor(EV1, EV2); // there should be one more physical slot allocation, as the first allocation should be // removed after releasing all logical slots - assertThat(context.getSlotProvider().getRequests().keySet(), hasSize(2)); + assertThat(context.getSlotProvider().getRequests()).hasSize(2); } @Test - public void testSchedulePendingRequestBulkTimeoutCheck() { + void testSchedulePendingRequestBulkTimeoutCheck() { TestingPhysicalSlotRequestBulkChecker bulkChecker = new TestingPhysicalSlotRequestBulkChecker(); AllocationContext context = createBulkCheckerContextWithEv12GroupAndEv3Group(bulkChecker); @@ -344,16 +322,15 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger { context.allocateSlotsFor(EV1, EV3); PhysicalSlotRequestBulk bulk = bulkChecker.getBulk(); - assertThat(bulk.getPendingRequests(), hasSize(2)); - assertThat( - bulk.getPendingRequests(), - containsInAnyOrder(RESOURCE_PROFILE.multiply(2), RESOURCE_PROFILE)); - assertThat(bulk.getAllocationIdsOfFulfilledRequests(), hasSize(0)); - assertThat(bulkChecker.getTimeout(), is(ALLOCATION_TIMEOUT)); + assertThat(bulk.getPendingRequests()).hasSize(2); + assertThat(bulk.getPendingRequests()) + .containsExactlyInAnyOrder(RESOURCE_PROFILE.multiply(2), RESOURCE_PROFILE); + assertThat(bulk.getAllocationIdsOfFulfilledRequests()).isEmpty(); + assertThat(bulkChecker.getTimeout()).isEqualTo(ALLOCATION_TIMEOUT); } @Test - public void testRequestFulfilledInBulk() { + void testRequestFulfilledInBulk() { TestingPhysicalSlotRequestBulkChecker bulkChecker = new TestingPhysicalSlotRequestBulkChecker(); AllocationContext context = createBulkCheckerContextWithEv12GroupAndEv3Group(bulkChecker); @@ -364,14 +341,14 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger { fulfilOneOfTwoSlotRequestsAndGetPendingProfile(context, allocationId); PhysicalSlotRequestBulk bulk = bulkChecker.getBulk(); - assertThat(bulk.getPendingRequests(), hasSize(1)); - assertThat(bulk.getPendingRequests(), containsInAnyOrder(pendingSlotResourceProfile)); - assertThat(bulk.getAllocationIdsOfFulfilledRequests(), hasSize(1)); - assertThat(bulk.getAllocationIdsOfFulfilledRequests(), containsInAnyOrder(allocationId)); + assertThat(bulk.getPendingRequests()).hasSize(1); + assertThat(bulk.getPendingRequests()).containsExactly(pendingSlotResourceProfile); + assertThat(bulk.getAllocationIdsOfFulfilledRequests()).hasSize(1); + assertThat(bulk.getAllocationIdsOfFulfilledRequests()).containsExactly(allocationId); } @Test - public void testRequestBulkCancel() { + void testRequestBulkCancel() { TestingPhysicalSlotRequestBulkChecker bulkChecker = new TestingPhysicalSlotRequestBulkChecker(); AllocationContext context = createBulkCheckerContextWithEv12GroupAndEv3Group(bulkChecker); @@ -397,11 +374,10 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger { // EV3 needs again a physical slot, therefore there are 3 requests overall context.allocateSlotsFor(EV1, EV3); - assertThat(context.getSlotProvider().getRequests().values(), hasSize(3)); + assertThat(context.getSlotProvider().getRequests()).hasSize(3); // either EV1 or EV3 logical slot future is fulfilled before cancellation - assertThat(ev1failed != ev3failed, is(true)); - assertThat( - assignments2.get(0).getLogicalSlotFuture().isCompletedExceptionally(), is(false)); + assertThat(ev1failed).isNotEqualTo(ev3failed); + assertThat(assignments2.get(0).getLogicalSlotFuture()).isNotCompletedExceptionally(); } private static void releaseLogicalSlot(LogicalSlot slot) { @@ -410,7 +386,7 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger { } @Test - public void testBulkClearIfPhysicalSlotRequestFails() { + void testBulkClearIfPhysicalSlotRequestFails() { TestingPhysicalSlotRequestBulkChecker bulkChecker = new TestingPhysicalSlotRequestBulkChecker(); AllocationContext context = createBulkCheckerContextWithEv12GroupAndEv3Group(bulkChecker); @@ -423,11 +399,11 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger { .completeExceptionally(new Throwable()); PhysicalSlotRequestBulk bulk = bulkChecker.getBulk(); - assertThat(bulk.getPendingRequests(), hasSize(0)); + assertThat(bulk.getPendingRequests()).isEmpty(); } @Test - public void failLogicalSlotsIfPhysicalSlotIsFailed() { + void failLogicalSlotsIfPhysicalSlotIsFailed() { final TestingPhysicalSlotRequestBulkChecker bulkChecker = new TestingPhysicalSlotRequestBulkChecker(); AllocationContext context = @@ -442,17 +418,17 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger { final List<ExecutionSlotAssignment> allocatedSlots = context.allocateSlotsFor(EV1, EV2); for (ExecutionSlotAssignment allocatedSlot : allocatedSlots) { - assertTrue(allocatedSlot.getLogicalSlotFuture().isCompletedExceptionally()); + assertThat(allocatedSlot.getLogicalSlotFuture()).isCompletedExceptionally(); } - assertThat(bulkChecker.getBulk().getPendingRequests(), is(empty())); + assertThat(bulkChecker.getBulk().getPendingRequests()).isEmpty(); final Set<SlotRequestId> requests = context.getSlotProvider().getRequests().keySet(); - assertThat(context.getSlotProvider().getCancellations().keySet(), is(requests)); + assertThat(context.getSlotProvider().getCancellations().keySet()).isEqualTo(requests); } @Test - public void testSlotRequestProfileFromExecutionSlotSharingGroup() { + void testSlotRequestProfileFromExecutionSlotSharingGroup() { final ResourceProfile resourceProfile1 = ResourceProfile.fromResources(1, 10); final ResourceProfile resourceProfile2 = ResourceProfile.fromResources(2, 20); final AllocationContext context = @@ -462,14 +438,14 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger { .build(); context.allocateSlotsFor(EV1, EV2); - assertThat(context.getSlotProvider().getRequests().values().size(), is(2)); + assertThat(context.getSlotProvider().getRequests()).hasSize(2); assertThat( - context.getSlotProvider().getRequests().values().stream() - .map(PhysicalSlotRequest::getSlotProfile) - .map(SlotProfile::getPhysicalSlotResourceProfile) - .collect(Collectors.toList()), - containsInAnyOrder(resourceProfile1, resourceProfile2)); + context.getSlotProvider().getRequests().values().stream() + .map(PhysicalSlotRequest::getSlotProfile) + .map(SlotProfile::getPhysicalSlotResourceProfile) + .collect(Collectors.toList())) + .containsExactlyInAnyOrder(resourceProfile1, resourceProfile2); } private static List<ExecutionVertexID> getAssignIds( @@ -495,7 +471,7 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger { AllocationContext context, AllocationID allocationId) { Map<SlotRequestId, PhysicalSlotRequest> requests = context.getSlotProvider().getRequests(); List<SlotRequestId> slotRequestIds = new ArrayList<>(requests.keySet()); - assertThat(slotRequestIds, hasSize(2)); + assertThat(slotRequestIds).hasSize(2); SlotRequestId slotRequestId1 = slotRequestIds.get(0); SlotRequestId slotRequestId2 = slotRequestIds.get(1); context.getSlotProvider()
