This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new aae023f48c8 [FLINK-32856][JUnit5 Migration] Migrate the testutils, 
throughput and throwable packages of flink-runtime module to junit5 (#23198)
aae023f48c8 is described below

commit aae023f48c8b92cde37e0e7765d821844ace036b
Author: Rui Fan <[email protected]>
AuthorDate: Wed Aug 23 21:49:10 2023 +0800

    [FLINK-32856][JUnit5 Migration] Migrate the testutils, throughput and 
throwable packages of flink-runtime module to junit5 (#23198)
---
 .../testutils/PseudoRandomValueSelectorTest.java   |  35 +++----
 .../flink/runtime/testutils/TestJvmProcess.java    |  15 +--
 .../runtime/testutils/statemigration/TestType.java |   8 +-
 .../runtime/throughput/BufferDebloaterTest.java    |  96 +++++++++---------
 .../runtime/throughput/BufferSizeEMATest.java      |  78 +++++++--------
 .../throughput/ThroughputCalculatorTest.java       |  36 ++++---
 .../runtime/throwable/ThrowableClassifierTest.java | 110 ++++++++++-----------
 7 files changed, 181 insertions(+), 197 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/PseudoRandomValueSelectorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/PseudoRandomValueSelectorTest.java
index d89cb6b0d41..b77f578861e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/PseudoRandomValueSelectorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/PseudoRandomValueSelectorTest.java
@@ -21,9 +21,8 @@ import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.util.EnvironmentInformation;
-import org.apache.flink.util.TestLogger;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.util.HashSet;
@@ -34,25 +33,20 @@ import java.util.stream.IntStream;
 import static 
org.apache.flink.configuration.CheckpointingOptions.SAVEPOINT_DIRECTORY;
 import static 
org.apache.flink.configuration.JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB;
 import static org.apache.flink.configuration.TaskManagerOptions.CPU_CORES;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.greaterThan;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assume.assumeFalse;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assume.assumeNoException;
 import static org.junit.Assume.assumeNotNull;
+import static org.junit.jupiter.api.Assumptions.assumeFalse;
 
 /** Tests {@link PseudoRandomValueSelector}. */
-public class PseudoRandomValueSelectorTest extends TestLogger {
+class PseudoRandomValueSelectorTest {
 
     /**
      * Tests that the selector will return different values if invoked several 
times even for the
      * same option.
      */
     @Test
-    public void testRandomizationOfValues() {
+    void testRandomizationOfValues() {
         final Double[] alternatives =
                 IntStream.range(0, 
1000).boxed().map(Double::valueOf).toArray(Double[]::new);
 
@@ -63,22 +57,22 @@ public class PseudoRandomValueSelectorTest extends 
TestLogger {
             final Double selectedValue = selectValue(valueSelector, CPU_CORES, 
alternatives);
             uniqueValues.add(selectedValue);
         }
-        assertThat(uniqueValues.size(), greaterThan(1));
+        assertThat(uniqueValues).hasSizeGreaterThan(1);
     }
 
     private <T> T selectValue(
             PseudoRandomValueSelector valueSelector, ConfigOption<T> option, 
T... alternatives) {
         final Configuration configuration = new Configuration();
-        assertNull(configuration.get(option));
+        assertThat(configuration.get(option)).isNull();
         valueSelector.select(configuration, option, alternatives);
         final T selected = configuration.get(option);
-        assertNotNull(selected);
+        assertThat(selected).isNotNull();
         return selected;
     }
 
     /** Tests that the selector will return different values for different 
seeds. */
     @Test
-    public void testRandomizationWithSeed() {
+    void testRandomizationWithSeed() {
         final Double[] alternatives =
                 IntStream.range(0, 
1000).boxed().map(Double::valueOf).toArray(Double[]::new);
 
@@ -87,12 +81,12 @@ public class PseudoRandomValueSelectorTest extends 
TestLogger {
             final PseudoRandomValueSelector selector = 
PseudoRandomValueSelector.create("test" + i);
             uniqueValues.add(selectValue(selector, CPU_CORES, alternatives));
         }
-        assertThat(uniqueValues.size(), greaterThan(1));
+        assertThat(uniqueValues).hasSizeGreaterThan(1);
     }
 
     /** Tests that the selector produces the same value for the same seed. */
     @Test
-    public void testStableRandomization() {
+    void testStableRandomization() {
         final Double[] doubles =
                 IntStream.range(0, 
1000).boxed().map(Double::valueOf).toArray(Double[]::new);
         final Integer[] numbers = IntStream.range(0, 
1000).boxed().toArray(Integer[]::new);
@@ -108,7 +102,7 @@ public class PseudoRandomValueSelectorTest extends 
TestLogger {
                             selectValue(selector, JOB_MANAGER_HEAP_MEMORY_MB, 
numbers),
                             selectValue(selector, SAVEPOINT_DIRECTORY, 
strings)));
         }
-        assertEquals(1, uniqueValues.size());
+        assertThat(uniqueValues).hasSize(1);
     }
 
     /**
@@ -117,7 +111,7 @@ public class PseudoRandomValueSelectorTest extends 
TestLogger {
      * <p>This test assumes that both sources of information are available 
(CI).
      */
     @Test
-    public void readCommitId() {
+    void readCommitId() {
         assumeNotNull(ZooKeeperTestUtils.runsOnCIInfrastructure());
         // this information is only valid after executing process-resources on 
flink-runtime
         final String envCommitId = EnvironmentInformation.getGitCommitId();
@@ -130,7 +124,6 @@ public class PseudoRandomValueSelectorTest extends 
TestLogger {
         }
 
         final Optional<String> gitCommitId = 
PseudoRandomValueSelector.getGitCommitId();
-        assertTrue(gitCommitId.isPresent());
-        assertEquals(envCommitId, gitCommitId.get());
+        assertThat(gitCommitId).isPresent().contains(envCommitId);
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java
index dcaf7210afa..78cebd083f3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java
@@ -39,7 +39,8 @@ import static 
org.apache.flink.runtime.testutils.CommonTestUtils.getJavaCommandP
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
 
 /** A {@link Process} running a separate JVM. */
 public abstract class TestJvmProcess {
@@ -316,18 +317,18 @@ public abstract class TestJvmProcess {
             Thread.sleep(10);
         }
 
-        if (!exists) {
-            fail("The marker file was not found within " + timeoutMillis + " 
msecs");
-        }
+        assertThat(exists)
+                .withFailMessage("The marker file was not found within %s 
msecs", timeoutMillis)
+                .isTrue();
     }
 
     public static void killProcessWithSigTerm(long pid) throws Exception {
         // send it a regular kill command (SIG_TERM)
         final Process kill = Runtime.getRuntime().exec("kill " + pid);
         kill.waitFor();
-        if (kill.exitValue() != 0) {
-            fail("failed to send SIG_TERM to process " + pid);
-        }
+        assertThat(kill.exitValue())
+                .withFailMessage("failed to send SIG_TERM to process %s", pid)
+                .isZero();
     }
 
     public static void waitForMarkerFiles(File basedir, String prefix, int 
num, long timeout) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/TestType.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/TestType.java
index 8db517f68b3..da70fb42b52 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/TestType.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/TestType.java
@@ -26,13 +26,13 @@ import org.apache.flink.runtime.state.Keyed;
 import org.apache.flink.runtime.state.PriorityComparable;
 import org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueueElement;
 
-import org.junit.Assert;
-
 import javax.annotation.Nonnull;
 
 import java.io.IOException;
 import java.util.Objects;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
 /**
  * A data type used as state in state migration tests.
  *
@@ -129,9 +129,9 @@ public class TestType extends 
AbstractHeapPriorityQueueElement
         @Override
         public TestType deserialize(DataInputView source) throws IOException {
             String key = source.readUTF();
-            Assert.assertEquals(RANDOM_PAYLOAD, source.readUTF());
+            assertThat(source.readUTF()).isEqualTo(RANDOM_PAYLOAD);
             int value = source.readInt();
-            Assert.assertTrue(source.readBoolean());
+            assertThat(source.readBoolean()).isTrue();
 
             return new TestType(key, value);
         }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/throughput/BufferDebloaterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/throughput/BufferDebloaterTest.java
index 55d6d735993..0f49af792dc 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/throughput/BufferDebloaterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/throughput/BufferDebloaterTest.java
@@ -18,24 +18,18 @@
 
 package org.apache.flink.runtime.throughput;
 
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.util.OptionalInt;
 
 import static 
org.apache.flink.configuration.TaskManagerOptions.BUFFER_DEBLOAT_THRESHOLD_PERCENTAGES;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for {@link BufferDebloater}. */
-public class BufferDebloaterTest extends TestLogger {
+class BufferDebloaterTest {
 
     @Test
-    public void testZeroBuffersInUse() {
+    void testZeroBuffersInUse() {
         // if the gate returns the zero buffers in use it should be 
transformed to 1.
         testBufferDebloater()
                 .withDebloatTarget(1000)
@@ -46,7 +40,7 @@ public class BufferDebloaterTest extends TestLogger {
     }
 
     @Test
-    public void testCorrectBufferSizeCalculation() {
+    void testCorrectBufferSizeCalculation() {
         testBufferDebloater()
                 .withDebloatTarget(1200)
                 .withBufferSize(50, 1100)
@@ -56,7 +50,7 @@ public class BufferDebloaterTest extends TestLogger {
     }
 
     @Test
-    public void testCalculatedBufferSizeLessThanMin() {
+    void testCalculatedBufferSizeLessThanMin() {
         testBufferDebloater()
                 .withDebloatTarget(1200)
                 .withBufferSize(250, 1100)
@@ -66,7 +60,7 @@ public class BufferDebloaterTest extends TestLogger {
     }
 
     @Test
-    public void testCalculatedBufferSizeForThroughputZero() {
+    void testCalculatedBufferSizeForThroughputZero() {
         // When the throughput is zero then min buffer size will be taken.
         testBufferDebloater()
                 .withDebloatTarget(1200)
@@ -77,7 +71,7 @@ public class BufferDebloaterTest extends TestLogger {
     }
 
     @Test
-    public void testConfiguredConsumptionTimeIsTooLow() {
+    void testConfiguredConsumptionTimeIsTooLow() {
         // When the consumption time is low then min buffer size will be taken.
         testBufferDebloater()
                 .withDebloatTarget(7)
@@ -88,7 +82,7 @@ public class BufferDebloaterTest extends TestLogger {
     }
 
     @Test
-    public void testCalculatedBufferSizeGreaterThanMax() {
+    void testCalculatedBufferSizeGreaterThanMax() {
         // New calculated buffer size should be more than max value it means 
that we should take max
         // value which means that no updates should happen because the old 
value equal to new value.
         testBufferDebloater()
@@ -100,7 +94,7 @@ public class BufferDebloaterTest extends TestLogger {
     }
 
     @Test
-    public void testCalculatedBufferSlightlyDifferentFromCurrentOne() {
+    void testCalculatedBufferSlightlyDifferentFromCurrentOne() {
         // New calculated buffer size should be a little less than current 
value(or max value which
         // is the same) it means that no updates should happen because the new 
value is not so
         // different from the old one.
@@ -113,7 +107,7 @@ public class BufferDebloaterTest extends TestLogger {
     }
 
     @Test
-    public void testAnnouncedMaxBufferSizeDespiteLastDiffLessThanThreshold() {
+    void testAnnouncedMaxBufferSizeDespiteLastDiffLessThanThreshold() {
         final int numberOfBuffersInUse = 1;
         BufferDebloater bufferDebloater =
                 testBufferDebloater()
@@ -125,21 +119,21 @@ public class BufferDebloaterTest extends TestLogger {
 
         // Calculate the buffer size a little lower than the max buffer size.
         bufferDebloater.recalculateBufferSize(1000, numberOfBuffersInUse);
-        assertThat(bufferDebloater.getLastBufferSize(), is(1000));
+        assertThat(bufferDebloater.getLastBufferSize()).isEqualTo(1000);
 
         // Recalculate the buffer size to max value.
         bufferDebloater.recalculateBufferSize(2000, numberOfBuffersInUse);
 
         // The max value should be announced despite it differ from the 
previous one by less than
         // threshold value.
-        assertThat(bufferDebloater.getLastBufferSize(), is(1100));
+        assertThat(bufferDebloater.getLastBufferSize()).isEqualTo(1100);
 
         // Make sure that there is no repeated announcement of max buffer size.
         bufferDebloater.recalculateBufferSize(2000, numberOfBuffersInUse);
     }
 
     @Test
-    public void 
testAnnouncedMinBufferSizeEvenDespiteLastDiffLessThanThreshold() {
+    void testAnnouncedMinBufferSizeEvenDespiteLastDiffLessThanThreshold() {
         final int numberOfBuffersInUse = 1;
         BufferDebloater bufferDebloater =
                 testBufferDebloater()
@@ -151,21 +145,21 @@ public class BufferDebloaterTest extends TestLogger {
 
         // Calculate the buffer size a little greater than the min buffer size.
         bufferDebloater.recalculateBufferSize(60, numberOfBuffersInUse);
-        assertThat(bufferDebloater.getLastBufferSize(), is(60));
+        assertThat(bufferDebloater.getLastBufferSize()).isEqualTo(60);
 
         // Recalculate the buffer size to min value.
         bufferDebloater.recalculateBufferSize(40, numberOfBuffersInUse);
 
         // The min value should be announced despite it differ from the 
previous one by less than
         // threshold value.
-        assertThat(bufferDebloater.getLastBufferSize(), is(50));
+        assertThat(bufferDebloater.getLastBufferSize()).isEqualTo(50);
 
         // Make sure that there is no repeated announcement of min buffer size.
         bufferDebloater.recalculateBufferSize(40, numberOfBuffersInUse);
     }
 
     @Test
-    public void testSkipUpdate() {
+    void testSkipUpdate() {
         int maxBufferSize = 32768;
         int minBufferSize = 256;
         double threshold = 0.3;
@@ -180,36 +174,39 @@ public class BufferDebloaterTest extends TestLogger {
         int currentBufferSize = maxBufferSize / 2;
 
         OptionalInt optionalInt = 
bufferDebloater.recalculateBufferSize(currentBufferSize, 1);
-        assertTrue(optionalInt.isPresent());
-        assertEquals(currentBufferSize, optionalInt.getAsInt());
+        assertThat(optionalInt).isPresent().hasValue(currentBufferSize);
 
         // It is true because less than threshold.
-        assertTrue(bufferDebloater.skipUpdate(currentBufferSize));
-        assertTrue(bufferDebloater.skipUpdate(currentBufferSize - 1));
-        assertTrue(bufferDebloater.skipUpdate(currentBufferSize + 1));
-
-        assertTrue(
-                bufferDebloater.skipUpdate(
-                        currentBufferSize - (int) (currentBufferSize * 
threshold) + 1));
-        assertTrue(
-                bufferDebloater.skipUpdate(
-                        currentBufferSize + (int) (currentBufferSize * 
threshold) - 1));
+        assertThat(bufferDebloater.skipUpdate(currentBufferSize)).isTrue();
+        assertThat(bufferDebloater.skipUpdate(currentBufferSize - 1)).isTrue();
+        assertThat(bufferDebloater.skipUpdate(currentBufferSize + 1)).isTrue();
+
+        assertThat(
+                        bufferDebloater.skipUpdate(
+                                currentBufferSize - (int) (currentBufferSize * 
threshold) + 1))
+                .isTrue();
+        assertThat(
+                        bufferDebloater.skipUpdate(
+                                currentBufferSize + (int) (currentBufferSize * 
threshold) - 1))
+                .isTrue();
 
         // It is false because it reaches threshold.
-        assertFalse(
-                bufferDebloater.skipUpdate(
-                        currentBufferSize - (int) (currentBufferSize * 
threshold)));
-        assertFalse(
-                bufferDebloater.skipUpdate(
-                        currentBufferSize + (int) (currentBufferSize * 
threshold)));
-        assertFalse(bufferDebloater.skipUpdate(minBufferSize + 1));
-        assertFalse(bufferDebloater.skipUpdate(minBufferSize));
-        assertFalse(bufferDebloater.skipUpdate(maxBufferSize - 1));
-        assertFalse(bufferDebloater.skipUpdate(maxBufferSize));
+        assertThat(
+                        bufferDebloater.skipUpdate(
+                                currentBufferSize - (int) (currentBufferSize * 
threshold)))
+                .isFalse();
+        assertThat(
+                        bufferDebloater.skipUpdate(
+                                currentBufferSize + (int) (currentBufferSize * 
threshold)))
+                .isFalse();
+        assertThat(bufferDebloater.skipUpdate(minBufferSize + 1)).isFalse();
+        assertThat(bufferDebloater.skipUpdate(minBufferSize)).isFalse();
+        assertThat(bufferDebloater.skipUpdate(maxBufferSize - 1)).isFalse();
+        assertThat(bufferDebloater.skipUpdate(maxBufferSize)).isFalse();
 
         // Beyond the min and max size is always false.
-        assertFalse(bufferDebloater.skipUpdate(maxBufferSize + 1));
-        assertFalse(bufferDebloater.skipUpdate(minBufferSize - 1));
+        assertThat(bufferDebloater.skipUpdate(maxBufferSize + 1)).isFalse();
+        assertThat(bufferDebloater.skipUpdate(minBufferSize - 1)).isFalse();
     }
 
     public static BufferDebloaterTestBuilder testBufferDebloater() {
@@ -257,7 +254,7 @@ public class BufferDebloaterTest extends TestLogger {
             final OptionalInt newBufferSize =
                     bufferDebloater.recalculateBufferSize(throughput, 
numberOfBuffersInUse);
 
-            assertFalse(newBufferSize.isPresent());
+            assertThat(newBufferSize).isNotPresent();
         }
 
         public BufferDebloater expectBufferSize(int expectedBufferSize) {
@@ -267,8 +264,7 @@ public class BufferDebloaterTest extends TestLogger {
             final OptionalInt newBufferSize =
                     bufferDebloater.recalculateBufferSize(throughput, 
numberOfBuffersInUse);
 
-            assertTrue(newBufferSize.isPresent());
-            assertThat(newBufferSize.getAsInt(), is(expectedBufferSize));
+            assertThat(newBufferSize).isPresent().hasValue(expectedBufferSize);
             return bufferDebloater;
         }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/throughput/BufferSizeEMATest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/throughput/BufferSizeEMATest.java
index b58a1932ed3..365448e1155 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/throughput/BufferSizeEMATest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/throughput/BufferSizeEMATest.java
@@ -18,74 +18,74 @@
 
 package org.apache.flink.runtime.throughput;
 
-import org.apache.flink.util.TestLogger;
+import org.junit.jupiter.api.Test;
 
-import org.junit.Test;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.is;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
 
 /** Test for {@link BufferSizeEMA}. */
-public class BufferSizeEMATest extends TestLogger {
+class BufferSizeEMATest {
 
     @Test
-    public void testCalculationBufferSize() {
+    void testCalculationBufferSize() {
         BufferSizeEMA calculator = new BufferSizeEMA(200, 10, 3);
 
         // The result value seeks to the bottom limit but it will take a while 
until it reaches it.
-        assertThat(calculator.calculateBufferSize(111, 13), is(104));
-        assertThat(calculator.calculateBufferSize(107, 7), is(59));
-        assertThat(calculator.calculateBufferSize(107, 7), is(37));
-        assertThat(calculator.calculateBufferSize(107, 7), is(26));
-        assertThat(calculator.calculateBufferSize(107, 7), is(20));
-        assertThat(calculator.calculateBufferSize(107, 7), is(17));
-        assertThat(calculator.calculateBufferSize(107, 13), is(12));
-        assertThat(calculator.calculateBufferSize(107, 13), is(10));
+        assertThat(calculator.calculateBufferSize(111, 13)).isEqualTo(104);
+        assertThat(calculator.calculateBufferSize(107, 7)).isEqualTo(59);
+        assertThat(calculator.calculateBufferSize(107, 7)).isEqualTo(37);
+        assertThat(calculator.calculateBufferSize(107, 7)).isEqualTo(26);
+        assertThat(calculator.calculateBufferSize(107, 7)).isEqualTo(20);
+        assertThat(calculator.calculateBufferSize(107, 7)).isEqualTo(17);
+        assertThat(calculator.calculateBufferSize(107, 13)).isEqualTo(12);
+        assertThat(calculator.calculateBufferSize(107, 13)).isEqualTo(10);
 
         // Upgrade
-        assertThat(calculator.calculateBufferSize(333, 1), is(15));
-        assertThat(calculator.calculateBufferSize(333, 1), is(22));
-        assertThat(calculator.calculateBufferSize(333, 1), is(33));
-        assertThat(calculator.calculateBufferSize(333, 1), is(49));
-        assertThat(calculator.calculateBufferSize(333, 1), is(73));
-        assertThat(calculator.calculateBufferSize(333, 1), is(109));
+        assertThat(calculator.calculateBufferSize(333, 1)).isEqualTo(15);
+        assertThat(calculator.calculateBufferSize(333, 1)).isEqualTo(22);
+        assertThat(calculator.calculateBufferSize(333, 1)).isEqualTo(33);
+        assertThat(calculator.calculateBufferSize(333, 1)).isEqualTo(49);
+        assertThat(calculator.calculateBufferSize(333, 1)).isEqualTo(73);
+        assertThat(calculator.calculateBufferSize(333, 1)).isEqualTo(109);
     }
 
     @Test
-    public void testSizeGreaterThanMaxSize() {
+    void testSizeGreaterThanMaxSize() {
         BufferSizeEMA calculator = new BufferSizeEMA(200, 10, 3);
 
         // Decrease value to less than max.
-        assertThat(calculator.calculateBufferSize(0, 1), is(100));
+        assertThat(calculator.calculateBufferSize(0, 1)).isEqualTo(100);
 
         // Impossible to exceed maximum.
-        assertThat(calculator.calculateBufferSize(1000, 1), is(150));
-        assertThat(calculator.calculateBufferSize(1000, 1), is(200));
-        assertThat(calculator.calculateBufferSize(1000, 1), is(200));
+        assertThat(calculator.calculateBufferSize(1000, 1)).isEqualTo(150);
+        assertThat(calculator.calculateBufferSize(1000, 1)).isEqualTo(200);
+        assertThat(calculator.calculateBufferSize(1000, 1)).isEqualTo(200);
     }
 
     @Test
-    public void testSizeLessThanMinSize() {
+    void testSizeLessThanMinSize() {
         BufferSizeEMA calculator = new BufferSizeEMA(200, 10, 3);
 
         // Impossible to less than min.
-        assertThat(calculator.calculateBufferSize(0, 1), is(100));
-        assertThat(calculator.calculateBufferSize(0, 1), is(50));
-        assertThat(calculator.calculateBufferSize(0, 1), is(25));
-        assertThat(calculator.calculateBufferSize(0, 1), is(12));
-        assertThat(calculator.calculateBufferSize(0, 1), is(10));
-        assertThat(calculator.calculateBufferSize(0, 1), is(10));
+        assertThat(calculator.calculateBufferSize(0, 1)).isEqualTo(100);
+        assertThat(calculator.calculateBufferSize(0, 1)).isEqualTo(50);
+        assertThat(calculator.calculateBufferSize(0, 1)).isEqualTo(25);
+        assertThat(calculator.calculateBufferSize(0, 1)).isEqualTo(12);
+        assertThat(calculator.calculateBufferSize(0, 1)).isEqualTo(10);
+        assertThat(calculator.calculateBufferSize(0, 1)).isEqualTo(10);
     }
 
-    @Test(expected = IllegalArgumentException.class)
-    public void testNegativeTotalSize() {
+    @Test
+    void testNegativeTotalSize() {
         BufferSizeEMA calculator = new BufferSizeEMA(100, 200, 2);
-        calculator.calculateBufferSize(-1, 1);
+        assertThatExceptionOfType(IllegalArgumentException.class)
+                .isThrownBy(() -> calculator.calculateBufferSize(-1, 1));
     }
 
-    @Test(expected = IllegalArgumentException.class)
-    public void testZeroBuffers() {
+    @Test
+    void testZeroBuffers() {
         BufferSizeEMA calculator = new BufferSizeEMA(100, 200, 2);
-        calculator.calculateBufferSize(1, 0);
+        assertThatExceptionOfType(IllegalArgumentException.class)
+                .isThrownBy(() -> calculator.calculateBufferSize(1, 0));
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/throughput/ThroughputCalculatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/throughput/ThroughputCalculatorTest.java
index 15c973c8cb1..8e98d29c143 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/throughput/ThroughputCalculatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/throughput/ThroughputCalculatorTest.java
@@ -20,19 +20,17 @@ package org.apache.flink.runtime.throughput;
 
 import org.apache.flink.util.clock.ManualClock;
 
-import junit.framework.TestCase;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.time.Duration;
 
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.is;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for {@link ThroughputCalculator}. */
-public class ThroughputCalculatorTest extends TestCase {
+class ThroughputCalculatorTest {
 
     @Test
-    public void testCorrectThroughputCalculation() {
+    void testCorrectThroughputCalculation() {
         ManualClock clock = new ManualClock();
         ThroughputCalculator throughputCalculator = new 
ThroughputCalculator(clock);
 
@@ -43,27 +41,27 @@ public class ThroughputCalculatorTest extends TestCase {
         throughputCalculator.incomingDataSize(1);
         clock.advanceTime(Duration.ofMillis(66));
 
-        assertThat(throughputCalculator.calculateThroughput(), is(10_000L * 
1_000 / 100));
+        
assertThat(throughputCalculator.calculateThroughput()).isEqualTo(10_000L * 
1_000 / 100);
     }
 
     @Test
-    public void testResetValueAfterCalculation() {
+    void testResetValueAfterCalculation() {
         ManualClock clock = new ManualClock();
         ThroughputCalculator throughputCalculator = new 
ThroughputCalculator(clock);
 
         throughputCalculator.incomingDataSize(666);
         clock.advanceTime(Duration.ofMillis(100));
 
-        assertThat(throughputCalculator.calculateThroughput(), is(6660L));
+        
assertThat(throughputCalculator.calculateThroughput()).isEqualTo(6660L);
         // It should be the same as previous time.
-        assertThat(throughputCalculator.calculateThroughput(), is(6660L));
+        
assertThat(throughputCalculator.calculateThroughput()).isEqualTo(6660L);
 
         clock.advanceTime(Duration.ofMillis(1));
-        assertThat(throughputCalculator.calculateThroughput(), is(0L));
+        assertThat(throughputCalculator.calculateThroughput()).isZero();
     }
 
     @Test
-    public void testIgnoringIdleTime() {
+    void testIgnoringIdleTime() {
         ManualClock clock = new ManualClock();
         ThroughputCalculator throughputCalculator = new 
ThroughputCalculator(clock);
 
@@ -76,11 +74,11 @@ public class ThroughputCalculatorTest extends TestCase {
         throughputCalculator.incomingDataSize(3);
         clock.advanceTime(Duration.ofMillis(1));
 
-        assertThat(throughputCalculator.calculateThroughput(), is(5L * 1_000));
+        assertThat(throughputCalculator.calculateThroughput()).isEqualTo(5L * 
1_000);
     }
 
     @Test
-    public void testCalculationDuringIdleTime() {
+    void testCalculationDuringIdleTime() {
         ManualClock clock = new ManualClock();
         ThroughputCalculator throughputCalculator = new 
ThroughputCalculator(clock);
 
@@ -90,11 +88,11 @@ public class ThroughputCalculatorTest extends TestCase {
         // This will be ignored because it is in idle now.
         clock.advanceTime(Duration.ofMillis(9));
 
-        assertThat(throughputCalculator.calculateThroughput(), is(10L * 
1_000));
+        assertThat(throughputCalculator.calculateThroughput()).isEqualTo(10L * 
1_000);
     }
 
     @Test
-    public void testMultiplyIdleEnd() {
+    void testMultiplyIdleEnd() {
         ManualClock clock = new ManualClock();
         ThroughputCalculator throughputCalculator = new 
ThroughputCalculator(clock);
 
@@ -112,11 +110,11 @@ public class ThroughputCalculatorTest extends TestCase {
         clock.advanceTime(Duration.ofMillis(1));
 
         // resumeMeasurement should not reset the time because 
pauseMeasurement was not called.
-        assertThat(throughputCalculator.calculateThroughput(), is(1_000L));
+        
assertThat(throughputCalculator.calculateThroughput()).isEqualTo(1_000L);
     }
 
     @Test
-    public void testNotRestartTimerOnCalculationDuringIdleTime() {
+    void testNotRestartTimerOnCalculationDuringIdleTime() {
         ManualClock clock = new ManualClock();
         ThroughputCalculator throughputCalculator = new 
ThroughputCalculator(clock);
 
@@ -132,6 +130,6 @@ public class ThroughputCalculatorTest extends TestCase {
         throughputCalculator.incomingDataSize(10);
         clock.advanceTime(Duration.ofMillis(1));
 
-        assertThat(throughputCalculator.calculateThroughput(), is(10L * 
1_000));
+        assertThat(throughputCalculator.calculateThroughput()).isEqualTo(10L * 
1_000);
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/throwable/ThrowableClassifierTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/throwable/ThrowableClassifierTest.java
index e68d753145b..4ec955ff631 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/throwable/ThrowableClassifierTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/throwable/ThrowableClassifierTest.java
@@ -19,104 +19,100 @@
 package org.apache.flink.runtime.throwable;
 
 import org.apache.flink.runtime.execution.SuppressRestartsException;
-import org.apache.flink.util.TestLogger;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test {@link ThrowableClassifier}. */
-public class ThrowableClassifierTest extends TestLogger {
+class ThrowableClassifierTest {
 
     @Test
-    public void testThrowableType_NonRecoverable() {
-        assertEquals(
-                ThrowableType.NonRecoverableError,
-                ThrowableClassifier.getThrowableType(
-                        new SuppressRestartsException(new Exception(""))));
+    void testThrowableType_NonRecoverable() {
+        assertThat(
+                        ThrowableClassifier.getThrowableType(
+                                new SuppressRestartsException(new 
Exception(""))))
+                .isEqualTo(ThrowableType.NonRecoverableError);
     }
 
     @Test
-    public void testThrowableType_Recoverable() {
-        assertEquals(
-                ThrowableType.RecoverableError,
-                ThrowableClassifier.getThrowableType(new Exception("")));
-        assertEquals(
-                ThrowableType.RecoverableError,
-                ThrowableClassifier.getThrowableType(new 
TestRecoverableErrorException()));
+    void testThrowableType_Recoverable() {
+        assertThat(ThrowableClassifier.getThrowableType(new Exception("")))
+                .isEqualTo(ThrowableType.RecoverableError);
+        assertThat(ThrowableClassifier.getThrowableType(new 
TestRecoverableErrorException()))
+                .isEqualTo(ThrowableType.RecoverableError);
     }
 
     @Test
-    public void testThrowableType_EnvironmentError() {
-        assertEquals(
-                ThrowableType.EnvironmentError,
-                ThrowableClassifier.getThrowableType(new 
TestEnvironmentErrorException()));
+    void testThrowableType_EnvironmentError() {
+        assertThat(ThrowableClassifier.getThrowableType(new 
TestEnvironmentErrorException()))
+                .isEqualTo(ThrowableType.EnvironmentError);
     }
 
     @Test
-    public void testThrowableType_PartitionDataMissingError() {
-        assertEquals(
-                ThrowableType.PartitionDataMissingError,
-                ThrowableClassifier.getThrowableType(new 
TestPartitionDataMissingErrorException()));
+    void testThrowableType_PartitionDataMissingError() {
+        assertThat(
+                        ThrowableClassifier.getThrowableType(
+                                new TestPartitionDataMissingErrorException()))
+                .isEqualTo(ThrowableType.PartitionDataMissingError);
     }
 
     @Test
-    public void testThrowableType_InheritError() {
-        assertEquals(
-                ThrowableType.PartitionDataMissingError,
-                ThrowableClassifier.getThrowableType(
-                        new TestPartitionDataMissingErrorSubException()));
+    void testThrowableType_InheritError() {
+        assertThat(
+                        ThrowableClassifier.getThrowableType(
+                                new 
TestPartitionDataMissingErrorSubException()))
+                .isEqualTo(ThrowableType.PartitionDataMissingError);
     }
 
     @Test
-    public void testFindThrowableOfThrowableType() {
+    void testFindThrowableOfThrowableType() {
         // no throwable type
-        assertFalse(
-                ThrowableClassifier.findThrowableOfThrowableType(
-                                new Exception(), 
ThrowableType.RecoverableError)
-                        .isPresent());
+        assertThat(
+                        ThrowableClassifier.findThrowableOfThrowableType(
+                                new Exception(), 
ThrowableType.RecoverableError))
+                .isNotPresent();
 
         // no recoverable throwable type
-        assertFalse(
-                ThrowableClassifier.findThrowableOfThrowableType(
+        assertThat(
+                        ThrowableClassifier.findThrowableOfThrowableType(
                                 new TestPartitionDataMissingErrorException(),
-                                ThrowableType.RecoverableError)
-                        .isPresent());
+                                ThrowableType.RecoverableError))
+                .isNotPresent();
 
         // direct recoverable throwable
-        assertTrue(
-                ThrowableClassifier.findThrowableOfThrowableType(
-                                new TestRecoverableErrorException(), 
ThrowableType.RecoverableError)
-                        .isPresent());
+        assertThat(
+                        ThrowableClassifier.findThrowableOfThrowableType(
+                                new TestRecoverableErrorException(),
+                                ThrowableType.RecoverableError))
+                .isPresent();
 
         // nested recoverable throwable
-        assertTrue(
-                ThrowableClassifier.findThrowableOfThrowableType(
+        assertThat(
+                        ThrowableClassifier.findThrowableOfThrowableType(
                                 new Exception(new 
TestRecoverableErrorException()),
-                                ThrowableType.RecoverableError)
-                        .isPresent());
+                                ThrowableType.RecoverableError))
+                .isPresent();
 
         // inherit recoverable throwable
-        assertTrue(
-                ThrowableClassifier.findThrowableOfThrowableType(
+        assertThat(
+                        ThrowableClassifier.findThrowableOfThrowableType(
                                 new TestRecoverableFailureSubException(),
-                                ThrowableType.RecoverableError)
-                        .isPresent());
+                                ThrowableType.RecoverableError))
+                .isPresent();
     }
 
     @ThrowableAnnotation(ThrowableType.PartitionDataMissingError)
-    private class TestPartitionDataMissingErrorException extends Exception {}
+    private static class TestPartitionDataMissingErrorException extends 
Exception {}
 
     @ThrowableAnnotation(ThrowableType.EnvironmentError)
-    private class TestEnvironmentErrorException extends Exception {}
+    private static class TestEnvironmentErrorException extends Exception {}
 
     @ThrowableAnnotation(ThrowableType.RecoverableError)
-    private class TestRecoverableErrorException extends Exception {}
+    private static class TestRecoverableErrorException extends Exception {}
 
-    private class TestPartitionDataMissingErrorSubException
+    private static class TestPartitionDataMissingErrorSubException
             extends TestPartitionDataMissingErrorException {}
 
-    private class TestRecoverableFailureSubException extends 
TestRecoverableErrorException {}
+    private static class TestRecoverableFailureSubException extends 
TestRecoverableErrorException {}
 }


Reply via email to