This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new efa040f Add connection timeout client configuration option (#2852)
efa040f is described below
commit efa040fd1fa3a65e317d97ac5b2029ad75677e64
Author: Ivan Kelly <[email protected]>
AuthorDate: Fri Nov 16 22:37:39 2018 +0100
Add connection timeout client configuration option (#2852)
Allows the client to specify how long to wait for brokers to respond.
---
.../apache/pulsar/client/api/ClientBuilder.java | 10 ++++
.../pulsar/client/api/ClientConfiguration.java | 21 ++++++++
.../pulsar/client/impl/ClientBuilderImpl.java | 6 +++
.../apache/pulsar/client/impl/ConnectionPool.java | 2 +-
.../client/impl/conf/ClientConfigurationData.java | 3 +-
.../pulsar/client/impl/ConnectionTimeoutTest.java | 59 ++++++++++++++++++++++
6 files changed, 99 insertions(+), 2 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
index 2831c09..416fc6c 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
@@ -309,4 +309,14 @@ public interface ClientBuilder extends Cloneable {
* @param unit time unit for {@code statsInterval}
*/
ClientBuilder keepAliveInterval(int keepAliveIntervalSeconds, TimeUnit
unit);
+
+ /**
+ * Set the duration of time to wait for a connection to a broker to be
established. If the duration
+ * passes without a response from the broker, the connection attempt is
dropped.
+ *
+ * @since 2.3.0
+ * @param duration the duration to wait
+ * @param unit the time unit in which the duration is defined
+ */
+ ClientBuilder connectionTimeout(int duration, TimeUnit unit);
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java
index a2213b3..e5bac91 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java
@@ -362,6 +362,27 @@ public class ClientConfiguration implements Serializable {
return this;
}
+ /**
+ * Set the duration of time to wait for a connection to a broker to be
established. If the duration
+ * passes without a response from the broker, the connection attempt is
dropped.
+ *
+ * @param duration the duration to wait
+ * @param unit the time unit in which the duration is defined
+ */
+ public void setConnectionTimeout(int duration, TimeUnit unit) {
+ confData.setConnectionTimeoutMs((int)unit.toMillis(duration));
+ }
+
+ /**
+ * Get the duration of time for which the client will wait for a
connection to a broker to be
+ * established before giving up.
+ *
+ * @return the duration, in milliseconds
+ */
+ public long getConnectionTimeoutMs() {
+ return confData.getConnectionTimeoutMs();
+ }
+
public ClientConfigurationData getConfigurationData() {
return confData;
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
index 3e87bc9..d5545ce 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
@@ -189,6 +189,12 @@ public class ClientBuilderImpl implements ClientBuilder {
return this;
}
+ @Override
+ public ClientBuilder connectionTimeout(int duration, TimeUnit unit) {
+ conf.setConnectionTimeoutMs((int)unit.toMillis(duration));
+ return this;
+ }
+
public ClientConfigurationData getClientConfigurationData() {
return conf;
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index 0c7751b..25f4b14 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -82,7 +82,7 @@ public class ConnectionPool implements Closeable {
bootstrap.group(eventLoopGroup);
bootstrap.channel(EventLoopUtil.getClientSocketChannelClass(eventLoopGroup));
- bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000);
+ bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
conf.getConnectionTimeoutMs());
bootstrap.option(ChannelOption.TCP_NODELAY, conf.isUseTcpNoDelay());
bootstrap.option(ChannelOption.ALLOCATOR,
PooledByteBufAllocator.DEFAULT);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
index f007782..f7bec63 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
@@ -58,7 +58,8 @@ public class ClientConfigurationData implements Serializable,
Cloneable {
private int maxLookupRequest = 50000;
private int maxNumberOfRejectedRequestPerConnection = 50;
private int keepAliveIntervalSeconds = 30;
-
+ private int connectionTimeoutMs = 10000;
+
public ClientConfigurationData clone() {
try {
return (ClientConfigurationData) super.clone();
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConnectionTimeoutTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConnectionTimeoutTest.java
new file mode 100644
index 0000000..f81ada0
--- /dev/null
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConnectionTimeoutTest.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import io.netty.channel.ConnectTimeoutException;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class ConnectionTimeoutTest {
+
+ // 192.0.2.0/24 is assigned for documentation, should be a deadend
+ final static String blackholeBroker = "pulsar://192.0.2.1:1234";
+
+ @Test
+ public void testLowTimeout() throws Exception {
+ long startNanos = System.nanoTime();
+
+ try (PulsarClient clientLow =
PulsarClient.builder().serviceUrl(blackholeBroker)
+ .connectionTimeout(1, TimeUnit.MILLISECONDS).build();
+ PulsarClient clientDefault =
PulsarClient.builder().serviceUrl(blackholeBroker).build()) {
+ CompletableFuture<?> lowFuture =
clientLow.newProducer().topic("foo").createAsync();
+ CompletableFuture<?> defaultFuture =
clientDefault.newProducer().topic("foo").createAsync();
+
+ try {
+ lowFuture.get();
+ Assert.fail("Shouldn't be able to connect to anything");
+ } catch (Exception e) {
+ Assert.assertFalse(defaultFuture.isDone());
+
Assert.assertEquals(e.getCause().getCause().getCause().getClass(),
+ ConnectTimeoutException.class);
+ Assert.assertTrue((System.nanoTime() - startNanos) <
TimeUnit.SECONDS.toNanos(3));
+ }
+ }
+ }
+}