This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f4e834babe [FLINK-31876][QS] Migrate flink-queryable-state-runtime 
tests to JUnit5
7f4e834babe is described below

commit 7f4e834babe7e8865123148e57ecdd6a138b609d
Author: fredia <[email protected]>
AuthorDate: Sun Apr 23 12:21:38 2023 +0800

    [FLINK-31876][QS] Migrate flink-queryable-state-runtime tests to JUnit5
---
 .../flink-queryable-state-runtime/pom.xml          |   6 +
 .../client/proxy/KvStateClientProxyImplTest.java   |  43 ++--
 .../itcases/AbstractQueryableStateTestBase.java    | 203 ++++++++--------
 .../itcases/HAQueryableStateFsBackendITCase.java   |  87 +++----
 .../HAQueryableStateRocksDBBackendITCase.java      |  89 +++----
 .../NonHAQueryableStateFsBackendITCase.java        |  47 ++--
 .../NonHAQueryableStateRocksDBBackendITCase.java   |  47 ++--
 .../queryablestate/network/AbstractServerTest.java |  72 +++---
 .../KVStateRequestSerializerRocksDBTest.java       |  23 +-
 .../network/KvStateClientHandlerTest.java          |  92 ++++++--
 .../network/KvStateRequestSerializerTest.java      | 260 +++++++++++++--------
 .../network/KvStateServerHandlerTest.java          | 162 +++++++------
 .../queryablestate/network/KvStateServerTest.java  |  24 +-
 .../network/MessageSerializerTest.java             | 102 ++++----
 .../org.junit.jupiter.api.extension.Extension      |  16 ++
 15 files changed, 690 insertions(+), 583 deletions(-)

diff --git a/flink-queryable-state/flink-queryable-state-runtime/pom.xml 
b/flink-queryable-state/flink-queryable-state-runtime/pom.xml
index 2a12468a89c..9775c67485a 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/pom.xml
+++ b/flink-queryable-state/flink-queryable-state-runtime/pom.xml
@@ -124,6 +124,12 @@ under the License.
                                                <goals>
                                                        <goal>test-jar</goal>
                                                </goals>
+                                               <configuration>
+                                                       <excludes>
+                                                               <!-- test-jar 
is still used by JUnit4 modules -->
+                                                               
<exclude>META-INF/services/org.junit.jupiter.api.extension.Extension</exclude>
+                                                       </excludes>
+                                               </configuration>
                                        </execution>
                                </executions>
                        </plugin>
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImplTest.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImplTest.java
index 16ec6e9cbf2..a7c3265b302 100644
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImplTest.java
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImplTest.java
@@ -23,27 +23,24 @@ import 
org.apache.flink.queryablestate.network.stats.DisabledKvStateRequestStats
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobmaster.KvStateLocationOracle;
 import org.apache.flink.runtime.query.KvStateLocation;
