This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit af7c775ab57b571cc827f83005f9ebf5c3034e89 Author: Weijie Guo <[email protected]> AuthorDate: Fri Aug 5 14:34:23 2022 +0800 [hotfix] Migrate PipelinedRegionSchedulingStrategyTest and StrategyTestUtil to Junit5 and AssertJ. --- .../PipelinedRegionSchedulingStrategyTest.java | 92 +++++++++++----------- .../scheduler/strategy/StrategyTestUtil.java | 13 ++- 2 files changed, 49 insertions(+), 56 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategyTest.java index f827c481ff9..34395e02999 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategyTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategyTest.java @@ -29,12 +29,11 @@ import org.apache.flink.runtime.jobgraph.JobGraphBuilder; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.testutils.TestingUtils; -import org.apache.flink.testutils.executor.TestExecutorResource; -import org.apache.flink.util.TestLogger; +import org.apache.flink.testutils.executor.TestExecutorExtension; -import org.junit.Before; -import org.junit.ClassRule; -import org.junit.Test; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; import java.util.ArrayList; import java.util.Arrays; @@ -46,16 +45,13 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import static org.apache.flink.runtime.scheduler.strategy.StrategyTestUtil.assertLatestScheduledVerticesAreEqualTo; -import static org.hamcrest.Matchers.hasSize; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; +import static org.assertj.core.api.Assertions.assertThat; /** Unit tests for {@link PipelinedRegionSchedulingStrategy}. */ -public class PipelinedRegionSchedulingStrategyTest extends TestLogger { - @ClassRule - public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE = - TestingUtils.defaultExecutorResource(); +class PipelinedRegionSchedulingStrategyTest { + @RegisterExtension + public static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = + TestingUtils.defaultExecutorExtension(); private TestingSchedulerOperations testingSchedulerOperation; @@ -71,8 +67,8 @@ public class PipelinedRegionSchedulingStrategyTest extends TestLogger { private List<TestingSchedulingExecutionVertex> sink; - @Before - public void setUp() { + @BeforeEach + void setUp() { testingSchedulerOperation = new TestingSchedulerOperations(); buildTopology(); @@ -121,7 +117,7 @@ public class PipelinedRegionSchedulingStrategyTest extends TestLogger { } @Test - public void testStartScheduling() { + void testStartScheduling() { startScheduling(testingSchedulingTopology); final List<List<TestingSchedulingExecutionVertex>> expectedScheduledVertices = @@ -135,7 +131,7 @@ public class PipelinedRegionSchedulingStrategyTest extends TestLogger { } @Test - public void testRestartTasks() { + void testRestartTasks() { final PipelinedRegionSchedulingStrategy schedulingStrategy = startScheduling(testingSchedulingTopology); @@ -158,7 +154,7 @@ public class PipelinedRegionSchedulingStrategyTest extends TestLogger { } @Test - public void testNotifyingBlockingResultPartitionProducerFinished() { + void testNotifyingBlockingResultPartitionProducerFinished() { final PipelinedRegionSchedulingStrategy schedulingStrategy = startScheduling(testingSchedulingTopology); @@ -167,13 +163,13 @@ public class PipelinedRegionSchedulingStrategyTest extends TestLogger { schedulingStrategy.onExecutionStateChange(upstream1.getId(), ExecutionState.FINISHED); // sinks' inputs are not all consumable yet so they are not scheduled - assertThat(testingSchedulerOperation.getScheduledVertices(), hasSize(4)); + assertThat(testingSchedulerOperation.getScheduledVertices()).hasSize(4); final TestingSchedulingExecutionVertex upstream2 = map2.get(1); upstream2.getProducedResults().iterator().next().markFinished(); schedulingStrategy.onExecutionStateChange(upstream2.getId(), ExecutionState.FINISHED); - assertThat(testingSchedulerOperation.getScheduledVertices(), hasSize(6)); + assertThat(testingSchedulerOperation.getScheduledVertices()).hasSize(6); final List<List<TestingSchedulingExecutionVertex>> expectedScheduledVertices = new ArrayList<>(); @@ -184,7 +180,7 @@ public class PipelinedRegionSchedulingStrategyTest extends TestLogger { } @Test - public void testSchedulingTopologyWithPersistentBlockingEdges() { + void testSchedulingTopologyWithPersistentBlockingEdges() { final TestingSchedulingTopology topology = new TestingSchedulingTopology(); final List<TestingSchedulingExecutionVertex> v1 = @@ -207,7 +203,7 @@ public class PipelinedRegionSchedulingStrategyTest extends TestLogger { } @Test - public void testComputingCrossRegionConsumedPartitionGroupsCorrectly() throws Exception { + void testComputingCrossRegionConsumedPartitionGroupsCorrectly() throws Exception { final JobVertex v1 = createJobVertex("v1", 4); final JobVertex v2 = createJobVertex("v2", 3); final JobVertex v3 = createJobVertex("v3", 2); @@ -236,7 +232,7 @@ public class PipelinedRegionSchedulingStrategyTest extends TestLogger { final Set<ConsumedPartitionGroup> crossRegionConsumedPartitionGroups = schedulingStrategy.getCrossRegionConsumedPartitionGroups(); - assertEquals(1, crossRegionConsumedPartitionGroups.size()); + assertThat(crossRegionConsumedPartitionGroups).hasSize(1); final ConsumedPartitionGroup expected = executionGraph @@ -245,11 +241,11 @@ public class PipelinedRegionSchedulingStrategyTest extends TestLogger { .getAllConsumedPartitionGroups() .get(0); - assertTrue(crossRegionConsumedPartitionGroups.contains(expected)); + assertThat(crossRegionConsumedPartitionGroups).contains(expected); } @Test - public void testNoCrossRegionConsumedPartitionGroupsWithAllToAllBlockingEdge() { + void testNoCrossRegionConsumedPartitionGroupsWithAllToAllBlockingEdge() { final TestingSchedulingTopology topology = new TestingSchedulingTopology(); final List<TestingSchedulingExecutionVertex> producer = @@ -267,11 +263,11 @@ public class PipelinedRegionSchedulingStrategyTest extends TestLogger { final Set<ConsumedPartitionGroup> crossRegionConsumedPartitionGroups = schedulingStrategy.getCrossRegionConsumedPartitionGroups(); - assertEquals(0, crossRegionConsumedPartitionGroups.size()); + assertThat(crossRegionConsumedPartitionGroups).isEmpty(); } @Test - public void testSchedulingTopologyWithCrossRegionConsumedPartitionGroups() throws Exception { + void testSchedulingTopologyWithCrossRegionConsumedPartitionGroups() throws Exception { final JobVertex v1 = createJobVertex("v1", 4); final JobVertex v2 = createJobVertex("v2", 3); final JobVertex v3 = createJobVertex("v3", 2); @@ -296,7 +292,7 @@ public class PipelinedRegionSchedulingStrategyTest extends TestLogger { // Test whether the topology is built correctly final List<SchedulingPipelinedRegion> regions = new ArrayList<>(); schedulingTopology.getAllPipelinedRegions().forEach(regions::add); - assertEquals(2, regions.size()); + assertThat(regions).hasSize(2); final ExecutionVertex v31 = executionGraph.getJobVertex(v3.getID()).getTaskVertices()[0]; @@ -305,7 +301,7 @@ public class PipelinedRegionSchedulingStrategyTest extends TestLogger { .getPipelinedRegionOfVertex(v31.getID()) .getVertices() .forEach(vertex -> region1.add(vertex.getId())); - assertEquals(5, region1.size()); + assertThat(region1).hasSize(5); final ExecutionVertex v32 = executionGraph.getJobVertex(v3.getID()).getTaskVertices()[1]; @@ -314,18 +310,18 @@ public class PipelinedRegionSchedulingStrategyTest extends TestLogger { .getPipelinedRegionOfVertex(v32.getID()) .getVertices() .forEach(vertex -> region2.add(vertex.getId())); - assertEquals(4, region2.size()); + assertThat(region2).hasSize(4); // Test whether region 1 is scheduled correctly PipelinedRegionSchedulingStrategy schedulingStrategy = startScheduling(schedulingTopology); - assertEquals(1, testingSchedulerOperation.getScheduledVertices().size()); + assertThat(testingSchedulerOperation.getScheduledVertices()).hasSize(1); final List<ExecutionVertexID> scheduledVertices1 = testingSchedulerOperation.getScheduledVertices().get(0); - assertEquals(5, scheduledVertices1.size()); + assertThat(scheduledVertices1).hasSize(5); for (ExecutionVertexID vertexId : scheduledVertices1) { - assertTrue(region1.contains(vertexId)); + assertThat(region1).contains(vertexId); } // Test whether the region 2 is scheduled correctly when region 1 is finished @@ -333,18 +329,18 @@ public class PipelinedRegionSchedulingStrategyTest extends TestLogger { v22.finishAllBlockingPartitions(); schedulingStrategy.onExecutionStateChange(v22.getID(), ExecutionState.FINISHED); - assertEquals(2, testingSchedulerOperation.getScheduledVertices().size()); + assertThat(testingSchedulerOperation.getScheduledVertices()).hasSize(2); final List<ExecutionVertexID> scheduledVertices2 = testingSchedulerOperation.getScheduledVertices().get(1); - assertEquals(4, scheduledVertices2.size()); + assertThat(scheduledVertices2).hasSize(4); for (ExecutionVertexID vertexId : scheduledVertices2) { - assertTrue(region2.contains(vertexId)); + assertThat(region2).contains(vertexId); } } @Test - public void testScheduleBlockingDownstreamTaskIndividually() throws Exception { + void testScheduleBlockingDownstreamTaskIndividually() throws Exception { final JobVertex v1 = createJobVertex("v1", 2); final JobVertex v2 = createJobVertex("v2", 2); @@ -364,17 +360,17 @@ public class PipelinedRegionSchedulingStrategyTest extends TestLogger { final PipelinedRegionSchedulingStrategy schedulingStrategy = startScheduling(schedulingTopology); - assertEquals(2, testingSchedulerOperation.getScheduledVertices().size()); + assertThat(testingSchedulerOperation.getScheduledVertices()).hasSize(2); final ExecutionVertex v11 = executionGraph.getJobVertex(v1.getID()).getTaskVertices()[0]; v11.finishAllBlockingPartitions(); schedulingStrategy.onExecutionStateChange(v11.getID(), ExecutionState.FINISHED); - assertEquals(3, testingSchedulerOperation.getScheduledVertices().size()); + assertThat(testingSchedulerOperation.getScheduledVertices()).hasSize(3); } @Test - public void testFinishHybridPartitionWillNotRescheduleDownstream() throws Exception { + void testFinishHybridPartitionWillNotRescheduleDownstream() throws Exception { final JobVertex v1 = createJobVertex("v1", 1); final JobVertex v2 = createJobVertex("v2", 1); @@ -393,17 +389,17 @@ public class PipelinedRegionSchedulingStrategyTest extends TestLogger { PipelinedRegionSchedulingStrategy schedulingStrategy = startScheduling(schedulingTopology); // all regions will be scheduled - assertEquals(2, testingSchedulerOperation.getScheduledVertices().size()); + assertThat(testingSchedulerOperation.getScheduledVertices()).hasSize(2); final ExecutionVertex v11 = executionGraph.getJobVertex(v1.getID()).getTaskVertices()[0]; schedulingStrategy.onExecutionStateChange(v11.getID(), ExecutionState.FINISHED); - assertEquals(2, testingSchedulerOperation.getScheduledVertices().size()); + assertThat(testingSchedulerOperation.getScheduledVertices()).hasSize(2); } /** Inner non-pipelined edge will not affect it's region be scheduled. */ @Test - public void testSchedulingRegionWithInnerNonPipelinedEdge() throws Exception { + void testSchedulingRegionWithInnerNonPipelinedEdge() throws Exception { final JobVertex v1 = createJobVertex("v1", 1); final JobVertex v2 = createJobVertex("v2", 1); final JobVertex v3 = createJobVertex("v3", 1); @@ -431,10 +427,10 @@ public class PipelinedRegionSchedulingStrategyTest extends TestLogger { startScheduling(schedulingTopology); - assertEquals(1, testingSchedulerOperation.getScheduledVertices().size()); + assertThat(testingSchedulerOperation.getScheduledVertices()).hasSize(1); List<ExecutionVertexID> executionVertexIds = testingSchedulerOperation.getScheduledVertices().get(0); - assertEquals(4, executionVertexIds.size()); + assertThat(executionVertexIds).hasSize(4); } /** @@ -442,7 +438,7 @@ public class PipelinedRegionSchedulingStrategyTest extends TestLogger { * after it's all blocking edge finished, non-blocking edge don't block scheduling. */ @Test - public void testDownstreamRegionWillBeBlockedByBlockingEdge() throws Exception { + void testDownstreamRegionWillBeBlockedByBlockingEdge() throws Exception { final JobVertex v1 = createJobVertex("v1", 1); final JobVertex v2 = createJobVertex("v2", 1); final JobVertex v3 = createJobVertex("v3", 1); @@ -464,12 +460,12 @@ public class PipelinedRegionSchedulingStrategyTest extends TestLogger { final PipelinedRegionSchedulingStrategy schedulingStrategy = startScheduling(schedulingTopology); - assertEquals(2, testingSchedulerOperation.getScheduledVertices().size()); + assertThat(testingSchedulerOperation.getScheduledVertices()).hasSize(2); final ExecutionVertex v11 = executionGraph.getJobVertex(v1.getID()).getTaskVertices()[0]; v11.finishAllBlockingPartitions(); schedulingStrategy.onExecutionStateChange(v11.getID(), ExecutionState.FINISHED); - assertEquals(3, testingSchedulerOperation.getScheduledVertices().size()); + assertThat(testingSchedulerOperation.getScheduledVertices()).hasSize(3); } private static JobVertex createJobVertex(String vertexName, int parallelism) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/StrategyTestUtil.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/StrategyTestUtil.java index 38ab94f30e3..664780acdc9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/StrategyTestUtil.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/StrategyTestUtil.java @@ -21,12 +21,10 @@ package org.apache.flink.runtime.scheduler.strategy; import java.util.List; import java.util.stream.Collectors; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.lessThanOrEqualTo; -import static org.junit.Assert.assertEquals; +import static org.assertj.core.api.Assertions.assertThat; /** Strategy test utilities. */ -public class StrategyTestUtil { +class StrategyTestUtil { static void assertLatestScheduledVerticesAreEqualTo( final List<List<TestingSchedulingExecutionVertex>> expected, @@ -34,11 +32,10 @@ public class StrategyTestUtil { final List<List<ExecutionVertexID>> allScheduledVertices = testingSchedulerOperation.getScheduledVertices(); final int expectedScheduledBulks = expected.size(); - assertThat(expectedScheduledBulks, lessThanOrEqualTo(allScheduledVertices.size())); + assertThat(expectedScheduledBulks).isLessThanOrEqualTo(allScheduledVertices.size()); for (int i = 0; i < expectedScheduledBulks; i++) { - assertEquals( - idsFromVertices(expected.get(expectedScheduledBulks - i - 1)), - allScheduledVertices.get(allScheduledVertices.size() - i - 1)); + assertThat(allScheduledVertices.get(allScheduledVertices.size() - i - 1)) + .isEqualTo(idsFromVertices(expected.get(expectedScheduledBulks - i - 1))); } }
