HuangZhenQiu commented on a change in pull request #12746:
URL: https://github.com/apache/flink/pull/12746#discussion_r443865133
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
##########
@@ -39,94 +41,145 @@
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
-@Ignore
+/**
+ * {@link PartitionRequestClientFactory} test.
+ */
public class PartitionRequestClientFactoryTest {
- private final static int SERVER_PORT = NetUtils.getAvailablePort();
+ private static final int SERVER_PORT = NetUtils.getAvailablePort();
@Test
- public void testResourceReleaseAfterInterruptedConnect() throws
Exception {
+ public void testNettyClientConnectRetry() throws Exception {
+ NettyTestUtil.NettyServerAndClient serverAndClient =
createNettyServerAndClient();
+ UnstableNettyClient unstableNettyClient = new
UnstableNettyClient(serverAndClient.client(), 2);
- // Latch to synchronize on the connect call.
- final CountDownLatch syncOnConnect = new CountDownLatch(1);
+ PartitionRequestClientFactory factory = new
PartitionRequestClientFactory(unstableNettyClient, 2);
+ ConnectionID serverAddress = new ConnectionID(new
InetSocketAddress(InetAddress.getLocalHost(),
+ serverAndClient.server().getConfig().getServerPort()),
0);
- final Tuple2<NettyServer, NettyClient> netty =
createNettyServerAndClient(
- new NettyProtocol(null, null) {
+ factory.createPartitionRequestClient(serverAddress);
- @Override
- public ChannelHandler[]
getServerChannelHandlers() {
- return new ChannelHandler[0];
- }
+ serverAndClient.client().shutdown();
+ serverAndClient.server().shutdown();
+ }
- @Override
- public ChannelHandler[]
getClientChannelHandlers() {
- return new ChannelHandler[] {
- new
CountDownLatchOnConnectHandler(syncOnConnect)};
- }
- });
+ @Test(expected = CompletionException.class)
+ public void testNettyClientConnectRetryFailure() throws Exception {
+ NettyTestUtil.NettyServerAndClient serverAndClient =
createNettyServerAndClient();
+ UnstableNettyClient unstableNettyClient = new
UnstableNettyClient(serverAndClient.client(), 3);
- final NettyServer server = netty.f0;
- final NettyClient client = netty.f1;
+ try {
+ PartitionRequestClientFactory factory = new
PartitionRequestClientFactory(unstableNettyClient, 2);
+ ConnectionID serverAddress = new ConnectionID(new
InetSocketAddress(InetAddress.getLocalHost(),
+
serverAndClient.server().getConfig().getServerPort()), 0);
- final UncaughtTestExceptionHandler exceptionHandler = new
UncaughtTestExceptionHandler();
+ factory.createPartitionRequestClient(serverAddress);
- try {
- final PartitionRequestClientFactory factory = new
PartitionRequestClientFactory(client);
+ } catch (Exception e) {
+ throw e;
+ } finally {
+ serverAndClient.client().shutdown();
+ serverAndClient.server().shutdown();
+ }
+ }
- final Thread connect = new Thread(new Runnable() {
- @Override
- public void run() {
- ConnectionID serverAddress = null;
+ @Test
+ public void testNettyClientConnectRetryMultipleThread() throws
Exception {
+ NettyTestUtil.NettyServerAndClient serverAndClient =
createNettyServerAndClient();
+ UnstableNettyClient unstableNettyClient = new
UnstableNettyClient(serverAndClient.client(), 2);
- try {
- serverAddress =
createServerConnectionID(0);
+ PartitionRequestClientFactory factory = new
PartitionRequestClientFactory(unstableNettyClient, 2);
+ ConnectionID serverAddress = new ConnectionID(new
InetSocketAddress(InetAddress.getLocalHost(),
+ serverAndClient.server().getConfig().getServerPort()),
0);
- // This triggers a connect
-
factory.createPartitionRequestClient(serverAddress);
- }
- catch (Throwable t) {
-
- if (serverAddress != null) {
-
factory.closeOpenChannelConnections(serverAddress);
-
Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(),
t);
- } else {
- t.printStackTrace();
- fail("Could not create
RemoteAddress for server.");
- }
+ ExecutorService threadPoolExecutor =
Executors.newFixedThreadPool(10);
+ List<Future<NettyPartitionRequestClient>> futures = new
ArrayList<>();
+
+ for (int i = 0; i < 10; i++) {
+ Future<NettyPartitionRequestClient> future =
threadPoolExecutor.submit(new Callable<NettyPartitionRequestClient>() {
+ @Override
+ public NettyPartitionRequestClient call() {
+ NettyPartitionRequestClient client =
null;
+ try {
+ client =
factory.createPartitionRequestClient(serverAddress);
+ } catch (Exception e) {
+ // catch exception
+
System.out.println(e.getMessage());
+ fail();
}
+ return client;
}
});
- connect.setUncaughtExceptionHandler(exceptionHandler);
+ futures.add(future);
+ }
- connect.start();
+ futures.forEach(runnableFuture -> {
+ NettyPartitionRequestClient client = null;
+ try {
+ client = runnableFuture.get();
+ System.out.println("Result = " + client == null
? "null" : client.toString());
+ assertNotNull(client);
Review comment:
The line can be removed.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]