reswqa commented on code in PR #22457:
URL: https://github.com/apache/flink/pull/22457#discussion_r1187645751


##########
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateClientHandlerTest.java:
##########
@@ -93,28 +92,66 @@ public void testReadCallbacksAndBufferRecycling() throws 
Exception {
 
         // Verify callback
         channel.writeInbound(buf);
-        verify(callback, times(1)).onFailure(isA(RuntimeException.class));
+        assertThat(TestClientHandlerCallback.onFailureCnt).isEqualTo(1);
+        
assertThat(TestClientHandlerCallback.onFailureBody).isInstanceOf(RuntimeException.class);
 
         //
         // Unexpected messages
         //
         buf = channel.alloc().buffer(4).writeInt(1223823);
 
         // Verify callback
+        TestClientHandlerCallback.onFailureCnt = 0;
         channel.writeInbound(buf);
-        verify(callback, times(1)).onFailure(isA(IllegalStateException.class));
-        assertEquals("Buffer not recycled", 0, buf.refCnt());
+        assertThat(TestClientHandlerCallback.onFailureCnt).isEqualTo(1);
+        
assertThat(TestClientHandlerCallback.onFailureBody).isInstanceOf(RuntimeException.class);
+        assertThat(buf.refCnt()).isEqualTo(0).withFailMessage("Buffer not 
recycled");
 
         //
         // Exception caught
         //
+        TestClientHandlerCallback.onFailureCnt = 0;
         channel.pipeline().fireExceptionCaught(new RuntimeException("Expected 
test Exception"));
-        verify(callback, times(3)).onFailure(isA(RuntimeException.class));
+        assertThat(TestClientHandlerCallback.onFailureCnt).isEqualTo(1);
+        
assertThat(TestClientHandlerCallback.onFailureBody).isInstanceOf(RuntimeException.class);
 
         //
         // Channel inactive
         //
+        TestClientHandlerCallback.onFailureCnt = 0;
         channel.pipeline().fireChannelInactive();
-        verify(callback, 
times(1)).onFailure(isA(ClosedChannelException.class));
+        assertThat(TestClientHandlerCallback.onFailureCnt).isEqualTo(1);
+        
assertThat(TestClientHandlerCallback.onFailureBody).isInstanceOf(ClosedChannelException.class);
+    }
+
+    private static class TestClientHandlerCallback implements 
ClientHandlerCallback {
+        private static int onRequestCnt;

Review Comment:
   I tend not to use static. The reason is that we should write reusable test 
components as much as possible, If we have multiple test cases in the future 
that require `TestingClientHandlerCallback`, then static will become a hassle. 
Especially we cannot assume that they are always executed sequentially in a 
single thread.



##########
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java:
##########
@@ -48,31 +48,33 @@ public class NonHAQueryableStateFsBackendITCase extends 
AbstractQueryableStateTe
     private static final int QS_PROXY_PORT_RANGE_START = 9084;
     private static final int QS_SERVER_PORT_RANGE_START = 9089;
 
-    @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
-
-    @ClassRule
-    public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =

Review Comment:
   Maybe we can use `Order` annotation provided by `Junit5`, for example:
   
   ```
   class HAQueryableStateFsBackendITCase extends AbstractQueryableStateTestBase 
{
   
       private static final int NUM_JMS = 2;
       // NUM_TMS * NUM_SLOTS_PER_TM must match the parallelism of the 
pipelines so that
       // we always use all TaskManagers so that the JM oracle is always 
properly re-registered
       private static final int NUM_TMS = 2;
       private static final int NUM_SLOTS_PER_TM = 2;
   
       private static final int QS_PROXY_PORT_RANGE_START = 9064;
       private static final int QS_SERVER_PORT_RANGE_START = 9069;
   
       @TempDir
       @Order(1)
       public static Path tmpStateBackendDir;
   
       @TempDir
       @Order(2)
       public static Path tmpHaStoragePath;
   
       @RegisterExtension
       @Order(3)
       public static final AllCallbackWrapper<ZooKeeperExtension> ZK_RESOURCE =
               new AllCallbackWrapper<>(new ZooKeeperExtension());
   
       @RegisterExtension
       @Order(4)
       public static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
               new MiniClusterExtension(
                       () ->
                               new MiniClusterResourceConfiguration.Builder()
                                       .setConfiguration(getConfig())
                                       .setNumberTaskManagers(NUM_TMS)
                                       
.setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM)
                                       .build());
   
       @Override
       protected StateBackend createStateBackend() throws Exception {
           return new FsStateBackend(tmpStateBackendDir.toUri().toString());
       }
   
       @BeforeAll
       static void setup(@InjectClusterClient RestClusterClient<?> 
injectedClusterClient)
               throws Exception {
           client = new QueryableStateClient("localhost", 
QS_PROXY_PORT_RANGE_START);
   
           clusterClient = injectedClusterClient;
       }
   
       @AfterAll
       static void tearDown() throws Exception {
           client.shutdownAndWait();
       }
   
       private static Configuration getConfig() {
   
           Configuration config = new Configuration();
           
config.setBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER, 
true);
           config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, 
MemorySize.parse("4m"));
           config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS);
           config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 
NUM_TMS);
           config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 
NUM_SLOTS_PER_TM);
           config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 2);
           config.setInteger(QueryableStateOptions.PROXY_NETWORK_THREADS, 2);
           config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 2);
           config.setString(
                   QueryableStateOptions.PROXY_PORT_RANGE,
                   QS_PROXY_PORT_RANGE_START + "-" + (QS_PROXY_PORT_RANGE_START 
+ NUM_TMS));
           config.setString(
                   QueryableStateOptions.SERVER_PORT_RANGE,
                   QS_SERVER_PORT_RANGE_START + "-" + 
(QS_SERVER_PORT_RANGE_START + NUM_TMS));
           config.setBoolean(WebOptions.SUBMIT_ENABLE, false);
   
           config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
tmpHaStoragePath.toString());
   
           config.setString(
                   HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM,
                   ZK_RESOURCE.getCustomExtension().getConnectString());
           config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
   
           return config;
       }
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to