reswqa commented on code in PR #22457:
URL: https://github.com/apache/flink/pull/22457#discussion_r1186655970


##########
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java:
##########
@@ -99,24 +101,23 @@
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicLongArray;
 
-import static org.hamcrest.CoreMatchers.containsString;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assertions.fail;
 
 /** Base class for queryable state integration tests with a configurable state 
backend. */
-public abstract class AbstractQueryableStateTestBase extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)

Review Comment:
   ```suggestion
   
   ```



##########
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java:
##########
@@ -232,9 +233,9 @@ public Integer getKey(Tuple2<Integer, Long> value) {
                                 try {
                                     Tuple2<Integer, Long> res = response.get();
                                     counts.set(key, res.f1);
-                                    assertEquals("Key mismatch", key, 
res.f0.intValue());
+                                    
assertThat(key).isEqualTo(res.f0.intValue()).withFailMessage("Key mismatch");
                                 } catch (Exception e) {
-                                    Assert.fail(e.getMessage());
+                                    fail(e.getMessage());

Review Comment:
   We should try our best to avoid using `fail`.



##########
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java:
##########
@@ -492,9 +493,7 @@ public Integer getKey(Tuple2<Integer, Long> value) {
                 jobStatusFuture = 
clusterClient.getJobStatus(closableJobGraph.getJobId());
             }
 
-            assertEquals(
-                    JobStatus.RUNNING,
-                    jobStatusFuture.get(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS));
+            assertThat(jobStatusFuture.get(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS)).isEqualTo(JobStatus.RUNNING);

Review Comment:
   Is the logic of this test dependent on the deadline, or is it just providing 
a suitable timeout for `future#get`. If it is the former, this can be done 
using Asserj's assertion for the future. If it is the latter, please use 
`FlinkAssertions.assertThatFuture`.



##########
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java:
##########
@@ -649,37 +635,41 @@ public Integer getKey(Tuple2<Integer, Long> value) {
                                     }
                                 })
                         .asQueryableState("hakuna", valueState);
