This is an automated email from the ASF dual-hosted git repository.
zjureel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 5f665bc297a [FLINK-32848][tests][JUnit5 migration] Migrate
flink-runtime/shuffle tests to JUnit5 (#23302)
5f665bc297a is described below
commit 5f665bc297a40bfc56f1ccfaf52154d92e0c71ec
Author: Zhanghao Chen <[email protected]>
AuthorDate: Mon Sep 18 13:52:27 2023 +0800
[FLINK-32848][tests][JUnit5 migration] Migrate flink-runtime/shuffle tests
to JUnit5 (#23302)
Co-authored-by: Shammon FY <[email protected]>
---
.../runtime/shuffle/NettyShuffleUtilsTest.java | 11 ++--
.../flink/runtime/shuffle/ShuffleMasterTest.java | 70 ++++++++++------------
.../runtime/shuffle/ShuffleServiceLoaderTest.java | 36 +++++------
3 files changed, 53 insertions(+), 64 deletions(-)
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/NettyShuffleUtilsTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/NettyShuffleUtilsTest.java
index fcfdfb5092e..0c82ec3254a 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/NettyShuffleUtilsTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/NettyShuffleUtilsTest.java
@@ -35,11 +35,10 @@ import
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-import org.apache.flink.util.TestLogger;
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.Collection;
@@ -51,10 +50,10 @@ import static
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.cr
import static
org.apache.flink.runtime.io.network.partition.ResultPartitionType.BLOCKING;
import static
org.apache.flink.runtime.io.network.partition.ResultPartitionType.PIPELINED_BOUNDED;
import static
org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder.createRemoteWithIdAndLocation;
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link NettyShuffleUtils}. */
-public class NettyShuffleUtilsTest extends TestLogger {
+class NettyShuffleUtilsTest {
/**
* This test verifies that the {@link NettyShuffleEnvironment} requires
buffers as expected, so
@@ -62,7 +61,7 @@ public class NettyShuffleUtilsTest extends TestLogger {
*
ShuffleMaster#computeShuffleMemorySizeForTask(TaskInputsOutputsDescriptor)} is
correct.
*/
@Test
- public void testComputeRequiredNetworkBuffers() throws Exception {
+ void testComputeRequiredNetworkBuffers() throws Exception {
int numBuffersPerChannel = 5;
int numBuffersPerGate = 8;
Optional<Integer> maxRequiredBuffersPerGate =
Optional.of(Integer.MAX_VALUE);
@@ -134,7 +133,7 @@ public class NettyShuffleUtilsTest extends TestLogger {
+ calculateBuffersConsumption(resultPartition1)
+ calculateBuffersConsumption(resultPartition2)
+ calculateBuffersConsumption(resultPartition3);
- assertEquals(expected, numTotalBuffers);
+ assertThat(numTotalBuffers).isEqualTo(expected);
inputGate1.close();
inputGate2.close();
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/ShuffleMasterTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/ShuffleMasterTest.java
index 0cf23b71108..adff276258c 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/ShuffleMasterTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/ShuffleMasterTest.java
@@ -32,10 +32,9 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
-import org.apache.flink.util.TestLogger;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.concurrent.BlockingQueue;
@@ -45,13 +44,10 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import static
org.apache.flink.api.common.restartstrategy.RestartStrategies.fixedDelayRestart;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link ShuffleMaster}. */
-public class ShuffleMasterTest extends TestLogger {
+class ShuffleMasterTest {
private static final String STOP_TRACKING_PARTITION_KEY =
"stop_tracking_partition_key";
@@ -59,47 +55,41 @@ public class ShuffleMasterTest extends TestLogger {
private static final String EXTERNAL_PARTITION_RELEASE_EVENT =
"releasePartitionExternally";
- @Before
- public void before() {
+ @BeforeEach
+ void before() {
TestShuffleMaster.partitionEvents.clear();
}
@Test
- public void testShuffleMasterLifeCycle() throws Exception {
+ void testShuffleMasterLifeCycle() throws Exception {
try (MiniCluster cluster = new
MiniCluster(createClusterConfiguration(false))) {
cluster.start();
cluster.executeJobBlocking(createJobGraph());
}
- assertTrue(TestShuffleMaster.currentInstance.get().closed.get());
-
- String[] expectedPartitionEvents =
- new String[] {
- PARTITION_REGISTRATION_EVENT,
- PARTITION_REGISTRATION_EVENT,
- EXTERNAL_PARTITION_RELEASE_EVENT,
- EXTERNAL_PARTITION_RELEASE_EVENT,
- };
- assertArrayEquals(expectedPartitionEvents,
TestShuffleMaster.partitionEvents.toArray());
+ assertThat(TestShuffleMaster.currentInstance.get().closed).isTrue();
+ assertThat(TestShuffleMaster.partitionEvents)
+ .containsExactly(
+ PARTITION_REGISTRATION_EVENT,
+ PARTITION_REGISTRATION_EVENT,
+ EXTERNAL_PARTITION_RELEASE_EVENT,
+ EXTERNAL_PARTITION_RELEASE_EVENT);
}
@Test
- public void testStopTrackingPartition() throws Exception {
+ void testStopTrackingPartition() throws Exception {
try (MiniCluster cluster = new
MiniCluster(createClusterConfiguration(true))) {
cluster.start();
cluster.executeJobBlocking(createJobGraph());
}
- assertTrue(TestShuffleMaster.currentInstance.get().closed.get());
-
- String[] expectedPartitionEvents =
- new String[] {
- PARTITION_REGISTRATION_EVENT,
- PARTITION_REGISTRATION_EVENT,
- PARTITION_REGISTRATION_EVENT,
- PARTITION_REGISTRATION_EVENT,
- EXTERNAL_PARTITION_RELEASE_EVENT,
- EXTERNAL_PARTITION_RELEASE_EVENT,
- };
- assertArrayEquals(expectedPartitionEvents,
TestShuffleMaster.partitionEvents.toArray());
+ assertThat(TestShuffleMaster.currentInstance.get().closed).isTrue();
+ assertThat(TestShuffleMaster.partitionEvents)
+ .containsExactly(
+ PARTITION_REGISTRATION_EVENT,
+ PARTITION_REGISTRATION_EVENT,
+ PARTITION_REGISTRATION_EVENT,
+ PARTITION_REGISTRATION_EVENT,
+ EXTERNAL_PARTITION_RELEASE_EVENT,
+ EXTERNAL_PARTITION_RELEASE_EVENT);
}
private MiniClusterConfiguration createClusterConfiguration(boolean
stopTrackingPartition) {
@@ -169,8 +159,8 @@ public class ShuffleMasterTest extends TestLogger {
@Override
public void start() throws Exception {
- assertFalse(started.get());
- assertFalse(closed.get());
+ assertThat(started).isFalse();
+ assertThat(closed).isFalse();
started.set(true);
super.start();
}
@@ -185,7 +175,7 @@ public class ShuffleMasterTest extends TestLogger {
@Override
public void registerJob(JobShuffleContext context) {
assertShuffleMasterAlive();
- assertTrue(jobContext.compareAndSet(null, context));
+ assertThat(jobContext.compareAndSet(null, context)).isTrue();
super.registerJob(context);
}
@@ -238,13 +228,13 @@ public class ShuffleMasterTest extends TestLogger {
}
private void assertShuffleMasterAlive() {
- assertFalse(closed.get());
- assertTrue(started.get());
+ assertThat(closed).isFalse();
+ assertThat(started).isTrue();
}
private void assertJobRegistered() {
assertShuffleMasterAlive();
- assertNotNull(jobContext.get());
+ assertThat(jobContext).isNotNull();
}
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/ShuffleServiceLoaderTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/ShuffleServiceLoaderTest.java
index ecbc4793176..cae0737a88a 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/ShuffleServiceLoaderTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/ShuffleServiceLoaderTest.java
@@ -23,49 +23,49 @@ import
org.apache.flink.runtime.io.network.NettyShuffleServiceFactory;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.TestLogger;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import static
org.apache.flink.runtime.shuffle.ShuffleServiceOptions.SHUFFLE_SERVICE_FACTORY_CLASS;
-import static org.hamcrest.core.IsInstanceOf.instanceOf;
-import static org.junit.Assert.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Test suite for {@link ShuffleServiceLoader} utility. */
-public class ShuffleServiceLoaderTest extends TestLogger {
+class ShuffleServiceLoaderTest {
@Test
- public void testLoadDefaultNettyShuffleServiceFactory() throws
FlinkException {
+ void testLoadDefaultNettyShuffleServiceFactory() throws FlinkException {
Configuration configuration = new Configuration();
ShuffleServiceFactory<?, ?, ?> shuffleServiceFactory =
ShuffleServiceLoader.loadShuffleServiceFactory(configuration);
- assertThat(
- "Loaded shuffle service factory is not the default netty
implementation",
- shuffleServiceFactory,
- instanceOf(NettyShuffleServiceFactory.class));
+ assertThat(shuffleServiceFactory)
+ .withFailMessage(
+ "Loaded shuffle service factory is not the default
netty implementation")
+ .isInstanceOf(NettyShuffleServiceFactory.class);
}
@Test
- public void testLoadCustomShuffleServiceFactory() throws FlinkException {
+ void testLoadCustomShuffleServiceFactory() throws FlinkException {
Configuration configuration = new Configuration();
configuration.setString(
SHUFFLE_SERVICE_FACTORY_CLASS,
"org.apache.flink.runtime.shuffle.ShuffleServiceLoaderTest$CustomShuffleServiceFactory");
ShuffleServiceFactory<?, ?, ?> shuffleServiceFactory =
ShuffleServiceLoader.loadShuffleServiceFactory(configuration);
- assertThat(
- "Loaded shuffle service factory is not the custom test
implementation",
- shuffleServiceFactory,
- instanceOf(CustomShuffleServiceFactory.class));
+ assertThat(shuffleServiceFactory)
+ .withFailMessage(
+ "Loaded shuffle service factory is not the custom test
implementation")
+ .isInstanceOf(CustomShuffleServiceFactory.class);
}
- @Test(expected = FlinkException.class)
- public void testLoadShuffleServiceFactoryFailure() throws FlinkException {
+ @Test
+ public void testLoadShuffleServiceFactoryFailure() {
Configuration configuration = new Configuration();
configuration.setString(
SHUFFLE_SERVICE_FACTORY_CLASS,
"org.apache.flink.runtime.shuffle.UnavailableShuffleServiceFactory");
- ShuffleServiceLoader.loadShuffleServiceFactory(configuration);
+ assertThatThrownBy(() ->
ShuffleServiceLoader.loadShuffleServiceFactory(configuration))
+ .isInstanceOf(FlinkException.class);
}
/**