This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 54b67844e7cc1a0b572eef8315b104fbcf2b7837 Author: Weihua Hu <[email protected]> AuthorDate: Thu Mar 23 18:13:25 2023 +0800 [hotfix] Migrate TaskExecutorManagerTest to Junit5 and Assertj --- .../slotmanager/TaskExecutorManagerTest.java | 131 +++++++++------------ 1 file changed, 58 insertions(+), 73 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerTest.java index f8ef9b61530..a5e9dfc5158 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerTest.java @@ -30,12 +30,11 @@ import org.apache.flink.runtime.taskexecutor.SlotReport; import org.apache.flink.runtime.taskexecutor.SlotStatus; import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder; import org.apache.flink.testutils.TestingUtils; -import org.apache.flink.testutils.executor.TestExecutorResource; -import org.apache.flink.util.TestLogger; +import org.apache.flink.testutils.executor.TestExecutorExtension; import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter; -import org.junit.ClassRule; -import org.junit.Test; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; import java.util.ArrayList; import java.util.List; @@ -46,22 +45,18 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.IntStream; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.nullValue; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; +import static org.assertj.core.api.Assertions.assertThat; /** Tests for the {@link TaskExecutorManager}. */ -public class TaskExecutorManagerTest extends TestLogger { +class TaskExecutorManagerTest { - @ClassRule - public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE = - TestingUtils.defaultExecutorResource(); + @RegisterExtension + static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = + TestingUtils.defaultExecutorExtension(); /** Tests that a pending slot is only fulfilled by an exactly matching received slot. */ @Test - public void testPendingSlotNotFulfilledIfProfilesAreNotExactMatch() { + void testPendingSlotNotFulfilledIfProfilesAreNotExactMatch() { final int numWorkerCpuCores = 3; final WorkerResourceSpec workerResourceSpec = new WorkerResourceSpec.Builder().setCpuCores(numWorkerCpuCores).build(); @@ -81,20 +76,20 @@ public class TaskExecutorManagerTest extends TestLogger { // create pending slot taskExecutorManager.allocateWorker(requestedSlotProfile); - assertThat(taskExecutorManager.getNumberPendingTaskManagerSlots(), is(1)); + assertThat(taskExecutorManager.getNumberPendingTaskManagerSlots()).isEqualTo(1); createAndRegisterTaskExecutor(taskExecutorManager, 1, offeredSlotProfile); // the slot from the task executor should be accepted, but we should still be waiting // for the originally requested slot - assertThat(taskExecutorManager.getNumberRegisteredSlots(), is(1)); - assertThat(taskExecutorManager.getNumberPendingTaskManagerSlots(), is(1)); + assertThat(taskExecutorManager.getNumberRegisteredSlots()).isEqualTo(1); + assertThat(taskExecutorManager.getNumberPendingTaskManagerSlots()).isEqualTo(1); } } /** Tests that a pending slot is not fulfilled by an already allocated slot. */ @Test - public void testPendingSlotNotFulfilledByAllocatedSlot() { + void testPendingSlotNotFulfilledByAllocatedSlot() { final int numWorkerCpuCores = 3; final WorkerResourceSpec workerResourceSpec = new WorkerResourceSpec.Builder().setCpuCores(numWorkerCpuCores).build(); @@ -112,7 +107,7 @@ public class TaskExecutorManagerTest extends TestLogger { // create pending slot taskExecutorManager.allocateWorker(requestedSlotProfile); - assertThat(taskExecutorManager.getNumberPendingTaskManagerSlots(), is(1)); + assertThat(taskExecutorManager.getNumberPendingTaskManagerSlots()).isEqualTo(1); final TaskExecutorConnection taskExecutorConnection = createTaskExecutorConnection(); final SlotReport slotReport = @@ -127,8 +122,8 @@ public class TaskExecutorManagerTest extends TestLogger { // the slot from the task executor should be accepted, but we should still be waiting // for the originally requested slot - assertThat(taskExecutorManager.getNumberRegisteredSlots(), is(1)); - assertThat(taskExecutorManager.getNumberPendingTaskManagerSlots(), is(1)); + assertThat(taskExecutorManager.getNumberRegisteredSlots()).isEqualTo(1); + assertThat(taskExecutorManager.getNumberPendingTaskManagerSlots()).isEqualTo(1); } } @@ -140,7 +135,7 @@ public class TaskExecutorManagerTest extends TestLogger { * <p>See FLINK-7793 */ @Test - public void testTaskManagerTimeoutDoesNotRemoveSlots() throws Exception { + void testTaskManagerTimeoutDoesNotRemoveSlots() throws Exception { final Time taskManagerTimeout = Time.milliseconds(10L); final CompletableFuture<InstanceID> releaseResourceFuture = new CompletableFuture<>(); @@ -148,12 +143,11 @@ public class TaskExecutorManagerTest extends TestLogger { createResourceAllocatorBuilder() .setDeclareResourceNeededConsumer( (resourceDeclarations) -> { - assertThat(resourceDeclarations.size(), is(1)); + assertThat(resourceDeclarations).hasSize(1); ResourceDeclaration resourceDeclaration = resourceDeclarations.iterator().next(); - assertThat(resourceDeclaration.getNumNeeded(), is(0)); - assertThat( - resourceDeclaration.getUnwantedWorkers().size(), is(1)); + assertThat(resourceDeclaration.getNumNeeded()).isZero(); + assertThat(resourceDeclaration.getUnwantedWorkers()).hasSize(1); releaseResourceFuture.complete( resourceDeclaration .getUnwantedWorkers() @@ -176,7 +170,8 @@ public class TaskExecutorManagerTest extends TestLogger { InstanceID newTaskExecutorId = createAndRegisterTaskExecutor( taskExecutorManager, 1, ResourceProfile.ANY); - assertEquals(1, taskExecutorManager.getNumberRegisteredSlots()); + assertThat(taskExecutorManager.getNumberRegisteredSlots()) + .isEqualTo(1); return newTaskExecutorId; }, mainThreadExecutor) @@ -184,14 +179,15 @@ public class TaskExecutorManagerTest extends TestLogger { .thenCombine( releaseResourceFuture, (registeredInstance, releasedInstance) -> { - assertThat(registeredInstance, is(releasedInstance)); - assertEquals(1, taskExecutorManager.getNumberRegisteredSlots()); + assertThat(registeredInstance).isEqualTo(releasedInstance); + assertThat(taskExecutorManager.getNumberRegisteredSlots()) + .isEqualTo(1); return registeredInstance; }) .thenAccept( taskExecutorId -> { taskExecutorManager.unregisterTaskExecutor(taskExecutorId); - assertEquals(0, taskExecutorManager.getNumberRegisteredSlots()); + assertThat(taskExecutorManager.getNumberRegisteredSlots()).isZero(); }) .get(); } @@ -201,7 +197,7 @@ public class TaskExecutorManagerTest extends TestLogger { * Tests that formerly used task managers can timeout after all of their slots have been freed. */ @Test - public void testTimeoutForUnusedTaskManager() throws Exception { + void testTimeoutForUnusedTaskManager() throws Exception { WorkerResourceSpec workerResourceSpec = new WorkerResourceSpec.Builder().setCpuCores(1).build(); final ResourceProfile resourceProfile = ResourceProfile.newBuilder().setCpuCores(1).build(); @@ -213,19 +209,17 @@ public class TaskExecutorManagerTest extends TestLogger { new TestingResourceAllocatorBuilder() .setDeclareResourceNeededConsumer( (resourceDeclarations) -> { - assertThat(resourceDeclarations.size(), is(1)); + assertThat(resourceDeclarations.size()).isEqualTo(1); ResourceDeclaration resourceDeclaration = resourceDeclarations.iterator().next(); if (declareResourceCount.getAndIncrement() == 0) { - assertThat(resourceDeclaration.getNumNeeded(), is(1)); - assertThat( - resourceDeclaration.getUnwantedWorkers().size(), - is(0)); + assertThat(resourceDeclaration.getNumNeeded()).isEqualTo(1); + assertThat(resourceDeclaration.getUnwantedWorkers()) + .isEmpty(); } else { - assertThat(resourceDeclaration.getNumNeeded(), is(0)); - assertThat( - resourceDeclaration.getUnwantedWorkers().size(), - is(1)); + assertThat(resourceDeclaration.getNumNeeded()).isZero(); + assertThat(resourceDeclaration.getUnwantedWorkers()) + .hasSize(1); releaseResourceFuture.complete( resourceDeclaration .getUnwantedWorkers() @@ -262,7 +256,7 @@ public class TaskExecutorManagerTest extends TestLogger { .thenAcceptBoth( releaseResourceFuture, (registeredInstance, releasedInstance) -> - assertThat(registeredInstance, is(releasedInstance))) + assertThat(registeredInstance).isEqualTo(releasedInstance)) .get(); } } @@ -272,7 +266,7 @@ public class TaskExecutorManagerTest extends TestLogger { * fulfill the requested resource profile. */ @Test - public void testWorkerOnlyAllocatedIfRequestedSlotCouldBeFulfilled() { + void testWorkerOnlyAllocatedIfRequestedSlotCouldBeFulfilled() { final int numCoresPerWorker = 1; final WorkerResourceSpec workerResourceSpec = @@ -296,9 +290,8 @@ public class TaskExecutorManagerTest extends TestLogger { .setResourceAllocator(resourceAllocator) .createTaskExecutorManager()) { - assertThat( - taskExecutorManager.allocateWorker(requestedProfile).orElse(null), nullValue()); - assertThat(declareResourceCount.get(), is(0)); + assertThat(taskExecutorManager.allocateWorker(requestedProfile)).isNotPresent(); + assertThat(declareResourceCount).hasValue(0); } } @@ -307,7 +300,7 @@ public class TaskExecutorManagerTest extends TestLogger { * allocating new workers. */ @Test - public void testMaxSlotLimitAllocateWorker() { + void testMaxSlotLimitAllocateWorker() { final int numberSlots = 1; final int maxSlotNum = 1; @@ -316,7 +309,7 @@ public class TaskExecutorManagerTest extends TestLogger { createResourceAllocatorBuilder() .setDeclareResourceNeededConsumer( (resourceDeclarations) -> { - assertThat(resourceDeclarations.size(), is(1)); + assertThat(resourceDeclarations).hasSize(1); ResourceDeclaration resourceDeclaration = resourceDeclarations.iterator().next(); resourceRequestNumber.add(resourceDeclaration.getNumNeeded()); @@ -330,15 +323,13 @@ public class TaskExecutorManagerTest extends TestLogger { .setResourceAllocator(resourceAllocator) .createTaskExecutorManager()) { - assertThat(resourceRequestNumber.size(), is(0)); + assertThat(resourceRequestNumber).isEmpty(); taskExecutorManager.allocateWorker(ResourceProfile.UNKNOWN); - assertThat(resourceRequestNumber.size(), is(1)); - assertThat(resourceRequestNumber.get(0), is(1)); + assertThat(resourceRequestNumber).containsExactly(1); taskExecutorManager.allocateWorker(ResourceProfile.UNKNOWN); - assertThat(resourceRequestNumber.size(), is(1)); - assertThat(resourceRequestNumber.get(0), is(1)); + assertThat(resourceRequestNumber).containsExactly(1); } } @@ -347,7 +338,7 @@ public class TaskExecutorManagerTest extends TestLogger { * new TaskExecutor registered. */ @Test - public void testMaxSlotLimitRegisterWorker() throws Exception { + void testMaxSlotLimitRegisterWorker() throws Exception { final int numberSlots = 1; final int maxSlotNum = 1; @@ -360,12 +351,12 @@ public class TaskExecutorManagerTest extends TestLogger { createAndRegisterTaskExecutor(taskExecutorManager, 1, ResourceProfile.ANY); createAndRegisterTaskExecutor(taskExecutorManager, 1, ResourceProfile.ANY); - assertThat(taskExecutorManager.getNumberRegisteredSlots(), is(1)); + assertThat(taskExecutorManager.getNumberRegisteredSlots()).isEqualTo(1); } } @Test - public void testGetResourceOverview() { + void testGetResourceOverview() { final ResourceProfile resourceProfile1 = ResourceProfile.fromResources(1, 10); final ResourceProfile resourceProfile2 = ResourceProfile.fromResources(2, 20); @@ -378,24 +369,18 @@ public class TaskExecutorManagerTest extends TestLogger { taskExecutorManager.occupySlot(instanceId1); taskExecutorManager.occupySlot(instanceId2); - assertThat( - taskExecutorManager.getTotalFreeResources(), - equalTo(resourceProfile1.merge(resourceProfile2))); - assertThat( - taskExecutorManager.getTotalFreeResourcesOf(instanceId1), - equalTo(resourceProfile1)); - assertThat( - taskExecutorManager.getTotalFreeResourcesOf(instanceId2), - equalTo(resourceProfile2)); - assertThat( - taskExecutorManager.getTotalRegisteredResources(), - equalTo(resourceProfile1.merge(resourceProfile2).multiply(2))); - assertThat( - taskExecutorManager.getTotalRegisteredResourcesOf(instanceId1), - equalTo(resourceProfile1.multiply(2))); - assertThat( - taskExecutorManager.getTotalRegisteredResourcesOf(instanceId2), - equalTo(resourceProfile2.multiply(2))); + assertThat(taskExecutorManager.getTotalFreeResources()) + .isEqualTo(resourceProfile1.merge(resourceProfile2)); + assertThat(taskExecutorManager.getTotalFreeResourcesOf(instanceId1)) + .isEqualTo(resourceProfile1); + assertThat(taskExecutorManager.getTotalFreeResourcesOf(instanceId2)) + .isEqualTo(resourceProfile2); + assertThat(taskExecutorManager.getTotalRegisteredResources()) + .isEqualTo(resourceProfile1.merge(resourceProfile2).multiply(2)); + assertThat(taskExecutorManager.getTotalRegisteredResourcesOf(instanceId1)) + .isEqualTo(resourceProfile1.multiply(2)); + assertThat(taskExecutorManager.getTotalRegisteredResourcesOf(instanceId2)) + .isEqualTo(resourceProfile2.multiply(2)); } }
