This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit af7c775ab57b571cc827f83005f9ebf5c3034e89
Author: Weijie Guo <[email protected]>
AuthorDate: Fri Aug 5 14:34:23 2022 +0800

    [hotfix] Migrate PipelinedRegionSchedulingStrategyTest and StrategyTestUtil 
to Junit5 and AssertJ.
---
 .../PipelinedRegionSchedulingStrategyTest.java     | 92 +++++++++++-----------
 .../scheduler/strategy/StrategyTestUtil.java       | 13 ++-
 2 files changed, 49 insertions(+), 56 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategyTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategyTest.java
index f827c481ff9..34395e02999 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategyTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategyTest.java
@@ -29,12 +29,11 @@ import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.testutils.TestingUtils;
-import org.apache.flink.testutils.executor.TestExecutorResource;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
 
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -46,16 +45,13 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static 
org.apache.flink.runtime.scheduler.strategy.StrategyTestUtil.assertLatestScheduledVerticesAreEqualTo;
-import static org.hamcrest.Matchers.hasSize;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Unit tests for {@link PipelinedRegionSchedulingStrategy}. */
-public class PipelinedRegionSchedulingStrategyTest extends TestLogger {
-    @ClassRule
-    public static final TestExecutorResource<ScheduledExecutorService> 
EXECUTOR_RESOURCE =
-            TestingUtils.defaultExecutorResource();
+class PipelinedRegionSchedulingStrategyTest {
+    @RegisterExtension
+    public static final TestExecutorExtension<ScheduledExecutorService> 
EXECUTOR_RESOURCE =
+            TestingUtils.defaultExecutorExtension();
 
     private TestingSchedulerOperations testingSchedulerOperation;
 
@@ -71,8 +67,8 @@ public class PipelinedRegionSchedulingStrategyTest extends 
TestLogger {
 
     private List<TestingSchedulingExecutionVertex> sink;
 
-    @Before
-    public void setUp() {
+    @BeforeEach
+    void setUp() {
         testingSchedulerOperation = new TestingSchedulerOperations();
 
         buildTopology();
@@ -121,7 +117,7 @@ public class PipelinedRegionSchedulingStrategyTest extends 
TestLogger {
     }
 
     @Test
-    public void testStartScheduling() {
+    void testStartScheduling() {
         startScheduling(testingSchedulingTopology);
 
         final List<List<TestingSchedulingExecutionVertex>> 
expectedScheduledVertices =
@@ -135,7 +131,7 @@ public class PipelinedRegionSchedulingStrategyTest extends 
TestLogger {
     }
 
     @Test
-    public void testRestartTasks() {
+    void testRestartTasks() {
         final PipelinedRegionSchedulingStrategy schedulingStrategy =
                 startScheduling(testingSchedulingTopology);
 
@@ -158,7 +154,7 @@ public class PipelinedRegionSchedulingStrategyTest extends 
TestLogger {
     }
 
     @Test
-    public void testNotifyingBlockingResultPartitionProducerFinished() {
+    void testNotifyingBlockingResultPartitionProducerFinished() {
         final PipelinedRegionSchedulingStrategy schedulingStrategy =
                 startScheduling(testingSchedulingTopology);
 
@@ -167,13 +163,13 @@ public class PipelinedRegionSchedulingStrategyTest 
extends TestLogger {
         schedulingStrategy.onExecutionStateChange(upstream1.getId(), 
ExecutionState.FINISHED);
 
         // sinks' inputs are not all consumable yet so they are not scheduled
-        assertThat(testingSchedulerOperation.getScheduledVertices(), 
hasSize(4));
+        
assertThat(testingSchedulerOperation.getScheduledVertices()).hasSize(4);
 
         final TestingSchedulingExecutionVertex upstream2 = map2.get(1);
         upstream2.getProducedResults().iterator().next().markFinished();
         schedulingStrategy.onExecutionStateChange(upstream2.getId(), 
ExecutionState.FINISHED);
 
-        assertThat(testingSchedulerOperation.getScheduledVertices(), 
hasSize(6));
+        
assertThat(testingSchedulerOperation.getScheduledVertices()).hasSize(6);
 
         final List<List<TestingSchedulingExecutionVertex>> 
expectedScheduledVertices =
                 new ArrayList<>();
@@ -184,7 +180,7 @@ public class PipelinedRegionSchedulingStrategyTest extends 
TestLogger {
     }
 
     @Test
-    public void testSchedulingTopologyWithPersistentBlockingEdges() {
+    void testSchedulingTopologyWithPersistentBlockingEdges() {
         final TestingSchedulingTopology topology = new 
TestingSchedulingTopology();
 
         final List<TestingSchedulingExecutionVertex> v1 =
@@ -207,7 +203,7 @@ public class PipelinedRegionSchedulingStrategyTest extends 
TestLogger {
     }
 
     @Test
-    public void testComputingCrossRegionConsumedPartitionGroupsCorrectly() 
throws Exception {
+    void testComputingCrossRegionConsumedPartitionGroupsCorrectly() throws 
Exception {
         final JobVertex v1 = createJobVertex("v1", 4);
         final JobVertex v2 = createJobVertex("v2", 3);
         final JobVertex v3 = createJobVertex("v3", 2);
@@ -236,7 +232,7 @@ public class PipelinedRegionSchedulingStrategyTest extends 
TestLogger {
         final Set<ConsumedPartitionGroup> crossRegionConsumedPartitionGroups =
                 schedulingStrategy.getCrossRegionConsumedPartitionGroups();
 
-        assertEquals(1, crossRegionConsumedPartitionGroups.size());
+        assertThat(crossRegionConsumedPartitionGroups).hasSize(1);
 
         final ConsumedPartitionGroup expected =
                 executionGraph
@@ -245,11 +241,11 @@ public class PipelinedRegionSchedulingStrategyTest 
extends TestLogger {
                         .getAllConsumedPartitionGroups()
                         .get(0);
 
-        assertTrue(crossRegionConsumedPartitionGroups.contains(expected));
+        assertThat(crossRegionConsumedPartitionGroups).contains(expected);
     }
 
     @Test
-    public void 
testNoCrossRegionConsumedPartitionGroupsWithAllToAllBlockingEdge() {
+    void testNoCrossRegionConsumedPartitionGroupsWithAllToAllBlockingEdge() {
         final TestingSchedulingTopology topology = new 
TestingSchedulingTopology();
 
         final List<TestingSchedulingExecutionVertex> producer =
@@ -267,11 +263,11 @@ public class PipelinedRegionSchedulingStrategyTest 
extends TestLogger {
         final Set<ConsumedPartitionGroup> crossRegionConsumedPartitionGroups =
                 schedulingStrategy.getCrossRegionConsumedPartitionGroups();
 
-        assertEquals(0, crossRegionConsumedPartitionGroups.size());
+        assertThat(crossRegionConsumedPartitionGroups).isEmpty();
     }
 
     @Test
-    public void testSchedulingTopologyWithCrossRegionConsumedPartitionGroups() 
throws Exception {
+    void testSchedulingTopologyWithCrossRegionConsumedPartitionGroups() throws 
Exception {
         final JobVertex v1 = createJobVertex("v1", 4);
         final JobVertex v2 = createJobVertex("v2", 3);
         final JobVertex v3 = createJobVertex("v3", 2);
@@ -296,7 +292,7 @@ public class PipelinedRegionSchedulingStrategyTest extends 
TestLogger {
         // Test whether the topology is built correctly
         final List<SchedulingPipelinedRegion> regions = new ArrayList<>();
         schedulingTopology.getAllPipelinedRegions().forEach(regions::add);
-        assertEquals(2, regions.size());
+        assertThat(regions).hasSize(2);
 
         final ExecutionVertex v31 = 
executionGraph.getJobVertex(v3.getID()).getTaskVertices()[0];
 
@@ -305,7 +301,7 @@ public class PipelinedRegionSchedulingStrategyTest extends 
TestLogger {
                 .getPipelinedRegionOfVertex(v31.getID())
                 .getVertices()
                 .forEach(vertex -> region1.add(vertex.getId()));
-        assertEquals(5, region1.size());
+        assertThat(region1).hasSize(5);
 
         final ExecutionVertex v32 = 
executionGraph.getJobVertex(v3.getID()).getTaskVertices()[1];
 
@@ -314,18 +310,18 @@ public class PipelinedRegionSchedulingStrategyTest 
extends TestLogger {
                 .getPipelinedRegionOfVertex(v32.getID())
                 .getVertices()
                 .forEach(vertex -> region2.add(vertex.getId()));
-        assertEquals(4, region2.size());
+        assertThat(region2).hasSize(4);
 
         // Test whether region 1 is scheduled correctly
         PipelinedRegionSchedulingStrategy schedulingStrategy = 
startScheduling(schedulingTopology);
 
-        assertEquals(1, 
testingSchedulerOperation.getScheduledVertices().size());
+        
assertThat(testingSchedulerOperation.getScheduledVertices()).hasSize(1);
         final List<ExecutionVertexID> scheduledVertices1 =
                 testingSchedulerOperation.getScheduledVertices().get(0);
-        assertEquals(5, scheduledVertices1.size());
+        assertThat(scheduledVertices1).hasSize(5);
 
         for (ExecutionVertexID vertexId : scheduledVertices1) {
-            assertTrue(region1.contains(vertexId));
+            assertThat(region1).contains(vertexId);
         }
 
         // Test whether the region 2 is scheduled correctly when region 1 is 
finished
@@ -333,18 +329,18 @@ public class PipelinedRegionSchedulingStrategyTest 
extends TestLogger {
         v22.finishAllBlockingPartitions();
 
         schedulingStrategy.onExecutionStateChange(v22.getID(), 
ExecutionState.FINISHED);
-        assertEquals(2, 
testingSchedulerOperation.getScheduledVertices().size());
+        
assertThat(testingSchedulerOperation.getScheduledVertices()).hasSize(2);
         final List<ExecutionVertexID> scheduledVertices2 =
                 testingSchedulerOperation.getScheduledVertices().get(1);
-        assertEquals(4, scheduledVertices2.size());
+        assertThat(scheduledVertices2).hasSize(4);
 
         for (ExecutionVertexID vertexId : scheduledVertices2) {
-            assertTrue(region2.contains(vertexId));
+            assertThat(region2).contains(vertexId);
         }
     }
 
     @Test
-    public void testScheduleBlockingDownstreamTaskIndividually() throws 
Exception {
+    void testScheduleBlockingDownstreamTaskIndividually() throws Exception {
         final JobVertex v1 = createJobVertex("v1", 2);
         final JobVertex v2 = createJobVertex("v2", 2);
 
@@ -364,17 +360,17 @@ public class PipelinedRegionSchedulingStrategyTest 
extends TestLogger {
         final PipelinedRegionSchedulingStrategy schedulingStrategy =
                 startScheduling(schedulingTopology);
 
-        assertEquals(2, 
testingSchedulerOperation.getScheduledVertices().size());
+        
assertThat(testingSchedulerOperation.getScheduledVertices()).hasSize(2);
 
         final ExecutionVertex v11 = 
executionGraph.getJobVertex(v1.getID()).getTaskVertices()[0];
         v11.finishAllBlockingPartitions();
 
         schedulingStrategy.onExecutionStateChange(v11.getID(), 
ExecutionState.FINISHED);
-        assertEquals(3, 
testingSchedulerOperation.getScheduledVertices().size());
+        
assertThat(testingSchedulerOperation.getScheduledVertices()).hasSize(3);
     }
 
     @Test
-    public void testFinishHybridPartitionWillNotRescheduleDownstream() throws 
Exception {
+    void testFinishHybridPartitionWillNotRescheduleDownstream() throws 
Exception {
         final JobVertex v1 = createJobVertex("v1", 1);
         final JobVertex v2 = createJobVertex("v2", 1);
 
@@ -393,17 +389,17 @@ public class PipelinedRegionSchedulingStrategyTest 
extends TestLogger {
         PipelinedRegionSchedulingStrategy schedulingStrategy = 
startScheduling(schedulingTopology);
 
         // all regions will be scheduled
-        assertEquals(2, 
testingSchedulerOperation.getScheduledVertices().size());
+        
assertThat(testingSchedulerOperation.getScheduledVertices()).hasSize(2);
 
         final ExecutionVertex v11 = 
executionGraph.getJobVertex(v1.getID()).getTaskVertices()[0];
         schedulingStrategy.onExecutionStateChange(v11.getID(), 
ExecutionState.FINISHED);
 
-        assertEquals(2, 
testingSchedulerOperation.getScheduledVertices().size());
+        
assertThat(testingSchedulerOperation.getScheduledVertices()).hasSize(2);
     }
 
     /** Inner non-pipelined edge will not affect it's region be scheduled. */
     @Test
-    public void testSchedulingRegionWithInnerNonPipelinedEdge() throws 
Exception {
+    void testSchedulingRegionWithInnerNonPipelinedEdge() throws Exception {
         final JobVertex v1 = createJobVertex("v1", 1);
         final JobVertex v2 = createJobVertex("v2", 1);
         final JobVertex v3 = createJobVertex("v3", 1);
@@ -431,10 +427,10 @@ public class PipelinedRegionSchedulingStrategyTest 
extends TestLogger {
 
         startScheduling(schedulingTopology);
 
-        assertEquals(1, 
testingSchedulerOperation.getScheduledVertices().size());
+        
assertThat(testingSchedulerOperation.getScheduledVertices()).hasSize(1);
         List<ExecutionVertexID> executionVertexIds =
                 testingSchedulerOperation.getScheduledVertices().get(0);
-        assertEquals(4, executionVertexIds.size());
+        assertThat(executionVertexIds).hasSize(4);
     }
 
     /**
@@ -442,7 +438,7 @@ public class PipelinedRegionSchedulingStrategyTest extends 
TestLogger {
      * after it's all blocking edge finished, non-blocking edge don't block 
scheduling.
      */
     @Test
-    public void testDownstreamRegionWillBeBlockedByBlockingEdge() throws 
Exception {
+    void testDownstreamRegionWillBeBlockedByBlockingEdge() throws Exception {
         final JobVertex v1 = createJobVertex("v1", 1);
         final JobVertex v2 = createJobVertex("v2", 1);
         final JobVertex v3 = createJobVertex("v3", 1);
@@ -464,12 +460,12 @@ public class PipelinedRegionSchedulingStrategyTest 
extends TestLogger {
         final PipelinedRegionSchedulingStrategy schedulingStrategy =
                 startScheduling(schedulingTopology);
 
-        assertEquals(2, 
testingSchedulerOperation.getScheduledVertices().size());
+        
assertThat(testingSchedulerOperation.getScheduledVertices()).hasSize(2);
 
         final ExecutionVertex v11 = 
executionGraph.getJobVertex(v1.getID()).getTaskVertices()[0];
         v11.finishAllBlockingPartitions();
         schedulingStrategy.onExecutionStateChange(v11.getID(), 
ExecutionState.FINISHED);
-        assertEquals(3, 
testingSchedulerOperation.getScheduledVertices().size());
+        
assertThat(testingSchedulerOperation.getScheduledVertices()).hasSize(3);
     }
 
     private static JobVertex createJobVertex(String vertexName, int 
parallelism) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/StrategyTestUtil.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/StrategyTestUtil.java
index 38ab94f30e3..664780acdc9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/StrategyTestUtil.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/StrategyTestUtil.java
@@ -21,12 +21,10 @@ package org.apache.flink.runtime.scheduler.strategy;
 import java.util.List;
 import java.util.stream.Collectors;
 
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.lessThanOrEqualTo;
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Strategy test utilities. */
-public class StrategyTestUtil {
+class StrategyTestUtil {
 
     static void assertLatestScheduledVerticesAreEqualTo(
             final List<List<TestingSchedulingExecutionVertex>> expected,
@@ -34,11 +32,10 @@ public class StrategyTestUtil {
         final List<List<ExecutionVertexID>> allScheduledVertices =
                 testingSchedulerOperation.getScheduledVertices();
         final int expectedScheduledBulks = expected.size();
-        assertThat(expectedScheduledBulks, 
lessThanOrEqualTo(allScheduledVertices.size()));
+        
assertThat(expectedScheduledBulks).isLessThanOrEqualTo(allScheduledVertices.size());
         for (int i = 0; i < expectedScheduledBulks; i++) {
-            assertEquals(
-                    idsFromVertices(expected.get(expectedScheduledBulks - i - 
1)),
-                    allScheduledVertices.get(allScheduledVertices.size() - i - 
1));
+            assertThat(allScheduledVertices.get(allScheduledVertices.size() - 
i - 1))
+                    
.isEqualTo(idsFromVertices(expected.get(expectedScheduledBulks - i - 1)));
         }
     }
 

Reply via email to