RyanSkraba commented on a change in pull request #18848:
URL: https://github.com/apache/flink/pull/18848#discussion_r828298799
##########
File path:
flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
##########
@@ -65,21 +60,20 @@ public static void setup() throws Exception {
.createAndStart();
}
- @AfterClass
- public static void teardown()
+ @AfterAll
+ private static void teardown()
throws InterruptedException, ExecutionException, TimeoutException {
final Collection<CompletableFuture<?>> terminationFutures = new
ArrayList<>(2);
terminationFutures.add(akkaRpcService1.stopService());
terminationFutures.add(akkaRpcService2.stopService());
- FutureUtils.waitForAll(terminationFutures)
- .get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+ FutureUtils.waitForAll(terminationFutures).get();
Review comment:
Any reason to drop the timeouts?
##########
File path:
flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcSerializedValueTest.java
##########
@@ -19,107 +19,84 @@
package org.apache.flink.runtime.rpc.akka;
import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.TestLogger;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.ArgumentsProvider;
+import org.junit.jupiter.params.provider.ArgumentsSource;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.time.Instant;
-import java.util.Set;
-import java.util.stream.Collectors;
import java.util.stream.Stream;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.greaterThan;
-import static org.hamcrest.Matchers.not;
-import static org.hamcrest.Matchers.notNullValue;
-import static org.hamcrest.Matchers.nullValue;
-import static org.junit.Assert.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
/** Tests for the {@link AkkaRpcSerializedValue}. */
-public class AkkaRpcSerializedValueTest extends TestLogger {
+class AkkaRpcSerializedValueTest {
@Test
- public void testNullValue() throws Exception {
+ void testNullValue() throws Exception {
AkkaRpcSerializedValue serializedValue =
AkkaRpcSerializedValue.valueOf(null);
- assertThat(serializedValue.getSerializedData(), nullValue());
- assertThat(serializedValue.getSerializedDataLength(), equalTo(0));
-
assertThat(serializedValue.deserializeValue(getClass().getClassLoader()),
nullValue());
+ assertThat(serializedValue.getSerializedData()).isNull();
+ assertThat(serializedValue.getSerializedDataLength()).isEqualTo(0);
+ assertThat((Object)
serializedValue.deserializeValue(getClass().getClassLoader())).isNull();
AkkaRpcSerializedValue otherSerializedValue =
AkkaRpcSerializedValue.valueOf(null);
- assertThat(otherSerializedValue, equalTo(serializedValue));
- assertThat(otherSerializedValue.hashCode(),
equalTo(serializedValue.hashCode()));
+ assertThat(otherSerializedValue).isEqualTo(serializedValue);
+
assertThat(otherSerializedValue.hashCode()).isEqualTo(serializedValue.hashCode());
AkkaRpcSerializedValue clonedSerializedValue =
InstantiationUtil.clone(serializedValue);
- assertThat(clonedSerializedValue.getSerializedData(), nullValue());
- assertThat(clonedSerializedValue.getSerializedDataLength(),
equalTo(0));
- assertThat(
-
clonedSerializedValue.deserializeValue(getClass().getClassLoader()),
nullValue());
- assertThat(clonedSerializedValue, equalTo(serializedValue));
- assertThat(clonedSerializedValue.hashCode(),
equalTo(serializedValue.hashCode()));
+ assertThat(clonedSerializedValue.getSerializedData()).isNull();
+
assertThat(clonedSerializedValue.getSerializedDataLength()).isEqualTo(0);
+ assertThat((Object)
clonedSerializedValue.deserializeValue(getClass().getClassLoader()))
+ .isNull();
+ assertThat(clonedSerializedValue).isEqualTo(serializedValue);
+
assertThat(clonedSerializedValue.hashCode()).isEqualTo(serializedValue.hashCode());
}
- @Test
- public void testNotNullValues() throws Exception {
- Set<Object> values =
- Stream.of(
- true,
- (byte) 5,
- (short) 6,
- 5,
- 5L,
- 5.5F,
- 6.5,
- 'c',
- "string",
- Instant.now(),
-
BigInteger.valueOf(Long.MAX_VALUE).add(BigInteger.TEN),
- BigDecimal.valueOf(Math.PI))
- .collect(Collectors.toSet());
-
- Object previousValue = null;
- AkkaRpcSerializedValue previousSerializedValue = null;
- for (Object value : values) {
- AkkaRpcSerializedValue serializedValue =
AkkaRpcSerializedValue.valueOf(value);
- assertThat(value.toString(), serializedValue.getSerializedData(),
notNullValue());
- assertThat(value.toString(),
serializedValue.getSerializedDataLength(), greaterThan(0));
- assertThat(
- value.toString(),
-
serializedValue.deserializeValue(getClass().getClassLoader()),
- equalTo(value));
-
- AkkaRpcSerializedValue otherSerializedValue =
AkkaRpcSerializedValue.valueOf(value);
- assertThat(value.toString(), otherSerializedValue,
equalTo(serializedValue));
- assertThat(
- value.toString(),
- otherSerializedValue.hashCode(),
- equalTo(serializedValue.hashCode()));
+ private static class SerializationArgumentsProvider implements
ArgumentsProvider {
+ @Override
+ public Stream<? extends Arguments> provideArguments(ExtensionContext
context)
+ throws Exception {
+ return Stream.of(
+ true,
+ (byte) 5,
+ (short) 6,
+ 5,
+ 5L,
+ 5.5F,
+ 6.5,
+ 'c',
+ "string",
+ Instant.now(),
+
BigInteger.valueOf(Long.MAX_VALUE).add(BigInteger.TEN),
+ BigDecimal.valueOf(Math.PI))
+ .map(Arguments::of);
+ }
+ }
- AkkaRpcSerializedValue clonedSerializedValue =
InstantiationUtil.clone(serializedValue);
- assertThat(
- value.toString(),
- clonedSerializedValue.getSerializedData(),
- equalTo(serializedValue.getSerializedData()));
- assertThat(
- value.toString(),
-
clonedSerializedValue.deserializeValue(getClass().getClassLoader()),
- equalTo(value));
- assertThat(value.toString(), clonedSerializedValue,
equalTo(serializedValue));
- assertThat(
- value.toString(),
- clonedSerializedValue.hashCode(),
- equalTo(serializedValue.hashCode()));
+ @ParameterizedTest
+ @ArgumentsSource(SerializationArgumentsProvider.class)
Review comment:
Nice refactoring -- this could be a `@MethodSource` if you wanted to
simplify a bit. (This could be just a method returning Stream<Object> without
wrapping in Arguments::of)
##########
File path:
flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
##########
@@ -134,7 +126,7 @@ public void testFailingAddressResolution() throws Exception
{
fail("The rpc connection resolution should have failed.");
} catch (ExecutionException exception) {
// we're expecting a RpcConnectionException
- assertTrue(exception.getCause() instanceof RpcConnectionException);
+ assertThat(exception.getCause() instanceof
RpcConnectionException).isTrue();
Review comment:
This try/catch could be rewritten with the CompletableFuture assertions:
```
assertThat(futureRpcGateway)
.failsWithin(timeout.getSize(), timeout.getUnit())
.withThrowableOfType(ExecutionException.class)
.withCauseInstanceOf(RpcConnectionException.class);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]