-import org.apache.flink.util.TestLogger;
 
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import java.net.InetAddress;
 import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
 
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.nullValue;
-import static org.junit.Assert.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for the {@link KvStateClientProxyImpl}. */
-public class KvStateClientProxyImplTest extends TestLogger {
+class KvStateClientProxyImplTest {
 
     private KvStateClientProxyImpl kvStateClientProxy;
 
-    @Before
-    public void setup() {
+    @BeforeEach
+    void setup() {
         kvStateClientProxy =
                 new KvStateClientProxyImpl(
                         InetAddress.getLoopbackAddress().getHostName(),
@@ -53,14 +50,14 @@ public class KvStateClientProxyImplTest extends TestLogger {
                         new DisabledKvStateRequestStats());
     }
 
-    @After
-    public void shutdown() {
+    @AfterEach
+    void shutdown() {
         kvStateClientProxy.shutdown();
     }
 
     /** Tests that we can set and retrieve the {@link KvStateLocationOracle}. 
*/
     @Test
-    public void testKvStateLocationOracle() {
+    void testKvStateLocationOracle() {
         final JobID jobId1 = new JobID();
         final TestingKvStateLocationOracle kvStateLocationOracle1 =
                 new TestingKvStateLocationOracle();
@@ -70,17 +67,15 @@ public class KvStateClientProxyImplTest extends TestLogger {
                 new TestingKvStateLocationOracle();
         kvStateClientProxy.updateKvStateLocationOracle(jobId2, 
kvStateLocationOracle2);
 
-        assertThat(kvStateClientProxy.getKvStateLocationOracle(new JobID()), 
nullValue());
+        assertThat(kvStateClientProxy.getKvStateLocationOracle(new 
JobID())).isNull();
 
-        assertThat(
-                kvStateClientProxy.getKvStateLocationOracle(jobId1),
-                equalTo(kvStateLocationOracle1));
-        assertThat(
-                kvStateClientProxy.getKvStateLocationOracle(jobId2),
-                equalTo(kvStateLocationOracle2));
+        assertThat(kvStateClientProxy.getKvStateLocationOracle(jobId1))
+                .isEqualTo(kvStateLocationOracle1);
+        assertThat(kvStateClientProxy.getKvStateLocationOracle(jobId2))
+                .isEqualTo(kvStateLocationOracle2);
 
         kvStateClientProxy.updateKvStateLocationOracle(jobId1, null);
-        assertThat(kvStateClientProxy.getKvStateLocationOracle(jobId1), 
nullValue());
+        
assertThat(kvStateClientProxy.getKvStateLocationOracle(jobId1)).isNull();
     }
 
     /**
@@ -88,7 +83,7 @@ public class KvStateClientProxyImplTest extends TestLogger {
      * HighAvailabilityServices#DEFAULT_JOB_ID} will be used for all requests.
      */
     @Test
-    public void testLegacyCodePathPreference() {
+    void testLegacyCodePathPreference() {
         final TestingKvStateLocationOracle kvStateLocationOracle =
                 new TestingKvStateLocationOracle();
         kvStateClientProxy.updateKvStateLocationOracle(
@@ -96,8 +91,8 @@ public class KvStateClientProxyImplTest extends TestLogger {
         final JobID jobId = new JobID();
         kvStateClientProxy.updateKvStateLocationOracle(jobId, new 
TestingKvStateLocationOracle());
 
-        assertThat(
-                kvStateClientProxy.getKvStateLocationOracle(jobId), 
equalTo(kvStateLocationOracle));
+        assertThat(kvStateClientProxy.getKvStateLocationOracle(jobId))
+                .isEqualTo(kvStateLocationOracle);
     }
 
     /** Testing implementation of {@link KvStateLocationOracle}. */
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
index 158e64904e4..7d6742c2e99 100644
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
@@ -60,23 +60,22 @@ import 
org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.testutils.ClassLoaderUtils;
-import org.apache.flink.testutils.executor.TestExecutorResource;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.concurrent.FutureUtils;
 import org.apache.flink.util.concurrent.ScheduledExecutor;
 import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
 
 import com.esotericsoftware.kryo.Serializer;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
 
+import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
 import java.net.URLClassLoader;
@@ -90,7 +89,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -99,24 +97,22 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicLongArray;
 
-import static org.hamcrest.CoreMatchers.containsString;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Base class for queryable state integration tests with a configurable state 
backend. */
-public abstract class AbstractQueryableStateTestBase extends TestLogger {
+public abstract class AbstractQueryableStateTestBase {
 
     private static final Duration TEST_TIMEOUT = Duration.ofSeconds(200L);
     private static final long RETRY_TIMEOUT = 50L;
 
-    @ClassRule
-    public static final TestExecutorResource<ScheduledExecutorService> 
EXECUTOR_RESOURCE =
-            new TestExecutorResource<>(() -> 
Executors.newScheduledThreadPool(4));
+    @RegisterExtension
+    static final TestExecutorExtension<ScheduledExecutorService> 
EXECUTOR_EXTENSION =
+            new TestExecutorExtension<>(() -> 
Executors.newScheduledThreadPool(4));
 
     private final ScheduledExecutor executor =
-            new 
ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor());
+            new 
ScheduledExecutorServiceAdapter(EXECUTOR_EXTENSION.getExecutor());
 
     /** State backend to use. */
     private StateBackend stateBackend;
@@ -128,14 +124,14 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
 
     protected static int maxParallelism;
 
-    @ClassRule public static TemporaryFolder classloaderFolder = new 
TemporaryFolder();
+    @TempDir static File classloaderFolder;
 
-    @Before
-    public void setUp() throws Exception {
+    @BeforeEach
+    void setUp() throws Exception {
         // NOTE: do not use a shared instance for all tests as the tests may 
break
         this.stateBackend = createStateBackend();
 
-        Assert.assertNotNull(clusterClient);
+        assertThat(clusterClient).isNotNull();
 
         maxParallelism = 4;
     }
@@ -157,7 +153,7 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
      * counts of each key in rounds until all keys have non-zero counts.
      */
     @Test
-    public void testQueryableState() throws Exception {
+    void testQueryableState() throws Exception {
         final Deadline deadline = Deadline.now().plus(TEST_TIMEOUT);
         final int numKeys = 256;
 
@@ -229,13 +225,15 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
 
                     result.thenAccept(
                             response -> {
-                                try {
-                                    Tuple2<Integer, Long> res = response.get();
-                                    counts.set(key, res.f1);
-                                    assertEquals("Key mismatch", key, 
res.f0.intValue());
-                                } catch (Exception e) {
-                                    Assert.fail(e.getMessage());
-                                }
+                                assertThatCode(
+                                                () -> {
+                                                    Tuple2<Integer, Long> res 
= response.get();
+                                                    counts.set(key, res.f1);
+                                                    assertThat(key)
+                                                            
.isEqualTo(res.f0.intValue())
+                                                            
.withFailMessage("Key mismatch");
+                                                })
+                                        .doesNotThrowAnyException();
                             });
 
                     futures.add(result);
@@ -246,19 +244,21 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
                         .get(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS);
             }
 
-            assertTrue("Not all keys are non-zero", allNonZero);
+            assertThat(allNonZero).isTrue().withFailMessage("Not all keys are 
non-zero");
 
             // All should be non-zero
             for (int i = 0; i < numKeys; i++) {
                 long count = counts.get(i);
-                assertTrue("Count at position " + i + " is " + count, count > 
0);
+                assertThat(count)
+                        .isGreaterThan(0)
+                        .withFailMessage("Count at position " + i + " is " + 
count);
             }
         }
     }
 
     /** Tests that duplicate query registrations fail the job at the 
JobManager. */
-    @Test(timeout = 60_000)
-    public void testDuplicateRegistrationFailsJob() throws Exception {
+    @Test
+    void testDuplicateRegistrationFailsJob() throws Exception {
         final int numKeys = 256;
 
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -312,18 +312,17 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
                 .thenApply(JobResult::getSerializedThrowable)
                 .thenAccept(
                         serializedThrowable -> {
-                            assertTrue(serializedThrowable.isPresent());
+                            assertThat(serializedThrowable).isPresent();
                             final Throwable t =
                                     serializedThrowable
                                             .get()
                                             
.deserializeError(getClass().getClassLoader());
                             final String failureCause = 
ExceptionUtils.stringifyException(t);
-                            assertThat(
-                                    failureCause,
-                                    containsString(
+                            assertThat(failureCause)
+                                    .contains(
                                             "KvState with name '"
                                                     + queryName
-                                                    + "' has already been 
registered by another operator"));
+                                                    + "' has already been 
registered by another operator");
                         })
                 .get();
     }
@@ -334,7 +333,7 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
      * subtask index is queried with value numElements (the latest element 
updated the state).
      */
     @Test
-    public void testValueState() throws Exception {
+    void testValueState() throws Exception {
         final Deadline deadline = Deadline.now().plus(TEST_TIMEOUT);
         final long numElements = 1024L;
 
@@ -377,7 +376,7 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
 
     /** This test checks if custom Kryo serializers are loaded with correct 
classloader. */
     @Test
-    public void testCustomKryoSerializerHandling() throws Exception {
+    void testCustomKryoSerializerHandling() throws Exception {
         final Deadline deadline = Deadline.now().plus(TEST_TIMEOUT);
         final long numElements = 1;
         final String stateName = "teriberka";
@@ -450,8 +449,8 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
      * queryable state name.
      */
     @Test
-    @Ignore
-    public void testWrongJobIdAndWrongQueryableStateName() throws Exception {
+    @Disabled
+    void testWrongJobIdAndWrongQueryableStateName() throws Exception {
         final Deadline deadline = Deadline.now().plus(TEST_TIMEOUT);
         final long numElements = 1024L;
 
@@ -492,9 +491,8 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
                 jobStatusFuture = 
clusterClient.getJobStatus(closableJobGraph.getJobId());
             }
 
-            assertEquals(
-                    JobStatus.RUNNING,
-                    jobStatusFuture.get(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS));
+            assertThat(jobStatusFuture.get(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS))
+                    .isEqualTo(JobStatus.RUNNING);
 
             final JobID wrongJobId = new JobID();
 
@@ -506,24 +504,17 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
                             BasicTypeInfo.INT_TYPE_INFO,
                             valueState);
 
-            try {
-                unknownJobFuture.get(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS);
-                fail(); // by now the request must have failed.
-            } catch (ExecutionException e) {
-                Assert.assertTrue(
-                        "GOT: " + e.getCause().getMessage(),
-                        e.getCause() instanceof RuntimeException);
-                Assert.assertTrue(
-                        "GOT: " + e.getCause().getMessage(),
-                        e.getCause()
-                                .getMessage()
-                                .contains(
-                                        "FlinkJobNotFoundException: Could not 
find Flink job ("
-                                                + wrongJobId
-                                                + ")"));
-            } catch (Exception f) {
-                fail("Unexpected type of exception: " + f.getMessage());
-            }
+            assertThatThrownBy(
+                            () ->
+                                    unknownJobFuture.get(
+                                            deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS))
+                    .isInstanceOf(ExecutionException.class)
+                    .cause()
+                    .isInstanceOf(RuntimeException.class)
+                    .hasMessage(
+                            "FlinkJobNotFoundException: Could not find Flink 
job ("
+                                    + wrongJobId
+                                    + ")");
 
             CompletableFuture<ValueState<Tuple2<Integer, Long>>> unknownQSName 
=
                     client.getKvState(
@@ -533,22 +524,15 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
                             BasicTypeInfo.INT_TYPE_INFO,
                             valueState);
 
-            try {
-                unknownQSName.get(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS);
-                fail(); // by now the request must have failed.
-            } catch (ExecutionException e) {
-                Assert.assertTrue(
-                        "GOT: " + e.getCause().getMessage(),
-                        e.getCause() instanceof RuntimeException);
-                Assert.assertTrue(
-                        "GOT: " + e.getCause().getMessage(),
-                        e.getCause()
-                                .getMessage()
-                                .contains(
-                                        "UnknownKvStateLocation: No 
KvStateLocation found for KvState instance with name 'wrong-hakuna'."));
-            } catch (Exception f) {
-                fail("Unexpected type of exception: " + f.getMessage());
-            }
+            assertThatThrownBy(
+                            () ->
+                                    unknownQSName.get(
+                                            deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS))
+                    .isInstanceOf(ExecutionException.class)
+                    .cause()
+                    .isInstanceOf(RuntimeException.class)
+                    .hasMessage(
+                            "UnknownKvStateLocation: No KvStateLocation found 
for KvState instance with name 'wrong-hakuna'.");
         }
     }
 
@@ -557,7 +541,7 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
      * one request which fails.
      */
     @Test
-    public void testQueryNonStartedJobState() throws Exception {
+    void testQueryNonStartedJobState() throws Exception {
         final Deadline deadline = Deadline.now().plus(TEST_TIMEOUT);
         final long numElements = 1024L;
 
@@ -617,8 +601,8 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
      *
      * @throws UnknownKeyOrNamespaceException thrown due querying a 
non-existent key
      */
-    @Test(expected = UnknownKeyOrNamespaceException.class)
-    public void testValueStateDefault() throws Throwable {
+    @Test
+    void testValueStateDefault() throws Throwable {
         final Deadline deadline = Deadline.now().plus(TEST_TIMEOUT);
         final long numElements = 1024L;
 
@@ -672,13 +656,10 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
                             true,
                             executor);
 
-            try {
-                future.get(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS);
-            } catch (ExecutionException | CompletionException e) {
-                // get() on a completedExceptionally future wraps the
-                // exception in an ExecutionException.
-                throw e.getCause();
-            }
+            assertThatThrownBy(
+                            () -> future.get(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS))
+                    .cause()
+                    .isInstanceOf(UnknownKeyOrNamespaceException.class);
         }
     }
 
@@ -690,7 +671,7 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
      * <p>This is the same as the simple value state test, but uses the API 
shortcut.
      */
     @Test
-    public void testValueStateShortcut() throws Exception {
+    void testValueStateShortcut() throws Exception {
         final Deadline deadline = Deadline.now().plus(TEST_TIMEOUT);
         final long numElements = 1024L;
 
@@ -740,7 +721,7 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
      * sums these up. The test succeeds after each subtask index is queried 
with result n*(n+1)/2.
      */
     @Test
-    public void testReducingState() throws Exception {
+    void testReducingState() throws Exception {
         final Deadline deadline = Deadline.now().plus(TEST_TIMEOUT);
         final long numElements = 1024L;
 
@@ -797,7 +778,7 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
                     Tuple2<Integer, Long> value =
                             future.get(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS).get();
 
-                    assertEquals("Key mismatch", key, value.f0.intValue());
+                    
assertThat(key).isEqualTo(value.f0.intValue()).withFailMessage("Key mismatch");
                     if (expected == value.f1) {
                         success = true;
                     } else {
@@ -806,7 +787,7 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
                     }
                 }
 
-                assertTrue("Did not succeed query", success);
+                assertThat(success).isTrue().withFailMessage("Did not succeed 
query");
             }
         }
     }
@@ -817,7 +798,7 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
      * the values up. The test succeeds after each subtask index is queried 
with result n*(n+1)/2.
      */
     @Test
-    public void testMapState() throws Exception {
+    void testMapState() throws Exception {
         final Deadline deadline = Deadline.now().plus(TEST_TIMEOUT);
         final long numElements = 1024L;
 
@@ -899,15 +880,16 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
                                     .get(key);
 
                     if (value != null && value.f0 != null && expected == 
value.f1) {
-                        assertEquals("Key mismatch", key, value.f0.intValue());
+                        assertThat(key)
+                                .isEqualTo(value.f0.intValue())
+                                .withFailMessage("Key mismatch");
                         success = true;
                     } else {
                         // Retry
                         Thread.sleep(RETRY_TIMEOUT);
                     }
                 }
-
-                assertTrue("Did not succeed query", success);
+                assertThat(success).isTrue().withFailMessage("Did not succeed 
query");
             }
         }
     }
@@ -919,7 +901,7 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
      * contains the correct number of distinct elements.
      */
     @Test
-    public void testListState() throws Exception {
+    void testListState() throws Exception {
         final Deadline deadline = Deadline.now().plus(TEST_TIMEOUT);
         final long numElements = 1024L;
 
@@ -1010,20 +992,20 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
                     }
                 }
 
-                assertTrue("Did not succeed query", success);
+                assertThat(success).isTrue().withFailMessage("Did not succeed 
query");
             }
 
             for (int key = 0; key < maxParallelism; key++) {
                 Set<Long> values = results.get(key);
                 for (long i = 0L; i <= numElements; i++) {
-                    assertTrue(values.contains(i));
+                    assertThat(values).contains(i);
                 }
             }
         }
     }
 
     @Test
-    public void testAggregatingState() throws Exception {
+    void testAggregatingState() throws Exception {
         final Deadline deadline = Deadline.now().plus(TEST_TIMEOUT);
         final long numElements = 1024L;
 
@@ -1091,7 +1073,7 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
                     }
                 }
 
-                assertTrue("Did not succeed query", success);
+                assertThat(success).isTrue().withFailMessage("Did not succeed 
query");
             }
         }
     }
@@ -1345,10 +1327,9 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
                             Duration.ofMillis(50),
                             deadline,
                             (jobStatus) -> 
jobStatus.equals(JobStatus.CANCELED),
-                            new 
ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor()));
-            assertEquals(
-                    JobStatus.CANCELED,
-                    jobStatusFuture.get(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS));
+                            new 
ScheduledExecutorServiceAdapter(EXECUTOR_EXTENSION.getExecutor()));
+            assertThat(jobStatusFuture.get(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS))
+                    .isEqualTo(JobStatus.CANCELED);
         }
     }
 
@@ -1456,7 +1437,7 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
                 Tuple2<Integer, Long> value =
                         future.get(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS).value();
 
-                assertEquals("Key mismatch", key, value.f0.intValue());
+                
assertThat(key).isEqualTo(value.f0.intValue()).withFailMessage("Key mismatch");
                 if (expected == value.f1) {
                     success = true;
                 } else {
@@ -1465,14 +1446,14 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
                 }
             }
 
-            assertTrue("Did not succeed query", success);
+            assertThat(success).isTrue().withFailMessage("Did not succeed 
query");
         }
     }
 
     private static URLClassLoader createLoaderWithCustomKryoSerializer(String 
className)
             throws IOException {
         return ClassLoaderUtils.compileAndLoadJava(
-                classloaderFolder.newFolder(),
+                classloaderFolder,
                 className + ".java",
                 "import com.esotericsoftware.kryo.Kryo;\n"
                         + "import com.esotericsoftware.kryo.Serializer;\n"
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java
index 3739ed3256e..2465c6052cf 100644
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.queryablestate.itcases;
 
+import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
@@ -25,21 +26,25 @@ import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.QueryableStateOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.core.testutils.AllCallbackWrapper;
 import org.apache.flink.queryablestate.client.QueryableStateClient;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
-import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
-import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.runtime.zookeeper.ZooKeeperExtension;
+import org.apache.flink.test.junit5.InjectClusterClient;
+import org.apache.flink.test.junit5.MiniClusterExtension;
 
-import org.apache.curator.test.TestingServer;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.nio.file.Path;
 
 /** Several integration tests for queryable state using the {@link 
FsStateBackend}. */
-public class HAQueryableStateFsBackendITCase extends 
AbstractQueryableStateTestBase {
+class HAQueryableStateFsBackendITCase extends AbstractQueryableStateTestBase {
 
     private static final int NUM_JMS = 2;
     // NUM_TMS * NUM_SLOTS_PER_TM must match the parallelism of the pipelines 
so that
@@ -50,48 +55,51 @@ public class HAQueryableStateFsBackendITCase extends 
AbstractQueryableStateTestB
     private static final int QS_PROXY_PORT_RANGE_START = 9064;
     private static final int QS_SERVER_PORT_RANGE_START = 9069;
 
-    @ClassRule public static TemporaryFolder temporaryFolder = new 
TemporaryFolder();
-
-    private static TestingServer zkServer;
-
-    private static MiniClusterWithClientResource miniClusterResource;
+    @TempDir
+    @Order(1)
+    static Path tmpStateBackendDir;
+
+    @TempDir
+    @Order(2)
+    static Path tmpHaStoragePath;
+
+    @RegisterExtension
+    @Order(3)
+    static final AllCallbackWrapper<ZooKeeperExtension> ZK_RESOURCE =
+            new AllCallbackWrapper<>(new ZooKeeperExtension());
+
+    @RegisterExtension
+    @Order(4)
+    static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+            new MiniClusterExtension(
+                    () ->
+                            new MiniClusterResourceConfiguration.Builder()
+                                    .setConfiguration(getConfig())
+                                    .setNumberTaskManagers(NUM_TMS)
+                                    
.setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM)
+                                    .build());
 
     @Override
     protected StateBackend createStateBackend() throws Exception {
-        return new 
FsStateBackend(temporaryFolder.newFolder().toURI().toString());
+        return new FsStateBackend(tmpStateBackendDir.toUri().toString());
     }
 
-    @BeforeClass
-    public static void setup() throws Exception {
-        zkServer = ZooKeeperTestUtils.createAndStartZookeeperTestingServer();
-
-        // we have to manage this manually because we have to create the 
ZooKeeper server
-        // ahead of this
-        miniClusterResource =
-                new MiniClusterWithClientResource(
-                        new MiniClusterResourceConfiguration.Builder()
-                                .setConfiguration(getConfig())
-                                .setNumberTaskManagers(NUM_TMS)
-                                .setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM)
-                                .build());
-
-        miniClusterResource.before();
+    @BeforeAll
+    static void setup(@InjectClusterClient RestClusterClient<?> 
injectedClusterClient)
+            throws Exception {
 
         client = new QueryableStateClient("localhost", 
QS_PROXY_PORT_RANGE_START);
 
-        clusterClient = miniClusterResource.getClusterClient();
+        clusterClient = injectedClusterClient;
     }
 
-    @AfterClass
-    public static void tearDown() throws Exception {
-        miniClusterResource.after();
+    @AfterAll
+    static void tearDown() throws Exception {
 
         client.shutdownAndWait();
-
-        zkServer.close();
     }
 
-    private static Configuration getConfig() throws Exception {
+    private static Configuration getConfig() {
 
         Configuration config = new Configuration();
         
config.setBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER, 
true);
@@ -110,10 +118,11 @@ public class HAQueryableStateFsBackendITCase extends 
AbstractQueryableStateTestB
                 QS_SERVER_PORT_RANGE_START + "-" + (QS_SERVER_PORT_RANGE_START 
+ NUM_TMS));
         config.setBoolean(WebOptions.SUBMIT_ENABLE, false);
 
-        config.setString(
-                HighAvailabilityOptions.HA_STORAGE_PATH, 
temporaryFolder.newFolder().toString());
+        config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
tmpHaStoragePath.toString());
 
-        config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, 
zkServer.getConnectString());
+        config.setString(
+                HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM,
+                ZK_RESOURCE.getCustomExtension().getConnectString());
         config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
 
         return config;
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
index 508f151c0f0..3e7cc014997 100644
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.queryablestate.itcases;
 
+import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
@@ -26,20 +27,24 @@ import org.apache.flink.configuration.QueryableStateOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.core.testutils.AllCallbackWrapper;
 import org.apache.flink.queryablestate.client.QueryableStateClient;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
-import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
-import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.runtime.zookeeper.ZooKeeperExtension;
+import org.apache.flink.test.junit5.InjectClusterClient;
+import org.apache.flink.test.junit5.MiniClusterExtension;
 
-import org.apache.curator.test.TestingServer;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.nio.file.Path;
 
 /** Several integration tests for queryable state using the {@link 
RocksDBStateBackend}. */
-public class HAQueryableStateRocksDBBackendITCase extends 
AbstractQueryableStateTestBase {
+class HAQueryableStateRocksDBBackendITCase extends 
AbstractQueryableStateTestBase {
 
     private static final int NUM_JMS = 2;
     // NUM_TMS * NUM_SLOTS_PER_TM must match the parallelism of the pipelines 
so that
@@ -50,48 +55,49 @@ public class HAQueryableStateRocksDBBackendITCase extends 
AbstractQueryableState
     private static final int QS_PROXY_PORT_RANGE_START = 9074;
     private static final int QS_SERVER_PORT_RANGE_START = 9079;
 
-    @ClassRule public static TemporaryFolder temporaryFolder = new 
TemporaryFolder();
-
-    private static TestingServer zkServer;
-
-    private static MiniClusterWithClientResource miniClusterResource;
+    @TempDir
+    @Order(1)
+    static Path tmpStateBackendDir;
+
+    @TempDir
+    @Order(2)
+    static Path tmpHaStoragePath;
+
+    @RegisterExtension
+    @Order(3)
+    static final AllCallbackWrapper<ZooKeeperExtension> ZK_RESOURCE =
+            new AllCallbackWrapper<>(new ZooKeeperExtension());
+
+    @RegisterExtension
+    @Order(4)
+    static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+            new MiniClusterExtension(
+                    () ->
+                            new MiniClusterResourceConfiguration.Builder()
+                                    .setConfiguration(getConfig())
+                                    .setNumberTaskManagers(NUM_TMS)
+                                    
.setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM)
+                                    .build());
 
     @Override
     protected StateBackend createStateBackend() throws Exception {
-        return new 
RocksDBStateBackend(temporaryFolder.newFolder().toURI().toString());
+        return new RocksDBStateBackend(tmpStateBackendDir.toUri().toString());
     }
 
-    @BeforeClass
-    public static void setup() throws Exception {
-        zkServer = ZooKeeperTestUtils.createAndStartZookeeperTestingServer();
-
-        // we have to manage this manually because we have to create the 
ZooKeeper server
-        // ahead of this
-        miniClusterResource =
-                new MiniClusterWithClientResource(
-                        new MiniClusterResourceConfiguration.Builder()
-                                .setConfiguration(getConfig())
-                                .setNumberTaskManagers(NUM_TMS)
-                                .setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM)
-                                .build());
-
-        miniClusterResource.before();
-
+    @BeforeAll
+    static void setup(@InjectClusterClient RestClusterClient<?> 
injectedClusterClient)
+            throws Exception {
         client = new QueryableStateClient("localhost", 
QS_PROXY_PORT_RANGE_START);
 
-        clusterClient = miniClusterResource.getClusterClient();
+        clusterClient = injectedClusterClient;
     }
 
-    @AfterClass
-    public static void tearDown() throws Exception {
-        miniClusterResource.after();
-
+    @AfterAll
+    static void tearDown() throws Exception {
         client.shutdownAndWait();
-
-        zkServer.close();
     }
 
-    private static Configuration getConfig() throws Exception {
+    private static Configuration getConfig() {
 
         Configuration config = new Configuration();
         
config.setBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER, 
true);
@@ -110,10 +116,11 @@ public class HAQueryableStateRocksDBBackendITCase extends 
AbstractQueryableState
                 QS_SERVER_PORT_RANGE_START + "-" + (QS_SERVER_PORT_RANGE_START 
+ NUM_TMS));
         config.setBoolean(WebOptions.SUBMIT_ENABLE, false);
 
-        config.setString(
-                HighAvailabilityOptions.HA_STORAGE_PATH, 
temporaryFolder.newFolder().toString());
+        config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
tmpHaStoragePath.toString());
 
-        config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, 
zkServer.getConnectString());
+        config.setString(
+                HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM,
+                ZK_RESOURCE.getCustomExtension().getConnectString());
         config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
 
         return config;
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java
index cb53fe4b02a..26c7d366bf3 100644
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.queryablestate.itcases;
 
+import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MemorySize;
@@ -28,13 +29,15 @@ import 
org.apache.flink.queryablestate.client.QueryableStateClient;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
-import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.test.junit5.InjectClusterClient;
+import org.apache.flink.test.junit5.MiniClusterExtension;
 
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.nio.file.Path;
 
 /** Several integration tests for queryable state using the {@link 
FsStateBackend}. */
 public class NonHAQueryableStateFsBackendITCase extends 
AbstractQueryableStateTestBase {
@@ -48,31 +51,33 @@ public class NonHAQueryableStateFsBackendITCase extends 
AbstractQueryableStateTe
     private static final int QS_PROXY_PORT_RANGE_START = 9084;
     private static final int QS_SERVER_PORT_RANGE_START = 9089;
 
-    @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
+    @TempDir static Path tmpStateBackendDir;
 
-    @ClassRule
-    public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
-            new MiniClusterWithClientResource(
-                    new MiniClusterResourceConfiguration.Builder()
-                            .setConfiguration(getConfig())
-                            .setNumberTaskManagers(NUM_TMS)
-                            .setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM)
-                            .build());
+    @RegisterExtension
+    static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+            new MiniClusterExtension(
+                    () ->
+                            new MiniClusterResourceConfiguration.Builder()
+                                    .setConfiguration(getConfig())
+                                    .setNumberTaskManagers(NUM_TMS)
+                                    
.setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM)
+                                    .build());
 
     @Override
     protected StateBackend createStateBackend() throws Exception {
-        return new 
FsStateBackend(temporaryFolder.newFolder().toURI().toString());
+        return new FsStateBackend(tmpStateBackendDir.toUri().toString());
     }
 
-    @BeforeClass
-    public static void setup() throws Exception {
+    @BeforeAll
+    static void setup(@InjectClusterClient RestClusterClient<?> 
injectedClusterClient)
+            throws Exception {
         client = new QueryableStateClient("localhost", 
QS_PROXY_PORT_RANGE_START);
 
-        clusterClient = MINI_CLUSTER_RESOURCE.getClusterClient();
+        clusterClient = injectedClusterClient;
     }
 
-    @AfterClass
-    public static void tearDown() {
+    @AfterAll
+    static void tearDown() {
         client.shutdownAndWait();
     }
 
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
index 13e191d8ec2..6e8e0e8be6f 100644
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.queryablestate.itcases;
 
+import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MemorySize;
@@ -28,13 +29,15 @@ import 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
 import org.apache.flink.queryablestate.client.QueryableStateClient;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
-import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.test.junit5.InjectClusterClient;
+import org.apache.flink.test.junit5.MiniClusterExtension;
 
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.nio.file.Path;
 
 /** Several integration tests for queryable state using the {@link 
RocksDBStateBackend}. */
 public class NonHAQueryableStateRocksDBBackendITCase extends 
AbstractQueryableStateTestBase {
@@ -47,31 +50,33 @@ public class NonHAQueryableStateRocksDBBackendITCase 
extends AbstractQueryableSt
     private static final int QS_PROXY_PORT_RANGE_START = 9094;
     private static final int QS_SERVER_PORT_RANGE_START = 9099;
 
-    @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
+    @TempDir static Path tmpStateBackendDir;
 
-    @ClassRule
-    public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
-            new MiniClusterWithClientResource(
-                    new MiniClusterResourceConfiguration.Builder()
-                            .setConfiguration(getConfig())
-                            .setNumberTaskManagers(NUM_TMS)
-                            .setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM)
-                            .build());
+    @RegisterExtension
+    static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+            new MiniClusterExtension(
+                    () ->
+                            new MiniClusterResourceConfiguration.Builder()
+                                    .setConfiguration(getConfig())
+                                    .setNumberTaskManagers(NUM_TMS)
+                                    
.setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM)
+                                    .build());
 
     @Override
     protected StateBackend createStateBackend() throws Exception {
-        return new 
RocksDBStateBackend(temporaryFolder.newFolder().toURI().toString());
+        return new RocksDBStateBackend(tmpStateBackendDir.toUri().toString());
     }
 
-    @BeforeClass
-    public static void setup() throws Exception {
+    @BeforeAll
+    static void setup(@InjectClusterClient RestClusterClient<?> 
injectedClusterClient)
+            throws Exception {
         client = new QueryableStateClient("localhost", 
QS_PROXY_PORT_RANGE_START);
 
-        clusterClient = MINI_CLUSTER_RESOURCE.getClusterClient();
+        clusterClient = injectedClusterClient;
     }
 
-    @AfterClass
-    public static void tearDown() {
+    @AfterAll
+    static void tearDown() {
         client.shutdownAndWait();
     }
 
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java
index ed29cfdab2c..edc60d185ac 100644
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java
@@ -25,16 +25,11 @@ import 
org.apache.flink.queryablestate.network.messages.MessageSerializer;
 import org.apache.flink.queryablestate.network.stats.AtomicKvStateRequestStats;
 import 
org.apache.flink.queryablestate.network.stats.DisabledKvStateRequestStats;
 import org.apache.flink.queryablestate.network.stats.KvStateRequestStats;
-import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.TestLogger;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
 
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
+import org.junit.jupiter.api.Test;
 
 import java.net.InetAddress;
 import java.net.UnknownHostException;
@@ -46,22 +41,18 @@ import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
-/** Tests general behavior of the {@link AbstractServerBase}. */
-public class AbstractServerTest extends TestLogger {
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
-    @Rule public ExpectedException expectedEx = ExpectedException.none();
+/** Tests general behavior of the {@link AbstractServerBase}. */
+class AbstractServerTest {
 
     /**
      * Tests that in case of port collision, a FlinkRuntimeException is thrown 
with a specific
      * message.
      */
     @Test
-    public void testServerInitializationFailure() throws Throwable {
-
-        // the expected exception along with the adequate message
-        expectedEx.expect(FlinkRuntimeException.class);
-        expectedEx.expectMessage(
-                "Unable to start Test Server 2. All ports in provided range 
are occupied.");
+    void testServerInitializationFailure() throws Throwable {
 
         List<Integer> portList = Collections.singletonList(0);
 
@@ -76,7 +67,10 @@ public class AbstractServerTest extends TestLogger {
                             new DisabledKvStateRequestStats(),
                             
Collections.singletonList(server1.getServerAddress().getPort())
                                     .iterator())) {
-                server2.start();
+                // the expected exception along with the adequate message
+                assertThatThrownBy(() -> server2.start())
+                        .hasMessage(
+                                "Unable to start Test Server 2. All ports in 
provided range are occupied.");
             }
         }
     }
@@ -86,7 +80,7 @@ public class AbstractServerTest extends TestLogger {
      * to the next port in the range.
      */
     @Test
-    public void testPortRangeSuccess() throws Throwable {
+    void testPortRangeSuccess() throws Throwable {
 
         AtomicKvStateRequestStats serverStats1 = new 
AtomicKvStateRequestStats();
         AtomicKvStateRequestStats serverStats2 = new 
AtomicKvStateRequestStats();
@@ -112,39 +106,37 @@ public class AbstractServerTest extends TestLogger {
                                         new 
TestMessage.TestMessageDeserializer()),
                                 clientStats)) {
             server1.start();
-            Assert.assertTrue(
-                    server1.getServerAddress().getPort() >= portRangeStart
-                            && server1.getServerAddress().getPort() <= 
portRangeEnd);
+            
assertThat(server1.getServerAddress().getPort()).isGreaterThanOrEqualTo(portRangeStart);
+            
assertThat(server1.getServerAddress().getPort()).isLessThanOrEqualTo(portRangeEnd);
 
             server2.start();
-            Assert.assertTrue(
-                    server2.getServerAddress().getPort() >= portRangeStart
-                            && server2.getServerAddress().getPort() <= 
portRangeEnd);
+            
assertThat(server2.getServerAddress().getPort()).isGreaterThanOrEqualTo(portRangeStart);
+            
assertThat(server2.getServerAddress().getPort()).isLessThanOrEqualTo(portRangeEnd);
 
             TestMessage response1 =
                     client.sendRequest(server1.getServerAddress(), new 
TestMessage("ping")).join();
-            Assert.assertEquals(server1.getServerName() + "-ping", 
response1.getMessage());
+            
assertThat(response1.getMessage()).isEqualTo(server1.getServerName() + "-ping");
 
             TestMessage response2 =
                     client.sendRequest(server2.getServerAddress(), new 
TestMessage("pong")).join();
-            Assert.assertEquals(server2.getServerName() + "-pong", 
response2.getMessage());
+            
assertThat(response2.getMessage()).isEqualTo(server2.getServerName() + "-pong");
 
-            Assert.assertEquals(1L, serverStats1.getNumConnections());
-            Assert.assertEquals(1L, serverStats2.getNumConnections());
+            assertThat(serverStats1.getNumConnections()).isEqualTo(1L);
+            assertThat(serverStats2.getNumConnections()).isEqualTo(1L);
 
-            Assert.assertEquals(2L, clientStats.getNumConnections());
-            Assert.assertEquals(0L, clientStats.getNumFailed());
-            Assert.assertEquals(2L, clientStats.getNumSuccessful());
-            Assert.assertEquals(2L, clientStats.getNumRequests());
+            assertThat(clientStats.getNumConnections()).isEqualTo(2L);
+            assertThat(clientStats.getNumFailed()).isEqualTo(0L);
+            assertThat(clientStats.getNumSuccessful()).isEqualTo(2L);
+            assertThat(clientStats.getNumRequests()).isEqualTo(2L);
         }
 
-        Assert.assertEquals(0L, serverStats1.getNumConnections());
-        Assert.assertEquals(0L, serverStats2.getNumConnections());
+        assertThat(serverStats1.getNumConnections()).isEqualTo(0L);
+        assertThat(serverStats2.getNumConnections()).isEqualTo(0L);
 
-        Assert.assertEquals(0L, clientStats.getNumConnections());
-        Assert.assertEquals(0L, clientStats.getNumFailed());
-        Assert.assertEquals(2L, clientStats.getNumSuccessful());
-        Assert.assertEquals(2L, clientStats.getNumRequests());
+        assertThat(clientStats.getNumConnections()).isEqualTo(0L);
+        assertThat(clientStats.getNumFailed()).isEqualTo(0L);
+        assertThat(clientStats.getNumSuccessful()).isEqualTo(2L);
+        assertThat(clientStats.getNumRequests()).isEqualTo(2L);
     }
 
     private static class TestClient extends Client<TestMessage, TestMessage>
@@ -161,7 +153,7 @@ public class AbstractServerTest extends TestLogger {
         @Override
         public void close() throws Exception {
             shutdown().join();
-            Assert.assertTrue(isEventGroupShutdown());
+            assertThat(isEventGroupShutdown()).isTrue();
         }
     }
 
@@ -207,8 +199,8 @@ public class AbstractServerTest extends TestLogger {
         @Override
         public void close() throws Exception {
             shutdownServer().get();
-            Assert.assertTrue(getQueryExecutor().isTerminated());
-            Assert.assertTrue(isEventGroupShutdown());
+            assertThat(getQueryExecutor().isTerminated()).isTrue();
+            assertThat(isEventGroupShutdown()).isTrue();
         }
     }
 
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
index e1fe88280ea..97a941de460 100644
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
@@ -29,17 +29,18 @@ import 
org.apache.flink.queryablestate.client.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.internal.InternalListState;
 import org.apache.flink.runtime.state.internal.InternalMapState;
 
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
 
 /**
  * Additional tests for the serialization and deserialization using the 
KvStateSerializer with a
  * RocksDB state back-end.
  */
-public final class KVStateRequestSerializerRocksDBTest {
+final class KVStateRequestSerializerRocksDBTest {
 
-    @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
+    @TempDir static File tmpFile;
 
     /**
      * Tests list serialization and deserialization match.
@@ -48,13 +49,11 @@ public final class KVStateRequestSerializerRocksDBTest {
      *     KvStateRequestSerializerTest#testListSerialization() using the heap 
state back-end test
      */
     @Test
-    public void testListSerialization() throws Exception {
+    void testListSerialization() throws Exception {
         final long key = 0L;
 
         final RocksDBKeyedStateBackend<Long> longHeapKeyedStateBackend =
-                RocksDBTestUtils.builderForTestDefaults(
-                                temporaryFolder.getRoot(), 
LongSerializer.INSTANCE)
-                        .build();
+                RocksDBTestUtils.builderForTestDefaults(tmpFile, 
LongSerializer.INSTANCE).build();
 
         longHeapKeyedStateBackend.setCurrentKey(key);
 
@@ -74,14 +73,12 @@ public final class KVStateRequestSerializerRocksDBTest {
      *     KvStateRequestSerializerTest#testMapSerialization() using the heap 
state back-end test
      */
     @Test
-    public void testMapSerialization() throws Exception {
+    void testMapSerialization() throws Exception {
         final long key = 0L;
 
         // objects for RocksDB state list serialisation
         final RocksDBKeyedStateBackend<Long> longHeapKeyedStateBackend =
-                RocksDBTestUtils.builderForTestDefaults(
-                                temporaryFolder.getRoot(), 
LongSerializer.INSTANCE)
-                        .build();
+                RocksDBTestUtils.builderForTestDefaults(tmpFile, 
LongSerializer.INSTANCE).build();
 
         longHeapKeyedStateBackend.setCurrentKey(key);
 
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateClientHandlerTest.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateClientHandlerTest.java
index e0facd7d309..904d8294dbe 100644
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateClientHandlerTest.java
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateClientHandlerTest.java
@@ -20,32 +20,27 @@ package org.apache.flink.queryablestate.network;
 
 import org.apache.flink.queryablestate.messages.KvStateInternalRequest;
 import org.apache.flink.queryablestate.messages.KvStateResponse;
+import org.apache.flink.queryablestate.network.messages.MessageBody;
 import org.apache.flink.queryablestate.network.messages.MessageSerializer;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
 import 
org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.nio.channels.ClosedChannelException;
 
-import static org.junit.Assert.assertEquals;
-import static org.mockito.ArgumentMatchers.isA;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link ClientHandler}. */
-public class KvStateClientHandlerTest {
+class KvStateClientHandlerTest {
 
     /**
      * Tests that on reads the expected callback methods are called and read 
buffers are recycled.
      */
     @Test
-    public void testReadCallbacksAndBufferRecycling() throws Exception {
-        final ClientHandlerCallback<KvStateResponse> callback = 
mock(ClientHandlerCallback.class);
+    void testReadCallbacksAndBufferRecycling() throws Exception {
+        final TestingClientHandlerCallback callback = new 
TestingClientHandlerCallback();
 
         final MessageSerializer<KvStateInternalRequest, KvStateResponse> 
serializer =
                 new MessageSerializer<>(
@@ -64,10 +59,12 @@ public class KvStateClientHandlerTest {
         buf.skipBytes(4); // skip frame length
 
         // Verify callback
+        callback.reset();
         channel.writeInbound(buf);
-        verify(callback, times(1)).onRequestResult(eq(1222112277L), 
any(KvStateResponse.class));
-        assertEquals("Buffer not recycled", 0, buf.refCnt());
-
+        assertThat(callback.onRequestCnt).isEqualTo(1);
+        assertThat(callback.onRequestId).isEqualTo(1222112277L);
+        assertThat(callback.onRequestBody).isInstanceOf(KvStateResponse.class);
+        assertThat(buf.refCnt()).isEqualTo(0).withFailMessage("Buffer not 
recycled");
         //
         // Request failure
         //
@@ -79,9 +76,12 @@ public class KvStateClientHandlerTest {
         buf.skipBytes(4); // skip frame length
 
         // Verify callback
+        callback.reset();
         channel.writeInbound(buf);
-        verify(callback, times(1)).onRequestFailure(eq(1222112278L), 
isA(RuntimeException.class));
-        assertEquals("Buffer not recycled", 0, buf.refCnt());
+        assertThat(callback.onRequestFailureCnt).isEqualTo(1);
+        assertThat(callback.onRequestFailureId).isEqualTo(1222112278L);
+        
assertThat(callback.onRequestFailureBody).isInstanceOf(RuntimeException.class);
+        assertThat(buf.refCnt()).isEqualTo(0).withFailMessage("Buffer not 
recycled");
 
         //
         // Server failure
@@ -92,8 +92,10 @@ public class KvStateClientHandlerTest {
         buf.skipBytes(4); // skip frame length
 
         // Verify callback
+        callback.reset();
         channel.writeInbound(buf);
-        verify(callback, times(1)).onFailure(isA(RuntimeException.class));
+        assertThat(callback.onFailureCnt).isEqualTo(1);
+        
assertThat(callback.onFailureBody).isInstanceOf(RuntimeException.class);
 
         //
         // Unexpected messages
@@ -101,20 +103,68 @@ public class KvStateClientHandlerTest {
         buf = channel.alloc().buffer(4).writeInt(1223823);
 
         // Verify callback
+        callback.reset();
         channel.writeInbound(buf);
-        verify(callback, times(1)).onFailure(isA(IllegalStateException.class));
-        assertEquals("Buffer not recycled", 0, buf.refCnt());
+        assertThat(callback.onFailureCnt).isEqualTo(1);
+        
assertThat(callback.onFailureBody).isInstanceOf(RuntimeException.class);
+        assertThat(buf.refCnt()).isEqualTo(0).withFailMessage("Buffer not 
recycled");
 
         //
         // Exception caught
         //
+        callback.reset();
         channel.pipeline().fireExceptionCaught(new RuntimeException("Expected 
test Exception"));
-        verify(callback, times(3)).onFailure(isA(RuntimeException.class));
+        assertThat(callback.onFailureCnt).isEqualTo(1);
+        
assertThat(callback.onFailureBody).isInstanceOf(RuntimeException.class);
 
         //
         // Channel inactive
         //
+        callback.reset();
         channel.pipeline().fireChannelInactive();
-        verify(callback, 
times(1)).onFailure(isA(ClosedChannelException.class));
+        assertThat(callback.onFailureCnt).isEqualTo(1);
+        
assertThat(callback.onFailureBody).isInstanceOf(ClosedChannelException.class);
+    }
+
+    private static class TestingClientHandlerCallback implements 
ClientHandlerCallback {
+        private int onRequestCnt;
+        private long onRequestId;
+        private MessageBody onRequestBody;
+        private int onRequestFailureCnt;
+        private long onRequestFailureId;
+        private Throwable onRequestFailureBody;
+        private int onFailureCnt;
+        private Throwable onFailureBody;
+
+        @Override
+        public void onRequestResult(long requestId, MessageBody response) {
+            onRequestCnt++;
+            onRequestId = requestId;
+            onRequestBody = response;
+        }
+
+        @Override
+        public void onRequestFailure(long requestId, Throwable cause) {
+            onRequestFailureCnt++;
+            onRequestFailureId = requestId;
+            onRequestFailureBody = cause;
+        }
+
+        @Override
+        public void onFailure(Throwable cause) {
+            onFailureCnt++;
+            onFailureBody = cause;
+        }
+
+        public void reset() {
+            onRequestCnt = 0;
+            onRequestId = -1;
+            onRequestBody = null;
+            onRequestFailureCnt = 0;
+            onRequestFailureId = -1;
+            onRequestFailureBody = null;
+            onFailureCnt = 0;
+            onFailureBody = null;
+        }
     }
 }
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
index cf860d2661f..dfeda1f1590 100644
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.queryablestate.network;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -29,6 +30,8 @@ import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.queryablestate.client.VoidNamespace;
 import org.apache.flink.queryablestate.client.VoidNamespaceSerializer;
 import 
org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.BackendBuildingException;
@@ -43,9 +46,9 @@ import 
org.apache.flink.runtime.state.internal.InternalMapState;
 import org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig;
 import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -58,23 +61,19 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
 
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for {@link KvStateSerializer}. */
-@RunWith(Parameterized.class)
-public class KvStateRequestSerializerTest {
+class KvStateRequestSerializerTest {
 
-    @Parameterized.Parameters
-    public static Collection<Boolean> parameters() {
+    public static Collection<Boolean> data() {
         return Arrays.asList(false, true);
     }
 
-    @Parameterized.Parameter public boolean async;
-
     /** Tests key and namespace serialization utils. */
     @Test
-    public void testKeyAndNamespaceSerialization() throws Exception {
+    void testKeyAndNamespaceSerialization() throws Exception {
         TypeSerializer<Long> keySerializer = LongSerializer.INSTANCE;
         TypeSerializer<String> namespaceSerializer = StringSerializer.INSTANCE;
 
@@ -89,91 +88,124 @@ public class KvStateRequestSerializerTest {
                 KvStateSerializer.deserializeKeyAndNamespace(
                         serializedKeyAndNamespace, keySerializer, 
namespaceSerializer);
 
-        assertEquals(expectedKey, actual.f0.longValue());
-        assertEquals(expectedNamespace, actual.f1);
+        assertThat(actual.f0.longValue()).isEqualTo(expectedKey);
+        assertThat(actual.f1).isEqualTo(expectedNamespace);
     }
 
     /** Tests key and namespace deserialization utils with too few bytes. */
-    @Test(expected = IOException.class)
-    public void testKeyAndNamespaceDeserializationEmpty() throws Exception {
-        KvStateSerializer.deserializeKeyAndNamespace(
-                new byte[] {}, LongSerializer.INSTANCE, 
StringSerializer.INSTANCE);
+    @Test
+    void testKeyAndNamespaceDeserializationEmpty() throws Exception {
+        assertThatThrownBy(
+                        () ->
+                                KvStateSerializer.deserializeKeyAndNamespace(
+                                        new byte[] {},
+                                        LongSerializer.INSTANCE,
+                                        StringSerializer.INSTANCE))
+                .isInstanceOf(IOException.class);
     }
 
     /** Tests key and namespace deserialization utils with too few bytes. */
-    @Test(expected = IOException.class)
-    public void testKeyAndNamespaceDeserializationTooShort() throws Exception {
-        KvStateSerializer.deserializeKeyAndNamespace(
-                new byte[] {1}, LongSerializer.INSTANCE, 
StringSerializer.INSTANCE);
+    @Test
+    void testKeyAndNamespaceDeserializationTooShort() throws Exception {
+        assertThatThrownBy(
+                        () ->
+                                KvStateSerializer.deserializeKeyAndNamespace(
+                                        new byte[] {1},
+                                        LongSerializer.INSTANCE,
+                                        StringSerializer.INSTANCE))
+                .isInstanceOf(IOException.class);
     }
 
     /** Tests key and namespace deserialization utils with too many bytes. */
-    @Test(expected = IOException.class)
-    public void testKeyAndNamespaceDeserializationTooMany1() throws Exception {
-        // Long + null String + 1 byte
-        KvStateSerializer.deserializeKeyAndNamespace(
-                new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 42, 0, 2},
-                LongSerializer.INSTANCE,
-                StringSerializer.INSTANCE);
+    @Test
+    void testKeyAndNamespaceDeserializationTooMany1() throws Exception {
+        assertThatThrownBy(
+                        () ->
+                                // Long + null String + 1 byte
+                                KvStateSerializer.deserializeKeyAndNamespace(
+                                        new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 
42, 0, 2},
+                                        LongSerializer.INSTANCE,
+                                        StringSerializer.INSTANCE))
+                .isInstanceOf(IOException.class);
     }
 
     /** Tests key and namespace deserialization utils with too many bytes. */
-    @Test(expected = IOException.class)
-    public void testKeyAndNamespaceDeserializationTooMany2() throws Exception {
-        // Long + null String + 2 bytes
-        KvStateSerializer.deserializeKeyAndNamespace(
-                new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 42, 0, 2, 2},
-                LongSerializer.INSTANCE,
-                StringSerializer.INSTANCE);
+    @Test
+    void testKeyAndNamespaceDeserializationTooMany2() throws Exception {
+        assertThatThrownBy(
+                        () ->
+                                // Long + null String + 2 bytes
+                                KvStateSerializer.deserializeKeyAndNamespace(
+                                        new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 
42, 0, 2, 2},
+                                        LongSerializer.INSTANCE,
+                                        StringSerializer.INSTANCE))
+                .isInstanceOf(IOException.class);
     }
 
     /** Tests value serialization utils. */
     @Test
-    public void testValueSerialization() throws Exception {
+    void testValueSerialization() throws Exception {
         TypeSerializer<Long> valueSerializer = LongSerializer.INSTANCE;
         long expectedValue = Long.MAX_VALUE - 1292929292L;
 
         byte[] serializedValue = 
KvStateSerializer.serializeValue(expectedValue, valueSerializer);
         long actualValue = KvStateSerializer.deserializeValue(serializedValue, 
valueSerializer);
 
-        assertEquals(expectedValue, actualValue);
+        assertThat(actualValue).isEqualTo(expectedValue);
     }
 
     /** Tests value deserialization with too few bytes. */
-    @Test(expected = IOException.class)
-    public void testDeserializeValueEmpty() throws Exception {
-        KvStateSerializer.deserializeValue(new byte[] {}, 
LongSerializer.INSTANCE);
+    @Test
+    void testDeserializeValueEmpty() throws Exception {
+        assertThatThrownBy(
+                        () ->
+                                KvStateSerializer.deserializeValue(
+                                        new byte[] {}, 
LongSerializer.INSTANCE))
+                .isInstanceOf(IOException.class);
     }
 
     /** Tests value deserialization with too few bytes. */
-    @Test(expected = IOException.class)
-    public void testDeserializeValueTooShort() throws Exception {
-        // 1 byte (incomplete Long)
-        KvStateSerializer.deserializeValue(new byte[] {1}, 
LongSerializer.INSTANCE);
+    @Test
+    void testDeserializeValueTooShort() throws Exception {
+        assertThatThrownBy(
+                        () ->
+                                // 1 byte (incomplete Long)
+                                KvStateSerializer.deserializeValue(
+                                        new byte[] {1}, 
LongSerializer.INSTANCE))
+                .isInstanceOf(IOException.class);
     }
 
     /** Tests value deserialization with too many bytes. */
-    @Test(expected = IOException.class)
-    public void testDeserializeValueTooMany1() throws Exception {
-        // Long + 1 byte
-        KvStateSerializer.deserializeValue(
-                new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 2}, 
LongSerializer.INSTANCE);
+    @Test
+    void testDeserializeValueTooMany1() throws Exception {
+        assertThatThrownBy(
+                        () ->
+                                // Long + 1 byte
+                                KvStateSerializer.deserializeValue(
+                                        new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 2},
+                                        LongSerializer.INSTANCE))
+                .isInstanceOf(IOException.class);
     }
 
     /** Tests value deserialization with too many bytes. */
-    @Test(expected = IOException.class)
-    public void testDeserializeValueTooMany2() throws Exception {
-        // Long + 2 bytes
-        KvStateSerializer.deserializeValue(
-                new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 2, 2}, 
LongSerializer.INSTANCE);
+    @Test
+    void testDeserializeValueTooMany2() throws Exception {
+        assertThatThrownBy(
+                        () ->
+                                // Long + 2 bytes
+                                KvStateSerializer.deserializeValue(
+                                        new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 2, 
2},
+                                        LongSerializer.INSTANCE))
+                .isInstanceOf(IOException.class);
     }
 
     /** Tests list serialization utils. */
-    @Test
-    public void testListSerialization() throws Exception {
+    @ParameterizedTest
+    @MethodSource("data")
+    void testListSerialization(boolean async) throws Exception {
         final long key = 0L;
         final HeapKeyedStateBackend<Long> longHeapKeyedStateBackend =
-                getLongHeapKeyedStateBackend(key);
+                getLongHeapKeyedStateBackend(key, async);
 
         final InternalListState<Long, VoidNamespace, Long> listState =
                 longHeapKeyedStateBackend.createOrUpdateInternalState(
@@ -225,46 +257,54 @@ public class KvStateRequestSerializerTest {
 
         List<Long> actualValues =
                 KvStateSerializer.deserializeList(serializedValues, 
valueSerializer);
-        assertEquals(expectedValues, actualValues);
+        assertThat(actualValues).isEqualTo(expectedValues);
 
         // Single value
         long expectedValue = ThreadLocalRandom.current().nextLong();
         byte[] serializedValue = 
KvStateSerializer.serializeValue(expectedValue, valueSerializer);
         List<Long> actualValue =
                 KvStateSerializer.deserializeList(serializedValue, 
valueSerializer);
-        assertEquals(1, actualValue.size());
-        assertEquals(expectedValue, actualValue.get(0).longValue());
+        assertThat(actualValue).containsExactly(expectedValue);
     }
 
     /** Tests list deserialization with too few bytes. */
     @Test
-    public void testDeserializeListEmpty() throws Exception {
+    void testDeserializeListEmpty() throws Exception {
         List<Long> actualValue =
                 KvStateSerializer.deserializeList(new byte[] {}, 
LongSerializer.INSTANCE);
-        assertEquals(0, actualValue.size());
+        assertThat(actualValue).isEmpty();
     }
 
     /** Tests list deserialization with too few bytes. */
-    @Test(expected = IOException.class)
-    public void testDeserializeListTooShort1() throws Exception {
-        // 1 byte (incomplete Long)
-        KvStateSerializer.deserializeList(new byte[] {1}, 
LongSerializer.INSTANCE);
+    @Test
+    void testDeserializeListTooShort1() throws Exception {
+        assertThatThrownBy(
+                        () ->
+                                // 1 byte (incomplete Long)
+                                KvStateSerializer.deserializeList(
+                                        new byte[] {1}, 
LongSerializer.INSTANCE))
+                .isInstanceOf(IOException.class);
     }
 
     /** Tests list deserialization with too few bytes. */
-    @Test(expected = IOException.class)
-    public void testDeserializeListTooShort2() throws Exception {
-        // Long + 1 byte (separator) + 1 byte (incomplete Long)
-        KvStateSerializer.deserializeList(
-                new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 2, 3}, 
LongSerializer.INSTANCE);
+    @Test
+    void testDeserializeListTooShort2() throws Exception {
+        assertThatThrownBy(
+                        () ->
+                                // Long + 1 byte (separator) + 1 byte 
(incomplete Long)
+                                KvStateSerializer.deserializeList(
+                                        new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 2, 
3},
+                                        LongSerializer.INSTANCE))
+                .isInstanceOf(IOException.class);
     }
 
     /** Tests map serialization utils. */
-    @Test
-    public void testMapSerialization() throws Exception {
+    @ParameterizedTest
+    @MethodSource("data")
+    public void testMapSerialization(boolean async) throws Exception {
         final long key = 0L;
         final HeapKeyedStateBackend<Long> longHeapKeyedStateBackend =
-                getLongHeapKeyedStateBackend(key);
+                getLongHeapKeyedStateBackend(key, async);
 
         final InternalMapState<Long, VoidNamespace, Long, String> mapState =
                 (InternalMapState<Long, VoidNamespace, Long, String>)
@@ -279,14 +319,16 @@ public class KvStateRequestSerializerTest {
         testMapSerialization(key, mapState);
     }
 
-    private HeapKeyedStateBackend<Long> getLongHeapKeyedStateBackend(final 
long key)
+    private HeapKeyedStateBackend<Long> getLongHeapKeyedStateBackend(final 
long key, boolean async)
             throws BackendBuildingException {
         final KeyGroupRange keyGroupRange = new KeyGroupRange(0, 0);
         ExecutionConfig executionConfig = new ExecutionConfig();
+        TaskKvStateRegistry taskKvStateRegistry =
+                new KvStateRegistry().createTaskRegistry(JobID.generate(), new 
JobVertexID());
         // objects for heap state list serialisation
         final HeapKeyedStateBackend<Long> longHeapKeyedStateBackend =
                 new HeapKeyedStateBackendBuilder<>(
-                                mock(TaskKvStateRegistry.class),
+                                taskKvStateRegistry,
                                 LongSerializer.INSTANCE,
                                 ClassLoader.getSystemClassLoader(),
                                 keyGroupRange.getNumberOfKeyGroups(),
@@ -353,9 +395,9 @@ public class KvStateRequestSerializerTest {
         Map<Long, String> actualValues =
                 KvStateSerializer.deserializeMap(
                         serializedValues, userKeySerializer, 
userValueSerializer);
-        assertEquals(expectedValues.size(), actualValues.size());
+        assertThat(actualValues).hasSize(expectedValues.size());
         for (Map.Entry<Long, String> actualEntry : actualValues.entrySet()) {
-            assertEquals(expectedValues.get(actualEntry.getKey()), 
actualEntry.getValue());
+            
assertThat(actualEntry.getValue()).isEqualTo(expectedValues.get(actualEntry.getKey()));
         }
 
         // Single value
@@ -372,45 +414,59 @@ public class KvStateRequestSerializerTest {
         Map<Long, String> actualValue =
                 KvStateSerializer.deserializeMap(
                         serializedValue, userKeySerializer, 
userValueSerializer);
-        assertEquals(1, actualValue.size());
-        assertEquals(expectedValue, actualValue.get(expectedKey));
+        assertThat(actualValue).hasSize(1);
+        assertThat(actualValue.get(expectedKey)).isEqualTo(expectedValue);
     }
 
     /** Tests map deserialization with too few bytes. */
     @Test
-    public void testDeserializeMapEmpty() throws Exception {
+    void testDeserializeMapEmpty() throws Exception {
         Map<Long, String> actualValue =
                 KvStateSerializer.deserializeMap(
                         new byte[] {}, LongSerializer.INSTANCE, 
StringSerializer.INSTANCE);
-        assertEquals(0, actualValue.size());
+        assertThat(actualValue).hasSize(0);
     }
 
     /** Tests map deserialization with too few bytes. */
-    @Test(expected = IOException.class)
-    public void testDeserializeMapTooShort1() throws Exception {
-        // 1 byte (incomplete Key)
-        KvStateSerializer.deserializeMap(
-                new byte[] {1}, LongSerializer.INSTANCE, 
StringSerializer.INSTANCE);
+    @Test
+    void testDeserializeMapTooShort1() throws Exception {
+        assertThatThrownBy(
+                        () ->
+                                // 1 byte (incomplete Key)
+                                KvStateSerializer.deserializeMap(
+                                        new byte[] {1},
+                                        LongSerializer.INSTANCE,
+                                        StringSerializer.INSTANCE))
+                .isInstanceOf(IOException.class);
     }
 
     /** Tests map deserialization with too few bytes. */
-    @Test(expected = IOException.class)
-    public void testDeserializeMapTooShort2() throws Exception {
-        // Long (Key) + 1 byte (incomplete Value)
-        KvStateSerializer.deserializeMap(
-                new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 0},
-                LongSerializer.INSTANCE,
-                LongSerializer.INSTANCE);
+    @Test
+    void testDeserializeMapTooShort2() throws Exception {
+        assertThatThrownBy(
+                        () ->
+                                // Long (Key) + 1 byte (incomplete Value)
+                                KvStateSerializer.deserializeMap(
+                                        new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 0},
+                                        LongSerializer.INSTANCE,
+                                        LongSerializer.INSTANCE))
+                .isInstanceOf(IOException.class);
     }
 
     /** Tests map deserialization with too few bytes. */
-    @Test(expected = IOException.class)
-    public void testDeserializeMapTooShort3() throws Exception {
-        // Long (Key1) + Boolean (false) + Long (Value1) + 1 byte (incomplete 
Key2)
-        KvStateSerializer.deserializeMap(
-                new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 
3},
-                LongSerializer.INSTANCE,
-                LongSerializer.INSTANCE);
+    @Test
+    void testDeserializeMapTooShort3() throws Exception {
+        assertThatThrownBy(
+                        () ->
+                                // Long (Key1) + Boolean (false) + Long 
(Value1) + 1 byte
+                                // (incomplete Key2)
+                                KvStateSerializer.deserializeMap(
+                                        new byte[] {
+                                            1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 
1, 1, 1, 1, 1, 1, 3
+                                        },
+                                        LongSerializer.INSTANCE,
+                                        LongSerializer.INSTANCE))
+                .isInstanceOf(IOException.class);
     }
 
     private byte[] randomByteArray(int capacity) {
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
index 6df22a827b1..868b30e67c0 100644
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
@@ -55,7 +55,6 @@ import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
-import org.apache.flink.util.TestLogger;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
 import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
@@ -63,33 +62,31 @@ import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
 import 
org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
 
 import java.net.InetAddress;
 import java.util.Collections;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link KvStateServerHandler}. */
-@Ignore(
+@Disabled(
         "KvStateServerHandlerTest is unstable. See FLINK-13553 for more 
information. Since the community "
                 + "does not have time to work on QS, we decided to temporarily 
ignore this test case in order"
                 + "to maintain build stability.")
-public class KvStateServerHandlerTest extends TestLogger {
+class KvStateServerHandlerTest {
 
     private static KvStateServerImpl testServer;
 
     private static final long READ_TIMEOUT_MILLIS = 10000L;
 
-    @BeforeClass
-    public static void setup() {
+    @BeforeAll
+    static void setup() {
         try {
             testServer =
                     new KvStateServerImpl(
@@ -105,14 +102,14 @@ public class KvStateServerHandlerTest extends TestLogger {
         }
     }
 
-    @AfterClass
-    public static void tearDown() throws Exception {
+    @AfterAll
+    static void tearDown() throws Exception {
         testServer.shutdown();
     }
 
     /** Tests a simple successful query via an EmbeddedChannel. */
     @Test
-    public void testSimpleQuery() throws Exception {
+    void testSimpleQuery() throws Exception {
         KvStateRegistry registry = new KvStateRegistry();
         AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
 
@@ -160,7 +157,7 @@ public class KvStateServerHandlerTest extends TestLogger {
 
         long requestId = Integer.MAX_VALUE + 182828L;
 
-        assertTrue(registryListener.registrationName.equals("vanilla"));
+        assertThat(registryListener.registrationName).isEqualTo("vanilla");
 
         KvStateInternalRequest request =
                 new KvStateInternalRequest(registryListener.kvStateId, 
serializedKeyAndNamespace);
@@ -175,18 +172,18 @@ public class KvStateServerHandlerTest extends TestLogger {
         buf.skipBytes(4); // skip frame length
 
         // Verify the response
-        assertEquals(MessageType.REQUEST_RESULT, 
MessageSerializer.deserializeHeader(buf));
+        
assertThat(MessageSerializer.deserializeHeader(buf)).isEqualTo(MessageType.REQUEST_RESULT);
         long deserRequestId = MessageSerializer.getRequestId(buf);
         KvStateResponse response = serializer.deserializeResponse(buf);
         buf.release();
 
-        assertEquals(requestId, deserRequestId);
+        assertThat(deserRequestId).isEqualTo(requestId);
 
         int actualValue =
                 KvStateSerializer.deserializeValue(response.getContent(), 
IntSerializer.INSTANCE);
-        assertEquals(expectedValue, actualValue);
+        assertThat(actualValue).isEqualTo(expectedValue);
 
-        assertEquals(stats.toString(), 1, stats.getNumRequests());
+        
assertThat(stats.getNumRequests()).isEqualTo(1).withFailMessage(stats.toString());
 
         // Wait for async successful request report
         long deadline = System.nanoTime() + TimeUnit.NANOSECONDS.convert(30, 
TimeUnit.SECONDS);
@@ -194,7 +191,7 @@ public class KvStateServerHandlerTest extends TestLogger {
             Thread.sleep(10L);
         }
 
-        assertEquals(stats.toString(), 1L, stats.getNumSuccessful());
+        
assertThat(stats.getNumSuccessful()).isEqualTo(1L).withFailMessage(stats.toString());
     }
 
     /**
@@ -202,7 +199,7 @@ public class KvStateServerHandlerTest extends TestLogger {
      * unregistered KvStateIDs.
      */
     @Test
-    public void testQueryUnknownKvStateID() throws Exception {
+    void testQueryUnknownKvStateID() throws Exception {
         KvStateRegistry registry = new KvStateRegistry();
         AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
 
@@ -229,18 +226,18 @@ public class KvStateServerHandlerTest extends TestLogger {
         buf.skipBytes(4); // skip frame length
 
         // Verify the response
-        assertEquals(MessageType.REQUEST_FAILURE, 
MessageSerializer.deserializeHeader(buf));
+        
assertThat(MessageSerializer.deserializeHeader(buf)).isEqualTo(MessageType.REQUEST_FAILURE);
         RequestFailure response = 
MessageSerializer.deserializeRequestFailure(buf);
         buf.release();
 
-        assertEquals(requestId, response.getRequestId());
+        assertThat(response.getRequestId()).isEqualTo(requestId);
 
-        assertTrue(
-                "Did not respond with expected failure cause",
-                response.getCause() instanceof UnknownKvStateIdException);
+        assertThat(response.getCause())
+                .isInstanceOf(UnknownKvStateIdException.class)
+                .withFailMessage("Did not respond with expected failure 
cause");
 
-        assertEquals(1L, stats.getNumRequests());
-        assertEquals(1L, stats.getNumFailed());
+        assertThat(stats.getNumRequests()).isEqualTo(1L);
+        assertThat(stats.getNumFailed()).isEqualTo(1L);
     }
 
     /**
@@ -248,7 +245,7 @@ public class KvStateServerHandlerTest extends TestLogger {
      * for non-existing keys.
      */
     @Test
-    public void testQueryUnknownKey() throws Exception {
+    void testQueryUnknownKey() throws Exception {
         KvStateRegistry registry = new KvStateRegistry();
         AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
 
@@ -287,7 +284,7 @@ public class KvStateServerHandlerTest extends TestLogger {
 
         long requestId = Integer.MAX_VALUE + 22982L;
 
-        assertTrue(registryListener.registrationName.equals("vanilla"));
+        assertThat(registryListener.registrationName).isEqualTo("vanilla");
 
         KvStateInternalRequest request =
                 new KvStateInternalRequest(registryListener.kvStateId, 
serializedKeyAndNamespace);
@@ -301,18 +298,18 @@ public class KvStateServerHandlerTest extends TestLogger {
         buf.skipBytes(4); // skip frame length
 
         // Verify the response
-        assertEquals(MessageType.REQUEST_FAILURE, 
MessageSerializer.deserializeHeader(buf));
+        
assertThat(MessageSerializer.deserializeHeader(buf)).isEqualTo(MessageType.REQUEST_FAILURE);
         RequestFailure response = 
MessageSerializer.deserializeRequestFailure(buf);
         buf.release();
 
-        assertEquals(requestId, response.getRequestId());
+        assertThat(response.getRequestId()).isEqualTo(requestId);
 
-        assertTrue(
-                "Did not respond with expected failure cause",
-                response.getCause() instanceof UnknownKeyOrNamespaceException);
+        assertThat(response.getCause())
+                .isInstanceOf(UnknownKeyOrNamespaceException.class)
+                .withFailMessage("Did not respond with expected failure 
cause");
 
-        assertEquals(1L, stats.getNumRequests());
-        assertEquals(1L, stats.getNumFailed());
+        assertThat(stats.getNumRequests()).isEqualTo(1L);
+        assertThat(stats.getNumFailed()).isEqualTo(1L);
     }
 
     /**
@@ -321,7 +318,7 @@ public class KvStateServerHandlerTest extends TestLogger {
      * call.
      */
     @Test
-    public void testFailureOnGetSerializedValue() throws Exception {
+    void testFailureOnGetSerializedValue() throws Exception {
         KvStateRegistry registry = new KvStateRegistry();
         AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
 
@@ -396,19 +393,19 @@ public class KvStateServerHandlerTest extends TestLogger {
         buf.skipBytes(4); // skip frame length
 
         // Verify the response
-        assertEquals(MessageType.REQUEST_FAILURE, 
MessageSerializer.deserializeHeader(buf));
+        
assertThat(MessageSerializer.deserializeHeader(buf)).isEqualTo(MessageType.REQUEST_FAILURE);
         RequestFailure response = 
MessageSerializer.deserializeRequestFailure(buf);
         buf.release();
 
-        assertTrue(response.getCause().getMessage().contains("Expected test 
Exception"));
+        assertThat(response.getCause().getMessage()).contains("Expected test 
Exception");
 
-        assertEquals(1L, stats.getNumRequests());
-        assertEquals(1L, stats.getNumFailed());
+        assertThat(stats.getNumRequests()).isEqualTo(1L);
+        assertThat(stats.getNumFailed()).isEqualTo(1L);
     }
 
     /** Tests that the channel is closed if an Exception reaches the channel 
handler. */
     @Test
-    public void testCloseChannelOnExceptionCaught() throws Exception {
+    void testCloseChannelOnExceptionCaught() throws Exception {
         KvStateRegistry registry = new KvStateRegistry();
         AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
 
@@ -427,14 +424,14 @@ public class KvStateServerHandlerTest extends TestLogger {
         buf.skipBytes(4); // skip frame length
 
         // Verify the response
-        assertEquals(MessageType.SERVER_FAILURE, 
MessageSerializer.deserializeHeader(buf));
+        
assertThat(MessageSerializer.deserializeHeader(buf)).isEqualTo(MessageType.SERVER_FAILURE);
         Throwable response = MessageSerializer.deserializeServerFailure(buf);
         buf.release();
 
-        assertTrue(response.getMessage().contains("Expected test Exception"));
+        assertThat(response.getMessage()).contains("Expected test Exception");
 
         channel.closeFuture().await(READ_TIMEOUT_MILLIS);
-        assertFalse(channel.isActive());
+        assertThat(channel.isActive()).isFalse();
     }
 
     /**
@@ -442,7 +439,7 @@ public class KvStateServerHandlerTest extends TestLogger {
      * closed.
      */
     @Test
-    public void testQueryExecutorShutDown() throws Throwable {
+    void testQueryExecutorShutDown() throws Throwable {
         KvStateRegistry registry = new KvStateRegistry();
         AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
 
@@ -457,7 +454,7 @@ public class KvStateServerHandlerTest extends TestLogger {
 
         localTestServer.start();
         localTestServer.shutdown();
-        assertTrue(localTestServer.getQueryExecutor().isTerminated());
+        assertThat(localTestServer.getQueryExecutor().isTerminated()).isTrue();
 
         MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer =
                 new MessageSerializer<>(
@@ -485,7 +482,7 @@ public class KvStateServerHandlerTest extends TestLogger {
 
         backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, desc);
 
-        assertTrue(registryListener.registrationName.equals("vanilla"));
+        assertThat(registryListener.registrationName).isEqualTo("vanilla");
 
         KvStateInternalRequest request =
                 new KvStateInternalRequest(registryListener.kvStateId, new 
byte[0]);
@@ -498,21 +495,21 @@ public class KvStateServerHandlerTest extends TestLogger {
         buf.skipBytes(4); // skip frame length
 
         // Verify the response
-        assertEquals(MessageType.REQUEST_FAILURE, 
MessageSerializer.deserializeHeader(buf));
+        
assertThat(MessageSerializer.deserializeHeader(buf)).isEqualTo(MessageType.REQUEST_FAILURE);
         RequestFailure response = 
MessageSerializer.deserializeRequestFailure(buf);
         buf.release();
 
-        
assertTrue(response.getCause().getMessage().contains("RejectedExecutionException"));
+        
assertThat(response.getCause().getMessage()).contains("RejectedExecutionException");
 
-        assertEquals(1L, stats.getNumRequests());
-        assertEquals(1L, stats.getNumFailed());
+        assertThat(stats.getNumRequests()).isEqualTo(1L);
+        assertThat(stats.getNumFailed()).isEqualTo(1L);
 
         localTestServer.shutdown();
     }
 
     /** Tests response on unexpected messages. */
     @Test
-    public void testUnexpectedMessage() throws Exception {
+    void testUnexpectedMessage() throws Exception {
         KvStateRegistry registry = new KvStateRegistry();
         AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
 
@@ -536,12 +533,12 @@ public class KvStateServerHandlerTest extends TestLogger {
         buf.skipBytes(4); // skip frame length
 
         // Verify the response
-        assertEquals(MessageType.SERVER_FAILURE, 
MessageSerializer.deserializeHeader(buf));
+        
assertThat(MessageSerializer.deserializeHeader(buf)).isEqualTo(MessageType.SERVER_FAILURE);
         Throwable response = MessageSerializer.deserializeServerFailure(buf);
         buf.release();
 
-        assertEquals(0L, stats.getNumRequests());
-        assertEquals(0L, stats.getNumFailed());
+        assertThat(stats.getNumRequests()).isEqualTo(0L);
+        assertThat(stats.getNumFailed()).isEqualTo(0L);
 
         KvStateResponse stateResponse = new KvStateResponse(new byte[0]);
         unexpectedMessage =
@@ -553,21 +550,21 @@ public class KvStateServerHandlerTest extends TestLogger {
         buf.skipBytes(4); // skip frame length
 
         // Verify the response
-        assertEquals(MessageType.SERVER_FAILURE, 
MessageSerializer.deserializeHeader(buf));
+        
assertThat(MessageSerializer.deserializeHeader(buf)).isEqualTo(MessageType.SERVER_FAILURE);
         response = MessageSerializer.deserializeServerFailure(buf);
         buf.release();
 
-        assertTrue(
-                "Unexpected failure cause " + response.getClass().getName(),
-                response instanceof IllegalArgumentException);
+        assertThat(response)
+                .isInstanceOf(IllegalArgumentException.class)
+                .withFailMessage("Unexpected failure cause " + 
response.getClass().getName());
 
-        assertEquals(0L, stats.getNumRequests());
-        assertEquals(0L, stats.getNumFailed());
+        assertThat(stats.getNumRequests()).isEqualTo(0L);
+        assertThat(stats.getNumFailed()).isEqualTo(0L);
     }
 
     /** Tests that incoming buffer instances are recycled. */
     @Test
-    public void testIncomingBufferIsRecycled() throws Exception {
+    void testIncomingBufferIsRecycled() throws Exception {
         KvStateRegistry registry = new KvStateRegistry();
         AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
 
@@ -583,27 +580,27 @@ public class KvStateServerHandlerTest extends TestLogger {
         KvStateInternalRequest request = new KvStateInternalRequest(new 
KvStateID(), new byte[0]);
         ByteBuf serRequest = 
MessageSerializer.serializeRequest(channel.alloc(), 282872L, request);
 
-        assertEquals(1L, serRequest.refCnt());
+        assertThat(serRequest.refCnt()).isEqualTo(1L);
 
         // Write regular request
         channel.writeInbound(serRequest);
-        assertEquals("Buffer not recycled", 0L, serRequest.refCnt());
+        assertThat(serRequest.refCnt()).isEqualTo(0L).withFailMessage("Buffer 
not recycled");
 
         // Write unexpected msg
         ByteBuf unexpected = channel.alloc().buffer(8);
         unexpected.writeInt(4);
         unexpected.writeInt(4);
 
-        assertEquals(1L, unexpected.refCnt());
+        assertThat(unexpected.refCnt()).isEqualTo(1L);
 
         channel.writeInbound(unexpected);
-        assertEquals("Buffer not recycled", 0L, unexpected.refCnt());
+        assertThat(unexpected.refCnt()).isEqualTo(0L).withFailMessage("Buffer 
not recycled");
         channel.finishAndReleaseAll();
     }
 
     /** Tests the failure response if the serializers don't match. */
     @Test
-    public void testSerializerMismatch() throws Exception {
+    void testSerializerMismatch() throws Exception {
         KvStateRegistry registry = new KvStateRegistry();
         AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
 
@@ -655,7 +652,7 @@ public class KvStateServerHandlerTest extends TestLogger {
                         "wrong-namespace-type",
                         StringSerializer.INSTANCE);
 
-        assertTrue(registryListener.registrationName.equals("vanilla"));
+        assertThat(registryListener.registrationName).isEqualTo("vanilla");
 
         KvStateInternalRequest request =
                 new KvStateInternalRequest(registryListener.kvStateId, 
wrongKeyAndNamespace);
@@ -668,11 +665,11 @@ public class KvStateServerHandlerTest extends TestLogger {
         buf.skipBytes(4); // skip frame length
 
         // Verify the response
-        assertEquals(MessageType.REQUEST_FAILURE, 
MessageSerializer.deserializeHeader(buf));
+        
assertThat(MessageSerializer.deserializeHeader(buf)).isEqualTo(MessageType.REQUEST_FAILURE);
         RequestFailure response = 
MessageSerializer.deserializeRequestFailure(buf);
         buf.release();
-        assertEquals(182828L, response.getRequestId());
-        assertTrue(response.getCause().getMessage().contains("IOException"));
+        assertThat(response.getRequestId()).isEqualTo(182828L);
+        assertThat(response.getCause().getMessage()).contains("IOException");
 
         // Repeat with wrong namespace only
         request = new KvStateInternalRequest(registryListener.kvStateId, 
wrongNamespace);
@@ -685,19 +682,19 @@ public class KvStateServerHandlerTest extends TestLogger {
         buf.skipBytes(4); // skip frame length
 
         // Verify the response
-        assertEquals(MessageType.REQUEST_FAILURE, 
MessageSerializer.deserializeHeader(buf));
+        
assertThat(MessageSerializer.deserializeHeader(buf)).isEqualTo(MessageType.REQUEST_FAILURE);
         response = MessageSerializer.deserializeRequestFailure(buf);
         buf.release();
-        assertEquals(182829L, response.getRequestId());
-        assertTrue(response.getCause().getMessage().contains("IOException"));
+        assertThat(response.getRequestId()).isEqualTo(182829L);
+        assertThat(response.getCause().getMessage()).contains("IOException");
 
-        assertEquals(2L, stats.getNumRequests());
-        assertEquals(2L, stats.getNumFailed());
+        assertThat(stats.getNumRequests()).isEqualTo(2L);
+        assertThat(stats.getNumFailed()).isEqualTo(2L);
     }
 
     /** Tests that large responses are chunked. */
     @Test
-    public void testChunkedResponse() throws Exception {
+    void testChunkedResponse() throws Exception {
         KvStateRegistry registry = new KvStateRegistry();
         KvStateRequestStats stats = new AtomicKvStateRequestStats();
 
@@ -751,8 +748,7 @@ public class KvStateServerHandlerTest extends TestLogger {
 
         long requestId = Integer.MAX_VALUE + 182828L;
 
-        assertTrue(registryListener.registrationName.equals("vanilla"));
-
+        assertThat(registryListener.registrationName).isEqualTo("vanilla");
         KvStateInternalRequest request =
                 new KvStateInternalRequest(registryListener.kvStateId, 
serializedKeyAndNamespace);
         ByteBuf serRequest =
@@ -762,7 +758,7 @@ public class KvStateServerHandlerTest extends TestLogger {
         channel.writeInbound(serRequest);
 
         Object msg = readInboundBlocking(channel);
-        assertTrue("Not ChunkedByteBuf", msg instanceof ChunkedByteBuf);
+        
assertThat(msg).isInstanceOf(ChunkedByteBuf.class).withFailMessage("Not 
ChunkedByteBuf");
         ((ChunkedByteBuf) msg).close();
     }
 
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java
index d26d82ab4b1..a04d8749149 100644
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java
@@ -56,8 +56,8 @@ import 
org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
 import 
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel;
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 
-import org.junit.AfterClass;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Test;
 
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
@@ -66,19 +66,18 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link KvStateServerImpl}. */
-public class KvStateServerTest {
+class KvStateServerTest {
 
     // Thread pool for client bootstrap (shared between tests)
     private static final NioEventLoopGroup NIO_GROUP = new NioEventLoopGroup();
 
     private static final int TIMEOUT_MILLIS = 10000;
 
-    @AfterClass
-    public static void tearDown() throws Exception {
+    @AfterAll
+    static void tearDown() throws Exception {
         if (NIO_GROUP != null) {
             // note: no "quiet period" to not trigger Netty#4357
             NIO_GROUP.shutdownGracefully(0, 10, TimeUnit.SECONDS);
@@ -87,7 +86,7 @@ public class KvStateServerTest {
 
     /** Tests a simple successful query via a SocketChannel. */
     @Test
-    public void testSimpleRequest() throws Throwable {
+    void testSimpleRequest() throws Throwable {
         KvStateServerImpl server = null;
         Bootstrap bootstrap = null;
         try {
@@ -173,7 +172,7 @@ public class KvStateServerTest {
 
             long requestId = Integer.MAX_VALUE + 182828L;
 
-            assertTrue(registryListener.registrationName.equals("vanilla"));
+            assertThat(registryListener.registrationName).isEqualTo("vanilla");
 
             final KvStateInternalRequest request =
                     new KvStateInternalRequest(
@@ -186,14 +185,15 @@ public class KvStateServerTest {
 
             ByteBuf buf = responses.poll(TIMEOUT_MILLIS, 
TimeUnit.MILLISECONDS);
 
-            assertEquals(MessageType.REQUEST_RESULT, 
MessageSerializer.deserializeHeader(buf));
-            assertEquals(requestId, MessageSerializer.getRequestId(buf));
+            assertThat(MessageSerializer.deserializeHeader(buf))
+                    .isEqualTo(MessageType.REQUEST_RESULT);
+            
assertThat(MessageSerializer.getRequestId(buf)).isEqualTo(requestId);
             KvStateResponse response = 
server.getSerializer().deserializeResponse(buf);
 
             int actualValue =
                     KvStateSerializer.deserializeValue(
                             response.getContent(), IntSerializer.INSTANCE);
-            assertEquals(expectedValue, actualValue);
+            assertThat(actualValue).isEqualTo(expectedValue);
         } finally {
             if (server != null) {
                 server.shutdown();
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/MessageSerializerTest.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/MessageSerializerTest.java
index 8e71692a701..af009110e43 100644
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/MessageSerializerTest.java
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/MessageSerializerTest.java
@@ -29,33 +29,21 @@ import 
org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
 import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
 import org.apache.flink.shaded.netty4.io.netty.buffer.UnpooledByteBufAllocator;
 
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.Test;
 
-import java.util.Arrays;
-import java.util.Collection;
 import java.util.concurrent.ThreadLocalRandom;
 
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for {@link MessageSerializer}. */
-@RunWith(Parameterized.class)
-public class MessageSerializerTest {
+class MessageSerializerTest {
 
     private final ByteBufAllocator alloc = UnpooledByteBufAllocator.DEFAULT;
 
-    @Parameterized.Parameters
-    public static Collection<Boolean> parameters() {
-        return Arrays.asList(false, true);
-    }
-
-    @Parameterized.Parameter public boolean async;
-
     /** Tests request serialization. */
     @Test
-    public void testRequestSerialization() throws Exception {
+    void testRequestSerialization() throws Exception {
         long requestId = Integer.MAX_VALUE + 1337L;
         KvStateID kvStateId = new KvStateID();
         byte[] serializedKeyAndNamespace = randomByteArray(1024);
@@ -70,19 +58,20 @@ public class MessageSerializerTest {
         ByteBuf buf = MessageSerializer.serializeRequest(alloc, requestId, 
request);
 
         int frameLength = buf.readInt();
-        assertEquals(MessageType.REQUEST, 
MessageSerializer.deserializeHeader(buf));
-        assertEquals(requestId, MessageSerializer.getRequestId(buf));
+        
assertThat(MessageSerializer.deserializeHeader(buf)).isEqualTo(MessageType.REQUEST);
+        assertThat(MessageSerializer.getRequestId(buf)).isEqualTo(requestId);
         KvStateInternalRequest requestDeser = 
serializer.deserializeRequest(buf);
 
-        assertEquals(buf.readerIndex(), frameLength + 4);
+        assertThat(buf.readerIndex()).isEqualTo(frameLength + 4);
 
-        assertEquals(kvStateId, requestDeser.getKvStateId());
-        assertArrayEquals(serializedKeyAndNamespace, 
requestDeser.getSerializedKeyAndNamespace());
+        assertThat(requestDeser.getKvStateId()).isEqualTo(kvStateId);
+        assertThat(requestDeser.getSerializedKeyAndNamespace())
+                .isEqualTo(serializedKeyAndNamespace);
     }
 
     /** Tests request serialization with zero-length serialized key and 
namespace. */
     @Test
-    public void testRequestSerializationWithZeroLengthKeyAndNamespace() throws 
Exception {
+    void testRequestSerializationWithZeroLengthKeyAndNamespace() throws 
Exception {
 
         long requestId = Integer.MAX_VALUE + 1337L;
         KvStateID kvStateId = new KvStateID();
@@ -98,28 +87,30 @@ public class MessageSerializerTest {
         ByteBuf buf = MessageSerializer.serializeRequest(alloc, requestId, 
request);
 
         int frameLength = buf.readInt();
-        assertEquals(MessageType.REQUEST, 
MessageSerializer.deserializeHeader(buf));
-        assertEquals(requestId, MessageSerializer.getRequestId(buf));
+        
assertThat(MessageSerializer.deserializeHeader(buf)).isEqualTo(MessageType.REQUEST);
+        assertThat(MessageSerializer.getRequestId(buf)).isEqualTo(requestId);
         KvStateInternalRequest requestDeser = 
serializer.deserializeRequest(buf);
 
-        assertEquals(buf.readerIndex(), frameLength + 4);
+        assertThat(buf.readerIndex()).isEqualTo(frameLength + 4);
 
-        assertEquals(kvStateId, requestDeser.getKvStateId());
-        assertArrayEquals(serializedKeyAndNamespace, 
requestDeser.getSerializedKeyAndNamespace());
+        assertThat(requestDeser.getKvStateId()).isEqualTo(kvStateId);
+        assertThat(requestDeser.getSerializedKeyAndNamespace())
+                .isEqualTo(serializedKeyAndNamespace);
     }
 
     /**
      * Tests that we don't try to be smart about <code>null</code> key and 
namespace. They should be
      * treated explicitly.
      */
-    @Test(expected = NullPointerException.class)
-    public void testNullPointerExceptionOnNullSerializedKeyAndNamepsace() 
throws Exception {
-        new KvStateInternalRequest(new KvStateID(), null);
+    @Test
+    void testNullPointerExceptionOnNullSerializedKeyAndNamepsace() throws 
Exception {
+        assertThatThrownBy(() -> new KvStateInternalRequest(new KvStateID(), 
null))
+                .isInstanceOf(NullPointerException.class);
     }
 
     /** Tests response serialization. */
     @Test
-    public void testResponseSerialization() throws Exception {
+    void testResponseSerialization() throws Exception {
         long requestId = Integer.MAX_VALUE + 72727278L;
         byte[] serializedResult = randomByteArray(1024);
 
@@ -132,18 +123,18 @@ public class MessageSerializerTest {
         ByteBuf buf = MessageSerializer.serializeResponse(alloc, requestId, 
response);
 
         int frameLength = buf.readInt();
-        assertEquals(MessageType.REQUEST_RESULT, 
MessageSerializer.deserializeHeader(buf));
-        assertEquals(requestId, MessageSerializer.getRequestId(buf));
+        
assertThat(MessageSerializer.deserializeHeader(buf)).isEqualTo(MessageType.REQUEST_RESULT);
+        assertThat(MessageSerializer.getRequestId(buf)).isEqualTo(requestId);
         KvStateResponse responseDeser = serializer.deserializeResponse(buf);
 
-        assertEquals(buf.readerIndex(), frameLength + 4);
+        assertThat(buf.readerIndex()).isEqualTo(frameLength + 4);
 
-        assertArrayEquals(serializedResult, responseDeser.getContent());
+        assertThat(responseDeser.getContent()).isEqualTo(serializedResult);
     }
 
     /** Tests response serialization with zero-length serialized result. */
     @Test
-    public void testResponseSerializationWithZeroLengthSerializedResult() 
throws Exception {
+    void testResponseSerializationWithZeroLengthSerializedResult() throws 
Exception {
         byte[] serializedResult = new byte[0];
 
         final KvStateResponse response = new KvStateResponse(serializedResult);
@@ -156,55 +147,56 @@ public class MessageSerializerTest {
 
         int frameLength = buf.readInt();
 
-        assertEquals(MessageType.REQUEST_RESULT, 
MessageSerializer.deserializeHeader(buf));
-        assertEquals(72727278L, MessageSerializer.getRequestId(buf));
+        
assertThat(MessageSerializer.deserializeHeader(buf)).isEqualTo(MessageType.REQUEST_RESULT);
+        assertThat(MessageSerializer.getRequestId(buf)).isEqualTo(72727278L);
         KvStateResponse responseDeser = serializer.deserializeResponse(buf);
-        assertEquals(buf.readerIndex(), frameLength + 4);
+        assertThat(buf.readerIndex()).isEqualTo(frameLength + 4);
 
-        assertArrayEquals(serializedResult, responseDeser.getContent());
+        assertThat(responseDeser.getContent()).isEqualTo(serializedResult);
     }
 
     /**
      * Tests that we don't try to be smart about <code>null</code> results. 
They should be treated
      * explicitly.
      */
-    @Test(expected = NullPointerException.class)
-    public void testNullPointerExceptionOnNullSerializedResult() throws 
Exception {
-        new KvStateResponse((byte[]) null);
+    @Test
+    void testNullPointerExceptionOnNullSerializedResult() throws Exception {
+        assertThatThrownBy(() -> new KvStateResponse((byte[]) null))
+                .isInstanceOf(NullPointerException.class);
     }
 
     /** Tests request failure serialization. */
     @Test
-    public void testKvStateRequestFailureSerialization() throws Exception {
+    void testKvStateRequestFailureSerialization() throws Exception {
         long requestId = Integer.MAX_VALUE + 1111222L;
         IllegalStateException cause = new IllegalStateException("Expected 
test");
 
         ByteBuf buf = MessageSerializer.serializeRequestFailure(alloc, 
requestId, cause);
 
         int frameLength = buf.readInt();
-        assertEquals(MessageType.REQUEST_FAILURE, 
MessageSerializer.deserializeHeader(buf));
+        
assertThat(MessageSerializer.deserializeHeader(buf)).isEqualTo(MessageType.REQUEST_FAILURE);
         RequestFailure requestFailure = 
MessageSerializer.deserializeRequestFailure(buf);
-        assertEquals(buf.readerIndex(), frameLength + 4);
+        assertThat(buf.readerIndex()).isEqualTo(frameLength + 4);
 
-        assertEquals(requestId, requestFailure.getRequestId());
-        assertEquals(cause.getClass(), requestFailure.getCause().getClass());
-        assertEquals(cause.getMessage(), 
requestFailure.getCause().getMessage());
+        assertThat(requestFailure.getRequestId()).isEqualTo(requestId);
+        assertThat(requestFailure.getCause()).isInstanceOf(cause.getClass());
+        
assertThat(requestFailure.getCause().getMessage()).isEqualTo(cause.getMessage());
     }
 
     /** Tests server failure serialization. */
     @Test
-    public void testServerFailureSerialization() throws Exception {
+    void testServerFailureSerialization() throws Exception {
         IllegalStateException cause = new IllegalStateException("Expected 
test");
 
         ByteBuf buf = MessageSerializer.serializeServerFailure(alloc, cause);
 
         int frameLength = buf.readInt();
-        assertEquals(MessageType.SERVER_FAILURE, 
MessageSerializer.deserializeHeader(buf));
+        
assertThat(MessageSerializer.deserializeHeader(buf)).isEqualTo(MessageType.SERVER_FAILURE);
         Throwable request = MessageSerializer.deserializeServerFailure(buf);
-        assertEquals(buf.readerIndex(), frameLength + 4);
+        assertThat(buf.readerIndex()).isEqualTo(frameLength + 4);
 
-        assertEquals(cause.getClass(), request.getClass());
-        assertEquals(cause.getMessage(), request.getMessage());
+        assertThat(request).isInstanceOf(cause.getClass());
+        assertThat(request.getMessage()).isEqualTo(cause.getMessage());
     }
 
     private byte[] randomByteArray(int capacity) {
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension
 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension
new file mode 100644
index 00000000000..28999133c2b
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.util.TestLoggerExtension
\ No newline at end of file

Reply via email to