reswqa commented on code in PR #22457: URL: https://github.com/apache/flink/pull/22457#discussion_r1186655970
########## flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java: ########## @@ -99,24 +101,23 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLongArray; -import static org.hamcrest.CoreMatchers.containsString; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assertions.fail; /** Base class for queryable state integration tests with a configurable state backend. */ -public abstract class AbstractQueryableStateTestBase extends TestLogger { +@ExtendWith(TestLoggerExtension.class) Review Comment: ```suggestion ``` ########## flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java: ########## @@ -232,9 +233,9 @@ public Integer getKey(Tuple2<Integer, Long> value) { try { Tuple2<Integer, Long> res = response.get(); counts.set(key, res.f1); - assertEquals("Key mismatch", key, res.f0.intValue()); + assertThat(key).isEqualTo(res.f0.intValue()).withFailMessage("Key mismatch"); } catch (Exception e) { - Assert.fail(e.getMessage()); + fail(e.getMessage()); Review Comment: We should try our best to avoid using `fail`. ########## flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java: ########## @@ -492,9 +493,7 @@ public Integer getKey(Tuple2<Integer, Long> value) { jobStatusFuture = clusterClient.getJobStatus(closableJobGraph.getJobId()); } - assertEquals( - JobStatus.RUNNING, - jobStatusFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS)); + assertThat(jobStatusFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS)).isEqualTo(JobStatus.RUNNING); Review Comment: Is the logic of this test dependent on the deadline, or is it just providing a suitable timeout for `future#get`. If it is the former, this can be done using Asserj's assertion for the future. If it is the latter, please use `FlinkAssertions.assertThatFuture`. ########## flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java: ########## @@ -649,37 +635,41 @@ public Integer getKey(Tuple2<Integer, Long> value) { } }) .asQueryableState("hakuna", valueState); + assertThatThrownBy( Review Comment: We'd better narrow down the scope of `assertThatThrownBy`. ########## flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java: ########## @@ -762,7 +764,7 @@ public void testChunkedResponse() throws Exception { channel.writeInbound(serRequest); Object msg = readInboundBlocking(channel); - assertTrue("Not ChunkedByteBuf", msg instanceof ChunkedByteBuf); + assertTrue(msg instanceof ChunkedByteBuf, "Not ChunkedByteBuf"); Review Comment: ```suggestion assertThat(msg).isInstanceof(xxx); ``` ########## flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java: ########## @@ -55,41 +55,43 @@ import org.apache.flink.runtime.state.internal.InternalKvState; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.runtime.state.ttl.TtlTimeProvider; -import org.apache.flink.util.TestLogger; +import org.apache.flink.util.TestLoggerExtension; import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler; import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel; import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Ignore; -import org.junit.Test; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; import java.net.InetAddress; import java.util.Collections; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; Review Comment: Please using assertJ. ########## flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java: ########## @@ -66,19 +66,19 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; Review Comment: ```suggestion ``` ########## flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateClientHandlerTest.java: ########## @@ -93,28 +92,66 @@ public void testReadCallbacksAndBufferRecycling() throws Exception { // Verify callback channel.writeInbound(buf); - verify(callback, times(1)).onFailure(isA(RuntimeException.class)); + assertThat(TestClientHandlerCallback.onFailureCnt).isEqualTo(1); + assertThat(TestClientHandlerCallback.onFailureBody).isInstanceOf(RuntimeException.class); // // Unexpected messages // buf = channel.alloc().buffer(4).writeInt(1223823); // Verify callback + TestClientHandlerCallback.onFailureCnt = 0; channel.writeInbound(buf); - verify(callback, times(1)).onFailure(isA(IllegalStateException.class)); - assertEquals("Buffer not recycled", 0, buf.refCnt()); + assertThat(TestClientHandlerCallback.onFailureCnt).isEqualTo(1); + assertThat(TestClientHandlerCallback.onFailureBody).isInstanceOf(RuntimeException.class); + assertThat(buf.refCnt()).isEqualTo(0).withFailMessage("Buffer not recycled"); // // Exception caught // + TestClientHandlerCallback.onFailureCnt = 0; channel.pipeline().fireExceptionCaught(new RuntimeException("Expected test Exception")); - verify(callback, times(3)).onFailure(isA(RuntimeException.class)); + assertThat(TestClientHandlerCallback.onFailureCnt).isEqualTo(1); + assertThat(TestClientHandlerCallback.onFailureBody).isInstanceOf(RuntimeException.class); // // Channel inactive // + TestClientHandlerCallback.onFailureCnt = 0; channel.pipeline().fireChannelInactive(); - verify(callback, times(1)).onFailure(isA(ClosedChannelException.class)); + assertThat(TestClientHandlerCallback.onFailureCnt).isEqualTo(1); + assertThat(TestClientHandlerCallback.onFailureBody).isInstanceOf(ClosedChannelException.class); + } + + private static class TestClientHandlerCallback implements ClientHandlerCallback { + private static int onRequestCnt; Review Comment: Why those fields is static? ########## flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateClientHandlerTest.java: ########## @@ -20,32 +20,28 @@ import org.apache.flink.queryablestate.messages.KvStateInternalRequest; import org.apache.flink.queryablestate.messages.KvStateResponse; +import org.apache.flink.queryablestate.network.messages.MessageBody; import org.apache.flink.queryablestate.network.messages.MessageSerializer; import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel; -import org.junit.Test; +import org.junit.jupiter.api.Test; import java.nio.channels.ClosedChannelException; -import static org.junit.Assert.assertEquals; -import static org.mockito.ArgumentMatchers.isA; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; Review Comment: Maybe this is not the correct import for `asssertThat`? ########## flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java: ########## @@ -246,19 +247,20 @@ public Integer getKey(Tuple2<Integer, Long> value) { .get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); } - assertTrue("Not all keys are non-zero", allNonZero); + assertThat(allNonZero).isTrue().withFailMessage("Not all keys are non-zero"); // All should be non-zero for (int i = 0; i < numKeys; i++) { long count = counts.get(i); - assertTrue("Count at position " + i + " is " + count, count > 0); + assertThat(count).isGreaterThan(0).withFailMessage("Count at position " + i + " is " + count); } } } /** Tests that duplicate query registrations fail the job at the JobManager. */ - @Test(timeout = 60_000) - public void testDuplicateRegistrationFailsJob() throws Exception { + @Test + @Timeout(60_000) Review Comment: We should rely on global timeout of AZP, local timeout is unstable. ########## flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java: ########## @@ -48,31 +48,33 @@ public class NonHAQueryableStateFsBackendITCase extends AbstractQueryableStateTe private static final int QS_PROXY_PORT_RANGE_START = 9084; private static final int QS_SERVER_PORT_RANGE_START = 9089; - @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); - - @ClassRule - public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = Review Comment: Can it be replaced by `MiniClusterExtension`? ########## flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java: ########## @@ -508,19 +507,12 @@ public Integer getKey(Tuple2<Integer, Long> value) { try { unknownJobFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); - fail(); // by now the request must have failed. + fail("Should fail"); // by now the request must have failed. Review Comment: Maybe `assertThatThrowBy`? ########## flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java: ########## @@ -535,17 +527,11 @@ public Integer getKey(Tuple2<Integer, Long> value) { try { unknownQSName.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); - fail(); // by now the request must have failed. + fail("Should fail"); // by now the request must have failed. Review Comment: Maybe assertThatThrowBy? ########## flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImplTest.java: ########## @@ -23,27 +23,27 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.jobmaster.KvStateLocationOracle; import org.apache.flink.runtime.query.KvStateLocation; -import org.apache.flink.util.TestLogger; +import org.apache.flink.util.TestLoggerExtension; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; import java.net.InetAddress; import java.util.Collections; import java.util.concurrent.CompletableFuture; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.nullValue; -import static org.junit.Assert.assertThat; +import static org.assertj.core.api.Assertions.assertThat; /** Tests for the {@link KvStateClientProxyImpl}. */ -public class KvStateClientProxyImplTest extends TestLogger { +@ExtendWith(TestLoggerExtension.class) Review Comment: ```suggestion ``` If we introduce the extension file in `META-INF/services`, this can be removed. ########## flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateClientHandlerTest.java: ########## @@ -93,28 +92,66 @@ public void testReadCallbacksAndBufferRecycling() throws Exception { // Verify callback channel.writeInbound(buf); - verify(callback, times(1)).onFailure(isA(RuntimeException.class)); + assertThat(TestClientHandlerCallback.onFailureCnt).isEqualTo(1); + assertThat(TestClientHandlerCallback.onFailureBody).isInstanceOf(RuntimeException.class); // // Unexpected messages // buf = channel.alloc().buffer(4).writeInt(1223823); // Verify callback + TestClientHandlerCallback.onFailureCnt = 0; channel.writeInbound(buf); - verify(callback, times(1)).onFailure(isA(IllegalStateException.class)); - assertEquals("Buffer not recycled", 0, buf.refCnt()); + assertThat(TestClientHandlerCallback.onFailureCnt).isEqualTo(1); + assertThat(TestClientHandlerCallback.onFailureBody).isInstanceOf(RuntimeException.class); + assertThat(buf.refCnt()).isEqualTo(0).withFailMessage("Buffer not recycled"); // // Exception caught // + TestClientHandlerCallback.onFailureCnt = 0; channel.pipeline().fireExceptionCaught(new RuntimeException("Expected test Exception")); - verify(callback, times(3)).onFailure(isA(RuntimeException.class)); + assertThat(TestClientHandlerCallback.onFailureCnt).isEqualTo(1); + assertThat(TestClientHandlerCallback.onFailureBody).isInstanceOf(RuntimeException.class); // // Channel inactive // + TestClientHandlerCallback.onFailureCnt = 0; channel.pipeline().fireChannelInactive(); - verify(callback, times(1)).onFailure(isA(ClosedChannelException.class)); + assertThat(TestClientHandlerCallback.onFailureCnt).isEqualTo(1); + assertThat(TestClientHandlerCallback.onFailureBody).isInstanceOf(ClosedChannelException.class); + } + + private static class TestClientHandlerCallback implements ClientHandlerCallback { Review Comment: ```suggestion private static class TestingClientHandlerCallback implements ClientHandlerCallback { ``` ########## flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java: ########## @@ -225,46 +257,54 @@ public static void testListSerialization( List<Long> actualValues = KvStateSerializer.deserializeList(serializedValues, valueSerializer); - assertEquals(expectedValues, actualValues); + assertThat(actualValues).isEqualTo(expectedValues); // Single value long expectedValue = ThreadLocalRandom.current().nextLong(); byte[] serializedValue = KvStateSerializer.serializeValue(expectedValue, valueSerializer); List<Long> actualValue = KvStateSerializer.deserializeList(serializedValue, valueSerializer); - assertEquals(1, actualValue.size()); - assertEquals(expectedValue, actualValue.get(0).longValue()); + assertThat(actualValue).containsExactly(expectedValue); } /** Tests list deserialization with too few bytes. */ @Test - public void testDeserializeListEmpty() throws Exception { + void testDeserializeListEmpty() throws Exception { List<Long> actualValue = KvStateSerializer.deserializeList(new byte[] {}, LongSerializer.INSTANCE); - assertEquals(0, actualValue.size()); + assertThat(actualValue).hasSize(0); Review Comment: ```suggestion assertThat(actualValue).isEmpty(); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org