This is an automated email from the ASF dual-hosted git repository. wanglijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 35498b29836b0704ef092637d9728ce27f42d277 Author: Lijie Wang <[email protected]> AuthorDate: Wed Oct 19 17:19:47 2022 +0800 [hotfix][runtime] Migrate some related tests to Juint5 --- .../DefaultExecutionGraphDeploymentTest.java | 302 +++++++++------------ ...tExecutionGraphDeploymentWithBlobCacheTest.java | 20 +- ...ExecutionGraphDeploymentWithBlobServerTest.java | 61 +++-- ...hDeploymentWithSmallBlobCacheSizeLimitTest.java | 33 +-- .../partition/SortMergeResultPartitionTest.java | 186 ++++++------- .../metrics/groups/TaskIOMetricGroupTest.java | 68 +++-- .../DefaultVertexParallelismDeciderTest.java | 47 ++-- 7 files changed, 334 insertions(+), 383 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java index 05237422a71..32e6302ec88 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java @@ -66,14 +66,11 @@ import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.testtasks.NoOpInvokable; import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; import org.apache.flink.testutils.TestingUtils; -import org.apache.flink.testutils.executor.TestExecutorResource; -import org.apache.flink.util.TestLogger; +import org.apache.flink.testutils.executor.TestExecutorExtension; import org.apache.flink.util.function.FunctionUtils; -import org.hamcrest.Description; -import org.hamcrest.TypeSafeMatcher; -import org.junit.ClassRule; -import org.junit.Test; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; import java.util.ArrayList; import java.util.Arrays; @@ -87,19 +84,14 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; -import static junit.framework.TestCase.assertTrue; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.fail; +import static org.assertj.core.api.Assertions.assertThat; /** Tests for {@link DefaultExecutionGraph} deployment. */ -public class DefaultExecutionGraphDeploymentTest extends TestLogger { +class DefaultExecutionGraphDeploymentTest { - @ClassRule - public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE = - TestingUtils.defaultExecutorResource(); + @RegisterExtension + static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_EXTENSION = + TestingUtils.defaultExecutorExtension(); /** BLOB server instance to use for the job graph. */ protected BlobWriter blobWriter = VoidBlobWriter.getInstance(); @@ -117,7 +109,7 @@ public class DefaultExecutionGraphDeploymentTest extends TestLogger { * @param eg the execution graph that was created */ protected void checkJobOffloaded(DefaultExecutionGraph eg) throws Exception { - assertTrue(eg.getJobInformationOrBlobKey().isLeft()); + assertThat(eg.getJobInformationOrBlobKey().isLeft()).isTrue(); } /** @@ -128,11 +120,11 @@ public class DefaultExecutionGraphDeploymentTest extends TestLogger { * @param jobVertexId job vertex ID */ protected void checkTaskOffloaded(ExecutionGraph eg, JobVertexID jobVertexId) throws Exception { - assertTrue(eg.getJobVertex(jobVertexId).getTaskInformationOrBlobKey().isLeft()); + assertThat(eg.getJobVertex(jobVertexId).getTaskInformationOrBlobKey().isLeft()).isTrue(); } @Test - public void testBuildDeploymentDescriptor() throws Exception { + void testBuildDeploymentDescriptor() throws Exception { final JobVertexID jid1 = new JobVertexID(); final JobVertexID jid2 = new JobVertexID(); @@ -194,7 +186,7 @@ public class DefaultExecutionGraphDeploymentTest extends TestLogger { .setTaskManagerGateway(taskManagerGateway) .createTestingLogicalSlot(); - assertEquals(ExecutionState.CREATED, vertex.getExecutionState()); + assertThat(vertex.getExecutionState()).isEqualTo(ExecutionState.CREATED); vertex.getCurrentExecutionAttempt().transitionState(ExecutionState.SCHEDULED); vertex.getCurrentExecutionAttempt() @@ -202,128 +194,111 @@ public class DefaultExecutionGraphDeploymentTest extends TestLogger { .get(); vertex.deployToSlot(slot); - assertEquals(ExecutionState.DEPLOYING, vertex.getExecutionState()); + assertThat(vertex.getExecutionState()).isEqualTo(ExecutionState.DEPLOYING); checkTaskOffloaded(eg, vertex.getJobvertexId()); TaskDeploymentDescriptor descr = tdd.get(); - assertNotNull(descr); + assertThat(descr).isNotNull(); JobInformation jobInformation = descr.getSerializedJobInformation().deserializeValue(getClass().getClassLoader()); TaskInformation taskInformation = descr.getSerializedTaskInformation().deserializeValue(getClass().getClassLoader()); - assertEquals(jobId, descr.getJobId()); - assertEquals(jobId, jobInformation.getJobId()); - assertEquals(jid2, taskInformation.getJobVertexId()); - assertEquals(3, descr.getSubtaskIndex()); - assertEquals(10, taskInformation.getNumberOfSubtasks()); - assertEquals(BatchTask.class.getName(), taskInformation.getInvokableClassName()); - assertEquals("v2", taskInformation.getTaskName()); + assertThat(descr.getJobId()).isEqualTo(jobId); + assertThat(jobInformation.getJobId()).isEqualTo(jobId); + assertThat(taskInformation.getJobVertexId()).isEqualTo(jid2); + assertThat(descr.getSubtaskIndex()).isEqualTo(3); + assertThat(taskInformation.getNumberOfSubtasks()).isEqualTo(10); + assertThat(taskInformation.getInvokableClassName()).isEqualTo(BatchTask.class.getName()); + assertThat(taskInformation.getTaskName()).isEqualTo("v2"); Collection<ResultPartitionDeploymentDescriptor> producedPartitions = descr.getProducedPartitions(); Collection<InputGateDeploymentDescriptor> consumedPartitions = descr.getInputGates(); - assertEquals(2, producedPartitions.size()); - assertEquals(1, consumedPartitions.size()); + assertThat(producedPartitions.size()).isEqualTo(2); + assertThat(consumedPartitions.size()).isEqualTo(1); Iterator<ResultPartitionDeploymentDescriptor> iteratorProducedPartitions = producedPartitions.iterator(); Iterator<InputGateDeploymentDescriptor> iteratorConsumedPartitions = consumedPartitions.iterator(); - assertEquals(10, iteratorProducedPartitions.next().getNumberOfSubpartitions()); - assertEquals(10, iteratorProducedPartitions.next().getNumberOfSubpartitions()); + assertThat(iteratorProducedPartitions.next().getNumberOfSubpartitions()).isEqualTo(10); + assertThat(iteratorProducedPartitions.next().getNumberOfSubpartitions()).isEqualTo(10); ShuffleDescriptor[] shuffleDescriptors = iteratorConsumedPartitions.next().getShuffleDescriptors(); - assertEquals(10, shuffleDescriptors.length); + assertThat(shuffleDescriptors.length).isEqualTo(10); Iterator<ConsumedPartitionGroup> iteratorConsumedPartitionGroup = vertex.getAllConsumedPartitionGroups().iterator(); int idx = 0; for (IntermediateResultPartitionID partitionId : iteratorConsumedPartitionGroup.next()) { - assertEquals( - partitionId, shuffleDescriptors[idx++].getResultPartitionID().getPartitionId()); + assertThat(shuffleDescriptors[idx++].getResultPartitionID().getPartitionId()) + .isEqualTo(partitionId); } } @Test - public void testRegistrationOfExecutionsFinishing() { - try { - final JobVertexID jid1 = new JobVertexID(); - final JobVertexID jid2 = new JobVertexID(); + void testRegistrationOfExecutionsFinishing() throws Exception { - JobVertex v1 = new JobVertex("v1", jid1); - JobVertex v2 = new JobVertex("v2", jid2); + final JobVertexID jid1 = new JobVertexID(); + final JobVertexID jid2 = new JobVertexID(); - SchedulerBase scheduler = setupScheduler(v1, 7650, v2, 2350); - Collection<Execution> executions = - new ArrayList<>( - scheduler.getExecutionGraph().getRegisteredExecutions().values()); + JobVertex v1 = new JobVertex("v1", jid1); + JobVertex v2 = new JobVertex("v2", jid2); - for (Execution e : executions) { - e.markFinished(); - } + SchedulerBase scheduler = setupScheduler(v1, 7650, v2, 2350); + Collection<Execution> executions = + new ArrayList<>(scheduler.getExecutionGraph().getRegisteredExecutions().values()); - assertEquals(0, scheduler.getExecutionGraph().getRegisteredExecutions().size()); - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); + for (Execution e : executions) { + e.markFinished(); } + + assertThat(scheduler.getExecutionGraph().getRegisteredExecutions()).isEmpty(); } @Test - public void testRegistrationOfExecutionsFailing() { - try { - - final JobVertexID jid1 = new JobVertexID(); - final JobVertexID jid2 = new JobVertexID(); + void testRegistrationOfExecutionsFailing() throws Exception { - JobVertex v1 = new JobVertex("v1", jid1); - JobVertex v2 = new JobVertex("v2", jid2); + final JobVertexID jid1 = new JobVertexID(); + final JobVertexID jid2 = new JobVertexID(); - SchedulerBase scheduler = setupScheduler(v1, 7, v2, 6); - Collection<Execution> executions = - new ArrayList<>( - scheduler.getExecutionGraph().getRegisteredExecutions().values()); + JobVertex v1 = new JobVertex("v1", jid1); + JobVertex v2 = new JobVertex("v2", jid2); - for (Execution e : executions) { - e.markFailed(null); - } + SchedulerBase scheduler = setupScheduler(v1, 7, v2, 6); + Collection<Execution> executions = + new ArrayList<>(scheduler.getExecutionGraph().getRegisteredExecutions().values()); - assertEquals(0, scheduler.getExecutionGraph().getRegisteredExecutions().size()); - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); + for (Execution e : executions) { + e.markFailed(null); } + + assertThat(scheduler.getExecutionGraph().getRegisteredExecutions()).isEmpty(); } @Test - public void testRegistrationOfExecutionsFailedExternally() { - try { + void testRegistrationOfExecutionsFailedExternally() throws Exception { - final JobVertexID jid1 = new JobVertexID(); - final JobVertexID jid2 = new JobVertexID(); - - JobVertex v1 = new JobVertex("v1", jid1); - JobVertex v2 = new JobVertex("v2", jid2); + final JobVertexID jid1 = new JobVertexID(); + final JobVertexID jid2 = new JobVertexID(); - SchedulerBase scheduler = setupScheduler(v1, 7, v2, 6); - Collection<Execution> executions = - new ArrayList<>( - scheduler.getExecutionGraph().getRegisteredExecutions().values()); + JobVertex v1 = new JobVertex("v1", jid1); + JobVertex v2 = new JobVertex("v2", jid2); - for (Execution e : executions) { - e.fail(null); - } + SchedulerBase scheduler = setupScheduler(v1, 7, v2, 6); + Collection<Execution> executions = + new ArrayList<>(scheduler.getExecutionGraph().getRegisteredExecutions().values()); - assertEquals(0, scheduler.getExecutionGraph().getRegisteredExecutions().size()); - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); + for (Execution e : executions) { + e.fail(null); } + + assertThat(scheduler.getExecutionGraph().getRegisteredExecutions()).isEmpty(); } /** @@ -331,7 +306,7 @@ public class DefaultExecutionGraphDeploymentTest extends TestLogger { * accumulators and metrics for an execution that failed or was canceled. */ @Test - public void testAccumulatorsAndMetricsForwarding() throws Exception { + void testAccumulatorsAndMetricsForwarding() throws Exception { final JobVertexID jid1 = new JobVertexID(); final JobVertexID jid2 = new JobVertexID(); @@ -361,9 +336,9 @@ public class DefaultExecutionGraphDeploymentTest extends TestLogger { scheduler.updateTaskExecutionState(state); - assertEquals(ioMetrics, execution1.getIOMetrics()); - assertNotNull(execution1.getUserAccumulators()); - assertEquals(4, execution1.getUserAccumulators().get("acc").getLocalValue()); + assertThat(execution1.getIOMetrics()).isEqualTo(ioMetrics); + assertThat(execution1.getUserAccumulators()).isNotNull(); + assertThat(execution1.getUserAccumulators().get("acc").getLocalValue()).isEqualTo(4); // verify behavior for failed executions Execution execution2 = executions.values().iterator().next(); @@ -384,9 +359,9 @@ public class DefaultExecutionGraphDeploymentTest extends TestLogger { scheduler.updateTaskExecutionState(state2); - assertEquals(ioMetrics2, execution2.getIOMetrics()); - assertNotNull(execution2.getUserAccumulators()); - assertEquals(8, execution2.getUserAccumulators().get("acc").getLocalValue()); + assertThat(execution2.getIOMetrics()).isEqualTo(ioMetrics2); + assertThat(execution2.getUserAccumulators()).isNotNull(); + assertThat(execution2.getUserAccumulators().get("acc").getLocalValue()).isEqualTo(8); } /** @@ -395,7 +370,7 @@ public class DefaultExecutionGraphDeploymentTest extends TestLogger { * accumulators and metrics correctly. */ @Test - public void testAccumulatorsAndMetricsStorage() throws Exception { + void testAccumulatorsAndMetricsStorage() throws Exception { final JobVertexID jid1 = new JobVertexID(); final JobVertexID jid2 = new JobVertexID(); @@ -413,41 +388,35 @@ public class DefaultExecutionGraphDeploymentTest extends TestLogger { execution1.cancel(); execution1.completeCancelling(accumulators, ioMetrics, false); - assertEquals(ioMetrics, execution1.getIOMetrics()); - assertEquals(accumulators, execution1.getUserAccumulators()); + assertThat(execution1.getIOMetrics()).isEqualTo(ioMetrics); + assertThat(execution1.getUserAccumulators()).isEqualTo(accumulators); Execution execution2 = executions.values().iterator().next(); execution2.markFailed(new Throwable(), false, accumulators, ioMetrics, false, true); - assertEquals(ioMetrics, execution2.getIOMetrics()); - assertEquals(accumulators, execution2.getUserAccumulators()); + assertThat(execution2.getIOMetrics()).isEqualTo(ioMetrics); + assertThat(execution2.getUserAccumulators()).isEqualTo(accumulators); } @Test - public void testRegistrationOfExecutionsCanceled() { - try { - - final JobVertexID jid1 = new JobVertexID(); - final JobVertexID jid2 = new JobVertexID(); + void testRegistrationOfExecutionsCanceled() throws Exception { - JobVertex v1 = new JobVertex("v1", jid1); - JobVertex v2 = new JobVertex("v2", jid2); + final JobVertexID jid1 = new JobVertexID(); + final JobVertexID jid2 = new JobVertexID(); - SchedulerBase scheduler = setupScheduler(v1, 19, v2, 37); - Collection<Execution> executions = - new ArrayList<>( - scheduler.getExecutionGraph().getRegisteredExecutions().values()); + JobVertex v1 = new JobVertex("v1", jid1); + JobVertex v2 = new JobVertex("v2", jid2); - for (Execution e : executions) { - e.cancel(); - e.completeCancelling(); - } + SchedulerBase scheduler = setupScheduler(v1, 19, v2, 37); + Collection<Execution> executions = + new ArrayList<>(scheduler.getExecutionGraph().getRegisteredExecutions().values()); - assertEquals(0, scheduler.getExecutionGraph().getRegisteredExecutions().size()); - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); + for (Execution e : executions) { + e.cancel(); + e.completeCancelling(); } + + assertThat(scheduler.getExecutionGraph().getRegisteredExecutions()).isEmpty(); } /** @@ -456,7 +425,7 @@ public class DefaultExecutionGraphDeploymentTest extends TestLogger { * swallow the fail exception when scheduling a consumer task. */ @Test - public void testNoResourceAvailableFailure() throws Exception { + void testNoResourceAvailableFailure() throws Exception { JobVertex v1 = new JobVertex("source"); JobVertex v2 = new JobVertex("sink"); @@ -481,7 +450,7 @@ public class DefaultExecutionGraphDeploymentTest extends TestLogger { new DefaultSchedulerBuilder( graph, ComponentMainThreadExecutorServiceAdapter.forMainThread(), - EXECUTOR_RESOURCE.getExecutor()) + EXECUTOR_EXTENSION.getExecutor()) .setExecutionSlotAllocatorFactory( SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory( TestingPhysicalSlotProvider @@ -507,7 +476,7 @@ public class DefaultExecutionGraphDeploymentTest extends TestLogger { scheduler.updateTaskExecutionState( new TaskExecutionState(attemptID, ExecutionState.FINISHED, null)); - assertEquals(JobStatus.FAILED, eg.getState()); + assertThat(eg.getState()).isEqualTo(JobStatus.FAILED); } // ------------------------------------------------------------------------ @@ -515,16 +484,16 @@ public class DefaultExecutionGraphDeploymentTest extends TestLogger { // ------------------------------------------------------------------------ @Test - public void testSettingDefaultMaxNumberOfCheckpointsToRetain() throws Exception { + void testSettingDefaultMaxNumberOfCheckpointsToRetain() throws Exception { final Configuration jobManagerConfig = new Configuration(); final ExecutionGraph eg = createExecutionGraph(jobManagerConfig); - assertEquals( - CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue().intValue(), - eg.getCheckpointCoordinator() - .getCheckpointStore() - .getMaxNumberOfRetainedCheckpoints()); + assertThat( + eg.getCheckpointCoordinator() + .getCheckpointStore() + .getMaxNumberOfRetainedCheckpoints()) + .isEqualTo(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue().intValue()); } private SchedulerBase setupScheduler(JobVertex v1, int dop1, JobVertex v2, int dop2) @@ -542,7 +511,7 @@ public class DefaultExecutionGraphDeploymentTest extends TestLogger { new DefaultSchedulerBuilder( JobGraphTestUtils.streamingJobGraph(v1, v2), ComponentMainThreadExecutorServiceAdapter.forMainThread(), - EXECUTOR_RESOURCE.getExecutor()) + EXECUTOR_EXTENSION.getExecutor()) .setExecutionSlotAllocatorFactory( SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory()) .setFutureExecutor(executorService) @@ -556,13 +525,13 @@ public class DefaultExecutionGraphDeploymentTest extends TestLogger { scheduler.startScheduling(); Map<ExecutionAttemptID, Execution> executions = eg.getRegisteredExecutions(); - assertEquals(dop1 + dop2, executions.size()); + assertThat(executions.size()).isEqualTo(dop1 + dop2); return scheduler; } @Test - public void testSettingIllegalMaxNumberOfCheckpointsToRetain() throws Exception { + void testSettingIllegalMaxNumberOfCheckpointsToRetain() throws Exception { final int negativeMaxNumberOfCheckpointsToRetain = -10; @@ -573,22 +542,22 @@ public class DefaultExecutionGraphDeploymentTest extends TestLogger { final ExecutionGraph eg = createExecutionGraph(jobManagerConfig); - assertNotEquals( - negativeMaxNumberOfCheckpointsToRetain, - eg.getCheckpointCoordinator() - .getCheckpointStore() - .getMaxNumberOfRetainedCheckpoints()); - - assertEquals( - CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue().intValue(), - eg.getCheckpointCoordinator() - .getCheckpointStore() - .getMaxNumberOfRetainedCheckpoints()); + assertThat( + eg.getCheckpointCoordinator() + .getCheckpointStore() + .getMaxNumberOfRetainedCheckpoints()) + .isNotEqualTo(negativeMaxNumberOfCheckpointsToRetain); + + assertThat( + eg.getCheckpointCoordinator() + .getCheckpointStore() + .getMaxNumberOfRetainedCheckpoints()) + .isEqualTo(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue().intValue()); } /** Tests that the {@link ExecutionGraph} is deployed in topological order. */ @Test - public void testExecutionGraphIsDeployedInTopologicalOrder() throws Exception { + void testExecutionGraphIsDeployedInTopologicalOrder() throws Exception { final int sourceParallelism = 2; final int sinkParallelism = 1; @@ -628,7 +597,7 @@ public class DefaultExecutionGraphDeploymentTest extends TestLogger { new DefaultSchedulerBuilder( jobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread(), - EXECUTOR_RESOURCE.getExecutor()) + EXECUTOR_EXTENSION.getExecutor()) .setExecutionSlotAllocatorFactory( SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory( physicalSlotProvider)) @@ -670,7 +639,9 @@ public class DefaultExecutionGraphDeploymentTest extends TestLogger { } assertThat( - submittedTasks, new ExecutionStageMatcher(Arrays.asList(firstStage, secondStage))); + isDeployedInTopologicalOrder( + submittedTasks, Arrays.asList(firstStage, secondStage))) + .isTrue(); } private ExecutionGraph createExecutionGraph(Configuration configuration) throws Exception { @@ -693,41 +664,28 @@ public class DefaultExecutionGraphDeploymentTest extends TestLogger { .setJobGraph(jobGraph) .setJobMasterConfig(configuration) .setBlobWriter(blobWriter) - .build(EXECUTOR_RESOURCE.getExecutor()); + .build(EXECUTOR_EXTENSION.getExecutor()); } - private static final class ExecutionStageMatcher - extends TypeSafeMatcher<List<ExecutionAttemptID>> { - private final List<Collection<ExecutionAttemptID>> executionStages; - - private ExecutionStageMatcher(List<Collection<ExecutionAttemptID>> executionStages) { - this.executionStages = executionStages; - } - - @Override - protected boolean matchesSafely(List<ExecutionAttemptID> submissionOrder) { - final Iterator<ExecutionAttemptID> submissionIterator = submissionOrder.iterator(); + private boolean isDeployedInTopologicalOrder( + List<ExecutionAttemptID> submissionOrder, + List<Collection<ExecutionAttemptID>> executionStages) { + final Iterator<ExecutionAttemptID> submissionIterator = submissionOrder.iterator(); - for (Collection<ExecutionAttemptID> stage : executionStages) { - final Collection<ExecutionAttemptID> currentStage = new ArrayList<>(stage); + for (Collection<ExecutionAttemptID> stage : executionStages) { + final Collection<ExecutionAttemptID> currentStage = new ArrayList<>(stage); - while (!currentStage.isEmpty() && submissionIterator.hasNext()) { - if (!currentStage.remove(submissionIterator.next())) { - return false; - } - } - - if (!currentStage.isEmpty()) { + while (!currentStage.isEmpty() && submissionIterator.hasNext()) { + if (!currentStage.remove(submissionIterator.next())) { return false; } } - return !submissionIterator.hasNext(); + if (!currentStage.isEmpty()) { + return false; + } } - @Override - public void describeTo(Description description) { - description.appendValueList("<[", ", ", "]>", executionStages); - } + return !submissionIterator.hasNext(); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentWithBlobCacheTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentWithBlobCacheTest.java index 2d2509cf028..8ff9b0af5ca 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentWithBlobCacheTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentWithBlobCacheTest.java @@ -23,9 +23,10 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.blob.PermanentBlobCache; import org.apache.flink.runtime.blob.VoidBlobStore; +import org.apache.flink.testutils.junit.utils.TempDirUtils; -import org.junit.After; -import org.junit.Before; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import java.io.IOException; import java.net.InetSocketAddress; @@ -34,26 +35,31 @@ import java.net.InetSocketAddress; * Tests {@link ExecutionGraph} deployment when offloading job and task information into the BLOB * server. */ -public class DefaultExecutionGraphDeploymentWithBlobCacheTest +class DefaultExecutionGraphDeploymentWithBlobCacheTest extends DefaultExecutionGraphDeploymentWithBlobServerTest { - @Before + @BeforeEach @Override public void setupBlobServer() throws IOException { Configuration config = new Configuration(); // always offload the serialized job and task information config.setInteger(BlobServerOptions.OFFLOAD_MINSIZE, 0); - blobServer = new BlobServer(config, TEMPORARY_FOLDER.newFolder(), new VoidBlobStore()); + blobServer = + new BlobServer( + config, TempDirUtils.newFolder(temporaryFolder), new VoidBlobStore()); blobServer.start(); blobWriter = blobServer; InetSocketAddress serverAddress = new InetSocketAddress("localhost", blobServer.getPort()); blobCache = new PermanentBlobCache( - config, TEMPORARY_FOLDER.newFolder(), new VoidBlobStore(), serverAddress); + config, + TempDirUtils.newFolder(temporaryFolder), + new VoidBlobStore(), + serverAddress); } - @After + @AfterEach @Override public void shutdownBlobServer() throws IOException { if (blobServer != null) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentWithBlobServerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentWithBlobServerTest.java index a5ed097272b..41391484c04 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentWithBlobServerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentWithBlobServerTest.java @@ -22,71 +22,57 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.BlobServerOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.blob.BlobStore; import org.apache.flink.runtime.blob.PermanentBlobKey; import org.apache.flink.runtime.blob.VoidBlobStore; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.testutils.junit.utils.TempDirUtils; import org.apache.flink.types.Either; import org.apache.flink.util.SerializedValue; -import org.junit.After; -import org.junit.Before; -import org.junit.ClassRule; -import org.junit.rules.TemporaryFolder; -import org.mockito.Matchers; -import org.mockito.Mockito; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.io.TempDir; +import java.io.File; import java.io.IOException; +import java.nio.file.Path; import java.util.Collections; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.doAnswer; +import static org.assertj.core.api.Assertions.assertThat; /** * Tests {@link ExecutionGraph} deployment when offloading job and task information into the BLOB * server. */ -public class DefaultExecutionGraphDeploymentWithBlobServerTest +class DefaultExecutionGraphDeploymentWithBlobServerTest extends DefaultExecutionGraphDeploymentTest { - @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + @TempDir Path temporaryFolder; private Set<byte[]> seenHashes = Collections.newSetFromMap(new ConcurrentHashMap<byte[], Boolean>()); protected BlobServer blobServer = null; - @Before + @BeforeEach public void setupBlobServer() throws IOException { Configuration config = new Configuration(); // always offload the serialized job and task information config.setInteger(BlobServerOptions.OFFLOAD_MINSIZE, 0); blobServer = - Mockito.spy( - new BlobServer(config, TEMPORARY_FOLDER.newFolder(), new VoidBlobStore())); + new AssertBlobServer( + config, TempDirUtils.newFolder(temporaryFolder), new VoidBlobStore()); blobWriter = blobServer; blobCache = blobServer; seenHashes.clear(); - - // verify that we do not upload the same content more than once - doAnswer( - invocation -> { - PermanentBlobKey key = (PermanentBlobKey) invocation.callRealMethod(); - - assertTrue(seenHashes.add(key.getHash())); - - return key; - }) - .when(blobServer) - .putPermanent(any(JobID.class), Matchers.<byte[]>any()); - blobServer.start(); } - @After + @AfterEach public void shutdownBlobServer() throws IOException { if (blobServer != null) { blobServer.close(); @@ -98,7 +84,7 @@ public class DefaultExecutionGraphDeploymentWithBlobServerTest Either<SerializedValue<JobInformation>, PermanentBlobKey> jobInformationOrBlobKey = eg.getJobInformationOrBlobKey(); - assertTrue(jobInformationOrBlobKey.isRight()); + assertThat(jobInformationOrBlobKey.isRight()).isTrue(); // must not throw: blobServer.getFile(eg.getJobID(), jobInformationOrBlobKey.right()); @@ -109,9 +95,24 @@ public class DefaultExecutionGraphDeploymentWithBlobServerTest Either<SerializedValue<TaskInformation>, PermanentBlobKey> taskInformationOrBlobKey = eg.getJobVertex(jobVertexId).getTaskInformationOrBlobKey(); - assertTrue(taskInformationOrBlobKey.isRight()); + assertThat(taskInformationOrBlobKey.isRight()).isTrue(); // must not throw: blobServer.getFile(eg.getJobID(), taskInformationOrBlobKey.right()); } + + private class AssertBlobServer extends BlobServer { + public AssertBlobServer(Configuration config, File storageDir, BlobStore blobStore) + throws IOException { + super(config, storageDir, blobStore); + } + + @Override + public PermanentBlobKey putPermanent(JobID jobId, byte[] value) throws IOException { + PermanentBlobKey key = super.putPermanent(jobId, value); + // verify that we do not upload the same content more than once + assertThat(seenHashes.add(key.getHash())).isTrue(); + return key; + } + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentWithSmallBlobCacheSizeLimitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentWithSmallBlobCacheSizeLimitTest.java index 2ea30d6df44..4b6da6f72b8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentWithSmallBlobCacheSizeLimitTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentWithSmallBlobCacheSizeLimitTest.java @@ -43,10 +43,11 @@ import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder; import org.apache.flink.runtime.operators.BatchTask; import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup; import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; +import org.apache.flink.testutils.junit.utils.TempDirUtils; import org.apache.flink.util.function.FunctionUtils; -import org.junit.Before; -import org.junit.Test; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import java.io.IOException; import java.net.InetSocketAddress; @@ -55,8 +56,7 @@ import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; +import static org.assertj.core.api.Assertions.assertThat; /** * Tests {@link ExecutionGraph} deployment when job and task information are offloaded into the BLOB @@ -65,17 +65,19 @@ import static org.junit.Assert.assertNotNull; * even the size limit of {@link BlobCacheSizeTracker} in {@link PermanentBlobCache} is set to the * minimum value. */ -public class DefaultExecutionGraphDeploymentWithSmallBlobCacheSizeLimitTest +class DefaultExecutionGraphDeploymentWithSmallBlobCacheSizeLimitTest extends DefaultExecutionGraphDeploymentWithBlobCacheTest { - @Before + @BeforeEach @Override public void setupBlobServer() throws IOException { Configuration config = new Configuration(); // Always offload the serialized JobInformation, TaskInformation and cached // ShuffleDescriptors config.setInteger(BlobServerOptions.OFFLOAD_MINSIZE, 0); - blobServer = new BlobServer(config, TEMPORARY_FOLDER.newFolder(), new VoidBlobStore()); + blobServer = + new BlobServer( + config, TempDirUtils.newFolder(temporaryFolder), new VoidBlobStore()); blobServer.start(); blobWriter = blobServer; @@ -85,7 +87,7 @@ public class DefaultExecutionGraphDeploymentWithSmallBlobCacheSizeLimitTest blobCache = new PermanentBlobCache( config, - TEMPORARY_FOLDER.newFolder(), + TempDirUtils.newFolder(temporaryFolder), new VoidBlobStore(), serverAddress, blobCacheSizeTracker); @@ -103,7 +105,7 @@ public class DefaultExecutionGraphDeploymentWithSmallBlobCacheSizeLimitTest * larger than 1 and the deletion won't happen so frequently. */ @Test - public void testDeployMultipleTasksWithSmallBlobCacheSizeLimit() throws Exception { + void testDeployMultipleTasksWithSmallBlobCacheSizeLimit() throws Exception { final int numberOfVertices = 4; final int parallelism = 10; @@ -124,7 +126,7 @@ public class DefaultExecutionGraphDeploymentWithSmallBlobCacheSizeLimitTest for (ExecutionJobVertex ejv : eg.getVerticesTopologically()) { for (ExecutionVertex ev : ejv.getTaskVertices()) { - assertEquals(ExecutionState.CREATED, ev.getExecutionState()); + assertThat(ev.getExecutionState()).isEqualTo(ExecutionState.CREATED); LogicalSlot slot = new TestingLogicalSlotBuilder() @@ -134,13 +136,13 @@ public class DefaultExecutionGraphDeploymentWithSmallBlobCacheSizeLimitTest execution.transitionState(ExecutionState.SCHEDULED); execution.registerProducedPartitions(slot.getTaskManagerLocation()).get(); ev.deployToSlot(slot); - assertEquals(ExecutionState.DEPLOYING, ev.getExecutionState()); + assertThat(ev.getExecutionState()).isEqualTo(ExecutionState.DEPLOYING); TaskDeploymentDescriptor tdd = tdds.take(); - assertNotNull(tdd); + assertThat(tdd).isNotNull(); List<InputGateDeploymentDescriptor> igdds = tdd.getInputGates(); - assertEquals(ev.getAllConsumedPartitionGroups().size(), igdds.size()); + assertThat(igdds).hasSize(ev.getAllConsumedPartitionGroups().size()); if (igdds.size() > 0) { checkShuffleDescriptors(igdds.get(0), ev.getConsumedPartitionGroup(0)); @@ -188,9 +190,8 @@ public class DefaultExecutionGraphDeploymentWithSmallBlobCacheSizeLimitTest InputGateDeploymentDescriptor igdd, ConsumedPartitionGroup consumedPartitionGroup) { int idx = 0; for (IntermediateResultPartitionID consumedPartitionId : consumedPartitionGroup) { - assertEquals( - consumedPartitionId, - igdd.getShuffleDescriptors()[idx++].getResultPartitionID().getPartitionId()); + assertThat(igdd.getShuffleDescriptors()[idx++].getResultPartitionID().getPartitionId()) + .isEqualTo(consumedPartitionId); } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionTest.java index f4650e75df1..1403b60dff2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionTest.java @@ -30,40 +30,38 @@ import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.buffer.CompositeBuffer; import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; -import org.apache.flink.util.TestLogger; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameter; +import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; +import org.apache.flink.testutils.junit.utils.TempDirUtils; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.rules.Timeout; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.file.Path; import java.util.ArrayDeque; import java.util.Arrays; +import java.util.Collection; import java.util.Queue; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType; import static org.apache.flink.util.Preconditions.checkNotNull; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for {@link SortMergeResultPartition}. */ -@RunWith(Parameterized.class) -public class SortMergeResultPartitionTest extends TestLogger { +@ExtendWith(ParameterizedTestExtension.class) +public class SortMergeResultPartitionTest { private static final int bufferSize = 1024; @@ -73,7 +71,7 @@ public class SortMergeResultPartitionTest extends TestLogger { private static final int numThreads = 4; - private final boolean useHashDataBuffer; + @Parameter public boolean useHashDataBuffer; private final TestBufferAvailabilityListener listener = new TestBufferAvailabilityListener(); @@ -85,38 +83,33 @@ public class SortMergeResultPartitionTest extends TestLogger { private ExecutorService readIOExecutor; - @Rule public final TemporaryFolder tmpFolder = new TemporaryFolder(); + @TempDir private Path tmpFolder; - @Rule public Timeout timeout = new Timeout(60, TimeUnit.SECONDS); - - @Before - public void setUp() { + @BeforeEach + void setUp() throws IOException { fileChannelManager = - new FileChannelManagerImpl(new String[] {tmpFolder.getRoot().getPath()}, "testing"); + new FileChannelManagerImpl( + new String[] {TempDirUtils.newFolder(tmpFolder).toString()}, "testing"); globalPool = new NetworkBufferPool(totalBuffers, bufferSize); readBufferPool = new BatchShuffleReadBufferPool(totalBytes, bufferSize); readIOExecutor = Executors.newFixedThreadPool(numThreads); } - @After - public void shutdown() throws Exception { + @AfterEach + void shutdown() throws Exception { fileChannelManager.close(); globalPool.destroy(); readBufferPool.destroy(); readIOExecutor.shutdown(); } - @Parameterized.Parameters(name = "UseHashDataBuffer = {0}") - public static Object[] parameters() { - return new Object[] {true, false}; - } - - public SortMergeResultPartitionTest(boolean useHashDataBuffer) { - this.useHashDataBuffer = useHashDataBuffer; + @Parameters(name = "useHashDataBuffer={0}") + public static Collection<Boolean> parameters() { + return Arrays.asList(false, true); } - @Test - public void testWriteAndRead() throws Exception { + @TestTemplate + void testWriteAndRead() throws Exception { int numBuffers = useHashDataBuffer ? 100 : 15; int numSubpartitions = 10; int numRecords = 1000; @@ -232,8 +225,8 @@ public class SortMergeResultPartitionTest extends TestLogger { if (!buffer.isBuffer()) { ++numEndOfPartitionEvents; - assertFalse( - view.getAvailabilityAndBacklog(Integer.MAX_VALUE).isAvailable()); + assertThat(view.getAvailabilityAndBacklog(Integer.MAX_VALUE).isAvailable()) + .isFalse(); view.releaseAllResources(); } bufferAndBacklog = view.getNextBuffer(); @@ -252,16 +245,16 @@ public class SortMergeResultPartitionTest extends TestLogger { return views; } - @Test - public void testWriteLargeRecord() throws Exception { + @TestTemplate + void testWriteLargeRecord() throws Exception { int numBuffers = useHashDataBuffer ? 100 : 15; BufferPool bufferPool = globalPool.createBufferPool(numBuffers, numBuffers); SortMergeResultPartition partition = createSortMergedPartition(10, bufferPool); ByteBuffer recordWritten = generateRandomData(bufferSize * numBuffers, new Random()); partition.emitRecord(recordWritten, 0); - assertEquals( - useHashDataBuffer ? numBuffers : 0, bufferPool.bestEffortGetNumOfUsedBuffers()); + assertThat(bufferPool.bestEffortGetNumOfUsedBuffers()) + .isEqualTo(useHashDataBuffer ? numBuffers : 0); partition.finish(); partition.close(); @@ -286,11 +279,11 @@ public class SortMergeResultPartitionTest extends TestLogger { }); recordWritten.rewind(); recordRead.flip(); - assertEquals(recordWritten, recordRead); + assertThat(recordRead).isEqualTo(recordWritten); } - @Test - public void testDataBroadcast() throws Exception { + @TestTemplate + void testDataBroadcast() throws Exception { int numSubpartitions = 10; int numBuffers = useHashDataBuffer ? 100 : 15; int numRecords = 10000; @@ -308,11 +301,11 @@ public class SortMergeResultPartitionTest extends TestLogger { int eventSize = EventSerializer.toSerializedEvent(EndOfPartitionEvent.INSTANCE).remaining(); long dataSize = numSubpartitions * numRecords * bufferSize + numSubpartitions * eventSize; - assertNotNull(partition.getResultFile()); - assertEquals(2, checkNotNull(fileChannelManager.getPaths()[0].list()).length); + assertThat(partition.getResultFile()).isNotNull(); + assertThat(checkNotNull(fileChannelManager.getPaths()[0].list()).length).isEqualTo(2); for (File file : checkNotNull(fileChannelManager.getPaths()[0].listFiles())) { if (file.getName().endsWith(PartitionedFile.DATA_FILE_SUFFIX)) { - assertTrue(file.length() < numSubpartitions * numRecords * bufferSize); + assertThat(file.length()).isLessThan(numSubpartitions * numRecords * bufferSize); } } @@ -323,49 +316,47 @@ public class SortMergeResultPartitionTest extends TestLogger { bufferWithChannel -> { bufferWithChannel.getBuffer().recycleBuffer(); }); - assertEquals(dataSize, dataRead); + assertThat(dataRead).isEqualTo(dataSize); } - @Test(expected = IllegalStateException.class) - public void testReleaseWhileWriting() throws Exception { + @TestTemplate + void testReleaseWhileWriting() throws Exception { int numBuffers = useHashDataBuffer ? 100 : 15; BufferPool bufferPool = globalPool.createBufferPool(numBuffers, numBuffers); SortMergeResultPartition partition = createSortMergedPartition(10, bufferPool); - assertEquals(0, bufferPool.bestEffortGetNumOfUsedBuffers()); + assertThat(bufferPool.bestEffortGetNumOfUsedBuffers()).isEqualTo(0); partition.emitRecord(ByteBuffer.allocate(bufferSize * (numBuffers - 1)), 0); partition.emitRecord(ByteBuffer.allocate(bufferSize * (numBuffers - 1)), 1); partition.emitRecord(ByteBuffer.allocate(bufferSize), 2); - assertNull(partition.getResultFile()); - assertEquals(2, fileChannelManager.getPaths()[0].list().length); + assertThat(partition.getResultFile()).isNull(); + assertThat(fileChannelManager.getPaths()[0].list().length).isEqualTo(2); partition.release(); - try { - partition.emitRecord(ByteBuffer.allocate(bufferSize * numBuffers), 2); - } catch (IllegalStateException exception) { - assertEquals(0, fileChannelManager.getPaths()[0].list().length); - throw exception; - } + assertThatThrownBy( + () -> partition.emitRecord(ByteBuffer.allocate(bufferSize * numBuffers), 2)) + .isInstanceOf(IllegalStateException.class); + assertThat(fileChannelManager.getPaths()[0].list().length).isEqualTo(0); } - @Test - public void testRelease() throws Exception { + @TestTemplate + void testRelease() throws Exception { int numBuffers = useHashDataBuffer ? 100 : 15; BufferPool bufferPool = globalPool.createBufferPool(numBuffers, numBuffers); SortMergeResultPartition partition = createSortMergedPartition(10, bufferPool); - assertEquals(0, bufferPool.bestEffortGetNumOfUsedBuffers()); + assertThat(bufferPool.bestEffortGetNumOfUsedBuffers()).isEqualTo(0); partition.emitRecord(ByteBuffer.allocate(bufferSize * (numBuffers - 1)), 0); partition.emitRecord(ByteBuffer.allocate(bufferSize * (numBuffers - 1)), 1); partition.finish(); partition.close(); - assertEquals(3, partition.getResultFile().getNumRegions()); - assertEquals(2, checkNotNull(fileChannelManager.getPaths()[0].list()).length); + assertThat(partition.getResultFile().getNumRegions()).isEqualTo(3); + assertThat(checkNotNull(fileChannelManager.getPaths()[0].list()).length).isEqualTo(2); ResultSubpartitionView view = partition.createSubpartitionView(0, listener); partition.release(); @@ -381,57 +372,54 @@ public class SortMergeResultPartitionTest extends TestLogger { while (partition.getResultFile() != null) { Thread.sleep(100); } - assertEquals(0, checkNotNull(fileChannelManager.getPaths()[0].list()).length); + assertThat(checkNotNull(fileChannelManager.getPaths()[0].list()).length).isEqualTo(0); } - @Test - public void testCloseReleasesAllBuffers() throws Exception { + @TestTemplate + void testCloseReleasesAllBuffers() throws Exception { int numBuffers = useHashDataBuffer ? 100 : 15; BufferPool bufferPool = globalPool.createBufferPool(numBuffers, numBuffers); SortMergeResultPartition partition = createSortMergedPartition(10, bufferPool); - assertEquals(0, bufferPool.bestEffortGetNumOfUsedBuffers()); + assertThat(bufferPool.bestEffortGetNumOfUsedBuffers()).isEqualTo(0); partition.emitRecord(ByteBuffer.allocate(bufferSize * (numBuffers - 1)), 5); - assertEquals( - useHashDataBuffer ? numBuffers : 0, bufferPool.bestEffortGetNumOfUsedBuffers()); + assertThat(bufferPool.bestEffortGetNumOfUsedBuffers()) + .isEqualTo(useHashDataBuffer ? numBuffers : 0); partition.close(); - assertTrue(bufferPool.isDestroyed()); - assertEquals(totalBuffers, globalPool.getNumberOfAvailableMemorySegments()); + assertThat(bufferPool.isDestroyed()).isTrue(); + assertThat(globalPool.getNumberOfAvailableMemorySegments()).isEqualTo(totalBuffers); } - @Test(expected = IllegalStateException.class) - public void testReadUnfinishedPartition() throws Exception { + @TestTemplate + void testReadUnfinishedPartition() throws Exception { BufferPool bufferPool = globalPool.createBufferPool(10, 10); - try { - SortMergeResultPartition partition = createSortMergedPartition(10, bufferPool); - partition.createSubpartitionView(0, listener); - } finally { - bufferPool.lazyDestroy(); - } + SortMergeResultPartition partition = createSortMergedPartition(10, bufferPool); + assertThatThrownBy(() -> partition.createSubpartitionView(0, listener)) + .isInstanceOf(IllegalStateException.class); + bufferPool.lazyDestroy(); } - @Test(expected = IllegalStateException.class) - public void testReadReleasedPartition() throws Exception { + @TestTemplate + void testReadReleasedPartition() throws Exception { BufferPool bufferPool = globalPool.createBufferPool(10, 10); - try { - SortMergeResultPartition partition = createSortMergedPartition(10, bufferPool); - partition.finish(); - partition.release(); - partition.createSubpartitionView(0, listener); - } finally { - bufferPool.lazyDestroy(); - } + SortMergeResultPartition partition = createSortMergedPartition(10, bufferPool); + partition.finish(); + partition.release(); + + assertThatThrownBy(() -> partition.createSubpartitionView(0, listener)) + .isInstanceOf(IllegalStateException.class); + bufferPool.lazyDestroy(); } - @Test - public void testNumBytesProducedCounterForUnicast() throws IOException { + @TestTemplate + void testNumBytesProducedCounterForUnicast() throws IOException { testNumBytesProducedCounter(false); } - @Test - public void testNumBytesProducedCounterForBroadcast() throws IOException { + @TestTemplate + void testNumBytesProducedCounterForBroadcast() throws IOException { testNumBytesProducedCounter(true); } @@ -447,14 +435,16 @@ public class SortMergeResultPartitionTest extends TestLogger { partition.broadcastRecord(ByteBuffer.allocate(bufferSize)); partition.finish(); - assertEquals(bufferSize + 4, partition.numBytesProduced.getCount()); - assertEquals(numSubpartitions * (bufferSize + 4), partition.numBytesOut.getCount()); + assertThat(partition.numBytesProduced.getCount()).isEqualTo(bufferSize + 4); + assertThat(partition.numBytesOut.getCount()) + .isEqualTo(numSubpartitions * (bufferSize + 4)); } else { partition.emitRecord(ByteBuffer.allocate(bufferSize), 0); partition.finish(); - assertEquals(bufferSize + 4, partition.numBytesProduced.getCount()); - assertEquals(bufferSize + numSubpartitions * 4, partition.numBytesOut.getCount()); + assertThat(partition.numBytesProduced.getCount()).isEqualTo(bufferSize + 4); + assertThat(partition.numBytesOut.getCount()) + .isEqualTo(bufferSize + numSubpartitions * 4); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java index 71a978636bd..0faab8c1815 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java @@ -23,27 +23,24 @@ import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.runtime.executiongraph.IOMetrics; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; -import org.junit.Test; +import org.junit.jupiter.api.Test; import java.util.Map; -import static org.hamcrest.Matchers.greaterThanOrEqualTo; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertThat; +import static org.assertj.core.api.Assertions.assertThat; /** Tests for the {@link TaskIOMetricGroup}. */ -public class TaskIOMetricGroupTest { +class TaskIOMetricGroupTest { @Test - public void testTaskIOMetricGroup() throws InterruptedException { + void testTaskIOMetricGroup() throws InterruptedException { TaskMetricGroup task = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(); TaskIOMetricGroup taskIO = task.getIOMetricGroup(); taskIO.setEnableBusyTime(true); final long startTime = System.currentTimeMillis(); // test counter forwarding - assertNotNull(taskIO.getNumRecordsInCounter()); - assertNotNull(taskIO.getNumRecordsOutCounter()); + assertThat(taskIO.getNumRecordsInCounter()).isNotNull(); + assertThat(taskIO.getNumRecordsOutCounter()).isNotNull(); Counter c1 = new SimpleCounter(); c1.inc(32L); @@ -52,8 +49,9 @@ public class TaskIOMetricGroupTest { taskIO.reuseRecordsInputCounter(c1); taskIO.reuseRecordsOutputCounter(c2); - assertEquals(32L, taskIO.getNumRecordsInCounter().getCount()); - assertEquals(64L, taskIO.getNumRecordsOutCounter().getCount()); + + assertThat(taskIO.getNumRecordsInCounter().getCount()).isEqualTo(32L); + assertThat(taskIO.getNumRecordsOutCounter().getCount()).isEqualTo(64L); // test IOMetrics instantiation taskIO.getNumBytesInCounter().inc(100L); @@ -72,35 +70,33 @@ public class TaskIOMetricGroupTest { taskIO.getHardBackPressuredTimePerSecond().markEnd(); IOMetrics io = taskIO.createSnapshot(); - assertEquals(32L, io.getNumRecordsIn()); - assertEquals(64L, io.getNumRecordsOut()); - assertEquals(100L, io.getNumBytesIn()); - assertEquals(250L, io.getNumBytesOut()); - assertEquals(3L, taskIO.getNumBuffersOutCounter().getCount()); - assertEquals( - taskIO.getIdleTimeMsPerSecond().getAccumulatedCount(), io.getAccumulateIdleTime()); - assertEquals( - taskIO.getHardBackPressuredTimePerSecond().getAccumulatedCount() - + taskIO.getSoftBackPressuredTimePerSecond().getAccumulatedCount(), - io.getAccumulateBackPressuredTime()); + assertThat(io.getNumRecordsIn()).isEqualTo(32L); + assertThat(io.getNumRecordsOut()).isEqualTo(64L); + assertThat(io.getNumBytesIn()).isEqualTo(100L); + assertThat(io.getNumBytesOut()).isEqualTo(250L); + assertThat(taskIO.getNumBuffersOutCounter().getCount()).isEqualTo(3L); + assertThat(taskIO.getIdleTimeMsPerSecond().getAccumulatedCount()) + .isEqualTo(io.getAccumulateIdleTime()); assertThat( - io.getAccumulateBusyTime(), - greaterThanOrEqualTo( + taskIO.getHardBackPressuredTimePerSecond().getAccumulatedCount() + + taskIO.getSoftBackPressuredTimePerSecond().getAccumulatedCount()) + .isEqualTo(io.getAccumulateBackPressuredTime()); + assertThat(io.getAccumulateBusyTime()) + .isGreaterThanOrEqualTo( (double) System.currentTimeMillis() - startTime - io.getAccumulateIdleTime() - - io.getAccumulateBackPressuredTime())); - assertThat(taskIO.getIdleTimeMsPerSecond().getCount(), greaterThanOrEqualTo(softSleepTime)); - assertThat( - taskIO.getSoftBackPressuredTimePerSecond().getCount(), - greaterThanOrEqualTo(softSleepTime)); - assertThat( - taskIO.getHardBackPressuredTimePerSecond().getCount(), - greaterThanOrEqualTo(hardSleepTime)); + - io.getAccumulateBackPressuredTime()); + assertThat(taskIO.getIdleTimeMsPerSecond().getCount()) + .isGreaterThanOrEqualTo(softSleepTime); + assertThat(taskIO.getSoftBackPressuredTimePerSecond().getCount()) + .isGreaterThanOrEqualTo(softSleepTime); + assertThat(taskIO.getHardBackPressuredTimePerSecond().getCount()) + .isGreaterThanOrEqualTo(hardSleepTime); } @Test - public void testNumBytesProducedOfPartitionsMetrics() { + void testNumBytesProducedOfPartitionsMetrics() { TaskMetricGroup task = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(); TaskIOMetricGroup taskIO = task.getIOMetricGroup(); @@ -118,8 +114,8 @@ public class TaskIOMetricGroupTest { Map<IntermediateResultPartitionID, Long> numBytesProducedOfPartitions = taskIO.createSnapshot().getNumBytesProducedOfPartitions(); - assertEquals(2, numBytesProducedOfPartitions.size()); - assertEquals(32L, numBytesProducedOfPartitions.get(resultPartitionID1).longValue()); - assertEquals(64L, numBytesProducedOfPartitions.get(resultPartitionID2).longValue()); + assertThat(numBytesProducedOfPartitions.size()).isEqualTo(2); + assertThat(numBytesProducedOfPartitions.get(resultPartitionID1).longValue()).isEqualTo(32L); + assertThat(numBytesProducedOfPartitions.get(resultPartitionID2).longValue()).isEqualTo(64L); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismDeciderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismDeciderTest.java index 2591a9fead0..387e2bdc051 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismDeciderTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismDeciderTest.java @@ -22,17 +22,16 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.MemorySize; -import org.junit.Before; -import org.junit.Test; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import java.util.Arrays; import java.util.Collections; -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.MatcherAssert.assertThat; +import static org.assertj.core.api.Assertions.assertThat; /** Test for {@link DefaultVertexParallelismDecider}. */ -public class DefaultVertexParallelismDeciderTest { +class DefaultVertexParallelismDeciderTest { private static final long BYTE_256_MB = 256 * 1024 * 1024L; private static final long BYTE_512_MB = 512 * 1024 * 1024L; @@ -47,8 +46,8 @@ public class DefaultVertexParallelismDeciderTest { private DefaultVertexParallelismDecider decider; - @Before - public void before() throws Exception { + @BeforeEach + void before() throws Exception { Configuration configuration = new Configuration(); configuration.setInteger( @@ -66,19 +65,19 @@ public class DefaultVertexParallelismDeciderTest { } @Test - public void testNormalizedMaxAndMinParallelism() { - assertThat(decider.getMaxParallelism(), is(64)); - assertThat(decider.getMinParallelism(), is(4)); + void testNormalizedMaxAndMinParallelism() { + assertThat(decider.getMaxParallelism()).isEqualTo(64); + assertThat(decider.getMinParallelism()).isEqualTo(4); } @Test - public void testSourceJobVertex() { + void testSourceJobVertex() { int parallelism = decider.decideParallelismForVertex(Collections.emptyList()); - assertThat(parallelism, is(DEFAULT_SOURCE_PARALLELISM)); + assertThat(parallelism).isEqualTo(DEFAULT_SOURCE_PARALLELISM); } @Test - public void testNormalizeParallelismDownToPowerOf2() { + void testNormalizeParallelismDownToPowerOf2() { BlockingResultInfo resultInfo1 = BlockingResultInfo.createFromBroadcastResult(Arrays.asList(BYTE_256_MB)); BlockingResultInfo resultInfo2 = @@ -88,11 +87,11 @@ public class DefaultVertexParallelismDeciderTest { int parallelism = decider.decideParallelismForVertex(Arrays.asList(resultInfo1, resultInfo2)); - assertThat(parallelism, is(8)); + assertThat(parallelism).isEqualTo(8); } @Test - public void testNormalizeParallelismUpToPowerOf2() { + void testNormalizeParallelismUpToPowerOf2() { BlockingResultInfo resultInfo1 = BlockingResultInfo.createFromBroadcastResult(Arrays.asList(BYTE_256_MB)); BlockingResultInfo resultInfo2 = @@ -102,11 +101,11 @@ public class DefaultVertexParallelismDeciderTest { int parallelism = decider.decideParallelismForVertex(Arrays.asList(resultInfo1, resultInfo2)); - assertThat(parallelism, is(16)); + assertThat(parallelism).isEqualTo(16); } @Test - public void testInitiallyNormalizedParallelismIsLargerThanMaxParallelism() { + void testInitiallyNormalizedParallelismIsLargerThanMaxParallelism() { BlockingResultInfo resultInfo1 = BlockingResultInfo.createFromBroadcastResult(Arrays.asList(BYTE_256_MB)); BlockingResultInfo resultInfo2 = @@ -116,11 +115,11 @@ public class DefaultVertexParallelismDeciderTest { int parallelism = decider.decideParallelismForVertex(Arrays.asList(resultInfo1, resultInfo2)); - assertThat(parallelism, is(64)); + assertThat(parallelism).isEqualTo(64); } @Test - public void testInitiallyNormalizedParallelismIsSmallerThanMinParallelism() { + void testInitiallyNormalizedParallelismIsSmallerThanMinParallelism() { BlockingResultInfo resultInfo1 = BlockingResultInfo.createFromBroadcastResult(Arrays.asList(BYTE_256_MB)); BlockingResultInfo resultInfo2 = @@ -129,11 +128,11 @@ public class DefaultVertexParallelismDeciderTest { int parallelism = decider.decideParallelismForVertex(Arrays.asList(resultInfo1, resultInfo2)); - assertThat(parallelism, is(4)); + assertThat(parallelism).isEqualTo(4); } @Test - public void testBroadcastRatioExceedsCapRatio() { + void testBroadcastRatioExceedsCapRatio() { BlockingResultInfo resultInfo1 = BlockingResultInfo.createFromBroadcastResult(Arrays.asList(BYTE_1_GB)); BlockingResultInfo resultInfo2 = @@ -142,11 +141,11 @@ public class DefaultVertexParallelismDeciderTest { int parallelism = decider.decideParallelismForVertex(Arrays.asList(resultInfo1, resultInfo2)); - assertThat(parallelism, is(16)); + assertThat(parallelism).isEqualTo(16); } @Test - public void testNonBroadcastBytesCanNotDividedEvenly() { + void testNonBroadcastBytesCanNotDividedEvenly() { BlockingResultInfo resultInfo1 = BlockingResultInfo.createFromBroadcastResult(Arrays.asList(BYTE_512_MB)); BlockingResultInfo resultInfo2 = @@ -156,6 +155,6 @@ public class DefaultVertexParallelismDeciderTest { int parallelism = decider.decideParallelismForVertex(Arrays.asList(resultInfo1, resultInfo2)); - assertThat(parallelism, is(16)); + assertThat(parallelism).isEqualTo(16); } }
