zhuzhurk commented on code in PR #20350:
URL: https://github.com/apache/flink/pull/20350#discussion_r935083583
##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphConstructionTest.java:
##########
@@ -106,11 +95,11 @@ public void
testExecutionAttemptIdInTwoIdenticalJobsIsNotSame() throws Exception
eg1.attachJobGraph(ordered);
eg2.attachJobGraph(ordered);
- assertThat(
- Sets.intersection(
- eg1.getRegisteredExecutions().keySet(),
- eg2.getRegisteredExecutions().keySet()),
- is(empty()));
+ Assertions.assertThat(
Review Comment:
It's better to add `Assertions.assertThat` to static import
This also applies to other test files.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopologyTest.java:
##########
@@ -57,31 +58,24 @@
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
-import static junit.framework.TestCase.assertSame;
-import static junit.framework.TestCase.assertTrue;
import static
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionGraph;
import static
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createNoOpVertex;
import static
org.apache.flink.runtime.io.network.partition.ResultPartitionType.BLOCKING;
import static
org.apache.flink.runtime.io.network.partition.ResultPartitionType.PIPELINED;
import static org.apache.flink.runtime.jobgraph.DistributionPattern.ALL_TO_ALL;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.fail;
/** Unit tests for {@link DefaultExecutionTopology}. */
-public class DefaultExecutionTopologyTest extends TestLogger {
- @ClassRule
- public static final TestExecutorResource<ScheduledExecutorService>
EXECUTOR_RESOURCE =
- TestingUtils.defaultExecutorResource();
+class DefaultExecutionTopologyTest extends TestLogger {
Review Comment:
JUnit5 test should not extend TestLogger
##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java:
##########
@@ -35,55 +35,48 @@
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.apache.flink.testutils.TestingUtils;
-import org.apache.flink.testutils.executor.TestExecutorResource;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.TestLogger;
import org.assertj.core.api.Assertions;
-import org.junit.ClassRule;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
/** Tests for {@link IntermediateResultPartition}. */
public class IntermediateResultPartitionTest extends TestLogger {
Review Comment:
Now the test is not needed to be public and not needed to extend TestLogger.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertexTest.java:
##########
@@ -29,164 +29,132 @@
import org.apache.flink.runtime.scheduler.VertexParallelismStore;
import org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler;
import org.apache.flink.testutils.TestingUtils;
-import org.apache.flink.testutils.executor.TestExecutorResource;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
-import org.junit.Assert;
-import org.junit.ClassRule;
-import org.junit.Test;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
import java.util.Collections;
import java.util.concurrent.ScheduledExecutorService;
-import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-
/** Test for {@link ExecutionJobVertex} */
-public class ExecutionJobVertexTest {
- @ClassRule
- public static final TestExecutorResource<ScheduledExecutorService>
EXECUTOR_RESOURCE =
- TestingUtils.defaultExecutorResource();
+class ExecutionJobVertexTest {
+ @RegisterExtension
+ static final TestExecutorExtension<ScheduledExecutorService>
EXECUTOR_RESOURCE =
+ TestingUtils.defaultExecutorExtension();
@Test
- public void testParallelismGreaterThanMaxParallelism() {
+ void testParallelismGreaterThanMaxParallelism() {
JobVertex jobVertex = new JobVertex("testVertex");
jobVertex.setInvokableClass(AbstractInvokable.class);
// parallelism must be smaller than the max parallelism
jobVertex.setParallelism(172);
jobVertex.setMaxParallelism(4);
- assertThrows(
- "higher than the max parallelism",
- JobException.class,
- () ->
ExecutionGraphTestUtils.getExecutionJobVertex(jobVertex));
+ Assertions.assertThatThrownBy(
+ () ->
ExecutionGraphTestUtils.getExecutionJobVertex(jobVertex))
+ .isInstanceOf(JobException.class)
+ .hasMessageContaining("higher than the max parallelism");
}
@Test
- public void testLazyInitialization() throws Exception {
+ void testLazyInitialization() throws Exception {
final int parallelism = 3;
final int configuredMaxParallelism = 12;
final ExecutionJobVertex ejv =
createDynamicExecutionJobVertex(parallelism,
configuredMaxParallelism, -1);
- assertThat(ejv.getParallelism(), is(parallelism));
- assertThat(ejv.getMaxParallelism(), is(configuredMaxParallelism));
- assertThat(ejv.isInitialized(), is(false));
+ Assertions.assertThat(ejv.getParallelism()).isEqualTo(parallelism);
+
Assertions.assertThat(ejv.getMaxParallelism()).isEqualTo(configuredMaxParallelism);
+ Assertions.assertThat(ejv.isInitialized()).isFalse();
- assertThat(ejv.getTaskVertices().length, is(0));
+ Assertions.assertThat(ejv.getTaskVertices()).isEmpty();
- try {
- ejv.getInputs();
- Assert.fail("failure is expected");
- } catch (IllegalStateException e) {
- // ignore
- }
+
Assertions.assertThatThrownBy(ejv::getInputs).isInstanceOf(IllegalStateException.class);
- try {
- ejv.getProducedDataSets();
- Assert.fail("failure is expected");
- } catch (IllegalStateException e) {
- // ignore
- }
+ Assertions.assertThatThrownBy(ejv::getProducedDataSets)
+ .isInstanceOf(IllegalStateException.class);
- try {
- ejv.getSplitAssigner();
- Assert.fail("failure is expected");
- } catch (IllegalStateException e) {
- // ignore
- }
+ Assertions.assertThatThrownBy(ejv::getSplitAssigner)
+ .isInstanceOf(IllegalStateException.class);
- try {
- ejv.getOperatorCoordinators();
- Assert.fail("failure is expected");
- } catch (IllegalStateException e) {
- // ignore
- }
+ Assertions.assertThatThrownBy(ejv::getOperatorCoordinators)
+ .isInstanceOf(IllegalStateException.class);
- try {
- ejv.connectToPredecessors(Collections.emptyMap());
- Assert.fail("failure is expected");
- } catch (IllegalStateException e) {
- // ignore
- }
+ Assertions.assertThatThrownBy(() ->
ejv.connectToPredecessors(Collections.emptyMap()))
+ .isInstanceOf(IllegalStateException.class);
- try {
- ejv.executionVertexFinished();
- Assert.fail("failure is expected");
- } catch (IllegalStateException e) {
- // ignore
- }
+ Assertions.assertThatThrownBy(ejv::executionVertexFinished)
+ .isInstanceOf(IllegalStateException.class);
- try {
- ejv.executionVertexUnFinished();
- Assert.fail("failure is expected");
- } catch (IllegalStateException e) {
- // ignore
- }
+ Assertions.assertThatThrownBy(ejv::executionVertexUnFinished)
+ .isInstanceOf(IllegalStateException.class);
initializeVertex(ejv);
- assertThat(ejv.isInitialized(), is(true));
- assertThat(ejv.getTaskVertices().length, is(3));
- assertThat(ejv.getInputs().size(), is(0));
- assertThat(ejv.getProducedDataSets().length, is(1));
- assertThat(ejv.getOperatorCoordinators().size(), is(0));
+ Assertions.assertThat(ejv.isInitialized()).isTrue();
+ Assertions.assertThat(ejv.getTaskVertices().length).isEqualTo(3);
+ Assertions.assertThat(ejv.getInputs().size()).isEqualTo(0);
Review Comment:
nit: can use `isEmpty()`
##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphConstructionTest.java:
##########
@@ -365,23 +327,21 @@ public void
testRegisterConsumedPartitionGroupToEdgeManager() throws Exception {
IntermediateResultPartition partition1 = result.getPartitions()[0];
IntermediateResultPartition partition2 = result.getPartitions()[1];
- assertEquals(
- partition1.getConsumedPartitionGroups().get(0),
- partition2.getConsumedPartitionGroups().get(0));
+ Assertions.assertThat(partition2.getConsumedPartitionGroups().get(0))
+ .isEqualTo(partition1.getConsumedPartitionGroups().get(0));
ConsumedPartitionGroup consumedPartitionGroup =
partition1.getConsumedPartitionGroups().get(0);
Set<IntermediateResultPartitionID> partitionIds = new HashSet<>();
for (IntermediateResultPartitionID partitionId :
consumedPartitionGroup) {
partitionIds.add(partitionId);
}
- assertThat(
- partitionIds,
- containsInAnyOrder(partition1.getPartitionId(),
partition2.getPartitionId()));
+ Assertions.assertThat(partitionIds)
+ .contains(partition1.getPartitionId(),
partition2.getPartitionId());
Review Comment:
contains -> containsExactlyInAnyOrder
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopologyTest.java:
##########
@@ -191,18 +181,17 @@ public void testUpdateTopology() throws Exception {
executionGraph.initializeJobVertex(ejv1, 0L);
adapter.notifyExecutionGraphUpdated(executionGraph,
Collections.singletonList(ejv1));
- assertThat(IterableUtils.toStream(adapter.getVertices()).count(),
is(3L));
+
Assertions.assertThat(IterableUtils.toStream(adapter.getVertices()).count()).isEqualTo(3L);
Review Comment:
can be
```
assertThat(adapter.getVertices()).hasSize(3);
```
This also applies to the verifications below.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RemoveCachedShuffleDescriptorTest.java:
##########
@@ -52,50 +53,48 @@
import static
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createSchedulerAndDeploy;
import static
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.transitionTaskToFinished;
import static
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.transitionTasksToFinished;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
/**
* Tests for removing cached {@link ShuffleDescriptor}s when the related
partitions are no longer
* valid. Currently, there are two scenarios as illustrated in {@link
* IntermediateResult#clearCachedInformationForPartitionGroup}.
*/
-public class RemoveCachedShuffleDescriptorTest extends TestLogger {
+class RemoveCachedShuffleDescriptorTest extends TestLogger {
Review Comment:
no need to extend TestLogger (and it will not work because it uses junit4
rules)
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultResultPartitionTest.java:
##########
@@ -75,25 +71,26 @@ public void setUp() {
}
@Test
- public void testGetPartitionState() {
+ void testGetPartitionState() {
for (ResultPartitionState state : ResultPartitionState.values()) {
resultPartitionState.setResultPartitionState(state);
- assertEquals(state, resultPartition.getState());
+ Assertions.assertThat(resultPartition.getState()).isEqualTo(state);
}
}
@Test
- public void testGetConsumerVertexGroup() {
+ void testGetConsumerVertexGroup() {
- assertTrue(resultPartition.getConsumerVertexGroups().isEmpty());
+
Assertions.assertThat(resultPartition.getConsumerVertexGroups()).isEmpty();
// test update consumers
ExecutionVertexID executionVertexId = new ExecutionVertexID(new
JobVertexID(), 0);
consumerVertexGroups.put(
resultPartition.getId(),
Collections.singletonList(ConsumerVertexGroup.fromSingleVertex(executionVertexId)));
- assertFalse(resultPartition.getConsumerVertexGroups().isEmpty());
- assertThat(resultPartition.getConsumerVertexGroups().get(0),
contains(executionVertexId));
+
Assertions.assertThat(resultPartition.getConsumerVertexGroups().isEmpty()).isFalse();
Review Comment:
nit: can use `isNotEmpty()`
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionVertexTest.java:
##########
@@ -29,19 +29,19 @@
import org.apache.flink.util.IterableUtils;
import org.apache.flink.util.TestLogger;
-import org.junit.Before;
-import org.junit.Test;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import static
org.apache.flink.runtime.io.network.partition.ResultPartitionType.BLOCKING;
-import static org.junit.Assert.assertEquals;
/** Unit tests for {@link DefaultExecutionVertex}. */
-public class DefaultExecutionVertexTest extends TestLogger {
+class DefaultExecutionVertexTest extends TestLogger {
Review Comment:
should no longer extend TestLogger
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/forwardgroup/ForwardGroupComputeUtilTest.java:
##########
@@ -83,14 +81,14 @@ public void testIsolatedVertices() throws Exception {
* </pre>
*/
@Test
- public void testVariousResultPartitionTypesBetweenVertices() throws
Exception {
+ void testVariousResultPartitionTypesBetweenVertices() throws Exception {
testThreeVerticesConnectSequentially(false, true, 1, 2);
testThreeVerticesConnectSequentially(false, false, 0);
testThreeVerticesConnectSequentially(true, true, 1, 3);
}
private void testThreeVerticesConnectSequentially(
- boolean isForward1, boolean isForward2, int numOfGroups, int...
groupSizes)
+ boolean isForward1, boolean isForward2, int numOfGroups,
Integer... groupSizes)
Review Comment:
Why is this change needed?
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultResultPartitionTest.java:
##########
@@ -37,14 +38,9 @@
import java.util.function.Supplier;
import static
org.apache.flink.runtime.io.network.partition.ResultPartitionType.BLOCKING;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.contains;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
/** Unit tests for {@link DefaultResultPartition}. */
-public class DefaultResultPartitionTest extends TestLogger {
+class DefaultResultPartitionTest extends TestLogger {
Review Comment:
This also applies to the other test classes
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopologyTest.java:
##########
@@ -122,50 +116,46 @@ public void testResultPartitionStateSupplier() {
final DefaultResultPartition schedulingResultPartition =
adapter.getResultPartition(intermediateResultPartition.getPartitionId());
- assertEquals(ResultPartitionState.CREATED,
schedulingResultPartition.getState());
+ Assertions.assertThat(schedulingResultPartition.getState())
+ .isEqualTo(ResultPartitionState.CREATED);
intermediateResultPartition.markDataProduced();
- assertEquals(ResultPartitionState.CONSUMABLE,
schedulingResultPartition.getState());
+ Assertions.assertThat(schedulingResultPartition.getState())
+ .isEqualTo(ResultPartitionState.CONSUMABLE);
}
@Test
- public void testGetVertexOrThrow() {
- try {
- adapter.getVertex(new ExecutionVertexID(new JobVertexID(), 0));
- fail("get not exist vertex");
- } catch (IllegalArgumentException exception) {
- // expected
- }
+ void testGetVertexOrThrow() {
+ Assertions.assertThatThrownBy(
+ () -> adapter.getVertex(new ExecutionVertexID(new
JobVertexID(), 0)))
+ .isInstanceOf(IllegalArgumentException.class);
}
@Test
- public void testResultPartitionOrThrow() {
- try {
- adapter.getResultPartition(new IntermediateResultPartitionID());
- fail("get not exist result partition");
- } catch (IllegalArgumentException exception) {
- // expected
- }
+ void testResultPartitionOrThrow() {
+ Assertions.assertThatThrownBy(
+ () -> adapter.getResultPartition(new
IntermediateResultPartitionID()))
+ .isInstanceOf(IllegalArgumentException.class);
}
@Test
- public void testGetAllPipelinedRegions() {
+ void testGetAllPipelinedRegions() {
final Iterable<DefaultSchedulingPipelinedRegion> allPipelinedRegions =
adapter.getAllPipelinedRegions();
- assertEquals(1, Iterables.size(allPipelinedRegions));
+
Assertions.assertThat(Iterables.size(allPipelinedRegions)).isEqualTo(1);
Review Comment:
can be
```
assertThat(allPipelinedRegions).hasSize(1);
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultResultPartitionTest.java:
##########
@@ -37,14 +38,9 @@
import java.util.function.Supplier;
import static
org.apache.flink.runtime.io.network.partition.ResultPartitionType.BLOCKING;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.contains;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
/** Unit tests for {@link DefaultResultPartition}. */
-public class DefaultResultPartitionTest extends TestLogger {
+class DefaultResultPartitionTest extends TestLogger {
Review Comment:
should no longer extend TestLogger
##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java:
##########
@@ -86,101 +80,76 @@ public void testMultipleConsumersVertices() {
}
@Test
- public void testConnectDirectly() {
+ void testConnectDirectly() {
JobVertex source = new JobVertex("source");
JobVertex target = new JobVertex("target");
target.connectNewDataSetAsInput(
source, DistributionPattern.POINTWISE,
ResultPartitionType.PIPELINED);
- assertTrue(source.isInputVertex());
- assertFalse(source.isOutputVertex());
- assertFalse(target.isInputVertex());
- assertTrue(target.isOutputVertex());
+ Assertions.assertThat(source.isInputVertex()).isTrue();
+ Assertions.assertThat(source.isOutputVertex()).isFalse();
+ Assertions.assertThat(target.isInputVertex()).isFalse();
+ Assertions.assertThat(target.isOutputVertex()).isTrue();
- assertEquals(1, source.getNumberOfProducedIntermediateDataSets());
- assertEquals(1, target.getNumberOfInputs());
+
Assertions.assertThat(source.getNumberOfProducedIntermediateDataSets()).isEqualTo(1);
+ Assertions.assertThat(target.getNumberOfInputs()).isEqualTo(1);
- assertEquals(target.getInputs().get(0).getSource(),
source.getProducedDataSets().get(0));
+ Assertions.assertThat(source.getProducedDataSets().get(0))
+ .isEqualTo(target.getInputs().get(0).getSource());
- assertEquals(target,
source.getProducedDataSets().get(0).getConsumers().get(0).getTarget());
+
Assertions.assertThat(source.getProducedDataSets().get(0).getConsumers().get(0).getTarget())
+ .isEqualTo(target);
}
@Test
- public void testOutputFormat() {
- try {
- final InputOutputFormatVertex vertex = new
InputOutputFormatVertex("Name");
-
- OperatorID operatorID = new OperatorID();
- Configuration parameters = new Configuration();
- parameters.setString("test_key", "test_value");
- new
InputOutputFormatContainer(Thread.currentThread().getContextClassLoader())
- .addOutputFormat(operatorID, new
TestingOutputFormat(parameters))
- .addParameters(operatorID, parameters)
- .write(new TaskConfig(vertex.getConfiguration()));
-
- final ClassLoader cl = new TestClassLoader();
-
- try {
- vertex.initializeOnMaster(cl);
- fail("Did not throw expected exception.");
- } catch (TestException e) {
- // all good
- }
+ void testOutputFormat() throws Exception {
+ final InputOutputFormatVertex vertex = new
InputOutputFormatVertex("Name");
- InputOutputFormatVertex copy = InstantiationUtil.clone(vertex);
- ClassLoader ctxCl = Thread.currentThread().getContextClassLoader();
- try {
- copy.initializeOnMaster(cl);
- fail("Did not throw expected exception.");
- } catch (TestException e) {
- // all good
- }
- assertEquals(
- "Previous classloader was not restored.",
- ctxCl,
- Thread.currentThread().getContextClassLoader());
-
- try {
- copy.finalizeOnMaster(cl);
- fail("Did not throw expected exception.");
- } catch (TestException e) {
- // all good
- }
- assertEquals(
- "Previous classloader was not restored.",
- ctxCl,
- Thread.currentThread().getContextClassLoader());
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ OperatorID operatorID = new OperatorID();
+ Configuration parameters = new Configuration();
+ parameters.setString("test_key", "test_value");
+ new
InputOutputFormatContainer(Thread.currentThread().getContextClassLoader())
+ .addOutputFormat(operatorID, new
TestingOutputFormat(parameters))
+ .addParameters(operatorID, parameters)
+ .write(new TaskConfig(vertex.getConfiguration()));
+
+ final ClassLoader cl = new TestClassLoader();
+
+ Assertions.assertThatThrownBy(() -> vertex.initializeOnMaster(cl))
+ .isInstanceOf(TestException.class);
+
+ InputOutputFormatVertex copy = InstantiationUtil.clone(vertex);
+ ClassLoader ctxCl = Thread.currentThread().getContextClassLoader();
+ Assertions.assertThatThrownBy(() -> copy.initializeOnMaster(cl))
+ .isInstanceOf(TestException.class);
+
+
Assertions.assertThat(Thread.currentThread().getContextClassLoader()).isEqualTo(ctxCl);
+
+ Assertions.assertThatThrownBy(() -> copy.finalizeOnMaster(cl))
+ .isInstanceOf(TestException.class);
+
Assertions.assertThat(Thread.currentThread().getContextClassLoader()).isEqualTo(ctxCl);
Review Comment:
Assertions.assertThat(Thread.currentThread().getContextClassLoader()).as("Previous
classloader was not restored.").isEqualTo(ctxCl);
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopologyTest.java:
##########
@@ -335,17 +327,20 @@ private static void assertPartitionsEquals(
}
}
List<ConsumerVertexGroup> adaptedConsumers =
adaptedPartition.getConsumerVertexGroups();
- assertFalse(adaptedConsumers.isEmpty());
+ Assertions.assertThat(adaptedConsumers).isNotEmpty();
for (ExecutionVertexID originalId : originalConsumerIds) {
// it is sufficient to verify that some vertex exists with the
correct ID here,
// since deep equality is verified later in the main loop
// this DOES rely on an implicit assumption that the vertices
objects returned by
// the topology are
// identical to those stored in the partition
- assertTrue(
- adaptedConsumers.stream()
- .flatMap(IterableUtils::toStream)
- .anyMatch(adaptedConsumer ->
adaptedConsumer.equals(originalId)));
+ Assertions.assertThat(
Review Comment:
Looks to me we can simplify it to:
```
Assertions.assertThat(adaptedConsumers).anyMatch(adaptedConsumer ->
adaptedConsumer.equals(originalId));
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertexTest.java:
##########
@@ -29,164 +29,132 @@
import org.apache.flink.runtime.scheduler.VertexParallelismStore;
import org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler;
import org.apache.flink.testutils.TestingUtils;
-import org.apache.flink.testutils.executor.TestExecutorResource;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
-import org.junit.Assert;
-import org.junit.ClassRule;
-import org.junit.Test;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
import java.util.Collections;
import java.util.concurrent.ScheduledExecutorService;
-import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-
/** Test for {@link ExecutionJobVertex} */
-public class ExecutionJobVertexTest {
- @ClassRule
- public static final TestExecutorResource<ScheduledExecutorService>
EXECUTOR_RESOURCE =
- TestingUtils.defaultExecutorResource();
+class ExecutionJobVertexTest {
+ @RegisterExtension
+ static final TestExecutorExtension<ScheduledExecutorService>
EXECUTOR_RESOURCE =
+ TestingUtils.defaultExecutorExtension();
@Test
- public void testParallelismGreaterThanMaxParallelism() {
+ void testParallelismGreaterThanMaxParallelism() {
JobVertex jobVertex = new JobVertex("testVertex");
jobVertex.setInvokableClass(AbstractInvokable.class);
// parallelism must be smaller than the max parallelism
jobVertex.setParallelism(172);
jobVertex.setMaxParallelism(4);
- assertThrows(
- "higher than the max parallelism",
- JobException.class,
- () ->
ExecutionGraphTestUtils.getExecutionJobVertex(jobVertex));
+ Assertions.assertThatThrownBy(
+ () ->
ExecutionGraphTestUtils.getExecutionJobVertex(jobVertex))
+ .isInstanceOf(JobException.class)
+ .hasMessageContaining("higher than the max parallelism");
}
@Test
- public void testLazyInitialization() throws Exception {
+ void testLazyInitialization() throws Exception {
final int parallelism = 3;
final int configuredMaxParallelism = 12;
final ExecutionJobVertex ejv =
createDynamicExecutionJobVertex(parallelism,
configuredMaxParallelism, -1);
- assertThat(ejv.getParallelism(), is(parallelism));
- assertThat(ejv.getMaxParallelism(), is(configuredMaxParallelism));
- assertThat(ejv.isInitialized(), is(false));
+ Assertions.assertThat(ejv.getParallelism()).isEqualTo(parallelism);
+
Assertions.assertThat(ejv.getMaxParallelism()).isEqualTo(configuredMaxParallelism);
+ Assertions.assertThat(ejv.isInitialized()).isFalse();
- assertThat(ejv.getTaskVertices().length, is(0));
+ Assertions.assertThat(ejv.getTaskVertices()).isEmpty();
- try {
- ejv.getInputs();
- Assert.fail("failure is expected");
- } catch (IllegalStateException e) {
- // ignore
- }
+
Assertions.assertThatThrownBy(ejv::getInputs).isInstanceOf(IllegalStateException.class);
- try {
- ejv.getProducedDataSets();
- Assert.fail("failure is expected");
- } catch (IllegalStateException e) {
- // ignore
- }
+ Assertions.assertThatThrownBy(ejv::getProducedDataSets)
+ .isInstanceOf(IllegalStateException.class);
- try {
- ejv.getSplitAssigner();
- Assert.fail("failure is expected");
- } catch (IllegalStateException e) {
- // ignore
- }
+ Assertions.assertThatThrownBy(ejv::getSplitAssigner)
+ .isInstanceOf(IllegalStateException.class);
- try {
- ejv.getOperatorCoordinators();
- Assert.fail("failure is expected");
- } catch (IllegalStateException e) {
- // ignore
- }
+ Assertions.assertThatThrownBy(ejv::getOperatorCoordinators)
+ .isInstanceOf(IllegalStateException.class);
- try {
- ejv.connectToPredecessors(Collections.emptyMap());
- Assert.fail("failure is expected");
- } catch (IllegalStateException e) {
- // ignore
- }
+ Assertions.assertThatThrownBy(() ->
ejv.connectToPredecessors(Collections.emptyMap()))
+ .isInstanceOf(IllegalStateException.class);
- try {
- ejv.executionVertexFinished();
- Assert.fail("failure is expected");
- } catch (IllegalStateException e) {
- // ignore
- }
+ Assertions.assertThatThrownBy(ejv::executionVertexFinished)
+ .isInstanceOf(IllegalStateException.class);
- try {
- ejv.executionVertexUnFinished();
- Assert.fail("failure is expected");
- } catch (IllegalStateException e) {
- // ignore
- }
+ Assertions.assertThatThrownBy(ejv::executionVertexUnFinished)
+ .isInstanceOf(IllegalStateException.class);
initializeVertex(ejv);
- assertThat(ejv.isInitialized(), is(true));
- assertThat(ejv.getTaskVertices().length, is(3));
- assertThat(ejv.getInputs().size(), is(0));
- assertThat(ejv.getProducedDataSets().length, is(1));
- assertThat(ejv.getOperatorCoordinators().size(), is(0));
+ Assertions.assertThat(ejv.isInitialized()).isTrue();
+ Assertions.assertThat(ejv.getTaskVertices().length).isEqualTo(3);
Review Comment:
nit: can use `hasSize`
This also applies to below lines.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]