This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new bc272fa98f2 [Tests] Fix flaky test ConnectionTimeoutTest (#15865)
bc272fa98f2 is described below

commit bc272fa98f2b1bbb1a6e2babc867d94695ddec32
Author: Lari Hotari <[email protected]>
AuthorDate: Tue May 31 22:41:09 2022 +0300

    [Tests] Fix flaky test ConnectionTimeoutTest (#15865)
    
    - the original test assumed that 192.0.2.1 is a "black hole" and can be 
used for testing
      connection timeouts.
      This is an assumption which doesn't hold when a firewall blocks such 
connections.
    
    - create a dummy TCP/IP server and fill the backlog so that it won't 
respond and becomes
      a "black hole"
---
 .../pulsar/client/impl/ConnectionTimeoutTest.java  | 61 +++++++++++++++++-----
 1 file changed, 48 insertions(+), 13 deletions(-)

diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConnectionTimeoutTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConnectionTimeoutTest.java
index 3ed22c358b0..af79a4f645f 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConnectionTimeoutTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConnectionTimeoutTest.java
@@ -18,30 +18,65 @@
  */
 package org.apache.pulsar.client.impl;
 
+import io.netty.channel.ConnectTimeoutException;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import io.netty.channel.ConnectTimeoutException;
+import java.util.concurrent.TimeoutException;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
 public class ConnectionTimeoutTest {
 
-    // 192.0.2.0/24 is assigned for documentation, should be a deadend
-    static final String blackholeBroker = "pulsar://192.0.2.1:1234";
-
     @Test
     public void testLowTimeout() throws Exception {
-        try (PulsarClient clientLow = 
PulsarClient.builder().serviceUrl(blackholeBroker)
-                .connectionTimeout(1, TimeUnit.MILLISECONDS)
-                .operationTimeout(1000, TimeUnit.MILLISECONDS).build()) {
-            CompletableFuture<?> lowFuture = 
clientLow.newProducer().topic("foo").createAsync();
+        int backlogSize = 1;
+        // create a dummy server and fill the backlog of the server so that it 
won't respond
+        // so that the client timeout can be tested with this server
+        try (ServerSocket serverSocket = new ServerSocket(0, backlogSize, 
InetAddress.getByName("localhost"))) {
+            CountDownLatch latch = new CountDownLatch(backlogSize + 1);
+            List<Thread> threads = new ArrayList<>();
+            for (int i = 0; i < backlogSize + 1; i++) {
+                Thread connectThread = new Thread(() -> {
+                    try (Socket socket = new Socket()) {
+                        socket.connect(serverSocket.getLocalSocketAddress());
+                        latch.countDown();
+                        Thread.sleep(10000L);
+                    } catch (IOException e) {
+                        throw new RuntimeException(e);
+                    } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                    }
+                });
+                connectThread.start();
+                threads.add(connectThread);
+            }
+            latch.await();
+
+            String blackholeBroker =
+                    "pulsar://" + 
serverSocket.getInetAddress().getHostAddress() + ":" + 
serverSocket.getLocalPort();
 
-            try {
-                lowFuture.get();
-                Assert.fail("Shouldn't be able to connect to anything");
-            } catch (Exception e) {
-                
Assert.assertEquals(e.getCause().getCause().getCause().getClass(), 
ConnectTimeoutException.class);
+            try (PulsarClient clientLow = 
PulsarClient.builder().serviceUrl(blackholeBroker)
+                    .connectionTimeout(1, TimeUnit.MILLISECONDS)
+                    .operationTimeout(1000, TimeUnit.MILLISECONDS).build()) {
+                CompletableFuture<?> lowFuture = 
clientLow.newProducer().topic("foo").createAsync();
+                try {
+                    lowFuture.get(10, TimeUnit.SECONDS);
+                    Assert.fail("Shouldn't be able to connect to anything");
+                } catch (TimeoutException e) {
+                    Assert.fail("Connection timeout didn't apply.");
+                } catch (Exception e) {
+                    
Assert.assertEquals(e.getCause().getCause().getCause().getClass(), 
ConnectTimeoutException.class);
+                }
+            } finally {
+                threads.stream().forEach(Thread::interrupt);
             }
         }
     }

Reply via email to