This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new efa040f  Add connection timeout client configuration option (#2852)
efa040f is described below

commit efa040fd1fa3a65e317d97ac5b2029ad75677e64
Author: Ivan Kelly <[email protected]>
AuthorDate: Fri Nov 16 22:37:39 2018 +0100

    Add connection timeout client configuration option (#2852)
    
    Allows the client to specify how long to wait for brokers to respond.
---
 .../apache/pulsar/client/api/ClientBuilder.java    | 10 ++++
 .../pulsar/client/api/ClientConfiguration.java     | 21 ++++++++
 .../pulsar/client/impl/ClientBuilderImpl.java      |  6 +++
 .../apache/pulsar/client/impl/ConnectionPool.java  |  2 +-
 .../client/impl/conf/ClientConfigurationData.java  |  3 +-
 .../pulsar/client/impl/ConnectionTimeoutTest.java  | 59 ++++++++++++++++++++++
 6 files changed, 99 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
index 2831c09..416fc6c 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
@@ -309,4 +309,14 @@ public interface ClientBuilder extends Cloneable {
      * @param unit time unit for {@code statsInterval}
      */
     ClientBuilder keepAliveInterval(int keepAliveIntervalSeconds, TimeUnit 
unit);
+
+    /**
+     * Set the duration of time to wait for a connection to a broker to be 
established. If the duration
+     * passes without a response from the broker, the connection attempt is 
dropped.
+     *
+     * @since 2.3.0
+     * @param duration the duration to wait
+     * @param unit the time unit in which the duration is defined
+     */
+    ClientBuilder connectionTimeout(int duration, TimeUnit unit);
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java
index a2213b3..e5bac91 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java
@@ -362,6 +362,27 @@ public class ClientConfiguration implements Serializable {
         return this;
     }
 
+    /**
+     * Set the duration of time to wait for a connection to a broker to be 
established. If the duration
+     * passes without a response from the broker, the connection attempt is 
dropped.
+     *
+     * @param duration the duration to wait
+     * @param unit the time unit in which the duration is defined
+     */
+    public void setConnectionTimeout(int duration, TimeUnit unit) {
+        confData.setConnectionTimeoutMs((int)unit.toMillis(duration));
+    }
+
+    /**
+     * Get the duration of time for which the client will wait for a 
connection to a broker to be
+     * established before giving up.
+     *
+     * @return the duration, in milliseconds
+     */
+    public long getConnectionTimeoutMs() {
+        return confData.getConnectionTimeoutMs();
+    }
+
     public ClientConfigurationData getConfigurationData() {
         return confData;
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
index 3e87bc9..d5545ce 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
@@ -189,6 +189,12 @@ public class ClientBuilderImpl implements ClientBuilder {
         return this;
     }
 
+    @Override
+    public ClientBuilder connectionTimeout(int duration, TimeUnit unit) {
+        conf.setConnectionTimeoutMs((int)unit.toMillis(duration));
+        return this;
+    }
+
     public ClientConfigurationData getClientConfigurationData() {
         return conf;
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index 0c7751b..25f4b14 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -82,7 +82,7 @@ public class ConnectionPool implements Closeable {
         bootstrap.group(eventLoopGroup);
         
bootstrap.channel(EventLoopUtil.getClientSocketChannelClass(eventLoopGroup));
 
-        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000);
+        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 
conf.getConnectionTimeoutMs());
         bootstrap.option(ChannelOption.TCP_NODELAY, conf.isUseTcpNoDelay());
         bootstrap.option(ChannelOption.ALLOCATOR, 
PooledByteBufAllocator.DEFAULT);
         bootstrap.handler(new ChannelInitializer<SocketChannel>() {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
index f007782..f7bec63 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
@@ -58,7 +58,8 @@ public class ClientConfigurationData implements Serializable, 
Cloneable {
     private int maxLookupRequest = 50000;
     private int maxNumberOfRejectedRequestPerConnection = 50;
     private int keepAliveIntervalSeconds = 30;
-    
+    private int connectionTimeoutMs = 10000;
+
     public ClientConfigurationData clone() {
         try {
             return (ClientConfigurationData) super.clone();
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConnectionTimeoutTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConnectionTimeoutTest.java
new file mode 100644
index 0000000..f81ada0
--- /dev/null
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConnectionTimeoutTest.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import io.netty.channel.ConnectTimeoutException;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class ConnectionTimeoutTest {
+
+    // 192.0.2.0/24 is assigned for documentation, should be a deadend
+    final static String blackholeBroker = "pulsar://192.0.2.1:1234";
+
+    @Test
+    public void testLowTimeout() throws Exception {
+        long startNanos = System.nanoTime();
+
+        try (PulsarClient clientLow = 
PulsarClient.builder().serviceUrl(blackholeBroker)
+                .connectionTimeout(1, TimeUnit.MILLISECONDS).build();
+             PulsarClient clientDefault = 
PulsarClient.builder().serviceUrl(blackholeBroker).build()) {
+            CompletableFuture<?> lowFuture = 
clientLow.newProducer().topic("foo").createAsync();
+            CompletableFuture<?> defaultFuture = 
clientDefault.newProducer().topic("foo").createAsync();
+
+            try {
+                lowFuture.get();
+                Assert.fail("Shouldn't be able to connect to anything");
+            } catch (Exception e) {
+                Assert.assertFalse(defaultFuture.isDone());
+                
Assert.assertEquals(e.getCause().getCause().getCause().getClass(),
+                                    ConnectTimeoutException.class);
+                Assert.assertTrue((System.nanoTime() - startNanos) < 
TimeUnit.SECONDS.toNanos(3));
+            }
+        }
+    }
+}

Reply via email to