This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new bc272fa98f2 [Tests] Fix flaky test ConnectionTimeoutTest (#15865)
bc272fa98f2 is described below
commit bc272fa98f2b1bbb1a6e2babc867d94695ddec32
Author: Lari Hotari <[email protected]>
AuthorDate: Tue May 31 22:41:09 2022 +0300
[Tests] Fix flaky test ConnectionTimeoutTest (#15865)
- the original test assumed that 192.0.2.1 is a "black hole" and can be
used for testing
connection timeouts.
This is an assumption which doesn't hold when a firewall blocks such
connections.
- create a dummy TCP/IP server and fill the backlog so that it won't
respond and becomes
a "black hole"
---
.../pulsar/client/impl/ConnectionTimeoutTest.java | 61 +++++++++++++++++-----
1 file changed, 48 insertions(+), 13 deletions(-)
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConnectionTimeoutTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConnectionTimeoutTest.java
index 3ed22c358b0..af79a4f645f 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConnectionTimeoutTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConnectionTimeoutTest.java
@@ -18,30 +18,65 @@
*/
package org.apache.pulsar.client.impl;
+import io.netty.channel.ConnectTimeoutException;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import io.netty.channel.ConnectTimeoutException;
+import java.util.concurrent.TimeoutException;
import org.apache.pulsar.client.api.PulsarClient;
import org.testng.Assert;
import org.testng.annotations.Test;
public class ConnectionTimeoutTest {
- // 192.0.2.0/24 is assigned for documentation, should be a deadend
- static final String blackholeBroker = "pulsar://192.0.2.1:1234";
-
@Test
public void testLowTimeout() throws Exception {
- try (PulsarClient clientLow =
PulsarClient.builder().serviceUrl(blackholeBroker)
- .connectionTimeout(1, TimeUnit.MILLISECONDS)
- .operationTimeout(1000, TimeUnit.MILLISECONDS).build()) {
- CompletableFuture<?> lowFuture =
clientLow.newProducer().topic("foo").createAsync();
+ int backlogSize = 1;
+ // create a dummy server and fill the backlog of the server so that it
won't respond
+ // so that the client timeout can be tested with this server
+ try (ServerSocket serverSocket = new ServerSocket(0, backlogSize,
InetAddress.getByName("localhost"))) {
+ CountDownLatch latch = new CountDownLatch(backlogSize + 1);
+ List<Thread> threads = new ArrayList<>();
+ for (int i = 0; i < backlogSize + 1; i++) {
+ Thread connectThread = new Thread(() -> {
+ try (Socket socket = new Socket()) {
+ socket.connect(serverSocket.getLocalSocketAddress());
+ latch.countDown();
+ Thread.sleep(10000L);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ });
+ connectThread.start();
+ threads.add(connectThread);
+ }
+ latch.await();
+
+ String blackholeBroker =
+ "pulsar://" +
serverSocket.getInetAddress().getHostAddress() + ":" +
serverSocket.getLocalPort();
- try {
- lowFuture.get();
- Assert.fail("Shouldn't be able to connect to anything");
- } catch (Exception e) {
-
Assert.assertEquals(e.getCause().getCause().getCause().getClass(),
ConnectTimeoutException.class);
+ try (PulsarClient clientLow =
PulsarClient.builder().serviceUrl(blackholeBroker)
+ .connectionTimeout(1, TimeUnit.MILLISECONDS)
+ .operationTimeout(1000, TimeUnit.MILLISECONDS).build()) {
+ CompletableFuture<?> lowFuture =
clientLow.newProducer().topic("foo").createAsync();
+ try {
+ lowFuture.get(10, TimeUnit.SECONDS);
+ Assert.fail("Shouldn't be able to connect to anything");
+ } catch (TimeoutException e) {
+ Assert.fail("Connection timeout didn't apply.");
+ } catch (Exception e) {
+
Assert.assertEquals(e.getCause().getCause().getCause().getClass(),
ConnectTimeoutException.class);
+ }
+ } finally {
+ threads.stream().forEach(Thread::interrupt);
}
}
}