rkhachatryan commented on a change in pull request #12746:
URL: https://github.com/apache/flink/pull/12746#discussion_r444198510



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
##########
@@ -39,94 +41,145 @@
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
 
-@Ignore
+/**
+ * {@link PartitionRequestClientFactory} test.
+ */
 public class PartitionRequestClientFactoryTest {
 
-       private final static int SERVER_PORT = NetUtils.getAvailablePort();
+       private static final int SERVER_PORT = NetUtils.getAvailablePort();
 
        @Test
-       public void testResourceReleaseAfterInterruptedConnect() throws 
Exception {
+       public void testNettyClientConnectRetry() throws Exception {
+               NettyTestUtil.NettyServerAndClient serverAndClient = 
createNettyServerAndClient();
+               UnstableNettyClient unstableNettyClient = new 
UnstableNettyClient(serverAndClient.client(), 2);
 
-               // Latch to synchronize on the connect call.
-               final CountDownLatch syncOnConnect = new CountDownLatch(1);
+               PartitionRequestClientFactory factory = new 
PartitionRequestClientFactory(unstableNettyClient, 2);
+               ConnectionID serverAddress = new ConnectionID(new 
InetSocketAddress(InetAddress.getLocalHost(),
+                       serverAndClient.server().getConfig().getServerPort()), 
0);
 
-               final Tuple2<NettyServer, NettyClient> netty = 
createNettyServerAndClient(
-                               new NettyProtocol(null, null) {
+               factory.createPartitionRequestClient(serverAddress);
 
-                                       @Override
-                                       public ChannelHandler[] 
getServerChannelHandlers() {
-                                               return new ChannelHandler[0];
-                                       }
+               serverAndClient.client().shutdown();
+               serverAndClient.server().shutdown();
+       }
 
-                                       @Override
-                                       public ChannelHandler[] 
getClientChannelHandlers() {
-                                               return new ChannelHandler[] {
-                                                               new 
CountDownLatchOnConnectHandler(syncOnConnect)};
-                                       }
-                               });
+       @Test(expected = CompletionException.class)
+       public void testNettyClientConnectRetryFailure() throws Exception {
+               NettyTestUtil.NettyServerAndClient serverAndClient = 
createNettyServerAndClient();
+               UnstableNettyClient unstableNettyClient = new 
UnstableNettyClient(serverAndClient.client(), 3);
 
-               final NettyServer server = netty.f0;
-               final NettyClient client = netty.f1;
+               try {
+                       PartitionRequestClientFactory factory = new 
PartitionRequestClientFactory(unstableNettyClient, 2);
+                       ConnectionID serverAddress = new ConnectionID(new 
InetSocketAddress(InetAddress.getLocalHost(),
+                               
serverAndClient.server().getConfig().getServerPort()), 0);
 
-               final UncaughtTestExceptionHandler exceptionHandler = new 
UncaughtTestExceptionHandler();
+                       factory.createPartitionRequestClient(serverAddress);
 
-               try {
-                       final PartitionRequestClientFactory factory = new 
PartitionRequestClientFactory(client);
+               } catch (Exception e) {
+                       throw e;
+               } finally {
+                       serverAndClient.client().shutdown();
+                       serverAndClient.server().shutdown();
+               }
+       }
 
-                       final Thread connect = new Thread(new Runnable() {
-                               @Override
-                               public void run() {
-                                       ConnectionID serverAddress = null;
+       @Test
+       public void testNettyClientConnectRetryMultipleThread() throws 
Exception {
+               NettyTestUtil.NettyServerAndClient serverAndClient = 
createNettyServerAndClient();
+               UnstableNettyClient unstableNettyClient = new 
UnstableNettyClient(serverAndClient.client(), 2);
 
-                                       try {
-                                               serverAddress = 
createServerConnectionID(0);
+               PartitionRequestClientFactory factory = new 
PartitionRequestClientFactory(unstableNettyClient, 2);
+               ConnectionID serverAddress = new ConnectionID(new 
InetSocketAddress(InetAddress.getLocalHost(),
+                       serverAndClient.server().getConfig().getServerPort()), 
0);
 
-                                               // This triggers a connect
-                                               
factory.createPartitionRequestClient(serverAddress);
-                                       }
-                                       catch (Throwable t) {
-
-                                               if (serverAddress != null) {
-                                                       
factory.closeOpenChannelConnections(serverAddress);
-                                                       
Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(),
 t);
-                                               } else {
-                                                       t.printStackTrace();
-                                                       fail("Could not create 
RemoteAddress for server.");
-                                               }
+               ExecutorService threadPoolExecutor = 
Executors.newFixedThreadPool(10);
+               List<Future<NettyPartitionRequestClient>> futures = new 
ArrayList<>();
+
+               for (int i = 0; i < 10; i++) {
+                       Future<NettyPartitionRequestClient> future = 
threadPoolExecutor.submit(new Callable<NettyPartitionRequestClient>() {
+                               @Override
+                               public NettyPartitionRequestClient call() {
+                                       NettyPartitionRequestClient client = 
null;
+                                       try {
+                                               client = 
factory.createPartitionRequestClient(serverAddress);
+                                       } catch (Exception e) {
+                                               // catch exception
+                                               
System.out.println(e.getMessage());
+                                               fail();
                                        }
+                                       return client;
                                }
                        });
 
-                       connect.setUncaughtExceptionHandler(exceptionHandler);
+                       futures.add(future);
+               }
 
-                       connect.start();
+               futures.forEach(runnableFuture -> {
+                       NettyPartitionRequestClient client = null;
+                       try {
+                               client = runnableFuture.get();
+                               System.out.println("Result = " + client == null 
? "null" : client.toString());
+                               assertNotNull(client);

Review comment:
       Removed `println`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to