snuyanzin commented on code in PR #21410:
URL: https://github.com/apache/flink/pull/21410#discussion_r1043409381
##########
flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java:
##########
@@ -152,16 +137,15 @@
* <p>NOTE: Please ensure to close and dispose any created keyed state backend
in tests.
*/
@SuppressWarnings("serial")
+@ExtendWith(ParameterizedTestExtension.class)
public abstract class StateBackendTestBase<B extends AbstractStateBackend>
extends TestLogger {
- @Rule public final ExpectedException expectedException =
ExpectedException.none();
-
- @Before
+ @BeforeEach
public void before() throws Exception {
Review Comment:
```suggestion
void before() throws Exception {
```
could be package private
##########
flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java:
##########
@@ -276,7 +260,7 @@ protected <K> CheckpointableKeyedStateBackend<K>
restoreKeyedBackend(
new CloseableRegistry());
}
- @Test
+ @TestTemplate
public void testEnableStateLatencyTracking() throws Exception {
Review Comment:
```suggestion
void testEnableStateLatencyTracking() throws Exception {
```
could be package private
##########
flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java:
##########
@@ -152,16 +137,15 @@
* <p>NOTE: Please ensure to close and dispose any created keyed state backend
in tests.
*/
@SuppressWarnings("serial")
+@ExtendWith(ParameterizedTestExtension.class)
public abstract class StateBackendTestBase<B extends AbstractStateBackend>
extends TestLogger {
- @Rule public final ExpectedException expectedException =
ExpectedException.none();
-
- @Before
+ @BeforeEach
public void before() throws Exception {
env = buildMockEnv();
}
- @After
+ @AfterEach
public void after() {
Review Comment:
```suggestion
void after() {
```
could be package private
##########
flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java:
##########
@@ -303,34 +287,35 @@ public void testEnableStateLatencyTracking() throws
Exception {
? ((TestableKeyedStateBackend<Integer>)
keyedStateBackend)
.getDelegatedKeyedStateBackend(true)
: keyedStateBackend;
- Assert.assertTrue(
- ((AbstractKeyedStateBackend<Integer>) nested)
- .getLatencyTrackingStateConfig()
- .isEnabled());
+ assertThat(
+ ((AbstractKeyedStateBackend<Integer>) nested)
+ .getLatencyTrackingStateConfig()
+ .isEnabled())
+ .isTrue();
} finally {
IOUtils.closeQuietly(keyedStateBackend);
keyedStateBackend.dispose();
}
}
- @Test
+ @TestTemplate
public void testIsSafeToReuseState() throws Exception {
CheckpointableKeyedStateBackend<Integer> backend =
createKeyedBackend(IntSerializer.INSTANCE);
try {
- Assert.assertEquals(isSafeToReuseKVState(),
backend.isSafeToReuseKVState());
+
assertThat(backend.isSafeToReuseKVState()).isEqualTo(isSafeToReuseKVState());
} finally {
IOUtils.closeQuietly(backend);
backend.dispose();
}
}
- @Test
+ @TestTemplate
public void testKeyGroupedInternalPriorityQueue() throws Exception {
Review Comment:
```suggestion
void testKeyGroupedInternalPriorityQueue() throws Exception {
```
could be package private
##########
flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java:
##########
@@ -644,14 +629,14 @@ public void
testBackendUsesRegisteredKryoDefaultSerializerUsingGetOrCreate() thr
}
}
- assertTrue("Didn't see the expected Kryo exception.",
numExceptions > 0);
+ assertThat(numExceptions > 0).as("Didn't see the expected Kryo
exception.").isTrue();
} finally {
IOUtils.closeQuietly(backend);
backend.dispose();
}
}
- @Test
+ @TestTemplate
public void testBackendUsesRegisteredKryoSerializer() throws Exception {
Review Comment:
```suggestion
void testBackendUsesRegisteredKryoSerializer() throws Exception {
```
could be package private
##########
flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java:
##########
@@ -446,19 +429,19 @@ public void testGetKeys() throws Exception {
for (int expectedKey = namespace1ElementsNum;
expectedKey < namespace1ElementsNum +
namespace2ElementsNum;
expectedKey++) {
- assertTrue(actualIterator.hasNext());
- assertEquals(expectedKey, actualIterator.nextInt());
+ assertThat(actualIterator.hasNext()).isTrue();
+
assertThat(actualIterator.nextInt()).isEqualTo(expectedKey);
}
- assertFalse(actualIterator.hasNext());
+ assertThat(actualIterator.hasNext()).isFalse();
}
} finally {
IOUtils.closeQuietly(backend);
backend.dispose();
}
}
- @Test
+ @TestTemplate
public void testGetKeysAndNamespaces() throws Exception {
Review Comment:
```suggestion
void testGetKeysAndNamespaces() throws Exception {
```
could be package private
##########
flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java:
##########
@@ -1080,23 +1088,41 @@ public void
testKryoRegisteringRestoreResilienceWithRegisteredSerializer() throw
// on the second restore, since the custom serializer will be used
for
// deserialization, we expect the deliberate failure to be thrown
- expectedException.expect(
- anyOf(
- isA(ExpectedKryoTestException.class),
- Matchers.<Throwable>hasProperty(
- "cause",
isA(ExpectedKryoTestException.class))));
-
- // state backends that eagerly deserializes (such as the memory
state backend) will fail
- // here
- backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot2,
env);
-
- state =
- backend.getPartitionedState(
- VoidNamespace.INSTANCE,
VoidNamespaceSerializer.INSTANCE, kvId);
-
- backend.setCurrentKey(1);
- // state backends that lazily deserializes (such as RocksDB) will
fail here
- state.value();
+ assertThatThrownBy(
+ () -> {
+ // state backends that eagerly deserializes
(such as the memory
+ // state backend) will fail
+ // here
+ CheckpointableKeyedStateBackend<Integer>
restoreBackend = null;
+ try {
+ restoreBackend =
+ restoreKeyedBackend(
+ IntSerializer.INSTANCE,
snapshot2, env);
+
+ ValueState<TestPojo> restoreState =
+ restoreBackend.getPartitionedState(
+ VoidNamespace.INSTANCE,
+
VoidNamespaceSerializer.INSTANCE,
+ new
ValueStateDescriptor<>("id", pojoType));
+
+ restoreBackend.setCurrentKey(1);
+ // state backends that lazily deserializes
(such as RocksDB)
+ // will fail here
+ restoreState.value();
+ } finally {
+ if (restoreBackend != null) {
+ restoreBackend.dispose();
+ }
+ }
+ })
+ .satisfiesAnyOf(
+ actual ->
+ assertThat(actual)
+
.isInstanceOf(ExpectedKryoTestException.class),
+ actual ->
+ assertThat(actual)
+ .hasFieldOrProperty("cause")
Review Comment:
Could you please clarify why should we check it?
In my opinion `Throwable` has `cause` field and this check will always pass
for anything extending `Throwable`
##########
flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java:
##########
@@ -996,7 +1004,7 @@ public void
testKryoRegisteringRestoreResilienceWithDefaultSerializer() throws E
*
* @throws Exception expects {@link ExpectedKryoTestException} to be
thrown.
*/
- @Test
+ @TestTemplate
public void testKryoRegisteringRestoreResilienceWithRegisteredSerializer()
throws Exception {
Review Comment:
```suggestion
void testKryoRegisteringRestoreResilienceWithRegisteredSerializer()
throws Exception {
```
could be package private
##########
flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java:
##########
@@ -570,14 +555,14 @@ public void
testBackendUsesRegisteredKryoDefaultSerializer() throws Exception {
}
}
- assertTrue("Didn't see the expected Kryo exception.",
numExceptions > 0);
+ assertThat(numExceptions > 0).as("Didn't see the expected Kryo
exception.").isTrue();
} finally {
IOUtils.closeQuietly(backend);
backend.dispose();
}
}
- @Test
+ @TestTemplate
@SuppressWarnings("unchecked")
public void
testBackendUsesRegisteredKryoDefaultSerializerUsingGetOrCreate() throws
Exception {
Review Comment:
```suggestion
void testBackendUsesRegisteredKryoDefaultSerializerUsingGetOrCreate()
throws Exception {
```
could be package private
##########
flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java:
##########
@@ -355,43 +340,41 @@ public void testKeyGroupedInternalPriorityQueue(boolean
addAll) throws Exception
if (addAll) {
priorityQueue.addAll(asList(elements));
} else {
- assertTrue(priorityQueue.add(elements[0]));
- assertTrue(priorityQueue.add(elements[1]));
- assertFalse(priorityQueue.add(elements[2]));
- assertFalse(priorityQueue.add(elements[3]));
- assertFalse(priorityQueue.add(elements[4]));
+ assertThat(priorityQueue.add(elements[0])).isTrue();
+ assertThat(priorityQueue.add(elements[1])).isTrue();
+ assertThat(priorityQueue.add(elements[2])).isFalse();
+ assertThat(priorityQueue.add(elements[3])).isFalse();
+ assertThat(priorityQueue.add(elements[4])).isFalse();
}
- assertFalse(priorityQueue.isEmpty());
- assertThat(
- priorityQueue.getSubsetForKeyGroup(1),
- containsInAnyOrder(elementA42, elementA44));
- assertThat(
- priorityQueue.getSubsetForKeyGroup(8),
- containsInAnyOrder(elementB1, elementB3));
+ assertThat(priorityQueue.isEmpty()).isFalse();
+ assertThat(priorityQueue.getSubsetForKeyGroup(1))
+ .containsExactlyInAnyOrder(elementA42, elementA44);
+ assertThat(priorityQueue.getSubsetForKeyGroup(8))
+ .containsExactlyInAnyOrder(elementB1, elementB3);
- assertThat(priorityQueue.peek(), equalTo(elementB1));
- assertThat(priorityQueue.poll(), equalTo(elementB1));
- assertThat(priorityQueue.peek(), equalTo(elementB3));
+ assertThat(priorityQueue.peek()).isEqualTo(elementB1);
+ assertThat(priorityQueue.poll()).isEqualTo(elementB1);
+ assertThat(priorityQueue.peek()).isEqualTo(elementB3);
List<TestType> actualList = new ArrayList<>();
try (CloseableIterator<TestType> iterator =
priorityQueue.iterator()) {
iterator.forEachRemaining(actualList::add);
}
- assertThat(actualList, containsInAnyOrder(elementB3, elementA42,
elementA44));
+ assertThat(actualList).containsExactlyInAnyOrder(elementB3,
elementA42, elementA44);
- assertEquals(3, priorityQueue.size());
+ assertThat(priorityQueue.size()).isEqualTo(3);
- assertFalse(priorityQueue.remove(elementB1));
- assertTrue(priorityQueue.remove(elementB3));
- assertThat(priorityQueue.peek(), equalTo(elementA42));
+ assertThat(priorityQueue.remove(elementB1)).isFalse();
+ assertThat(priorityQueue.remove(elementB3)).isTrue();
+ assertThat(priorityQueue.peek()).isEqualTo(elementA42);
} finally {
IOUtils.closeQuietly(backend);
backend.dispose();
}
}
- @Test
+ @TestTemplate
public void testGetKeys() throws Exception {
Review Comment:
```suggestion
void testGetKeys() throws Exception {
```
could be package private
##########
flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java:
##########
@@ -303,34 +287,35 @@ public void testEnableStateLatencyTracking() throws
Exception {
? ((TestableKeyedStateBackend<Integer>)
keyedStateBackend)
.getDelegatedKeyedStateBackend(true)
: keyedStateBackend;
- Assert.assertTrue(
- ((AbstractKeyedStateBackend<Integer>) nested)
- .getLatencyTrackingStateConfig()
- .isEnabled());
+ assertThat(
+ ((AbstractKeyedStateBackend<Integer>) nested)
+ .getLatencyTrackingStateConfig()
+ .isEnabled())
+ .isTrue();
} finally {
IOUtils.closeQuietly(keyedStateBackend);
keyedStateBackend.dispose();
}
}
- @Test
+ @TestTemplate
public void testIsSafeToReuseState() throws Exception {
Review Comment:
```suggestion
void testIsSafeToReuseState() throws Exception {
```
could be package private
##########
flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java:
##########
@@ -506,7 +491,7 @@ public void testGetKeysAndNamespaces() throws Exception {
}
}
- @Test
+ @TestTemplate
@SuppressWarnings("unchecked")
public void testBackendUsesRegisteredKryoDefaultSerializer() throws
Exception {
Review Comment:
```suggestion
void testBackendUsesRegisteredKryoDefaultSerializer() throws Exception {
```
could be package private
##########
flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java:
##########
@@ -303,34 +287,35 @@ public void testEnableStateLatencyTracking() throws
Exception {
? ((TestableKeyedStateBackend<Integer>)
keyedStateBackend)
.getDelegatedKeyedStateBackend(true)
: keyedStateBackend;
- Assert.assertTrue(
- ((AbstractKeyedStateBackend<Integer>) nested)
- .getLatencyTrackingStateConfig()
- .isEnabled());
+ assertThat(
+ ((AbstractKeyedStateBackend<Integer>) nested)
+ .getLatencyTrackingStateConfig()
+ .isEnabled())
+ .isTrue();
} finally {
IOUtils.closeQuietly(keyedStateBackend);
keyedStateBackend.dispose();
}
}
- @Test
+ @TestTemplate
public void testIsSafeToReuseState() throws Exception {
CheckpointableKeyedStateBackend<Integer> backend =
createKeyedBackend(IntSerializer.INSTANCE);
try {
- Assert.assertEquals(isSafeToReuseKVState(),
backend.isSafeToReuseKVState());
+
assertThat(backend.isSafeToReuseKVState()).isEqualTo(isSafeToReuseKVState());
} finally {
IOUtils.closeQuietly(backend);
backend.dispose();
}
}
- @Test
+ @TestTemplate
public void testKeyGroupedInternalPriorityQueue() throws Exception {
testKeyGroupedInternalPriorityQueue(false);
}
- @Test
+ @TestTemplate
public void testKeyGroupedInternalPriorityQueueAddAll() throws Exception {
Review Comment:
```suggestion
void testKeyGroupedInternalPriorityQueueAddAll() throws Exception {
```
could be package private
##########
flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java:
##########
@@ -872,7 +860,7 @@ public void
testKryoRegisteringRestoreResilienceWithRegisteredType() throws Exce
*
* @throws Exception expects {@link ExpectedKryoTestException} to be
thrown.
*/
- @Test
+ @TestTemplate
@SuppressWarnings("unchecked")
public void testKryoRegisteringRestoreResilienceWithDefaultSerializer()
throws Exception {
Review Comment:
```suggestion
void testKryoRegisteringRestoreResilienceWithDefaultSerializer() throws
Exception {
```
could be package private
##########
flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java:
##########
@@ -152,16 +137,15 @@
* <p>NOTE: Please ensure to close and dispose any created keyed state backend
in tests.
*/
@SuppressWarnings("serial")
+@ExtendWith(ParameterizedTestExtension.class)
public abstract class StateBackendTestBase<B extends AbstractStateBackend>
extends TestLogger {
Review Comment:
`TestLogger` is a JUnit4 rule/extension
Here should be used
`META-INF/services/org.junit.jupiter.api.extension.Extension`
##########
flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java:
##########
@@ -959,24 +947,44 @@ public void
testKryoRegisteringRestoreResilienceWithDefaultSerializer() throws E
// on the second restore, since the custom serializer will be used
for
// deserialization, we expect the deliberate failure to be thrown
- expectedException.expect(
- anyOf(
- isA(ExpectedKryoTestException.class),
- Matchers.<Throwable>hasProperty(
- "cause",
isA(ExpectedKryoTestException.class))));
-
- // state backends that eagerly deserializes (such as the memory
state backend) will fail
- // here
- backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot2,
env);
-
- state =
- backend.getPartitionedState(
- VoidNamespace.INSTANCE,
VoidNamespaceSerializer.INSTANCE, kvId);
-
- backend.setCurrentKey(1);
- // state backends that lazily deserializes (such as RocksDB) will
fail here
- state.value();
-
+ assertThatThrownBy(
+ () -> {
+ // state backends that eagerly deserializes
(such as the memory
+ // state backend) will fail
+ // here
+ CheckpointableKeyedStateBackend<Integer>
restoreBackend = null;
+ try {
+ restoreBackend =
+ restoreKeyedBackend(
+ IntSerializer.INSTANCE,
snapshot2, env);
+
+ ValueState<TestPojo> restoreState =
+ restoreBackend.getPartitionedState(
+ VoidNamespace.INSTANCE,
+
VoidNamespaceSerializer.INSTANCE,
+ new
ValueStateDescriptor<>("id", pojoType));
+
+ restoreBackend.setCurrentKey(1);
+ // state backends that lazily deserializes
(such as RocksDB)
+ // will fail here
+ restoreState.value();
+
+ restoreBackend.dispose();
+ } finally {
+ if (restoreBackend != null) {
+ IOUtils.closeQuietly(restoreBackend);
+ restoreBackend.dispose();
+ }
+ }
+ })
+ .satisfiesAnyOf(
+ actual ->
+ assertThat(actual)
+
.isInstanceOf(ExpectedKryoTestException.class),
+ actual ->
+ assertThat(actual)
+ .hasFieldOrProperty("cause")
Review Comment:
Could you please clarify why should we check it?
In my opinion `Throwable` has `cause` field and this check will always pass
for anything extending `Throwable`
##########
flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java:
##########
@@ -711,14 +697,14 @@ public void testBackendUsesRegisteredKryoSerializer()
throws Exception {
}
}
- assertTrue("Didn't see the expected Kryo exception.",
numExceptions > 0);
+ assertThat(numExceptions > 0).as("Didn't see the expected Kryo
exception.").isTrue();
} finally {
IOUtils.closeQuietly(backend);
backend.dispose();
}
}
- @Test
+ @TestTemplate
@SuppressWarnings("unchecked")
public void testBackendUsesRegisteredKryoSerializerUsingGetOrCreate()
throws Exception {
Review Comment:
```suggestion
void testBackendUsesRegisteredKryoSerializerUsingGetOrCreate() throws
Exception {
```
could be package private
##########
flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java:
##########
@@ -796,15 +783,16 @@ public void
testBackendUsesRegisteredKryoSerializerUsingGetOrCreate() throws Exc
* <p>This test should not fail, because de- / serialization of the state
should not be
* performed with Kryo's default {@link
com.esotericsoftware.kryo.serializers.FieldSerializer}.
*/
- @Test
+ @TestTemplate
public void testKryoRegisteringRestoreResilienceWithRegisteredType()
throws Exception {
Review Comment:
```suggestion
void testKryoRegisteringRestoreResilienceWithRegisteredType() throws
Exception {
```
could be package private
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]