+        assertThatThrownBy(

Review Comment:
   We'd better narrow down the scope of `assertThatThrownBy`.



##########
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java:
##########
@@ -762,7 +764,7 @@ public void testChunkedResponse() throws Exception {
         channel.writeInbound(serRequest);
 
         Object msg = readInboundBlocking(channel);
-        assertTrue("Not ChunkedByteBuf", msg instanceof ChunkedByteBuf);
+        assertTrue(msg instanceof ChunkedByteBuf, "Not ChunkedByteBuf");

Review Comment:
   ```suggestion
           assertThat(msg).isInstanceof(xxx);
   ```



##########
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java:
##########
@@ -55,41 +55,43 @@
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestLoggerExtension;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
 import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
 import 
org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.net.InetAddress;
 import java.util.Collections;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 

Review Comment:
   Please using assertJ.



##########
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java:
##########
@@ -66,19 +66,19 @@
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;

Review Comment:
   ```suggestion
   ```



##########
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateClientHandlerTest.java:
##########
@@ -93,28 +92,66 @@ public void testReadCallbacksAndBufferRecycling() throws 
Exception {
 
         // Verify callback
         channel.writeInbound(buf);
-        verify(callback, times(1)).onFailure(isA(RuntimeException.class));
+        assertThat(TestClientHandlerCallback.onFailureCnt).isEqualTo(1);
+        
assertThat(TestClientHandlerCallback.onFailureBody).isInstanceOf(RuntimeException.class);
 
         //
         // Unexpected messages
         //
         buf = channel.alloc().buffer(4).writeInt(1223823);
 
         // Verify callback
+        TestClientHandlerCallback.onFailureCnt = 0;
         channel.writeInbound(buf);
-        verify(callback, times(1)).onFailure(isA(IllegalStateException.class));
-        assertEquals("Buffer not recycled", 0, buf.refCnt());
+        assertThat(TestClientHandlerCallback.onFailureCnt).isEqualTo(1);
+        
assertThat(TestClientHandlerCallback.onFailureBody).isInstanceOf(RuntimeException.class);
+        assertThat(buf.refCnt()).isEqualTo(0).withFailMessage("Buffer not 
recycled");
 
         //
         // Exception caught
         //
+        TestClientHandlerCallback.onFailureCnt = 0;
         channel.pipeline().fireExceptionCaught(new RuntimeException("Expected 
test Exception"));
-        verify(callback, times(3)).onFailure(isA(RuntimeException.class));
+        assertThat(TestClientHandlerCallback.onFailureCnt).isEqualTo(1);
+        
assertThat(TestClientHandlerCallback.onFailureBody).isInstanceOf(RuntimeException.class);
 
         //
         // Channel inactive
         //
+        TestClientHandlerCallback.onFailureCnt = 0;
         channel.pipeline().fireChannelInactive();
-        verify(callback, 
times(1)).onFailure(isA(ClosedChannelException.class));
+        assertThat(TestClientHandlerCallback.onFailureCnt).isEqualTo(1);
+        
assertThat(TestClientHandlerCallback.onFailureBody).isInstanceOf(ClosedChannelException.class);
+    }
+
+    private static class TestClientHandlerCallback implements 
ClientHandlerCallback {
+        private static int onRequestCnt;

Review Comment:
   Why those fields is static?



##########
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateClientHandlerTest.java:
##########
@@ -20,32 +20,28 @@
 
 import org.apache.flink.queryablestate.messages.KvStateInternalRequest;
 import org.apache.flink.queryablestate.messages.KvStateResponse;
+import org.apache.flink.queryablestate.network.messages.MessageBody;
 import org.apache.flink.queryablestate.network.messages.MessageSerializer;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
 import 
org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.nio.channels.ClosedChannelException;
 
-import static org.junit.Assert.assertEquals;
-import static org.mockito.ArgumentMatchers.isA;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;

Review Comment:
   Maybe this is not the correct import for `asssertThat`?



##########
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java:
##########
@@ -246,19 +247,20 @@ public Integer getKey(Tuple2<Integer, Long> value) {
                         .get(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS);
             }
 
-            assertTrue("Not all keys are non-zero", allNonZero);
+            assertThat(allNonZero).isTrue().withFailMessage("Not all keys are 
non-zero");
 
             // All should be non-zero
             for (int i = 0; i < numKeys; i++) {
                 long count = counts.get(i);
-                assertTrue("Count at position " + i + " is " + count, count > 
0);
+                assertThat(count).isGreaterThan(0).withFailMessage("Count at 
position " + i + " is " + count);
             }
         }
     }
 
     /** Tests that duplicate query registrations fail the job at the 
JobManager. */
-    @Test(timeout = 60_000)
-    public void testDuplicateRegistrationFailsJob() throws Exception {
+    @Test
+    @Timeout(60_000)

Review Comment:
   We should rely on global timeout of AZP, local timeout is unstable.



##########
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java:
##########
@@ -48,31 +48,33 @@ public class NonHAQueryableStateFsBackendITCase extends 
AbstractQueryableStateTe
     private static final int QS_PROXY_PORT_RANGE_START = 9084;
     private static final int QS_SERVER_PORT_RANGE_START = 9089;
 
-    @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
-
-    @ClassRule
-    public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =

Review Comment:
   Can it be replaced by `MiniClusterExtension`?



##########
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java:
##########
@@ -508,19 +507,12 @@ public Integer getKey(Tuple2<Integer, Long> value) {
 
             try {
                 unknownJobFuture.get(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS);
-                fail(); // by now the request must have failed.
+                fail("Should fail"); // by now the request must have failed.

Review Comment:
   Maybe `assertThatThrowBy`?



##########
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java:
##########
@@ -535,17 +527,11 @@ public Integer getKey(Tuple2<Integer, Long> value) {
 
             try {
                 unknownQSName.get(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS);
-                fail(); // by now the request must have failed.
+                fail("Should fail"); // by now the request must have failed.

Review Comment:
   Maybe assertThatThrowBy?



##########
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImplTest.java:
##########
@@ -23,27 +23,27 @@
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobmaster.KvStateLocationOracle;
 import org.apache.flink.runtime.query.KvStateLocation;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestLoggerExtension;
 
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.net.InetAddress;
 import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
 
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.nullValue;
-import static org.junit.Assert.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for the {@link KvStateClientProxyImpl}. */
-public class KvStateClientProxyImplTest extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)

Review Comment:
   ```suggestion
   
   ```
   If we introduce the extension file in `META-INF/services`, this can be 
removed.



##########
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateClientHandlerTest.java:
##########
@@ -93,28 +92,66 @@ public void testReadCallbacksAndBufferRecycling() throws 
Exception {
 
         // Verify callback
         channel.writeInbound(buf);
-        verify(callback, times(1)).onFailure(isA(RuntimeException.class));
+        assertThat(TestClientHandlerCallback.onFailureCnt).isEqualTo(1);
+        
assertThat(TestClientHandlerCallback.onFailureBody).isInstanceOf(RuntimeException.class);
 
         //
         // Unexpected messages
         //
         buf = channel.alloc().buffer(4).writeInt(1223823);
 
         // Verify callback
+        TestClientHandlerCallback.onFailureCnt = 0;
         channel.writeInbound(buf);
-        verify(callback, times(1)).onFailure(isA(IllegalStateException.class));
-        assertEquals("Buffer not recycled", 0, buf.refCnt());
+        assertThat(TestClientHandlerCallback.onFailureCnt).isEqualTo(1);
+        
assertThat(TestClientHandlerCallback.onFailureBody).isInstanceOf(RuntimeException.class);
+        assertThat(buf.refCnt()).isEqualTo(0).withFailMessage("Buffer not 
recycled");
 
         //
         // Exception caught
         //
+        TestClientHandlerCallback.onFailureCnt = 0;
         channel.pipeline().fireExceptionCaught(new RuntimeException("Expected 
test Exception"));
-        verify(callback, times(3)).onFailure(isA(RuntimeException.class));
+        assertThat(TestClientHandlerCallback.onFailureCnt).isEqualTo(1);
+        
assertThat(TestClientHandlerCallback.onFailureBody).isInstanceOf(RuntimeException.class);
 
         //
         // Channel inactive
         //
+        TestClientHandlerCallback.onFailureCnt = 0;
         channel.pipeline().fireChannelInactive();
-        verify(callback, 
times(1)).onFailure(isA(ClosedChannelException.class));
+        assertThat(TestClientHandlerCallback.onFailureCnt).isEqualTo(1);
+        
assertThat(TestClientHandlerCallback.onFailureBody).isInstanceOf(ClosedChannelException.class);
+    }
+
+    private static class TestClientHandlerCallback implements 
ClientHandlerCallback {

Review Comment:
   ```suggestion
       private static class TestingClientHandlerCallback implements 
ClientHandlerCallback {
   ```



##########
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java:
##########
@@ -225,46 +257,54 @@ public static void testListSerialization(
 
         List<Long> actualValues =
                 KvStateSerializer.deserializeList(serializedValues, 
valueSerializer);
-        assertEquals(expectedValues, actualValues);
+        assertThat(actualValues).isEqualTo(expectedValues);
 
         // Single value
         long expectedValue = ThreadLocalRandom.current().nextLong();
         byte[] serializedValue = 
KvStateSerializer.serializeValue(expectedValue, valueSerializer);
         List<Long> actualValue =
                 KvStateSerializer.deserializeList(serializedValue, 
valueSerializer);
-        assertEquals(1, actualValue.size());
-        assertEquals(expectedValue, actualValue.get(0).longValue());
+        assertThat(actualValue).containsExactly(expectedValue);
     }
 
     /** Tests list deserialization with too few bytes. */
     @Test
-    public void testDeserializeListEmpty() throws Exception {
+    void testDeserializeListEmpty() throws Exception {
         List<Long> actualValue =
                 KvStateSerializer.deserializeList(new byte[] {}, 
LongSerializer.INSTANCE);
-        assertEquals(0, actualValue.size());
+        assertThat(actualValue).hasSize(0);

Review Comment:
   ```suggestion
           assertThat(actualValue).isEmpty();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to