reswqa commented on code in PR #22471:
URL: https://github.com/apache/flink/pull/22471#discussion_r1174804263


##########
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java:
##########
@@ -721,29 +740,34 @@ public void testClientServerIntegration() throws 
Throwable {
             } catch (Exception e) {
                 e.printStackTrace();
             }
-            Assert.assertTrue(client.isEventGroupShutdown());
-
-            final CombinableMatcher<Throwable> exceptionMatcher =
-                    
either(FlinkMatchers.containsCause(ClosedChannelException.class))
-                            
.or(FlinkMatchers.containsCause(IllegalStateException.class));
+            assertThat(client.isEventGroupShutdown()).isTrue();
 
             for (Future<Void> future : taskFutures) {
                 try {
                     future.get();
-                    fail("Did not throw expected Exception after shut down");
-                } catch (ExecutionException t) {
-                    assertThat(t, exceptionMatcher);
+                } catch (Throwable throwable) {
+                    assertThat(
+                                    ExceptionUtils.findThrowable(
+                                                            throwable, 
ClosedChannelException.class)
+                                                    .isPresent()
+                                            || ExceptionUtils.findThrowable(
+                                                            throwable, 
IllegalStateException.class)
+                                                    .isPresent())
+                            .isTrue();

Review Comment:
   ```
   FlinkAssertions.assertThatFuture(future)
                       .eventuallyFailsWith(ExecutionException.class)
                       .satisfiesAnyOf(
                               
FlinkAssertions.anyCauseMatches(ClosedChannelException.class),
                               
FlinkAssertions.anyCauseMatches(IllegalStateException.class));
   ```
   
   I haven't tested it, but I think this might work.



##########
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java:
##########
@@ -86,28 +83,24 @@
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicReference;
 
-import static org.hamcrest.CoreMatchers.either;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
+import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link Client}. */
-public class ClientTest extends TestLogger {
+class ClientTest {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(ClientTest.class);
 
     // Thread pool for client bootstrap (shared between tests)
     private NioEventLoopGroup nioGroup;
 
-    @Before
-    public void setUp() throws Exception {
+    @BeforeEach
+    public void setUp() {

Review Comment:
   ```suggestion
      void setUp() {
   ```



##########
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java:
##########
@@ -257,33 +251,56 @@ public void testRequestUnavailableHost() throws Exception 
{
         try {
             client = new Client<>("Test Client", 1, serializer, stats);
 
-            InetSocketAddress serverAddress = new 
InetSocketAddress(InetAddress.getLocalHost(), 0);
-
-            KvStateInternalRequest request =
-                    new KvStateInternalRequest(new KvStateID(), new byte[0]);
-            CompletableFuture<KvStateResponse> future = 
client.sendRequest(serverAddress, request);
+            CompletableFuture<KvStateResponse> future;
+            Random random = new Random();
+            int numRetry = 0;
+            while (true) {
+                try {
+                    InetSocketAddress serverAddress =
+                            new InetSocketAddress(
+                                    InetAddress.getLocalHost(), 
random.nextInt(30000));
+
+                    KvStateInternalRequest request =
+                            new KvStateInternalRequest(new KvStateID(), new 
byte[0]);
+                    future = client.sendRequest(serverAddress, request);
+                    break;
+                } catch (Exception e) {
+                    if (ExceptionUtils.findThrowable(e, 
BindException.class).isPresent()
+                            && numRetry <= numTryCreateSocketAddress) {
+                        numRetry++;
+                    } else {
+                        throw e;
+                    }
+                }
+            }
 
-            assertThat(
-                    future,
-                    FlinkMatchers.futureWillCompleteExceptionally(
-                            ConnectException.class, Duration.ofHours(1)));
+            assertThat(future).isNotNull();
+            future.whenComplete(
+                    (result, exception) -> {
+                        assertThat(result).isNull();
+                        assertThat(
+                                        ExceptionUtils.findThrowable(
+                                                        exception, 
ConnectException.class)
+                                                .isPresent())
+                                .isTrue();
+                    });

Review Comment:
   This can be simply replaced by `CompletableFutureAssertion` of AssertJ.



##########
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java:
##########
@@ -248,12 +251,30 @@ void testRequestUnavailableHost() throws Exception {
         try {
             client = new Client<>("Test Client", 1, serializer, stats);
 
-            InetSocketAddress serverAddress = new 
InetSocketAddress(InetAddress.getLocalHost(), 0);
-
-            KvStateInternalRequest request =
-                    new KvStateInternalRequest(new KvStateID(), new byte[0]);
-            CompletableFuture<KvStateResponse> future = 
client.sendRequest(serverAddress, request);
+            CompletableFuture<KvStateResponse> future;
+            Random random = new Random();
+            int numRetry = 0;
+            while (true) {
+                try {
+                    InetSocketAddress serverAddress =
+                            new InetSocketAddress(
+                                    InetAddress.getLocalHost(), 
random.nextInt(30000));
+
+                    KvStateInternalRequest request =
+                            new KvStateInternalRequest(new KvStateID(), new 
byte[0]);
+                    future = client.sendRequest(serverAddress, request);
+                    break;
+                } catch (Exception e) {
+                    if (ExceptionUtils.findThrowable(e, 
BindException.class).isPresent()
+                            && numRetry <= numTryCreateSocketAddress) {
+                        numRetry++;
+                    } else {
+                        throw e;
+                    }
+                }
+            }

Review Comment:
   Actually, flink has NetUtils to find a free port 
   
   `int port = NetUtils.getAvailablePort().getPort();` 



##########
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java:
##########
@@ -86,28 +83,24 @@
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicReference;
 
-import static org.hamcrest.CoreMatchers.either;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
+import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link Client}. */
-public class ClientTest extends TestLogger {
+class ClientTest {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(ClientTest.class);
 
     // Thread pool for client bootstrap (shared between tests)
     private NioEventLoopGroup nioGroup;
 
-    @Before
-    public void setUp() throws Exception {
+    @BeforeEach
+    public void setUp() {
         nioGroup = new NioEventLoopGroup();
     }
 
-    @After
-    public void tearDown() throws Exception {
+    @AfterEach
+    public void tearDown() {

Review Comment:
   ```suggestion
       void tearDown() {
   ```



##########
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java:
##########
@@ -506,49 +525,49 @@ public void testServerClosesChannel() throws Exception {
             // Requests
             KvStateInternalRequest request =
                     new KvStateInternalRequest(new KvStateID(), new byte[0]);
-            Future<KvStateResponse> future = client.sendRequest(serverAddress, 
request);
+            CompletableFuture<KvStateResponse> future = 
client.sendRequest(serverAddress, request);
 
             received.take();
 
-            assertEquals(1, stats.getNumConnections());
+            assertThat(stats.getNumConnections()).isEqualTo(1);
 
             channel.get().close().await();
 
             try {
+                assertThatFuture(future).eventuallyFails();
                 future.get();
-                fail("Did not throw expected server failure");
             } catch (ExecutionException e) {
-                if (!(e.getCause() instanceof ClosedChannelException)) {
-                    fail("Did not throw expected Exception");
-                }
+                assertThat(e.getCause())
+                        .withFailMessage("Did not throw expected Exception")
+                        .isInstanceOf(ClosedChannelException.class);

Review Comment:
   ditto.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to