This is an automated email from the ASF dual-hosted git repository.
guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 7f4e834babe [FLINK-31876][QS] Migrate flink-queryable-state-runtime
tests to JUnit5
7f4e834babe is described below
commit 7f4e834babe7e8865123148e57ecdd6a138b609d
Author: fredia <[email protected]>
AuthorDate: Sun Apr 23 12:21:38 2023 +0800
[FLINK-31876][QS] Migrate flink-queryable-state-runtime tests to JUnit5
---
.../flink-queryable-state-runtime/pom.xml | 6 +
.../client/proxy/KvStateClientProxyImplTest.java | 43 ++--
.../itcases/AbstractQueryableStateTestBase.java | 203 ++++++++--------
.../itcases/HAQueryableStateFsBackendITCase.java | 87 +++----
.../HAQueryableStateRocksDBBackendITCase.java | 89 +++----
.../NonHAQueryableStateFsBackendITCase.java | 47 ++--
.../NonHAQueryableStateRocksDBBackendITCase.java | 47 ++--
.../queryablestate/network/AbstractServerTest.java | 72 +++---
.../KVStateRequestSerializerRocksDBTest.java | 23 +-
.../network/KvStateClientHandlerTest.java | 92 ++++++--
.../network/KvStateRequestSerializerTest.java | 260 +++++++++++++--------
.../network/KvStateServerHandlerTest.java | 162 +++++++------
.../queryablestate/network/KvStateServerTest.java | 24 +-
.../network/MessageSerializerTest.java | 102 ++++----
.../org.junit.jupiter.api.extension.Extension | 16 ++
15 files changed, 690 insertions(+), 583 deletions(-)
diff --git a/flink-queryable-state/flink-queryable-state-runtime/pom.xml
b/flink-queryable-state/flink-queryable-state-runtime/pom.xml
index 2a12468a89c..9775c67485a 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/pom.xml
+++ b/flink-queryable-state/flink-queryable-state-runtime/pom.xml
@@ -124,6 +124,12 @@ under the License.
<goals>
<goal>test-jar</goal>
</goals>
+ <configuration>
+ <excludes>
+ <!-- test-jar
is still used by JUnit4 modules -->
+
<exclude>META-INF/services/org.junit.jupiter.api.extension.Extension</exclude>
+ </excludes>
+ </configuration>
</execution>
</executions>
</plugin>
diff --git
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImplTest.java
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImplTest.java
index 16ec6e9cbf2..a7c3265b302 100644
---
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImplTest.java
+++
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImplTest.java
@@ -23,27 +23,24 @@ import
org.apache.flink.queryablestate.network.stats.DisabledKvStateRequestStats
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.jobmaster.KvStateLocationOracle;
import org.apache.flink.runtime.query.KvStateLocation;
-import org.apache.flink.util.TestLogger;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import java.net.InetAddress;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.nullValue;
-import static org.junit.Assert.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
/** Tests for the {@link KvStateClientProxyImpl}. */
-public class KvStateClientProxyImplTest extends TestLogger {
+class KvStateClientProxyImplTest {
private KvStateClientProxyImpl kvStateClientProxy;
- @Before
- public void setup() {
+ @BeforeEach
+ void setup() {
kvStateClientProxy =
new KvStateClientProxyImpl(
InetAddress.getLoopbackAddress().getHostName(),
@@ -53,14 +50,14 @@ public class KvStateClientProxyImplTest extends TestLogger {
new DisabledKvStateRequestStats());
}
- @After
- public void shutdown() {
+ @AfterEach
+ void shutdown() {
kvStateClientProxy.shutdown();
}
/** Tests that we can set and retrieve the {@link KvStateLocationOracle}.
*/
@Test
- public void testKvStateLocationOracle() {
+ void testKvStateLocationOracle() {
final JobID jobId1 = new JobID();
final TestingKvStateLocationOracle kvStateLocationOracle1 =
new TestingKvStateLocationOracle();
@@ -70,17 +67,15 @@ public class KvStateClientProxyImplTest extends TestLogger {
new TestingKvStateLocationOracle();
kvStateClientProxy.updateKvStateLocationOracle(jobId2,
kvStateLocationOracle2);
- assertThat(kvStateClientProxy.getKvStateLocationOracle(new JobID()),
nullValue());
+ assertThat(kvStateClientProxy.getKvStateLocationOracle(new
JobID())).isNull();
- assertThat(
- kvStateClientProxy.getKvStateLocationOracle(jobId1),
- equalTo(kvStateLocationOracle1));
- assertThat(
- kvStateClientProxy.getKvStateLocationOracle(jobId2),
- equalTo(kvStateLocationOracle2));
+ assertThat(kvStateClientProxy.getKvStateLocationOracle(jobId1))
+ .isEqualTo(kvStateLocationOracle1);
+ assertThat(kvStateClientProxy.getKvStateLocationOracle(jobId2))
+ .isEqualTo(kvStateLocationOracle2);
kvStateClientProxy.updateKvStateLocationOracle(jobId1, null);
- assertThat(kvStateClientProxy.getKvStateLocationOracle(jobId1),
nullValue());
+
assertThat(kvStateClientProxy.getKvStateLocationOracle(jobId1)).isNull();
}
/**
@@ -88,7 +83,7 @@ public class KvStateClientProxyImplTest extends TestLogger {
* HighAvailabilityServices#DEFAULT_JOB_ID} will be used for all requests.
*/
@Test
- public void testLegacyCodePathPreference() {
+ void testLegacyCodePathPreference() {
final TestingKvStateLocationOracle kvStateLocationOracle =
new TestingKvStateLocationOracle();
kvStateClientProxy.updateKvStateLocationOracle(
@@ -96,8 +91,8 @@ public class KvStateClientProxyImplTest extends TestLogger {
final JobID jobId = new JobID();
kvStateClientProxy.updateKvStateLocationOracle(jobId, new
TestingKvStateLocationOracle());
- assertThat(
- kvStateClientProxy.getKvStateLocationOracle(jobId),
equalTo(kvStateLocationOracle));
+ assertThat(kvStateClientProxy.getKvStateLocationOracle(jobId))
+ .isEqualTo(kvStateLocationOracle);
}
/** Testing implementation of {@link KvStateLocationOracle}. */
diff --git
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
index 158e64904e4..7d6742c2e99 100644
---
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
+++
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
@@ -60,23 +60,22 @@ import
org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.testutils.ClassLoaderUtils;
-import org.apache.flink.testutils.executor.TestExecutorResource;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
import com.esotericsoftware.kryo.Serializer;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.net.URLClassLoader;
@@ -90,7 +89,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -99,24 +97,22 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongArray;
-import static org.hamcrest.CoreMatchers.containsString;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Base class for queryable state integration tests with a configurable state
backend. */
-public abstract class AbstractQueryableStateTestBase extends TestLogger {
+public abstract class AbstractQueryableStateTestBase {
private static final Duration TEST_TIMEOUT = Duration.ofSeconds(200L);
private static final long RETRY_TIMEOUT = 50L;
- @ClassRule
- public static final TestExecutorResource<ScheduledExecutorService>
EXECUTOR_RESOURCE =
- new TestExecutorResource<>(() ->
Executors.newScheduledThreadPool(4));
+ @RegisterExtension
+ static final TestExecutorExtension<ScheduledExecutorService>
EXECUTOR_EXTENSION =
+ new TestExecutorExtension<>(() ->
Executors.newScheduledThreadPool(4));
private final ScheduledExecutor executor =
- new
ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor());
+ new
ScheduledExecutorServiceAdapter(EXECUTOR_EXTENSION.getExecutor());
/** State backend to use. */
private StateBackend stateBackend;
@@ -128,14 +124,14 @@ public abstract class AbstractQueryableStateTestBase
extends TestLogger {
protected static int maxParallelism;
- @ClassRule public static TemporaryFolder classloaderFolder = new
TemporaryFolder();
+ @TempDir static File classloaderFolder;
- @Before
- public void setUp() throws Exception {
+ @BeforeEach
+ void setUp() throws Exception {
// NOTE: do not use a shared instance for all tests as the tests may
break
this.stateBackend = createStateBackend();
- Assert.assertNotNull(clusterClient);
+ assertThat(clusterClient).isNotNull();
maxParallelism = 4;
}
@@ -157,7 +153,7 @@ public abstract class AbstractQueryableStateTestBase
extends TestLogger {
* counts of each key in rounds until all keys have non-zero counts.
*/
@Test
- public void testQueryableState() throws Exception {
+ void testQueryableState() throws Exception {
final Deadline deadline = Deadline.now().plus(TEST_TIMEOUT);
final int numKeys = 256;
@@ -229,13 +225,15 @@ public abstract class AbstractQueryableStateTestBase
extends TestLogger {
result.thenAccept(
response -> {
- try {
- Tuple2<Integer, Long> res = response.get();
- counts.set(key, res.f1);
- assertEquals("Key mismatch", key,
res.f0.intValue());
- } catch (Exception e) {
- Assert.fail(e.getMessage());
- }
+ assertThatCode(
+ () -> {
+ Tuple2<Integer, Long> res
= response.get();
+ counts.set(key, res.f1);
+ assertThat(key)
+
.isEqualTo(res.f0.intValue())
+
.withFailMessage("Key mismatch");
+ })
+ .doesNotThrowAnyException();
});
futures.add(result);
@@ -246,19 +244,21 @@ public abstract class AbstractQueryableStateTestBase
extends TestLogger {
.get(deadline.timeLeft().toMillis(),
TimeUnit.MILLISECONDS);
}
- assertTrue("Not all keys are non-zero", allNonZero);
+ assertThat(allNonZero).isTrue().withFailMessage("Not all keys are
non-zero");
// All should be non-zero
for (int i = 0; i < numKeys; i++) {
long count = counts.get(i);
- assertTrue("Count at position " + i + " is " + count, count >
0);
+ assertThat(count)
+ .isGreaterThan(0)
+ .withFailMessage("Count at position " + i + " is " +
count);
}
}
}
/** Tests that duplicate query registrations fail the job at the
JobManager. */
- @Test(timeout = 60_000)
- public void testDuplicateRegistrationFailsJob() throws Exception {
+ @Test
+ void testDuplicateRegistrationFailsJob() throws Exception {
final int numKeys = 256;
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -312,18 +312,17 @@ public abstract class AbstractQueryableStateTestBase
extends TestLogger {
.thenApply(JobResult::getSerializedThrowable)
.thenAccept(
serializedThrowable -> {
- assertTrue(serializedThrowable.isPresent());
+ assertThat(serializedThrowable).isPresent();
final Throwable t =
serializedThrowable
.get()
.deserializeError(getClass().getClassLoader());
final String failureCause =
ExceptionUtils.stringifyException(t);
- assertThat(
- failureCause,
- containsString(
+ assertThat(failureCause)
+ .contains(
"KvState with name '"
+ queryName
- + "' has already been
registered by another operator"));
+ + "' has already been
registered by another operator");
})
.get();
}
@@ -334,7 +333,7 @@ public abstract class AbstractQueryableStateTestBase
extends TestLogger {
* subtask index is queried with value numElements (the latest element
updated the state).
*/
@Test
- public void testValueState() throws Exception {
+ void testValueState() throws Exception {
final Deadline deadline = Deadline.now().plus(TEST_TIMEOUT);
final long numElements = 1024L;
@@ -377,7 +376,7 @@ public abstract class AbstractQueryableStateTestBase
extends TestLogger {
/** This test checks if custom Kryo serializers are loaded with correct
classloader. */
@Test
- public void testCustomKryoSerializerHandling() throws Exception {
+ void testCustomKryoSerializerHandling() throws Exception {
final Deadline deadline = Deadline.now().plus(TEST_TIMEOUT);
final long numElements = 1;
final String stateName = "teriberka";
@@ -450,8 +449,8 @@ public abstract class AbstractQueryableStateTestBase
extends TestLogger {
* queryable state name.
*/
@Test
- @Ignore
- public void testWrongJobIdAndWrongQueryableStateName() throws Exception {
+ @Disabled
+ void testWrongJobIdAndWrongQueryableStateName() throws Exception {
final Deadline deadline = Deadline.now().plus(TEST_TIMEOUT);
final long numElements = 1024L;
@@ -492,9 +491,8 @@ public abstract class AbstractQueryableStateTestBase
extends TestLogger {
jobStatusFuture =
clusterClient.getJobStatus(closableJobGraph.getJobId());
}
- assertEquals(
- JobStatus.RUNNING,
- jobStatusFuture.get(deadline.timeLeft().toMillis(),
TimeUnit.MILLISECONDS));
+ assertThat(jobStatusFuture.get(deadline.timeLeft().toMillis(),
TimeUnit.MILLISECONDS))
+ .isEqualTo(JobStatus.RUNNING);
final JobID wrongJobId = new JobID();
@@ -506,24 +504,17 @@ public abstract class AbstractQueryableStateTestBase
extends TestLogger {
BasicTypeInfo.INT_TYPE_INFO,
valueState);
- try {
- unknownJobFuture.get(deadline.timeLeft().toMillis(),
TimeUnit.MILLISECONDS);
- fail(); // by now the request must have failed.
- } catch (ExecutionException e) {
- Assert.assertTrue(
- "GOT: " + e.getCause().getMessage(),
- e.getCause() instanceof RuntimeException);
- Assert.assertTrue(
- "GOT: " + e.getCause().getMessage(),
- e.getCause()
- .getMessage()
- .contains(
- "FlinkJobNotFoundException: Could not
find Flink job ("
- + wrongJobId
- + ")"));
- } catch (Exception f) {
- fail("Unexpected type of exception: " + f.getMessage());
- }
+ assertThatThrownBy(
+ () ->
+ unknownJobFuture.get(
+ deadline.timeLeft().toMillis(),
TimeUnit.MILLISECONDS))
+ .isInstanceOf(ExecutionException.class)
+ .cause()
+ .isInstanceOf(RuntimeException.class)
+ .hasMessage(
+ "FlinkJobNotFoundException: Could not find Flink
job ("
+ + wrongJobId
+ + ")");
CompletableFuture<ValueState<Tuple2<Integer, Long>>> unknownQSName
=
client.getKvState(
@@ -533,22 +524,15 @@ public abstract class AbstractQueryableStateTestBase
extends TestLogger {
BasicTypeInfo.INT_TYPE_INFO,
valueState);
- try {
- unknownQSName.get(deadline.timeLeft().toMillis(),
TimeUnit.MILLISECONDS);
- fail(); // by now the request must have failed.
- } catch (ExecutionException e) {
- Assert.assertTrue(
- "GOT: " + e.getCause().getMessage(),
- e.getCause() instanceof RuntimeException);
- Assert.assertTrue(
- "GOT: " + e.getCause().getMessage(),
- e.getCause()
- .getMessage()
- .contains(
- "UnknownKvStateLocation: No
KvStateLocation found for KvState instance with name 'wrong-hakuna'."));
- } catch (Exception f) {
- fail("Unexpected type of exception: " + f.getMessage());
- }
+ assertThatThrownBy(
+ () ->
+ unknownQSName.get(
+ deadline.timeLeft().toMillis(),
TimeUnit.MILLISECONDS))
+ .isInstanceOf(ExecutionException.class)
+ .cause()
+ .isInstanceOf(RuntimeException.class)
+ .hasMessage(
+ "UnknownKvStateLocation: No KvStateLocation found
for KvState instance with name 'wrong-hakuna'.");
}
}
@@ -557,7 +541,7 @@ public abstract class AbstractQueryableStateTestBase
extends TestLogger {
* one request which fails.
*/
@Test
- public void testQueryNonStartedJobState() throws Exception {
+ void testQueryNonStartedJobState() throws Exception {
final Deadline deadline = Deadline.now().plus(TEST_TIMEOUT);
final long numElements = 1024L;
@@ -617,8 +601,8 @@ public abstract class AbstractQueryableStateTestBase
extends TestLogger {
*
* @throws UnknownKeyOrNamespaceException thrown due querying a
non-existent key
*/
- @Test(expected = UnknownKeyOrNamespaceException.class)
- public void testValueStateDefault() throws Throwable {
+ @Test
+ void testValueStateDefault() throws Throwable {
final Deadline deadline = Deadline.now().plus(TEST_TIMEOUT);
final long numElements = 1024L;
@@ -672,13 +656,10 @@ public abstract class AbstractQueryableStateTestBase
extends TestLogger {
true,
executor);
- try {
- future.get(deadline.timeLeft().toMillis(),
TimeUnit.MILLISECONDS);
- } catch (ExecutionException | CompletionException e) {
- // get() on a completedExceptionally future wraps the
- // exception in an ExecutionException.
- throw e.getCause();
- }
+ assertThatThrownBy(
+ () -> future.get(deadline.timeLeft().toMillis(),
TimeUnit.MILLISECONDS))
+ .cause()
+ .isInstanceOf(UnknownKeyOrNamespaceException.class);
}
}
@@ -690,7 +671,7 @@ public abstract class AbstractQueryableStateTestBase
extends TestLogger {
* <p>This is the same as the simple value state test, but uses the API
shortcut.
*/
@Test
- public void testValueStateShortcut() throws Exception {
+ void testValueStateShortcut() throws Exception {
final Deadline deadline = Deadline.now().plus(TEST_TIMEOUT);
final long numElements = 1024L;
@@ -740,7 +721,7 @@ public abstract class AbstractQueryableStateTestBase
extends TestLogger {
* sums these up. The test succeeds after each subtask index is queried
with result n*(n+1)/2.
*/
@Test
- public void testReducingState() throws Exception {
+ void testReducingState() throws Exception {
final Deadline deadline = Deadline.now().plus(TEST_TIMEOUT);
final long numElements = 1024L;
@@ -797,7 +778,7 @@ public abstract class AbstractQueryableStateTestBase
extends TestLogger {
Tuple2<Integer, Long> value =
future.get(deadline.timeLeft().toMillis(),
TimeUnit.MILLISECONDS).get();
- assertEquals("Key mismatch", key, value.f0.intValue());
+
assertThat(key).isEqualTo(value.f0.intValue()).withFailMessage("Key mismatch");
if (expected == value.f1) {
success = true;
} else {
@@ -806,7 +787,7 @@ public abstract class AbstractQueryableStateTestBase
extends TestLogger {
}
}
- assertTrue("Did not succeed query", success);
+ assertThat(success).isTrue().withFailMessage("Did not succeed
query");
}
}
}
@@ -817,7 +798,7 @@ public abstract class AbstractQueryableStateTestBase
extends TestLogger {
* the values up. The test succeeds after each subtask index is queried
with result n*(n+1)/2.
*/
@Test
- public void testMapState() throws Exception {
+ void testMapState() throws Exception {
final Deadline deadline = Deadline.now().plus(TEST_TIMEOUT);
final long numElements = 1024L;
@@ -899,15 +880,16 @@ public abstract class AbstractQueryableStateTestBase
extends TestLogger {
.get(key);
if (value != null && value.f0 != null && expected ==
value.f1) {
- assertEquals("Key mismatch", key, value.f0.intValue());
+ assertThat(key)
+ .isEqualTo(value.f0.intValue())
+ .withFailMessage("Key mismatch");
success = true;
} else {
// Retry
Thread.sleep(RETRY_TIMEOUT);
}
}
-
- assertTrue("Did not succeed query", success);
+ assertThat(success).isTrue().withFailMessage("Did not succeed
query");
}
}
}
@@ -919,7 +901,7 @@ public abstract class AbstractQueryableStateTestBase
extends TestLogger {
* contains the correct number of distinct elements.
*/
@Test
- public void testListState() throws Exception {
+ void testListState() throws Exception {
final Deadline deadline = Deadline.now().plus(TEST_TIMEOUT);
final long numElements = 1024L;
@@ -1010,20 +992,20 @@ public abstract class AbstractQueryableStateTestBase
extends TestLogger {
}
}
- assertTrue("Did not succeed query", success);
+ assertThat(success).isTrue().withFailMessage("Did not succeed
query");
}
for (int key = 0; key < maxParallelism; key++) {
Set<Long> values = results.get(key);
for (long i = 0L; i <= numElements; i++) {
- assertTrue(values.contains(i));
+ assertThat(values).contains(i);
}
}
}
}
@Test
- public void testAggregatingState() throws Exception {
+ void testAggregatingState() throws Exception {
final Deadline deadline = Deadline.now().plus(TEST_TIMEOUT);
final long numElements = 1024L;
@@ -1091,7 +1073,7 @@ public abstract class AbstractQueryableStateTestBase
extends TestLogger {
}
}
- assertTrue("Did not succeed query", success);
+ assertThat(success).isTrue().withFailMessage("Did not succeed
query");
}
}
}
@@ -1345,10 +1327,9 @@ public abstract class AbstractQueryableStateTestBase
extends TestLogger {
Duration.ofMillis(50),
deadline,
(jobStatus) ->
jobStatus.equals(JobStatus.CANCELED),
- new
ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor()));
- assertEquals(
- JobStatus.CANCELED,
- jobStatusFuture.get(deadline.timeLeft().toMillis(),
TimeUnit.MILLISECONDS));
+ new
ScheduledExecutorServiceAdapter(EXECUTOR_EXTENSION.getExecutor()));
+ assertThat(jobStatusFuture.get(deadline.timeLeft().toMillis(),
TimeUnit.MILLISECONDS))
+ .isEqualTo(JobStatus.CANCELED);
}
}
@@ -1456,7 +1437,7 @@ public abstract class AbstractQueryableStateTestBase
extends TestLogger {
Tuple2<Integer, Long> value =
future.get(deadline.timeLeft().toMillis(),
TimeUnit.MILLISECONDS).value();
- assertEquals("Key mismatch", key, value.f0.intValue());
+
assertThat(key).isEqualTo(value.f0.intValue()).withFailMessage("Key mismatch");
if (expected == value.f1) {
success = true;
} else {
@@ -1465,14 +1446,14 @@ public abstract class AbstractQueryableStateTestBase
extends TestLogger {
}
}
- assertTrue("Did not succeed query", success);
+ assertThat(success).isTrue().withFailMessage("Did not succeed
query");
}
}
private static URLClassLoader createLoaderWithCustomKryoSerializer(String
className)
throws IOException {
return ClassLoaderUtils.compileAndLoadJava(
- classloaderFolder.newFolder(),
+ classloaderFolder,
className + ".java",
"import com.esotericsoftware.kryo.Kryo;\n"
+ "import com.esotericsoftware.kryo.Serializer;\n"
diff --git
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java
index 3739ed3256e..2465c6052cf 100644
---
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java
+++
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java
@@ -18,6 +18,7 @@
package org.apache.flink.queryablestate.itcases;
+import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
@@ -25,21 +26,25 @@ import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.QueryableStateOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.core.testutils.AllCallbackWrapper;
import org.apache.flink.queryablestate.client.QueryableStateClient;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
-import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
-import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.runtime.zookeeper.ZooKeeperExtension;
+import org.apache.flink.test.junit5.InjectClusterClient;
+import org.apache.flink.test.junit5.MiniClusterExtension;
-import org.apache.curator.test.TestingServer;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.nio.file.Path;
/** Several integration tests for queryable state using the {@link
FsStateBackend}. */
-public class HAQueryableStateFsBackendITCase extends
AbstractQueryableStateTestBase {
+class HAQueryableStateFsBackendITCase extends AbstractQueryableStateTestBase {
private static final int NUM_JMS = 2;
// NUM_TMS * NUM_SLOTS_PER_TM must match the parallelism of the pipelines
so that
@@ -50,48 +55,51 @@ public class HAQueryableStateFsBackendITCase extends
AbstractQueryableStateTestB
private static final int QS_PROXY_PORT_RANGE_START = 9064;
private static final int QS_SERVER_PORT_RANGE_START = 9069;
- @ClassRule public static TemporaryFolder temporaryFolder = new
TemporaryFolder();
-
- private static TestingServer zkServer;
-
- private static MiniClusterWithClientResource miniClusterResource;
+ @TempDir
+ @Order(1)
+ static Path tmpStateBackendDir;
+
+ @TempDir
+ @Order(2)
+ static Path tmpHaStoragePath;
+
+ @RegisterExtension
+ @Order(3)
+ static final AllCallbackWrapper<ZooKeeperExtension> ZK_RESOURCE =
+ new AllCallbackWrapper<>(new ZooKeeperExtension());
+
+ @RegisterExtension
+ @Order(4)
+ static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+ new MiniClusterExtension(
+ () ->
+ new MiniClusterResourceConfiguration.Builder()
+ .setConfiguration(getConfig())
+ .setNumberTaskManagers(NUM_TMS)
+
.setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM)
+ .build());
@Override
protected StateBackend createStateBackend() throws Exception {
- return new
FsStateBackend(temporaryFolder.newFolder().toURI().toString());
+ return new FsStateBackend(tmpStateBackendDir.toUri().toString());
}
- @BeforeClass
- public static void setup() throws Exception {
- zkServer = ZooKeeperTestUtils.createAndStartZookeeperTestingServer();
-
- // we have to manage this manually because we have to create the
ZooKeeper server
- // ahead of this
- miniClusterResource =
- new MiniClusterWithClientResource(
- new MiniClusterResourceConfiguration.Builder()
- .setConfiguration(getConfig())
- .setNumberTaskManagers(NUM_TMS)
- .setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM)
- .build());
-
- miniClusterResource.before();
+ @BeforeAll
+ static void setup(@InjectClusterClient RestClusterClient<?>
injectedClusterClient)
+ throws Exception {
client = new QueryableStateClient("localhost",
QS_PROXY_PORT_RANGE_START);
- clusterClient = miniClusterResource.getClusterClient();
+ clusterClient = injectedClusterClient;
}
- @AfterClass
- public static void tearDown() throws Exception {
- miniClusterResource.after();
+ @AfterAll
+ static void tearDown() throws Exception {
client.shutdownAndWait();
-
- zkServer.close();
}
- private static Configuration getConfig() throws Exception {
+ private static Configuration getConfig() {
Configuration config = new Configuration();
config.setBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER,
true);
@@ -110,10 +118,11 @@ public class HAQueryableStateFsBackendITCase extends
AbstractQueryableStateTestB
QS_SERVER_PORT_RANGE_START + "-" + (QS_SERVER_PORT_RANGE_START
+ NUM_TMS));
config.setBoolean(WebOptions.SUBMIT_ENABLE, false);
- config.setString(
- HighAvailabilityOptions.HA_STORAGE_PATH,
temporaryFolder.newFolder().toString());
+ config.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
tmpHaStoragePath.toString());
- config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM,
zkServer.getConnectString());
+ config.setString(
+ HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM,
+ ZK_RESOURCE.getCustomExtension().getConnectString());
config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
return config;
diff --git
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
index 508f151c0f0..3e7cc014997 100644
---
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
+++
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
@@ -18,6 +18,7 @@
package org.apache.flink.queryablestate.itcases;
+import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
@@ -26,20 +27,24 @@ import org.apache.flink.configuration.QueryableStateOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.core.testutils.AllCallbackWrapper;
import org.apache.flink.queryablestate.client.QueryableStateClient;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
-import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
-import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.runtime.zookeeper.ZooKeeperExtension;
+import org.apache.flink.test.junit5.InjectClusterClient;
+import org.apache.flink.test.junit5.MiniClusterExtension;
-import org.apache.curator.test.TestingServer;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.nio.file.Path;
/** Several integration tests for queryable state using the {@link
RocksDBStateBackend}. */
-public class HAQueryableStateRocksDBBackendITCase extends
AbstractQueryableStateTestBase {
+class HAQueryableStateRocksDBBackendITCase extends
AbstractQueryableStateTestBase {
private static final int NUM_JMS = 2;
// NUM_TMS * NUM_SLOTS_PER_TM must match the parallelism of the pipelines
so that
@@ -50,48 +55,49 @@ public class HAQueryableStateRocksDBBackendITCase extends
AbstractQueryableState
private static final int QS_PROXY_PORT_RANGE_START = 9074;
private static final int QS_SERVER_PORT_RANGE_START = 9079;
- @ClassRule public static TemporaryFolder temporaryFolder = new
TemporaryFolder();
-
- private static TestingServer zkServer;
-
- private static MiniClusterWithClientResource miniClusterResource;
+ @TempDir
+ @Order(1)
+ static Path tmpStateBackendDir;
+
+ @TempDir
+ @Order(2)
+ static Path tmpHaStoragePath;
+
+ @RegisterExtension
+ @Order(3)
+ static final AllCallbackWrapper<ZooKeeperExtension> ZK_RESOURCE =
+ new AllCallbackWrapper<>(new ZooKeeperExtension());
+
+ @RegisterExtension
+ @Order(4)
+ static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+ new MiniClusterExtension(
+ () ->
+ new MiniClusterResourceConfiguration.Builder()
+ .setConfiguration(getConfig())
+ .setNumberTaskManagers(NUM_TMS)
+
.setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM)
+ .build());
@Override
protected StateBackend createStateBackend() throws Exception {
- return new
RocksDBStateBackend(temporaryFolder.newFolder().toURI().toString());
+ return new RocksDBStateBackend(tmpStateBackendDir.toUri().toString());
}
- @BeforeClass
- public static void setup() throws Exception {
- zkServer = ZooKeeperTestUtils.createAndStartZookeeperTestingServer();
-
- // we have to manage this manually because we have to create the
ZooKeeper server
- // ahead of this
- miniClusterResource =
- new MiniClusterWithClientResource(
- new MiniClusterResourceConfiguration.Builder()
- .setConfiguration(getConfig())
- .setNumberTaskManagers(NUM_TMS)
- .setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM)
- .build());
-
- miniClusterResource.before();
-
+ @BeforeAll
+ static void setup(@InjectClusterClient RestClusterClient<?>
injectedClusterClient)
+ throws Exception {
client = new QueryableStateClient("localhost",
QS_PROXY_PORT_RANGE_START);
- clusterClient = miniClusterResource.getClusterClient();
+ clusterClient = injectedClusterClient;
}
- @AfterClass
- public static void tearDown() throws Exception {
- miniClusterResource.after();
-
+ @AfterAll
+ static void tearDown() throws Exception {
client.shutdownAndWait();
-
- zkServer.close();
}
- private static Configuration getConfig() throws Exception {
+ private static Configuration getConfig() {
Configuration config = new Configuration();
config.setBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER,
true);
@@ -110,10 +116,11 @@ public class HAQueryableStateRocksDBBackendITCase extends
AbstractQueryableState
QS_SERVER_PORT_RANGE_START + "-" + (QS_SERVER_PORT_RANGE_START
+ NUM_TMS));
config.setBoolean(WebOptions.SUBMIT_ENABLE, false);
- config.setString(
- HighAvailabilityOptions.HA_STORAGE_PATH,
temporaryFolder.newFolder().toString());
+ config.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
tmpHaStoragePath.toString());
- config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM,
zkServer.getConnectString());
+ config.setString(
+ HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM,
+ ZK_RESOURCE.getCustomExtension().getConnectString());
config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
return config;
diff --git
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java
index cb53fe4b02a..26c7d366bf3 100644
---
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java
+++
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java
@@ -18,6 +18,7 @@
package org.apache.flink.queryablestate.itcases;
+import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
@@ -28,13 +29,15 @@ import
org.apache.flink.queryablestate.client.QueryableStateClient;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
-import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.test.junit5.InjectClusterClient;
+import org.apache.flink.test.junit5.MiniClusterExtension;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.nio.file.Path;
/** Several integration tests for queryable state using the {@link
FsStateBackend}. */
public class NonHAQueryableStateFsBackendITCase extends
AbstractQueryableStateTestBase {
@@ -48,31 +51,33 @@ public class NonHAQueryableStateFsBackendITCase extends
AbstractQueryableStateTe
private static final int QS_PROXY_PORT_RANGE_START = 9084;
private static final int QS_SERVER_PORT_RANGE_START = 9089;
- @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
+ @TempDir static Path tmpStateBackendDir;
- @ClassRule
- public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
- new MiniClusterWithClientResource(
- new MiniClusterResourceConfiguration.Builder()
- .setConfiguration(getConfig())
- .setNumberTaskManagers(NUM_TMS)
- .setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM)
- .build());
+ @RegisterExtension
+ static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+ new MiniClusterExtension(
+ () ->
+ new MiniClusterResourceConfiguration.Builder()
+ .setConfiguration(getConfig())
+ .setNumberTaskManagers(NUM_TMS)
+
.setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM)
+ .build());
@Override
protected StateBackend createStateBackend() throws Exception {
- return new
FsStateBackend(temporaryFolder.newFolder().toURI().toString());
+ return new FsStateBackend(tmpStateBackendDir.toUri().toString());
}
- @BeforeClass
- public static void setup() throws Exception {
+ @BeforeAll
+ static void setup(@InjectClusterClient RestClusterClient<?>
injectedClusterClient)
+ throws Exception {
client = new QueryableStateClient("localhost",
QS_PROXY_PORT_RANGE_START);
- clusterClient = MINI_CLUSTER_RESOURCE.getClusterClient();
+ clusterClient = injectedClusterClient;
}
- @AfterClass
- public static void tearDown() {
+ @AfterAll
+ static void tearDown() {
client.shutdownAndWait();
}
diff --git
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
index 13e191d8ec2..6e8e0e8be6f 100644
---
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
+++
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
@@ -18,6 +18,7 @@
package org.apache.flink.queryablestate.itcases;
+import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
@@ -28,13 +29,15 @@ import
org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.queryablestate.client.QueryableStateClient;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
-import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.test.junit5.InjectClusterClient;
+import org.apache.flink.test.junit5.MiniClusterExtension;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.nio.file.Path;
/** Several integration tests for queryable state using the {@link
RocksDBStateBackend}. */
public class NonHAQueryableStateRocksDBBackendITCase extends
AbstractQueryableStateTestBase {
@@ -47,31 +50,33 @@ public class NonHAQueryableStateRocksDBBackendITCase
extends AbstractQueryableSt
private static final int QS_PROXY_PORT_RANGE_START = 9094;
private static final int QS_SERVER_PORT_RANGE_START = 9099;
- @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
+ @TempDir static Path tmpStateBackendDir;
- @ClassRule
- public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
- new MiniClusterWithClientResource(
- new MiniClusterResourceConfiguration.Builder()
- .setConfiguration(getConfig())
- .setNumberTaskManagers(NUM_TMS)
- .setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM)
- .build());
+ @RegisterExtension
+ static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+ new MiniClusterExtension(
+ () ->
+ new MiniClusterResourceConfiguration.Builder()
+ .setConfiguration(getConfig())
+ .setNumberTaskManagers(NUM_TMS)
+
.setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM)
+ .build());
@Override
protected StateBackend createStateBackend() throws Exception {
- return new
RocksDBStateBackend(temporaryFolder.newFolder().toURI().toString());
+ return new RocksDBStateBackend(tmpStateBackendDir.toUri().toString());
}
- @BeforeClass
- public static void setup() throws Exception {
+ @BeforeAll
+ static void setup(@InjectClusterClient RestClusterClient<?>
injectedClusterClient)
+ throws Exception {
client = new QueryableStateClient("localhost",
QS_PROXY_PORT_RANGE_START);
- clusterClient = MINI_CLUSTER_RESOURCE.getClusterClient();
+ clusterClient = injectedClusterClient;
}
- @AfterClass
- public static void tearDown() {
+ @AfterAll
+ static void tearDown() {
client.shutdownAndWait();
}
diff --git
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java
index ed29cfdab2c..edc60d185ac 100644
---
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java
+++
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java
@@ -25,16 +25,11 @@ import
org.apache.flink.queryablestate.network.messages.MessageSerializer;
import org.apache.flink.queryablestate.network.stats.AtomicKvStateRequestStats;
import
org.apache.flink.queryablestate.network.stats.DisabledKvStateRequestStats;
import org.apache.flink.queryablestate.network.stats.KvStateRequestStats;
-import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.TestLogger;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
+import org.junit.jupiter.api.Test;
import java.net.InetAddress;
import java.net.UnknownHostException;
@@ -46,22 +41,18 @@ import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
-/** Tests general behavior of the {@link AbstractServerBase}. */
-public class AbstractServerTest extends TestLogger {
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
- @Rule public ExpectedException expectedEx = ExpectedException.none();
+/** Tests general behavior of the {@link AbstractServerBase}. */
+class AbstractServerTest {
/**
* Tests that in case of port collision, a FlinkRuntimeException is thrown
with a specific
* message.
*/
@Test
- public void testServerInitializationFailure() throws Throwable {
-
- // the expected exception along with the adequate message
- expectedEx.expect(FlinkRuntimeException.class);
- expectedEx.expectMessage(
- "Unable to start Test Server 2. All ports in provided range
are occupied.");
+ void testServerInitializationFailure() throws Throwable {
List<Integer> portList = Collections.singletonList(0);
@@ -76,7 +67,10 @@ public class AbstractServerTest extends TestLogger {
new DisabledKvStateRequestStats(),
Collections.singletonList(server1.getServerAddress().getPort())
.iterator())) {
- server2.start();
+ // the expected exception along with the adequate message
+ assertThatThrownBy(() -> server2.start())
+ .hasMessage(
+ "Unable to start Test Server 2. All ports in
provided range are occupied.");
}
}
}
@@ -86,7 +80,7 @@ public class AbstractServerTest extends TestLogger {
* to the next port in the range.
*/
@Test
- public void testPortRangeSuccess() throws Throwable {
+ void testPortRangeSuccess() throws Throwable {
AtomicKvStateRequestStats serverStats1 = new
AtomicKvStateRequestStats();
AtomicKvStateRequestStats serverStats2 = new
AtomicKvStateRequestStats();
@@ -112,39 +106,37 @@ public class AbstractServerTest extends TestLogger {
new
TestMessage.TestMessageDeserializer()),
clientStats)) {
server1.start();
- Assert.assertTrue(
- server1.getServerAddress().getPort() >= portRangeStart
- && server1.getServerAddress().getPort() <=
portRangeEnd);
+
assertThat(server1.getServerAddress().getPort()).isGreaterThanOrEqualTo(portRangeStart);
+
assertThat(server1.getServerAddress().getPort()).isLessThanOrEqualTo(portRangeEnd);
server2.start();
- Assert.assertTrue(
- server2.getServerAddress().getPort() >= portRangeStart
- && server2.getServerAddress().getPort() <=
portRangeEnd);
+
assertThat(server2.getServerAddress().getPort()).isGreaterThanOrEqualTo(portRangeStart);
+
assertThat(server2.getServerAddress().getPort()).isLessThanOrEqualTo(portRangeEnd);
TestMessage response1 =
client.sendRequest(server1.getServerAddress(), new
TestMessage("ping")).join();
- Assert.assertEquals(server1.getServerName() + "-ping",
response1.getMessage());
+
assertThat(response1.getMessage()).isEqualTo(server1.getServerName() + "-ping");
TestMessage response2 =
client.sendRequest(server2.getServerAddress(), new
TestMessage("pong")).join();
- Assert.assertEquals(server2.getServerName() + "-pong",
response2.getMessage());
+
assertThat(response2.getMessage()).isEqualTo(server2.getServerName() + "-pong");
- Assert.assertEquals(1L, serverStats1.getNumConnections());
- Assert.assertEquals(1L, serverStats2.getNumConnections());
+ assertThat(serverStats1.getNumConnections()).isEqualTo(1L);
+ assertThat(serverStats2.getNumConnections()).isEqualTo(1L);
- Assert.assertEquals(2L, clientStats.getNumConnections());
- Assert.assertEquals(0L, clientStats.getNumFailed());
- Assert.assertEquals(2L, clientStats.getNumSuccessful());
- Assert.assertEquals(2L, clientStats.getNumRequests());
+ assertThat(clientStats.getNumConnections()).isEqualTo(2L);
+ assertThat(clientStats.getNumFailed()).isEqualTo(0L);
+ assertThat(clientStats.getNumSuccessful()).isEqualTo(2L);
+ assertThat(clientStats.getNumRequests()).isEqualTo(2L);
}
- Assert.assertEquals(0L, serverStats1.getNumConnections());
- Assert.assertEquals(0L, serverStats2.getNumConnections());
+ assertThat(serverStats1.getNumConnections()).isEqualTo(0L);
+ assertThat(serverStats2.getNumConnections()).isEqualTo(0L);
- Assert.assertEquals(0L, clientStats.getNumConnections());
- Assert.assertEquals(0L, clientStats.getNumFailed());
- Assert.assertEquals(2L, clientStats.getNumSuccessful());
- Assert.assertEquals(2L, clientStats.getNumRequests());
+ assertThat(clientStats.getNumConnections()).isEqualTo(0L);
+ assertThat(clientStats.getNumFailed()).isEqualTo(0L);
+ assertThat(clientStats.getNumSuccessful()).isEqualTo(2L);
+ assertThat(clientStats.getNumRequests()).isEqualTo(2L);
}
private static class TestClient extends Client<TestMessage, TestMessage>
@@ -161,7 +153,7 @@ public class AbstractServerTest extends TestLogger {
@Override
public void close() throws Exception {
shutdown().join();
- Assert.assertTrue(isEventGroupShutdown());
+ assertThat(isEventGroupShutdown()).isTrue();
}
}
@@ -207,8 +199,8 @@ public class AbstractServerTest extends TestLogger {
@Override
public void close() throws Exception {
shutdownServer().get();
- Assert.assertTrue(getQueryExecutor().isTerminated());
- Assert.assertTrue(isEventGroupShutdown());
+ assertThat(getQueryExecutor().isTerminated()).isTrue();
+ assertThat(isEventGroupShutdown()).isTrue();
}
}
diff --git
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
index e1fe88280ea..97a941de460 100644
---
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
+++
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
@@ -29,17 +29,18 @@ import
org.apache.flink.queryablestate.client.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.runtime.state.internal.InternalMapState;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
/**
* Additional tests for the serialization and deserialization using the
KvStateSerializer with a
* RocksDB state back-end.
*/
-public final class KVStateRequestSerializerRocksDBTest {
+final class KVStateRequestSerializerRocksDBTest {
- @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
+ @TempDir static File tmpFile;
/**
* Tests list serialization and deserialization match.
@@ -48,13 +49,11 @@ public final class KVStateRequestSerializerRocksDBTest {
* KvStateRequestSerializerTest#testListSerialization() using the heap
state back-end test
*/
@Test
- public void testListSerialization() throws Exception {
+ void testListSerialization() throws Exception {
final long key = 0L;
final RocksDBKeyedStateBackend<Long> longHeapKeyedStateBackend =
- RocksDBTestUtils.builderForTestDefaults(
- temporaryFolder.getRoot(),
LongSerializer.INSTANCE)
- .build();
+ RocksDBTestUtils.builderForTestDefaults(tmpFile,
LongSerializer.INSTANCE).build();
longHeapKeyedStateBackend.setCurrentKey(key);
@@ -74,14 +73,12 @@ public final class KVStateRequestSerializerRocksDBTest {
* KvStateRequestSerializerTest#testMapSerialization() using the heap
state back-end test
*/
@Test
- public void testMapSerialization() throws Exception {
+ void testMapSerialization() throws Exception {
final long key = 0L;
// objects for RocksDB state list serialisation
final RocksDBKeyedStateBackend<Long> longHeapKeyedStateBackend =
- RocksDBTestUtils.builderForTestDefaults(
- temporaryFolder.getRoot(),
LongSerializer.INSTANCE)
- .build();
+ RocksDBTestUtils.builderForTestDefaults(tmpFile,
LongSerializer.INSTANCE).build();
longHeapKeyedStateBackend.setCurrentKey(key);
diff --git
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateClientHandlerTest.java
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateClientHandlerTest.java
index e0facd7d309..904d8294dbe 100644
---
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateClientHandlerTest.java
+++
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateClientHandlerTest.java
@@ -20,32 +20,27 @@ package org.apache.flink.queryablestate.network;
import org.apache.flink.queryablestate.messages.KvStateInternalRequest;
import org.apache.flink.queryablestate.messages.KvStateResponse;
+import org.apache.flink.queryablestate.network.messages.MessageBody;
import org.apache.flink.queryablestate.network.messages.MessageSerializer;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
import
org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.nio.channels.ClosedChannelException;
-import static org.junit.Assert.assertEquals;
-import static org.mockito.ArgumentMatchers.isA;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
+import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link ClientHandler}. */
-public class KvStateClientHandlerTest {
+class KvStateClientHandlerTest {
/**
* Tests that on reads the expected callback methods are called and read
buffers are recycled.
*/
@Test
- public void testReadCallbacksAndBufferRecycling() throws Exception {
- final ClientHandlerCallback<KvStateResponse> callback =
mock(ClientHandlerCallback.class);
+ void testReadCallbacksAndBufferRecycling() throws Exception {
+ final TestingClientHandlerCallback callback = new
TestingClientHandlerCallback();
final MessageSerializer<KvStateInternalRequest, KvStateResponse>
serializer =
new MessageSerializer<>(
@@ -64,10 +59,12 @@ public class KvStateClientHandlerTest {
buf.skipBytes(4); // skip frame length
// Verify callback
+ callback.reset();
channel.writeInbound(buf);
- verify(callback, times(1)).onRequestResult(eq(1222112277L),
any(KvStateResponse.class));
- assertEquals("Buffer not recycled", 0, buf.refCnt());
-
+ assertThat(callback.onRequestCnt).isEqualTo(1);
+ assertThat(callback.onRequestId).isEqualTo(1222112277L);
+ assertThat(callback.onRequestBody).isInstanceOf(KvStateResponse.class);
+ assertThat(buf.refCnt()).isEqualTo(0).withFailMessage("Buffer not
recycled");
//
// Request failure
//
@@ -79,9 +76,12 @@ public class KvStateClientHandlerTest {
buf.skipBytes(4); // skip frame length
// Verify callback
+ callback.reset();
channel.writeInbound(buf);
- verify(callback, times(1)).onRequestFailure(eq(1222112278L),
isA(RuntimeException.class));
- assertEquals("Buffer not recycled", 0, buf.refCnt());
+ assertThat(callback.onRequestFailureCnt).isEqualTo(1);
+ assertThat(callback.onRequestFailureId).isEqualTo(1222112278L);
+
assertThat(callback.onRequestFailureBody).isInstanceOf(RuntimeException.class);
+ assertThat(buf.refCnt()).isEqualTo(0).withFailMessage("Buffer not
recycled");
//
// Server failure
@@ -92,8 +92,10 @@ public class KvStateClientHandlerTest {
buf.skipBytes(4); // skip frame length
// Verify callback
+ callback.reset();
channel.writeInbound(buf);
- verify(callback, times(1)).onFailure(isA(RuntimeException.class));
+ assertThat(callback.onFailureCnt).isEqualTo(1);
+
assertThat(callback.onFailureBody).isInstanceOf(RuntimeException.class);
//
// Unexpected messages
@@ -101,20 +103,68 @@ public class KvStateClientHandlerTest {
buf = channel.alloc().buffer(4).writeInt(1223823);
// Verify callback
+ callback.reset();
channel.writeInbound(buf);
- verify(callback, times(1)).onFailure(isA(IllegalStateException.class));
- assertEquals("Buffer not recycled", 0, buf.refCnt());
+ assertThat(callback.onFailureCnt).isEqualTo(1);
+
assertThat(callback.onFailureBody).isInstanceOf(RuntimeException.class);
+ assertThat(buf.refCnt()).isEqualTo(0).withFailMessage("Buffer not
recycled");
//
// Exception caught
//
+ callback.reset();
channel.pipeline().fireExceptionCaught(new RuntimeException("Expected
test Exception"));
- verify(callback, times(3)).onFailure(isA(RuntimeException.class));
+ assertThat(callback.onFailureCnt).isEqualTo(1);
+
assertThat(callback.onFailureBody).isInstanceOf(RuntimeException.class);
//
// Channel inactive
//
+ callback.reset();
channel.pipeline().fireChannelInactive();
- verify(callback,
times(1)).onFailure(isA(ClosedChannelException.class));
+ assertThat(callback.onFailureCnt).isEqualTo(1);
+
assertThat(callback.onFailureBody).isInstanceOf(ClosedChannelException.class);
+ }
+
+ private static class TestingClientHandlerCallback implements
ClientHandlerCallback {
+ private int onRequestCnt;
+ private long onRequestId;
+ private MessageBody onRequestBody;
+ private int onRequestFailureCnt;
+ private long onRequestFailureId;
+ private Throwable onRequestFailureBody;
+ private int onFailureCnt;
+ private Throwable onFailureBody;
+
+ @Override
+ public void onRequestResult(long requestId, MessageBody response) {
+ onRequestCnt++;
+ onRequestId = requestId;
+ onRequestBody = response;
+ }
+
+ @Override
+ public void onRequestFailure(long requestId, Throwable cause) {
+ onRequestFailureCnt++;
+ onRequestFailureId = requestId;
+ onRequestFailureBody = cause;
+ }
+
+ @Override
+ public void onFailure(Throwable cause) {
+ onFailureCnt++;
+ onFailureBody = cause;
+ }
+
+ public void reset() {
+ onRequestCnt = 0;
+ onRequestId = -1;
+ onRequestBody = null;
+ onRequestFailureCnt = 0;
+ onRequestFailureId = -1;
+ onRequestFailureBody = null;
+ onFailureCnt = 0;
+ onFailureBody = null;
+ }
}
}
diff --git
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
index cf860d2661f..dfeda1f1590 100644
---
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
+++
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.queryablestate.network;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -29,6 +30,8 @@ import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.queryablestate.client.VoidNamespace;
import org.apache.flink.queryablestate.client.VoidNamespaceSerializer;
import
org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.BackendBuildingException;
@@ -43,9 +46,9 @@ import
org.apache.flink.runtime.state.internal.InternalMapState;
import org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@@ -58,23 +61,19 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Tests for {@link KvStateSerializer}. */
-@RunWith(Parameterized.class)
-public class KvStateRequestSerializerTest {
+class KvStateRequestSerializerTest {
- @Parameterized.Parameters
- public static Collection<Boolean> parameters() {
+ public static Collection<Boolean> data() {
return Arrays.asList(false, true);
}
- @Parameterized.Parameter public boolean async;
-
/** Tests key and namespace serialization utils. */
@Test
- public void testKeyAndNamespaceSerialization() throws Exception {
+ void testKeyAndNamespaceSerialization() throws Exception {
TypeSerializer<Long> keySerializer = LongSerializer.INSTANCE;
TypeSerializer<String> namespaceSerializer = StringSerializer.INSTANCE;
@@ -89,91 +88,124 @@ public class KvStateRequestSerializerTest {
KvStateSerializer.deserializeKeyAndNamespace(
serializedKeyAndNamespace, keySerializer,
namespaceSerializer);
- assertEquals(expectedKey, actual.f0.longValue());
- assertEquals(expectedNamespace, actual.f1);
+ assertThat(actual.f0.longValue()).isEqualTo(expectedKey);
+ assertThat(actual.f1).isEqualTo(expectedNamespace);
}
/** Tests key and namespace deserialization utils with too few bytes. */
- @Test(expected = IOException.class)
- public void testKeyAndNamespaceDeserializationEmpty() throws Exception {
- KvStateSerializer.deserializeKeyAndNamespace(
- new byte[] {}, LongSerializer.INSTANCE,
StringSerializer.INSTANCE);
+ @Test
+ void testKeyAndNamespaceDeserializationEmpty() throws Exception {
+ assertThatThrownBy(
+ () ->
+ KvStateSerializer.deserializeKeyAndNamespace(
+ new byte[] {},
+ LongSerializer.INSTANCE,
+ StringSerializer.INSTANCE))
+ .isInstanceOf(IOException.class);
}
/** Tests key and namespace deserialization utils with too few bytes. */
- @Test(expected = IOException.class)
- public void testKeyAndNamespaceDeserializationTooShort() throws Exception {
- KvStateSerializer.deserializeKeyAndNamespace(
- new byte[] {1}, LongSerializer.INSTANCE,
StringSerializer.INSTANCE);
+ @Test
+ void testKeyAndNamespaceDeserializationTooShort() throws Exception {
+ assertThatThrownBy(
+ () ->
+ KvStateSerializer.deserializeKeyAndNamespace(
+ new byte[] {1},
+ LongSerializer.INSTANCE,
+ StringSerializer.INSTANCE))
+ .isInstanceOf(IOException.class);
}
/** Tests key and namespace deserialization utils with too many bytes. */
- @Test(expected = IOException.class)
- public void testKeyAndNamespaceDeserializationTooMany1() throws Exception {
- // Long + null String + 1 byte
- KvStateSerializer.deserializeKeyAndNamespace(
- new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 42, 0, 2},
- LongSerializer.INSTANCE,
- StringSerializer.INSTANCE);
+ @Test
+ void testKeyAndNamespaceDeserializationTooMany1() throws Exception {
+ assertThatThrownBy(
+ () ->
+ // Long + null String + 1 byte
+ KvStateSerializer.deserializeKeyAndNamespace(
+ new byte[] {1, 1, 1, 1, 1, 1, 1, 1,
42, 0, 2},
+ LongSerializer.INSTANCE,
+ StringSerializer.INSTANCE))
+ .isInstanceOf(IOException.class);
}
/** Tests key and namespace deserialization utils with too many bytes. */
- @Test(expected = IOException.class)
- public void testKeyAndNamespaceDeserializationTooMany2() throws Exception {
- // Long + null String + 2 bytes
- KvStateSerializer.deserializeKeyAndNamespace(
- new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 42, 0, 2, 2},
- LongSerializer.INSTANCE,
- StringSerializer.INSTANCE);
+ @Test
+ void testKeyAndNamespaceDeserializationTooMany2() throws Exception {
+ assertThatThrownBy(
+ () ->
+ // Long + null String + 2 bytes
+ KvStateSerializer.deserializeKeyAndNamespace(
+ new byte[] {1, 1, 1, 1, 1, 1, 1, 1,
42, 0, 2, 2},
+ LongSerializer.INSTANCE,
+ StringSerializer.INSTANCE))
+ .isInstanceOf(IOException.class);
}
/** Tests value serialization utils. */
@Test
- public void testValueSerialization() throws Exception {
+ void testValueSerialization() throws Exception {
TypeSerializer<Long> valueSerializer = LongSerializer.INSTANCE;
long expectedValue = Long.MAX_VALUE - 1292929292L;
byte[] serializedValue =
KvStateSerializer.serializeValue(expectedValue, valueSerializer);
long actualValue = KvStateSerializer.deserializeValue(serializedValue,
valueSerializer);
- assertEquals(expectedValue, actualValue);
+ assertThat(actualValue).isEqualTo(expectedValue);
}
/** Tests value deserialization with too few bytes. */
- @Test(expected = IOException.class)
- public void testDeserializeValueEmpty() throws Exception {
- KvStateSerializer.deserializeValue(new byte[] {},
LongSerializer.INSTANCE);
+ @Test
+ void testDeserializeValueEmpty() throws Exception {
+ assertThatThrownBy(
+ () ->
+ KvStateSerializer.deserializeValue(
+ new byte[] {},
LongSerializer.INSTANCE))
+ .isInstanceOf(IOException.class);
}
/** Tests value deserialization with too few bytes. */
- @Test(expected = IOException.class)
- public void testDeserializeValueTooShort() throws Exception {
- // 1 byte (incomplete Long)
- KvStateSerializer.deserializeValue(new byte[] {1},
LongSerializer.INSTANCE);
+ @Test
+ void testDeserializeValueTooShort() throws Exception {
+ assertThatThrownBy(
+ () ->
+ // 1 byte (incomplete Long)
+ KvStateSerializer.deserializeValue(
+ new byte[] {1},
LongSerializer.INSTANCE))
+ .isInstanceOf(IOException.class);
}
/** Tests value deserialization with too many bytes. */
- @Test(expected = IOException.class)
- public void testDeserializeValueTooMany1() throws Exception {
- // Long + 1 byte
- KvStateSerializer.deserializeValue(
- new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 2},
LongSerializer.INSTANCE);
+ @Test
+ void testDeserializeValueTooMany1() throws Exception {
+ assertThatThrownBy(
+ () ->
+ // Long + 1 byte
+ KvStateSerializer.deserializeValue(
+ new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 2},
+ LongSerializer.INSTANCE))
+ .isInstanceOf(IOException.class);
}
/** Tests value deserialization with too many bytes. */
- @Test(expected = IOException.class)
- public void testDeserializeValueTooMany2() throws Exception {
- // Long + 2 bytes
- KvStateSerializer.deserializeValue(
- new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 2, 2},
LongSerializer.INSTANCE);
+ @Test
+ void testDeserializeValueTooMany2() throws Exception {
+ assertThatThrownBy(
+ () ->
+ // Long + 2 bytes
+ KvStateSerializer.deserializeValue(
+ new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 2,
2},
+ LongSerializer.INSTANCE))
+ .isInstanceOf(IOException.class);
}
/** Tests list serialization utils. */
- @Test
- public void testListSerialization() throws Exception {
+ @ParameterizedTest
+ @MethodSource("data")
+ void testListSerialization(boolean async) throws Exception {
final long key = 0L;
final HeapKeyedStateBackend<Long> longHeapKeyedStateBackend =
- getLongHeapKeyedStateBackend(key);
+ getLongHeapKeyedStateBackend(key, async);
final InternalListState<Long, VoidNamespace, Long> listState =
longHeapKeyedStateBackend.createOrUpdateInternalState(
@@ -225,46 +257,54 @@ public class KvStateRequestSerializerTest {
List<Long> actualValues =
KvStateSerializer.deserializeList(serializedValues,
valueSerializer);
- assertEquals(expectedValues, actualValues);
+ assertThat(actualValues).isEqualTo(expectedValues);
// Single value
long expectedValue = ThreadLocalRandom.current().nextLong();
byte[] serializedValue =
KvStateSerializer.serializeValue(expectedValue, valueSerializer);
List<Long> actualValue =
KvStateSerializer.deserializeList(serializedValue,
valueSerializer);
- assertEquals(1, actualValue.size());
- assertEquals(expectedValue, actualValue.get(0).longValue());
+ assertThat(actualValue).containsExactly(expectedValue);
}
/** Tests list deserialization with too few bytes. */
@Test
- public void testDeserializeListEmpty() throws Exception {
+ void testDeserializeListEmpty() throws Exception {
List<Long> actualValue =
KvStateSerializer.deserializeList(new byte[] {},
LongSerializer.INSTANCE);
- assertEquals(0, actualValue.size());
+ assertThat(actualValue).isEmpty();
}
/** Tests list deserialization with too few bytes. */
- @Test(expected = IOException.class)
- public void testDeserializeListTooShort1() throws Exception {
- // 1 byte (incomplete Long)
- KvStateSerializer.deserializeList(new byte[] {1},
LongSerializer.INSTANCE);
+ @Test
+ void testDeserializeListTooShort1() throws Exception {
+ assertThatThrownBy(
+ () ->
+ // 1 byte (incomplete Long)
+ KvStateSerializer.deserializeList(
+ new byte[] {1},
LongSerializer.INSTANCE))
+ .isInstanceOf(IOException.class);
}
/** Tests list deserialization with too few bytes. */
- @Test(expected = IOException.class)
- public void testDeserializeListTooShort2() throws Exception {
- // Long + 1 byte (separator) + 1 byte (incomplete Long)
- KvStateSerializer.deserializeList(
- new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 2, 3},
LongSerializer.INSTANCE);
+ @Test
+ void testDeserializeListTooShort2() throws Exception {
+ assertThatThrownBy(
+ () ->
+ // Long + 1 byte (separator) + 1 byte
(incomplete Long)
+ KvStateSerializer.deserializeList(
+ new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 2,
3},
+ LongSerializer.INSTANCE))
+ .isInstanceOf(IOException.class);
}
/** Tests map serialization utils. */
- @Test
- public void testMapSerialization() throws Exception {
+ @ParameterizedTest
+ @MethodSource("data")
+ public void testMapSerialization(boolean async) throws Exception {
final long key = 0L;
final HeapKeyedStateBackend<Long> longHeapKeyedStateBackend =
- getLongHeapKeyedStateBackend(key);
+ getLongHeapKeyedStateBackend(key, async);
final InternalMapState<Long, VoidNamespace, Long, String> mapState =
(InternalMapState<Long, VoidNamespace, Long, String>)
@@ -279,14 +319,16 @@ public class KvStateRequestSerializerTest {
testMapSerialization(key, mapState);
}
- private HeapKeyedStateBackend<Long> getLongHeapKeyedStateBackend(final
long key)
+ private HeapKeyedStateBackend<Long> getLongHeapKeyedStateBackend(final
long key, boolean async)
throws BackendBuildingException {
final KeyGroupRange keyGroupRange = new KeyGroupRange(0, 0);
ExecutionConfig executionConfig = new ExecutionConfig();
+ TaskKvStateRegistry taskKvStateRegistry =
+ new KvStateRegistry().createTaskRegistry(JobID.generate(), new
JobVertexID());
// objects for heap state list serialisation
final HeapKeyedStateBackend<Long> longHeapKeyedStateBackend =
new HeapKeyedStateBackendBuilder<>(
- mock(TaskKvStateRegistry.class),
+ taskKvStateRegistry,
LongSerializer.INSTANCE,
ClassLoader.getSystemClassLoader(),
keyGroupRange.getNumberOfKeyGroups(),
@@ -353,9 +395,9 @@ public class KvStateRequestSerializerTest {
Map<Long, String> actualValues =
KvStateSerializer.deserializeMap(
serializedValues, userKeySerializer,
userValueSerializer);
- assertEquals(expectedValues.size(), actualValues.size());
+ assertThat(actualValues).hasSize(expectedValues.size());
for (Map.Entry<Long, String> actualEntry : actualValues.entrySet()) {
- assertEquals(expectedValues.get(actualEntry.getKey()),
actualEntry.getValue());
+
assertThat(actualEntry.getValue()).isEqualTo(expectedValues.get(actualEntry.getKey()));
}
// Single value
@@ -372,45 +414,59 @@ public class KvStateRequestSerializerTest {
Map<Long, String> actualValue =
KvStateSerializer.deserializeMap(
serializedValue, userKeySerializer,
userValueSerializer);
- assertEquals(1, actualValue.size());
- assertEquals(expectedValue, actualValue.get(expectedKey));
+ assertThat(actualValue).hasSize(1);
+ assertThat(actualValue.get(expectedKey)).isEqualTo(expectedValue);
}
/** Tests map deserialization with too few bytes. */
@Test
- public void testDeserializeMapEmpty() throws Exception {
+ void testDeserializeMapEmpty() throws Exception {
Map<Long, String> actualValue =
KvStateSerializer.deserializeMap(
new byte[] {}, LongSerializer.INSTANCE,
StringSerializer.INSTANCE);
- assertEquals(0, actualValue.size());
+ assertThat(actualValue).hasSize(0);
}
/** Tests map deserialization with too few bytes. */
- @Test(expected = IOException.class)
- public void testDeserializeMapTooShort1() throws Exception {
- // 1 byte (incomplete Key)
- KvStateSerializer.deserializeMap(
- new byte[] {1}, LongSerializer.INSTANCE,
StringSerializer.INSTANCE);
+ @Test
+ void testDeserializeMapTooShort1() throws Exception {
+ assertThatThrownBy(
+ () ->
+ // 1 byte (incomplete Key)
+ KvStateSerializer.deserializeMap(
+ new byte[] {1},
+ LongSerializer.INSTANCE,
+ StringSerializer.INSTANCE))
+ .isInstanceOf(IOException.class);
}
/** Tests map deserialization with too few bytes. */
- @Test(expected = IOException.class)
- public void testDeserializeMapTooShort2() throws Exception {
- // Long (Key) + 1 byte (incomplete Value)
- KvStateSerializer.deserializeMap(
- new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 0},
- LongSerializer.INSTANCE,
- LongSerializer.INSTANCE);
+ @Test
+ void testDeserializeMapTooShort2() throws Exception {
+ assertThatThrownBy(
+ () ->
+ // Long (Key) + 1 byte (incomplete Value)
+ KvStateSerializer.deserializeMap(
+ new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 0},
+ LongSerializer.INSTANCE,
+ LongSerializer.INSTANCE))
+ .isInstanceOf(IOException.class);
}
/** Tests map deserialization with too few bytes. */
- @Test(expected = IOException.class)
- public void testDeserializeMapTooShort3() throws Exception {
- // Long (Key1) + Boolean (false) + Long (Value1) + 1 byte (incomplete
Key2)
- KvStateSerializer.deserializeMap(
- new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1,
3},
- LongSerializer.INSTANCE,
- LongSerializer.INSTANCE);
+ @Test
+ void testDeserializeMapTooShort3() throws Exception {
+ assertThatThrownBy(
+ () ->
+ // Long (Key1) + Boolean (false) + Long
(Value1) + 1 byte
+ // (incomplete Key2)
+ KvStateSerializer.deserializeMap(
+ new byte[] {
+ 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1,
1, 1, 1, 1, 1, 1, 3
+ },
+ LongSerializer.INSTANCE,
+ LongSerializer.INSTANCE))
+ .isInstanceOf(IOException.class);
}
private byte[] randomByteArray(int capacity) {
diff --git
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
index 6df22a827b1..868b30e67c0 100644
---
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
+++
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
@@ -55,7 +55,6 @@ import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
-import org.apache.flink.util.TestLogger;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
@@ -63,33 +62,31 @@ import
org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import
org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
import
org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
import java.net.InetAddress;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link KvStateServerHandler}. */
-@Ignore(
+@Disabled(
"KvStateServerHandlerTest is unstable. See FLINK-13553 for more
information. Since the community "
+ "does not have time to work on QS, we decided to temporarily
ignore this test case in order"
+ "to maintain build stability.")
-public class KvStateServerHandlerTest extends TestLogger {
+class KvStateServerHandlerTest {
private static KvStateServerImpl testServer;
private static final long READ_TIMEOUT_MILLIS = 10000L;
- @BeforeClass
- public static void setup() {
+ @BeforeAll
+ static void setup() {
try {
testServer =
new KvStateServerImpl(
@@ -105,14 +102,14 @@ public class KvStateServerHandlerTest extends TestLogger {
}
}
- @AfterClass
- public static void tearDown() throws Exception {
+ @AfterAll
+ static void tearDown() throws Exception {
testServer.shutdown();
}
/** Tests a simple successful query via an EmbeddedChannel. */
@Test
- public void testSimpleQuery() throws Exception {
+ void testSimpleQuery() throws Exception {
KvStateRegistry registry = new KvStateRegistry();
AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
@@ -160,7 +157,7 @@ public class KvStateServerHandlerTest extends TestLogger {
long requestId = Integer.MAX_VALUE + 182828L;
- assertTrue(registryListener.registrationName.equals("vanilla"));
+ assertThat(registryListener.registrationName).isEqualTo("vanilla");
KvStateInternalRequest request =
new KvStateInternalRequest(registryListener.kvStateId,
serializedKeyAndNamespace);
@@ -175,18 +172,18 @@ public class KvStateServerHandlerTest extends TestLogger {
buf.skipBytes(4); // skip frame length
// Verify the response
- assertEquals(MessageType.REQUEST_RESULT,
MessageSerializer.deserializeHeader(buf));
+
assertThat(MessageSerializer.deserializeHeader(buf)).isEqualTo(MessageType.REQUEST_RESULT);
long deserRequestId = MessageSerializer.getRequestId(buf);
KvStateResponse response = serializer.deserializeResponse(buf);
buf.release();
- assertEquals(requestId, deserRequestId);
+ assertThat(deserRequestId).isEqualTo(requestId);
int actualValue =
KvStateSerializer.deserializeValue(response.getContent(),
IntSerializer.INSTANCE);
- assertEquals(expectedValue, actualValue);
+ assertThat(actualValue).isEqualTo(expectedValue);
- assertEquals(stats.toString(), 1, stats.getNumRequests());
+
assertThat(stats.getNumRequests()).isEqualTo(1).withFailMessage(stats.toString());
// Wait for async successful request report
long deadline = System.nanoTime() + TimeUnit.NANOSECONDS.convert(30,
TimeUnit.SECONDS);
@@ -194,7 +191,7 @@ public class KvStateServerHandlerTest extends TestLogger {
Thread.sleep(10L);
}
- assertEquals(stats.toString(), 1L, stats.getNumSuccessful());
+
assertThat(stats.getNumSuccessful()).isEqualTo(1L).withFailMessage(stats.toString());
}
/**
@@ -202,7 +199,7 @@ public class KvStateServerHandlerTest extends TestLogger {
* unregistered KvStateIDs.
*/
@Test
- public void testQueryUnknownKvStateID() throws Exception {
+ void testQueryUnknownKvStateID() throws Exception {
KvStateRegistry registry = new KvStateRegistry();
AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
@@ -229,18 +226,18 @@ public class KvStateServerHandlerTest extends TestLogger {
buf.skipBytes(4); // skip frame length
// Verify the response
- assertEquals(MessageType.REQUEST_FAILURE,
MessageSerializer.deserializeHeader(buf));
+
assertThat(MessageSerializer.deserializeHeader(buf)).isEqualTo(MessageType.REQUEST_FAILURE);
RequestFailure response =
MessageSerializer.deserializeRequestFailure(buf);
buf.release();
- assertEquals(requestId, response.getRequestId());
+ assertThat(response.getRequestId()).isEqualTo(requestId);
- assertTrue(
- "Did not respond with expected failure cause",
- response.getCause() instanceof UnknownKvStateIdException);
+ assertThat(response.getCause())
+ .isInstanceOf(UnknownKvStateIdException.class)
+ .withFailMessage("Did not respond with expected failure
cause");
- assertEquals(1L, stats.getNumRequests());
- assertEquals(1L, stats.getNumFailed());
+ assertThat(stats.getNumRequests()).isEqualTo(1L);
+ assertThat(stats.getNumFailed()).isEqualTo(1L);
}
/**
@@ -248,7 +245,7 @@ public class KvStateServerHandlerTest extends TestLogger {
* for non-existing keys.
*/
@Test
- public void testQueryUnknownKey() throws Exception {
+ void testQueryUnknownKey() throws Exception {
KvStateRegistry registry = new KvStateRegistry();
AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
@@ -287,7 +284,7 @@ public class KvStateServerHandlerTest extends TestLogger {
long requestId = Integer.MAX_VALUE + 22982L;
- assertTrue(registryListener.registrationName.equals("vanilla"));
+ assertThat(registryListener.registrationName).isEqualTo("vanilla");
KvStateInternalRequest request =
new KvStateInternalRequest(registryListener.kvStateId,
serializedKeyAndNamespace);
@@ -301,18 +298,18 @@ public class KvStateServerHandlerTest extends TestLogger {
buf.skipBytes(4); // skip frame length
// Verify the response
- assertEquals(MessageType.REQUEST_FAILURE,
MessageSerializer.deserializeHeader(buf));
+
assertThat(MessageSerializer.deserializeHeader(buf)).isEqualTo(MessageType.REQUEST_FAILURE);
RequestFailure response =
MessageSerializer.deserializeRequestFailure(buf);
buf.release();
- assertEquals(requestId, response.getRequestId());
+ assertThat(response.getRequestId()).isEqualTo(requestId);
- assertTrue(
- "Did not respond with expected failure cause",
- response.getCause() instanceof UnknownKeyOrNamespaceException);
+ assertThat(response.getCause())
+ .isInstanceOf(UnknownKeyOrNamespaceException.class)
+ .withFailMessage("Did not respond with expected failure
cause");
- assertEquals(1L, stats.getNumRequests());
- assertEquals(1L, stats.getNumFailed());
+ assertThat(stats.getNumRequests()).isEqualTo(1L);
+ assertThat(stats.getNumFailed()).isEqualTo(1L);
}
/**
@@ -321,7 +318,7 @@ public class KvStateServerHandlerTest extends TestLogger {
* call.
*/
@Test
- public void testFailureOnGetSerializedValue() throws Exception {
+ void testFailureOnGetSerializedValue() throws Exception {
KvStateRegistry registry = new KvStateRegistry();
AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
@@ -396,19 +393,19 @@ public class KvStateServerHandlerTest extends TestLogger {
buf.skipBytes(4); // skip frame length
// Verify the response
- assertEquals(MessageType.REQUEST_FAILURE,
MessageSerializer.deserializeHeader(buf));
+
assertThat(MessageSerializer.deserializeHeader(buf)).isEqualTo(MessageType.REQUEST_FAILURE);
RequestFailure response =
MessageSerializer.deserializeRequestFailure(buf);
buf.release();
- assertTrue(response.getCause().getMessage().contains("Expected test
Exception"));
+ assertThat(response.getCause().getMessage()).contains("Expected test
Exception");
- assertEquals(1L, stats.getNumRequests());
- assertEquals(1L, stats.getNumFailed());
+ assertThat(stats.getNumRequests()).isEqualTo(1L);
+ assertThat(stats.getNumFailed()).isEqualTo(1L);
}
/** Tests that the channel is closed if an Exception reaches the channel
handler. */
@Test
- public void testCloseChannelOnExceptionCaught() throws Exception {
+ void testCloseChannelOnExceptionCaught() throws Exception {
KvStateRegistry registry = new KvStateRegistry();
AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
@@ -427,14 +424,14 @@ public class KvStateServerHandlerTest extends TestLogger {
buf.skipBytes(4); // skip frame length
// Verify the response
- assertEquals(MessageType.SERVER_FAILURE,
MessageSerializer.deserializeHeader(buf));
+
assertThat(MessageSerializer.deserializeHeader(buf)).isEqualTo(MessageType.SERVER_FAILURE);
Throwable response = MessageSerializer.deserializeServerFailure(buf);
buf.release();
- assertTrue(response.getMessage().contains("Expected test Exception"));
+ assertThat(response.getMessage()).contains("Expected test Exception");
channel.closeFuture().await(READ_TIMEOUT_MILLIS);
- assertFalse(channel.isActive());
+ assertThat(channel.isActive()).isFalse();
}
/**
@@ -442,7 +439,7 @@ public class KvStateServerHandlerTest extends TestLogger {
* closed.
*/
@Test
- public void testQueryExecutorShutDown() throws Throwable {
+ void testQueryExecutorShutDown() throws Throwable {
KvStateRegistry registry = new KvStateRegistry();
AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
@@ -457,7 +454,7 @@ public class KvStateServerHandlerTest extends TestLogger {
localTestServer.start();
localTestServer.shutdown();
- assertTrue(localTestServer.getQueryExecutor().isTerminated());
+ assertThat(localTestServer.getQueryExecutor().isTerminated()).isTrue();
MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer =
new MessageSerializer<>(
@@ -485,7 +482,7 @@ public class KvStateServerHandlerTest extends TestLogger {
backend.getPartitionedState(VoidNamespace.INSTANCE,
VoidNamespaceSerializer.INSTANCE, desc);
- assertTrue(registryListener.registrationName.equals("vanilla"));
+ assertThat(registryListener.registrationName).isEqualTo("vanilla");
KvStateInternalRequest request =
new KvStateInternalRequest(registryListener.kvStateId, new
byte[0]);
@@ -498,21 +495,21 @@ public class KvStateServerHandlerTest extends TestLogger {
buf.skipBytes(4); // skip frame length
// Verify the response
- assertEquals(MessageType.REQUEST_FAILURE,
MessageSerializer.deserializeHeader(buf));
+
assertThat(MessageSerializer.deserializeHeader(buf)).isEqualTo(MessageType.REQUEST_FAILURE);
RequestFailure response =
MessageSerializer.deserializeRequestFailure(buf);
buf.release();
-
assertTrue(response.getCause().getMessage().contains("RejectedExecutionException"));
+
assertThat(response.getCause().getMessage()).contains("RejectedExecutionException");
- assertEquals(1L, stats.getNumRequests());
- assertEquals(1L, stats.getNumFailed());
+ assertThat(stats.getNumRequests()).isEqualTo(1L);
+ assertThat(stats.getNumFailed()).isEqualTo(1L);
localTestServer.shutdown();
}
/** Tests response on unexpected messages. */
@Test
- public void testUnexpectedMessage() throws Exception {
+ void testUnexpectedMessage() throws Exception {
KvStateRegistry registry = new KvStateRegistry();
AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
@@ -536,12 +533,12 @@ public class KvStateServerHandlerTest extends TestLogger {
buf.skipBytes(4); // skip frame length
// Verify the response
- assertEquals(MessageType.SERVER_FAILURE,
MessageSerializer.deserializeHeader(buf));
+
assertThat(MessageSerializer.deserializeHeader(buf)).isEqualTo(MessageType.SERVER_FAILURE);
Throwable response = MessageSerializer.deserializeServerFailure(buf);
buf.release();
- assertEquals(0L, stats.getNumRequests());
- assertEquals(0L, stats.getNumFailed());
+ assertThat(stats.getNumRequests()).isEqualTo(0L);
+ assertThat(stats.getNumFailed()).isEqualTo(0L);
KvStateResponse stateResponse = new KvStateResponse(new byte[0]);
unexpectedMessage =
@@ -553,21 +550,21 @@ public class KvStateServerHandlerTest extends TestLogger {
buf.skipBytes(4); // skip frame length
// Verify the response
- assertEquals(MessageType.SERVER_FAILURE,
MessageSerializer.deserializeHeader(buf));
+
assertThat(MessageSerializer.deserializeHeader(buf)).isEqualTo(MessageType.SERVER_FAILURE);
response = MessageSerializer.deserializeServerFailure(buf);
buf.release();
- assertTrue(
- "Unexpected failure cause " + response.getClass().getName(),
- response instanceof IllegalArgumentException);
+ assertThat(response)
+ .isInstanceOf(IllegalArgumentException.class)
+ .withFailMessage("Unexpected failure cause " +
response.getClass().getName());
- assertEquals(0L, stats.getNumRequests());
- assertEquals(0L, stats.getNumFailed());
+ assertThat(stats.getNumRequests()).isEqualTo(0L);
+ assertThat(stats.getNumFailed()).isEqualTo(0L);
}
/** Tests that incoming buffer instances are recycled. */
@Test
- public void testIncomingBufferIsRecycled() throws Exception {
+ void testIncomingBufferIsRecycled() throws Exception {
KvStateRegistry registry = new KvStateRegistry();
AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
@@ -583,27 +580,27 @@ public class KvStateServerHandlerTest extends TestLogger {
KvStateInternalRequest request = new KvStateInternalRequest(new
KvStateID(), new byte[0]);
ByteBuf serRequest =
MessageSerializer.serializeRequest(channel.alloc(), 282872L, request);
- assertEquals(1L, serRequest.refCnt());
+ assertThat(serRequest.refCnt()).isEqualTo(1L);
// Write regular request
channel.writeInbound(serRequest);
- assertEquals("Buffer not recycled", 0L, serRequest.refCnt());
+ assertThat(serRequest.refCnt()).isEqualTo(0L).withFailMessage("Buffer
not recycled");
// Write unexpected msg
ByteBuf unexpected = channel.alloc().buffer(8);
unexpected.writeInt(4);
unexpected.writeInt(4);
- assertEquals(1L, unexpected.refCnt());
+ assertThat(unexpected.refCnt()).isEqualTo(1L);
channel.writeInbound(unexpected);
- assertEquals("Buffer not recycled", 0L, unexpected.refCnt());
+ assertThat(unexpected.refCnt()).isEqualTo(0L).withFailMessage("Buffer
not recycled");
channel.finishAndReleaseAll();
}
/** Tests the failure response if the serializers don't match. */
@Test
- public void testSerializerMismatch() throws Exception {
+ void testSerializerMismatch() throws Exception {
KvStateRegistry registry = new KvStateRegistry();
AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
@@ -655,7 +652,7 @@ public class KvStateServerHandlerTest extends TestLogger {
"wrong-namespace-type",
StringSerializer.INSTANCE);
- assertTrue(registryListener.registrationName.equals("vanilla"));
+ assertThat(registryListener.registrationName).isEqualTo("vanilla");
KvStateInternalRequest request =
new KvStateInternalRequest(registryListener.kvStateId,
wrongKeyAndNamespace);
@@ -668,11 +665,11 @@ public class KvStateServerHandlerTest extends TestLogger {
buf.skipBytes(4); // skip frame length
// Verify the response
- assertEquals(MessageType.REQUEST_FAILURE,
MessageSerializer.deserializeHeader(buf));
+
assertThat(MessageSerializer.deserializeHeader(buf)).isEqualTo(MessageType.REQUEST_FAILURE);
RequestFailure response =
MessageSerializer.deserializeRequestFailure(buf);
buf.release();
- assertEquals(182828L, response.getRequestId());
- assertTrue(response.getCause().getMessage().contains("IOException"));
+ assertThat(response.getRequestId()).isEqualTo(182828L);
+ assertThat(response.getCause().getMessage()).contains("IOException");
// Repeat with wrong namespace only
request = new KvStateInternalRequest(registryListener.kvStateId,
wrongNamespace);
@@ -685,19 +682,19 @@ public class KvStateServerHandlerTest extends TestLogger {
buf.skipBytes(4); // skip frame length
// Verify the response
- assertEquals(MessageType.REQUEST_FAILURE,
MessageSerializer.deserializeHeader(buf));
+
assertThat(MessageSerializer.deserializeHeader(buf)).isEqualTo(MessageType.REQUEST_FAILURE);
response = MessageSerializer.deserializeRequestFailure(buf);
buf.release();
- assertEquals(182829L, response.getRequestId());
- assertTrue(response.getCause().getMessage().contains("IOException"));
+ assertThat(response.getRequestId()).isEqualTo(182829L);
+ assertThat(response.getCause().getMessage()).contains("IOException");
- assertEquals(2L, stats.getNumRequests());
- assertEquals(2L, stats.getNumFailed());
+ assertThat(stats.getNumRequests()).isEqualTo(2L);
+ assertThat(stats.getNumFailed()).isEqualTo(2L);
}
/** Tests that large responses are chunked. */
@Test
- public void testChunkedResponse() throws Exception {
+ void testChunkedResponse() throws Exception {
KvStateRegistry registry = new KvStateRegistry();
KvStateRequestStats stats = new AtomicKvStateRequestStats();
@@ -751,8 +748,7 @@ public class KvStateServerHandlerTest extends TestLogger {
long requestId = Integer.MAX_VALUE + 182828L;
- assertTrue(registryListener.registrationName.equals("vanilla"));
-
+ assertThat(registryListener.registrationName).isEqualTo("vanilla");
KvStateInternalRequest request =
new KvStateInternalRequest(registryListener.kvStateId,
serializedKeyAndNamespace);
ByteBuf serRequest =
@@ -762,7 +758,7 @@ public class KvStateServerHandlerTest extends TestLogger {
channel.writeInbound(serRequest);
Object msg = readInboundBlocking(channel);
- assertTrue("Not ChunkedByteBuf", msg instanceof ChunkedByteBuf);
+
assertThat(msg).isInstanceOf(ChunkedByteBuf.class).withFailMessage("Not
ChunkedByteBuf");
((ChunkedByteBuf) msg).close();
}
diff --git
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java
index d26d82ab4b1..a04d8749149 100644
---
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java
+++
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java
@@ -56,8 +56,8 @@ import
org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
import
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel;
import
org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
-import org.junit.AfterClass;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Test;
import java.net.InetAddress;
import java.net.InetSocketAddress;
@@ -66,19 +66,18 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link KvStateServerImpl}. */
-public class KvStateServerTest {
+class KvStateServerTest {
// Thread pool for client bootstrap (shared between tests)
private static final NioEventLoopGroup NIO_GROUP = new NioEventLoopGroup();
private static final int TIMEOUT_MILLIS = 10000;
- @AfterClass
- public static void tearDown() throws Exception {
+ @AfterAll
+ static void tearDown() throws Exception {
if (NIO_GROUP != null) {
// note: no "quiet period" to not trigger Netty#4357
NIO_GROUP.shutdownGracefully(0, 10, TimeUnit.SECONDS);
@@ -87,7 +86,7 @@ public class KvStateServerTest {
/** Tests a simple successful query via a SocketChannel. */
@Test
- public void testSimpleRequest() throws Throwable {
+ void testSimpleRequest() throws Throwable {
KvStateServerImpl server = null;
Bootstrap bootstrap = null;
try {
@@ -173,7 +172,7 @@ public class KvStateServerTest {
long requestId = Integer.MAX_VALUE + 182828L;
- assertTrue(registryListener.registrationName.equals("vanilla"));
+ assertThat(registryListener.registrationName).isEqualTo("vanilla");
final KvStateInternalRequest request =
new KvStateInternalRequest(
@@ -186,14 +185,15 @@ public class KvStateServerTest {
ByteBuf buf = responses.poll(TIMEOUT_MILLIS,
TimeUnit.MILLISECONDS);
- assertEquals(MessageType.REQUEST_RESULT,
MessageSerializer.deserializeHeader(buf));
- assertEquals(requestId, MessageSerializer.getRequestId(buf));
+ assertThat(MessageSerializer.deserializeHeader(buf))
+ .isEqualTo(MessageType.REQUEST_RESULT);
+
assertThat(MessageSerializer.getRequestId(buf)).isEqualTo(requestId);
KvStateResponse response =
server.getSerializer().deserializeResponse(buf);
int actualValue =
KvStateSerializer.deserializeValue(
response.getContent(), IntSerializer.INSTANCE);
- assertEquals(expectedValue, actualValue);
+ assertThat(actualValue).isEqualTo(expectedValue);
} finally {
if (server != null) {
server.shutdown();
diff --git
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/MessageSerializerTest.java
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/MessageSerializerTest.java
index 8e71692a701..af009110e43 100644
---
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/MessageSerializerTest.java
+++
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/MessageSerializerTest.java
@@ -29,33 +29,21 @@ import
org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
import org.apache.flink.shaded.netty4.io.netty.buffer.UnpooledByteBufAllocator;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.Test;
-import java.util.Arrays;
-import java.util.Collection;
import java.util.concurrent.ThreadLocalRandom;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Tests for {@link MessageSerializer}. */
-@RunWith(Parameterized.class)
-public class MessageSerializerTest {
+class MessageSerializerTest {
private final ByteBufAllocator alloc = UnpooledByteBufAllocator.DEFAULT;
- @Parameterized.Parameters
- public static Collection<Boolean> parameters() {
- return Arrays.asList(false, true);
- }
-
- @Parameterized.Parameter public boolean async;
-
/** Tests request serialization. */
@Test
- public void testRequestSerialization() throws Exception {
+ void testRequestSerialization() throws Exception {
long requestId = Integer.MAX_VALUE + 1337L;
KvStateID kvStateId = new KvStateID();
byte[] serializedKeyAndNamespace = randomByteArray(1024);
@@ -70,19 +58,20 @@ public class MessageSerializerTest {
ByteBuf buf = MessageSerializer.serializeRequest(alloc, requestId,
request);
int frameLength = buf.readInt();
- assertEquals(MessageType.REQUEST,
MessageSerializer.deserializeHeader(buf));
- assertEquals(requestId, MessageSerializer.getRequestId(buf));
+
assertThat(MessageSerializer.deserializeHeader(buf)).isEqualTo(MessageType.REQUEST);
+ assertThat(MessageSerializer.getRequestId(buf)).isEqualTo(requestId);
KvStateInternalRequest requestDeser =
serializer.deserializeRequest(buf);
- assertEquals(buf.readerIndex(), frameLength + 4);
+ assertThat(buf.readerIndex()).isEqualTo(frameLength + 4);
- assertEquals(kvStateId, requestDeser.getKvStateId());
- assertArrayEquals(serializedKeyAndNamespace,
requestDeser.getSerializedKeyAndNamespace());
+ assertThat(requestDeser.getKvStateId()).isEqualTo(kvStateId);
+ assertThat(requestDeser.getSerializedKeyAndNamespace())
+ .isEqualTo(serializedKeyAndNamespace);
}
/** Tests request serialization with zero-length serialized key and
namespace. */
@Test
- public void testRequestSerializationWithZeroLengthKeyAndNamespace() throws
Exception {
+ void testRequestSerializationWithZeroLengthKeyAndNamespace() throws
Exception {
long requestId = Integer.MAX_VALUE + 1337L;
KvStateID kvStateId = new KvStateID();
@@ -98,28 +87,30 @@ public class MessageSerializerTest {
ByteBuf buf = MessageSerializer.serializeRequest(alloc, requestId,
request);
int frameLength = buf.readInt();
- assertEquals(MessageType.REQUEST,
MessageSerializer.deserializeHeader(buf));
- assertEquals(requestId, MessageSerializer.getRequestId(buf));
+
assertThat(MessageSerializer.deserializeHeader(buf)).isEqualTo(MessageType.REQUEST);
+ assertThat(MessageSerializer.getRequestId(buf)).isEqualTo(requestId);
KvStateInternalRequest requestDeser =
serializer.deserializeRequest(buf);
- assertEquals(buf.readerIndex(), frameLength + 4);
+ assertThat(buf.readerIndex()).isEqualTo(frameLength + 4);
- assertEquals(kvStateId, requestDeser.getKvStateId());
- assertArrayEquals(serializedKeyAndNamespace,
requestDeser.getSerializedKeyAndNamespace());
+ assertThat(requestDeser.getKvStateId()).isEqualTo(kvStateId);
+ assertThat(requestDeser.getSerializedKeyAndNamespace())
+ .isEqualTo(serializedKeyAndNamespace);
}
/**
* Tests that we don't try to be smart about <code>null</code> key and
namespace. They should be
* treated explicitly.
*/
- @Test(expected = NullPointerException.class)
- public void testNullPointerExceptionOnNullSerializedKeyAndNamepsace()
throws Exception {
- new KvStateInternalRequest(new KvStateID(), null);
+ @Test
+ void testNullPointerExceptionOnNullSerializedKeyAndNamepsace() throws
Exception {
+ assertThatThrownBy(() -> new KvStateInternalRequest(new KvStateID(),
null))
+ .isInstanceOf(NullPointerException.class);
}
/** Tests response serialization. */
@Test
- public void testResponseSerialization() throws Exception {
+ void testResponseSerialization() throws Exception {
long requestId = Integer.MAX_VALUE + 72727278L;
byte[] serializedResult = randomByteArray(1024);
@@ -132,18 +123,18 @@ public class MessageSerializerTest {
ByteBuf buf = MessageSerializer.serializeResponse(alloc, requestId,
response);
int frameLength = buf.readInt();
- assertEquals(MessageType.REQUEST_RESULT,
MessageSerializer.deserializeHeader(buf));
- assertEquals(requestId, MessageSerializer.getRequestId(buf));
+
assertThat(MessageSerializer.deserializeHeader(buf)).isEqualTo(MessageType.REQUEST_RESULT);
+ assertThat(MessageSerializer.getRequestId(buf)).isEqualTo(requestId);
KvStateResponse responseDeser = serializer.deserializeResponse(buf);
- assertEquals(buf.readerIndex(), frameLength + 4);
+ assertThat(buf.readerIndex()).isEqualTo(frameLength + 4);
- assertArrayEquals(serializedResult, responseDeser.getContent());
+ assertThat(responseDeser.getContent()).isEqualTo(serializedResult);
}
/** Tests response serialization with zero-length serialized result. */
@Test
- public void testResponseSerializationWithZeroLengthSerializedResult()
throws Exception {
+ void testResponseSerializationWithZeroLengthSerializedResult() throws
Exception {
byte[] serializedResult = new byte[0];
final KvStateResponse response = new KvStateResponse(serializedResult);
@@ -156,55 +147,56 @@ public class MessageSerializerTest {
int frameLength = buf.readInt();
- assertEquals(MessageType.REQUEST_RESULT,
MessageSerializer.deserializeHeader(buf));
- assertEquals(72727278L, MessageSerializer.getRequestId(buf));
+
assertThat(MessageSerializer.deserializeHeader(buf)).isEqualTo(MessageType.REQUEST_RESULT);
+ assertThat(MessageSerializer.getRequestId(buf)).isEqualTo(72727278L);
KvStateResponse responseDeser = serializer.deserializeResponse(buf);
- assertEquals(buf.readerIndex(), frameLength + 4);
+ assertThat(buf.readerIndex()).isEqualTo(frameLength + 4);
- assertArrayEquals(serializedResult, responseDeser.getContent());
+ assertThat(responseDeser.getContent()).isEqualTo(serializedResult);
}
/**
* Tests that we don't try to be smart about <code>null</code> results.
They should be treated
* explicitly.
*/
- @Test(expected = NullPointerException.class)
- public void testNullPointerExceptionOnNullSerializedResult() throws
Exception {
- new KvStateResponse((byte[]) null);
+ @Test
+ void testNullPointerExceptionOnNullSerializedResult() throws Exception {
+ assertThatThrownBy(() -> new KvStateResponse((byte[]) null))
+ .isInstanceOf(NullPointerException.class);
}
/** Tests request failure serialization. */
@Test
- public void testKvStateRequestFailureSerialization() throws Exception {
+ void testKvStateRequestFailureSerialization() throws Exception {
long requestId = Integer.MAX_VALUE + 1111222L;
IllegalStateException cause = new IllegalStateException("Expected
test");
ByteBuf buf = MessageSerializer.serializeRequestFailure(alloc,
requestId, cause);
int frameLength = buf.readInt();
- assertEquals(MessageType.REQUEST_FAILURE,
MessageSerializer.deserializeHeader(buf));
+
assertThat(MessageSerializer.deserializeHeader(buf)).isEqualTo(MessageType.REQUEST_FAILURE);
RequestFailure requestFailure =
MessageSerializer.deserializeRequestFailure(buf);
- assertEquals(buf.readerIndex(), frameLength + 4);
+ assertThat(buf.readerIndex()).isEqualTo(frameLength + 4);
- assertEquals(requestId, requestFailure.getRequestId());
- assertEquals(cause.getClass(), requestFailure.getCause().getClass());
- assertEquals(cause.getMessage(),
requestFailure.getCause().getMessage());
+ assertThat(requestFailure.getRequestId()).isEqualTo(requestId);
+ assertThat(requestFailure.getCause()).isInstanceOf(cause.getClass());
+
assertThat(requestFailure.getCause().getMessage()).isEqualTo(cause.getMessage());
}
/** Tests server failure serialization. */
@Test
- public void testServerFailureSerialization() throws Exception {
+ void testServerFailureSerialization() throws Exception {
IllegalStateException cause = new IllegalStateException("Expected
test");
ByteBuf buf = MessageSerializer.serializeServerFailure(alloc, cause);
int frameLength = buf.readInt();
- assertEquals(MessageType.SERVER_FAILURE,
MessageSerializer.deserializeHeader(buf));
+
assertThat(MessageSerializer.deserializeHeader(buf)).isEqualTo(MessageType.SERVER_FAILURE);
Throwable request = MessageSerializer.deserializeServerFailure(buf);
- assertEquals(buf.readerIndex(), frameLength + 4);
+ assertThat(buf.readerIndex()).isEqualTo(frameLength + 4);
- assertEquals(cause.getClass(), request.getClass());
- assertEquals(cause.getMessage(), request.getMessage());
+ assertThat(request).isInstanceOf(cause.getClass());
+ assertThat(request.getMessage()).isEqualTo(cause.getMessage());
}
private byte[] randomByteArray(int capacity) {
diff --git
a/flink-queryable-state/flink-queryable-state-runtime/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension
b/flink-queryable-state/flink-queryable-state-runtime/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension
new file mode 100644
index 00000000000..28999133c2b
--- /dev/null
+++
b/flink-queryable-state/flink-queryable-state-runtime/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.util.TestLoggerExtension
\ No newline at end of file