[
https://issues.apache.org/jira/browse/KAFKA-3496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16300834#comment-16300834
]
ASF GitHub Bot commented on KAFKA-3496:
---------------------------------------
ijuma closed pull request #1179: KAFKA-3496 - add policies for client reconnect
attempts
URL: https://github.com/apache/kafka/pull/1179
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
index ad35e205a52..e39b9f95acf 100644
---
a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
+++
b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
@@ -20,11 +20,11 @@
*
*/
final class ClusterConnectionStates {
- private final long reconnectBackoffMs;
+ private final ReconnectAttemptPolicy reconnectAttemptPolicy;
private final Map<String, NodeConnectionState> nodeState;
- public ClusterConnectionStates(long reconnectBackoffMs) {
- this.reconnectBackoffMs = reconnectBackoffMs;
+ public ClusterConnectionStates(ReconnectAttemptPolicy
reconnectAttemptPolicy) {
+ this.reconnectAttemptPolicy = reconnectAttemptPolicy;
this.nodeState = new HashMap<String, NodeConnectionState>();
}
@@ -40,7 +40,7 @@ public boolean canConnect(String id, long now) {
if (state == null)
return true;
else
- return state.state == ConnectionState.DISCONNECTED && now -
state.lastConnectAttemptMs >= this.reconnectBackoffMs;
+ return state.state == ConnectionState.DISCONNECTED && now -
state.lastConnectAttemptMs >= state.reconnectBackoffMs;
}
/**
@@ -53,7 +53,7 @@ public boolean isBlackedOut(String id, long now) {
if (state == null)
return false;
else
- return state.state == ConnectionState.DISCONNECTED && now -
state.lastConnectAttemptMs < this.reconnectBackoffMs;
+ return state.state == ConnectionState.DISCONNECTED && now -
state.lastConnectAttemptMs < state.reconnectBackoffMs;
}
/**
@@ -68,7 +68,7 @@ public long connectionDelay(String id, long now) {
if (state == null) return 0;
long timeWaited = now - state.lastConnectAttemptMs;
if (state.state == ConnectionState.DISCONNECTED) {
- return Math.max(this.reconnectBackoffMs - timeWaited, 0);
+ return Math.max(state.reconnectBackoffMs - timeWaited, 0);
} else {
// When connecting or connected, we should be able to delay
indefinitely since other events (connection or
// data acked) will cause a wakeup once data can be sent.
@@ -82,7 +82,11 @@ public long connectionDelay(String id, long now) {
* @param now The current time.
*/
public void connecting(String id, long now) {
- nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING,
now));
+ // a new scheduler should not be created for a new connection attempt.
+ ReconnectAttemptPolicy.ReconnectAttemptScheduler scheduler =
nodeState.containsKey(id) ?
+ nodeState.get(id).reconnectAttemptScheduler :
+ reconnectAttemptPolicy.newScheduler();
+ nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING,
now, scheduler));
}
/**
@@ -101,6 +105,9 @@ public boolean isConnected(String id) {
public void connected(String id) {
NodeConnectionState nodeState = nodeState(id);
nodeState.state = ConnectionState.CONNECTED;
+ // always refresh the scheduler after connecting successfully.
+ nodeState.reconnectAttemptScheduler =
reconnectAttemptPolicy.newScheduler();
+ nodeState.reconnectBackoffMs =
nodeState.reconnectAttemptScheduler.nextReconnectBackoffMs();
}
/**
@@ -111,6 +118,7 @@ public void connected(String id) {
public void disconnected(String id, long now) {
NodeConnectionState nodeState = nodeState(id);
nodeState.state = ConnectionState.DISCONNECTED;
+ nodeState.reconnectBackoffMs =
nodeState.reconnectAttemptScheduler.nextReconnectBackoffMs();
nodeState.lastConnectAttemptMs = now;
}
@@ -152,10 +160,13 @@ private NodeConnectionState nodeState(String id) {
ConnectionState state;
long lastConnectAttemptMs;
+ long reconnectBackoffMs;
+ ReconnectAttemptPolicy.ReconnectAttemptScheduler
reconnectAttemptScheduler;
- public NodeConnectionState(ConnectionState state, long
lastConnectAttempt) {
+ public NodeConnectionState(ConnectionState state, long
lastConnectAttempt, ReconnectAttemptPolicy.ReconnectAttemptScheduler
reconnectAttemptScheduler) {
this.state = state;
this.lastConnectAttemptMs = lastConnectAttempt;
+ this.reconnectAttemptScheduler = reconnectAttemptScheduler;
}
public String toString() {
diff --git
a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
index 298e1d8c541..44f17ae2dfe 100644
--- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
@@ -49,6 +49,15 @@
public static final String RECONNECT_BACKOFF_MS_CONFIG =
"reconnect.backoff.ms";
public static final String RECONNECT_BACKOFF_MS_DOC = "The amount of time
to wait before attempting to reconnect to a given host. This avoids repeatedly
connecting to a host in a tight loop. This backoff applies to all requests sent
by the consumer to the broker.";
+ public static final String RECONNECT_ATTEMPTS_POLICY_CLASS_CONFIG =
"reconnect.attempts.policy.class";
+ public static final String RECONNECT_ATTEMPTS_POLICY_CLASS_DOC = "The
policy used to determine the amount of time to wait before attempting to
reconnect to a given host. ";
+
+ public static final String RECONNECT_EXPONENTIAL_BASE_DELAY_MS_CONFIG =
"reconnect.exponential.baseDelayMs";
+ public static final String RECONNECT_EXPONENTIAL_BASE_DELAY_MS_DOC = "The
amount of time to wait before attempting a first reconnection to a given host.";
+
+ public static final String RECONNECT_EXPONENTIAL_MAX_DELAY_MS_CONFIG =
"reconnect.exponential.maxDelayMs";
+ public static final String RECONNECT_EXPONENTIAL_MAX_DELAY_MS_DOC = "The
maximum amount of time to wait before attempting to reconnect to a given host.";
+
public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms";
public static final String RETRY_BACKOFF_MS_DOC = "The amount of time to
wait before attempting to retry a failed fetch request to a given topic
partition. This avoids repeated fetching-and-failing in a tight loop.";
diff --git
a/clients/src/main/java/org/apache/kafka/clients/ConstantReconnectAttemptPolicy.java
b/clients/src/main/java/org/apache/kafka/clients/ConstantReconnectAttemptPolicy.java
new file mode 100644
index 00000000000..c358f9d6763
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/clients/ConstantReconnectAttemptPolicy.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding
copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import org.apache.kafka.common.config.AbstractConfig;
+
+/**
+ * A policy which used a constant delay between each reconnection attempt.
+ */
+public class ConstantReconnectAttemptPolicy implements ReconnectAttemptPolicy {
+
+ private long reconnectBackoffMs;
+
+ /**
+ * Creates a new {@link ConstantReconnectAttemptPolicy} instance.
+ */
+ public ConstantReconnectAttemptPolicy() {
+
+ }
+
+ /**
+ * Creates a new {@link ConstantReconnectAttemptPolicy} instance.
+ */
+ public ConstantReconnectAttemptPolicy(long reconnectBackoffMs) {
+ setReconnectBackoffMs(reconnectBackoffMs);
+ }
+
+ @Override
+ public void configure(AbstractConfig configs) {
+ Long reconnectBackoffMs =
configs.getLong(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG);
+ setReconnectBackoffMs(reconnectBackoffMs);
+ }
+
+ private void setReconnectBackoffMs(long reconnectBackoffMs) {
+ if (reconnectBackoffMs < 0)
+ throw new IllegalArgumentException(String.format("Delay must be
positive - reconnect.backoff.ms %d", reconnectBackoffMs));
+ this.reconnectBackoffMs = reconnectBackoffMs;
+ }
+
+ @Override
+ public ReconnectAttemptScheduler newScheduler() {
+ return new ConstantScheduler();
+ }
+
+ private class ConstantScheduler implements ReconnectAttemptScheduler {
+
+ @Override
+ public long nextReconnectBackoffMs() {
+ return reconnectBackoffMs;
+ }
+ }
+}
+
+
diff --git
a/clients/src/main/java/org/apache/kafka/clients/ExponentialReconnectAttemptPolicy.java
b/clients/src/main/java/org/apache/kafka/clients/ExponentialReconnectAttemptPolicy.java
new file mode 100644
index 00000000000..09dff287fbd
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/clients/ExponentialReconnectAttemptPolicy.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding
copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import org.apache.kafka.common.config.AbstractConfig;
+
+/**
+ * A policy which exponentially grows the delay between each reconnection
attempt.
+ * A reconnection attempt (@code i] is tried after {@code Math.min(2^(i-1) *
getBaseDelayMs(), getMaxDelayMs())} milliseconds.
+ */
+public class ExponentialReconnectAttemptPolicy implements
ReconnectAttemptPolicy {
+
+ /**
+ * The amount of time to wait before attempting a first reconnection to a
given host.
+ */
+ private long baseDelayMs;
+ /**
+ * The maximum amount of time to wait before attempting to reconnect to a
given host.
+ */
+ private long maxDelayMs;
+
+ /**
+ * Creates a new {@link ExponentialReconnectAttemptPolicy} instance.
+ */
+ public ExponentialReconnectAttemptPolicy() {
+
+ }
+
+ /**
+ * Creates a new {@link ExponentialReconnectAttemptPolicy} instance.
+ *
+ * @param baseDelayMs {@link #baseDelayMs}.
+ * @param maxDelayMs {@link #maxDelayMs}.
+ */
+ public ExponentialReconnectAttemptPolicy(long baseDelayMs, long
maxDelayMs) {
+ setDelays(baseDelayMs, maxDelayMs);
+ }
+
+ public long getBaseDelayMs() {
+ return baseDelayMs;
+ }
+
+ public long getMaxDelayMs() {
+ return maxDelayMs;
+ }
+
+ @Override
+ public void configure(AbstractConfig configs) {
+
setDelays(configs.getLong(CommonClientConfigs.RECONNECT_EXPONENTIAL_BASE_DELAY_MS_CONFIG),
+
configs.getLong(CommonClientConfigs.RECONNECT_EXPONENTIAL_MAX_DELAY_MS_CONFIG));
+ }
+
+ private void setDelays(long baseDelayMs, long maxDelayMs) {
+ if (baseDelayMs < 0 || maxDelayMs < 0)
+ throw new IllegalArgumentException(String.format("Delay must be
positive - baseDelayMs %d, maxDelayMs %d ", baseDelayMs, maxDelayMs));
+ if (maxDelayMs < baseDelayMs)
+ throw new IllegalArgumentException(String.format("maxDelayMs (%d)
must be superior to baseDelayMs (%d)", maxDelayMs, baseDelayMs));
+ this.baseDelayMs = baseDelayMs;
+ this.maxDelayMs = maxDelayMs;
+ }
+
+ @Override
+ public ReconnectAttemptScheduler newScheduler() {
+ return new ExponentialScheduler();
+ }
+
+ public class ExponentialScheduler implements ReconnectAttemptScheduler {
+
+ private long attempts = 0L;
+
+ @Override
+ public long nextReconnectBackoffMs() {
+ try {
+ if (attempts + 1 > 64) return maxDelayMs;
+ return Math.min(multiplyExact(baseDelayMs, 1L << attempts++),
maxDelayMs);
+ } catch (ArithmeticException e) { // this should never happen
+ return maxDelayMs;
+ }
+ }
+ }
+
+ /**
+ * This method is part of Math class since java 8.
+ */
+ private static long multiplyExact(long x, long y) {
+ long r = x * y;
+ long ax = Math.abs(x);
+ long ay = Math.abs(y);
+ if ((ax | ay) >>> 31 != 0) {
+ // Some bits greater than 2^31 that might cause overflow
+ // Check the result using the divide operator
+ // and check for the special case of Long.MIN_VALUE * -1
+ if (((y != 0) && (r / y != x)) ||
+ (x == Long.MIN_VALUE && y == -1)) {
+ throw new ArithmeticException("long overflow");
+ }
+ }
+ return r;
+ }
+}
+
+
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 4d01cdeb2e2..d615e219477 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -84,25 +84,25 @@ public NetworkClient(Selectable selector,
Metadata metadata,
String clientId,
int maxInFlightRequestsPerConnection,
- long reconnectBackoffMs,
+ ReconnectAttemptPolicy reconnectAttemptPolicy,
int socketSendBuffer,
int socketReceiveBuffer,
int requestTimeoutMs,
Time time) {
this(null, metadata, selector, clientId,
maxInFlightRequestsPerConnection,
- reconnectBackoffMs, socketSendBuffer, socketReceiveBuffer,
requestTimeoutMs, time);
+ reconnectAttemptPolicy, socketSendBuffer, socketReceiveBuffer,
requestTimeoutMs, time);
}
public NetworkClient(Selectable selector,
MetadataUpdater metadataUpdater,
String clientId,
int maxInFlightRequestsPerConnection,
- long reconnectBackoffMs,
+ ReconnectAttemptPolicy reconnectAttemptPolicy,
int socketSendBuffer,
int socketReceiveBuffer,
int requestTimeoutMs,
Time time) {
- this(metadataUpdater, null, selector, clientId,
maxInFlightRequestsPerConnection, reconnectBackoffMs,
+ this(metadataUpdater, null, selector, clientId,
maxInFlightRequestsPerConnection, reconnectAttemptPolicy,
socketSendBuffer, socketReceiveBuffer, requestTimeoutMs, time);
}
@@ -111,7 +111,7 @@ private NetworkClient(MetadataUpdater metadataUpdater,
Selectable selector,
String clientId,
int maxInFlightRequestsPerConnection,
- long reconnectBackoffMs,
+ ReconnectAttemptPolicy reconnectAttemptPolicy,
int socketSendBuffer,
int socketReceiveBuffer,
int requestTimeoutMs,
@@ -131,7 +131,7 @@ private NetworkClient(MetadataUpdater metadataUpdater,
this.selector = selector;
this.clientId = clientId;
this.inFlightRequests = new
InFlightRequests(maxInFlightRequestsPerConnection);
- this.connectionStates = new
ClusterConnectionStates(reconnectBackoffMs);
+ this.connectionStates = new
ClusterConnectionStates(reconnectAttemptPolicy);
this.socketSendBuffer = socketSendBuffer;
this.socketReceiveBuffer = socketReceiveBuffer;
this.correlation = 0;
@@ -456,7 +456,7 @@ private void handleCompletedReceives(List<ClientResponse>
responses, long now) {
*/
private void handleDisconnections(List<ClientResponse> responses, long
now) {
for (String node : this.selector.disconnected()) {
- log.debug("Node {} disconnected.", node);
+ log.warn("Node {} disconnected.", node);
processDisconnection(responses, node, now);
}
// we got a disconnect so we should probably refresh our metadata and
see if that broker is dead
@@ -500,7 +500,7 @@ private void initiateConnect(Node node, long now) {
connectionStates.disconnected(nodeConnectionId, now);
/* maybe the problem is our metadata, update it */
metadataUpdater.requestUpdate();
- log.debug("Error connecting to node {} at {}:{}:", node.id(),
node.host(), node.port(), e);
+ log.warn("Error connecting to node {} at {}:{}:", node.id(),
node.host(), node.port(), e);
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/ReconnectAttemptPolicy.java
b/clients/src/main/java/org/apache/kafka/clients/ReconnectAttemptPolicy.java
new file mode 100644
index 00000000000..87b4a3f3e4a
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/ReconnectAttemptPolicy.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding
copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import org.apache.kafka.common.config.AbstractConfig;
+
+public interface ReconnectAttemptPolicy {
+
+ void configure(AbstractConfig configs);
+
+ ReconnectAttemptScheduler newScheduler();
+
+ interface ReconnectAttemptScheduler {
+
+ long nextReconnectBackoffMs();
+ }
+
+}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 69c4a362092..41b98f507af 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -132,6 +132,21 @@
*/
public static final String RECONNECT_BACKOFF_MS_CONFIG =
CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG;
+ /**
+ * <code>reconnect.attempts.policy</code>
+ */
+ public static final String RECONNECT_ATTEMPTS_POLICY_CLASS_CONFIG =
CommonClientConfigs.RECONNECT_ATTEMPTS_POLICY_CLASS_CONFIG;
+
+ /**
+ * <code>reconnect.exponential.baseDelayMs/code>
+ */
+ public static final String RECONNECT_EXPONENTIAL_BASE_DELAY_MS_CONFIG =
CommonClientConfigs.RECONNECT_EXPONENTIAL_BASE_DELAY_MS_CONFIG;
+
+ /**
+ * <code>reconnect.exponential.maxDelayMs</code>
+ */
+ public static final String RECONNECT_EXPONENTIAL_MAX_DELAY_MS_CONFIG =
CommonClientConfigs.RECONNECT_EXPONENTIAL_MAX_DELAY_MS_CONFIG;
+
/**
* <code>retry.backoff.ms</code>
*/
@@ -265,6 +280,23 @@
atLeast(0L),
Importance.LOW,
CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC)
+ .define(RECONNECT_ATTEMPTS_POLICY_CLASS_CONFIG,
+ Type.CLASS,
+
"org.apache.kafka.clients.ConstantReconnectAttemptPolicy",
+ Importance.HIGH,
+
CommonClientConfigs.RECONNECT_ATTEMPTS_POLICY_CLASS_DOC)
+
.define(RECONNECT_EXPONENTIAL_BASE_DELAY_MS_CONFIG,
+ Type.LONG,
+ 50L,
+ atLeast(0L),
+ Importance.LOW,
+
CommonClientConfigs.RECONNECT_EXPONENTIAL_BASE_DELAY_MS_DOC)
+
.define(RECONNECT_EXPONENTIAL_MAX_DELAY_MS_CONFIG,
+ Type.LONG,
+ 5000L,
+ atLeast(0L),
+ Importance.LOW,
+
CommonClientConfigs.RECONNECT_EXPONENTIAL_MAX_DELAY_MS_DOC)
.define(RETRY_BACKOFF_MS_CONFIG,
Type.LONG,
100L,
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index b15d07f8096..7348e202427 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -22,6 +22,7 @@
import
org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
+import org.apache.kafka.clients.ReconnectAttemptPolicy;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
@@ -574,12 +575,16 @@ private KafkaConsumer(ConsumerConfig config,
this.metadata.update(Cluster.bootstrap(addresses), 0);
String metricGrpPrefix = "consumer";
ChannelBuilder channelBuilder =
ClientUtils.createChannelBuilder(config.values());
+
+ ReconnectAttemptPolicy reconnectAttemptPolicy =
config.getConfiguredInstance(ConsumerConfig.RECONNECT_ATTEMPTS_POLICY_CLASS_CONFIG,
ReconnectAttemptPolicy.class);
+ reconnectAttemptPolicy.configure(config);
+
NetworkClient netClient = new NetworkClient(
new
Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
metrics, time, metricGrpPrefix, channelBuilder),
this.metadata,
clientId,
100, // a fixed large enough value will suffice
- config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG),
+ reconnectAttemptPolicy,
config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG),
config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG),
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
time);
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 6acc0599e85..0559b0ed561 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -27,6 +27,7 @@
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.ReconnectAttemptPolicy;
import org.apache.kafka.clients.producer.internals.RecordAccumulator;
import org.apache.kafka.clients.producer.internals.Sender;
import org.apache.kafka.clients.producer.internals.ProducerInterceptors;
@@ -275,12 +276,16 @@ private KafkaProducer(ProducerConfig config,
Serializer<K> keySerializer, Serial
List<InetSocketAddress> addresses =
ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
this.metadata.update(Cluster.bootstrap(addresses),
time.milliseconds());
ChannelBuilder channelBuilder =
ClientUtils.createChannelBuilder(config.values());
+
+ ReconnectAttemptPolicy reconnectAttemptPolicy =
config.getConfiguredInstance(ProducerConfig.RECONNECT_ATTEMPTS_POLICY_CLASS_CONFIG,
ReconnectAttemptPolicy.class);
+ reconnectAttemptPolicy.configure(config);
+
NetworkClient client = new NetworkClient(
new
Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
this.metrics, time, "producer", channelBuilder),
this.metadata,
clientId,
config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
- config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
+ reconnectAttemptPolicy,
config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
this.requestTimeoutMs, time);
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 5b7a296a903..b96d3fa55f1 100644
---
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -137,6 +137,21 @@
+ "These methods can be
blocked either because the buffer is full or metadata unavailable."
+ "Blocking in the
user-supplied serializers or partitioner will not be counted against this
timeout.";
+ /**
+ * <code>reconnect.attempts.policy</code>
+ */
+ public static final String RECONNECT_ATTEMPTS_POLICY_CLASS_CONFIG =
CommonClientConfigs.RECONNECT_ATTEMPTS_POLICY_CLASS_CONFIG;
+
+ /**
+ * <code>reconnect.exponential.baseDelayMs/code>
+ */
+ public static final String RECONNECT_EXPONENTIAL_BASE_DELAY_MS_CONFIG =
CommonClientConfigs.RECONNECT_EXPONENTIAL_BASE_DELAY_MS_CONFIG;
+
+ /**
+ * <code>reconnect.exponential.maxDelayMs</code>
+ */
+ public static final String RECONNECT_EXPONENTIAL_MAX_DELAY_MS_CONFIG =
CommonClientConfigs.RECONNECT_EXPONENTIAL_MAX_DELAY_MS_CONFIG;
+
/** <code>block.on.buffer.full</code> */
/**
* @deprecated This config will be removed in a future release. Also, the
{@link #METADATA_FETCH_TIMEOUT_CONFIG} is no longer honored when this property
is set to true.
@@ -245,6 +260,23 @@
.define(RECONNECT_BACKOFF_MS_CONFIG,
Type.LONG, 50L, atLeast(0L), Importance.LOW,
CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC)
.define(METRIC_REPORTER_CLASSES_CONFIG,
Type.LIST, "", Importance.LOW, CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
.define(RETRY_BACKOFF_MS_CONFIG, Type.LONG,
100L, atLeast(0L), Importance.LOW, CommonClientConfigs.RETRY_BACKOFF_MS_DOC)
+ .define(RECONNECT_ATTEMPTS_POLICY_CLASS_CONFIG,
+ Type.CLASS,
+
"org.apache.kafka.clients.ConstantReconnectAttemptPolicy",
+ Importance.HIGH,
+
CommonClientConfigs.RECONNECT_ATTEMPTS_POLICY_CLASS_DOC)
+
.define(RECONNECT_EXPONENTIAL_BASE_DELAY_MS_CONFIG,
+ Type.LONG,
+ 50L,
+ atLeast(0L),
+ Importance.LOW,
+
CommonClientConfigs.RECONNECT_EXPONENTIAL_BASE_DELAY_MS_DOC)
+
.define(RECONNECT_EXPONENTIAL_MAX_DELAY_MS_CONFIG,
+ Type.LONG,
+ 5000L,
+ atLeast(0L),
+ Importance.LOW,
+
CommonClientConfigs.RECONNECT_EXPONENTIAL_MAX_DELAY_MS_DOC)
.define(METADATA_FETCH_TIMEOUT_CONFIG,
Type.LONG,
60 * 1000,
diff --git
a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 698b99c3b83..824b41e8894 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -342,7 +342,7 @@ private void pollSelectionKeys(Iterable<SelectionKey>
selectionKeys, boolean isI
} catch (Exception e) {
String desc = channel.socketDescription();
if (e instanceof IOException)
- log.debug("Connection with {} disconnected", desc, e);
+ log.warn("Connection with {} disconnected", desc, e);
else
log.warn("Unexpected error from {}; closing connection",
desc, e);
close(channel);
diff --git
a/clients/src/test/java/org/apache/kafka/clients/ExponentialReconnectAttemptPolicyTest.java
b/clients/src/test/java/org/apache/kafka/clients/ExponentialReconnectAttemptPolicyTest.java
new file mode 100644
index 00000000000..aeead887dfd
--- /dev/null
+++
b/clients/src/test/java/org/apache/kafka/clients/ExponentialReconnectAttemptPolicyTest.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+
+public class ExponentialReconnectAttemptPolicyTest {
+
+
+ public static final long BASE_DELAY_MS = 2000L;
+
+ @Test
+ public void testSimpleExponentialReconnectionPolicy() {
+
+ ExponentialReconnectAttemptPolicy policy = new
ExponentialReconnectAttemptPolicy(BASE_DELAY_MS, BASE_DELAY_MS * 60 * 5);
+ ReconnectAttemptPolicy.ReconnectAttemptScheduler scheduler =
policy.newScheduler();
+ assertEquals(2000, scheduler.nextReconnectBackoffMs());
+ assertEquals(4000, scheduler.nextReconnectBackoffMs());
+ assertEquals(8000, scheduler.nextReconnectBackoffMs());
+ assertEquals(16000, scheduler.nextReconnectBackoffMs());
+ assertEquals(32000, scheduler.nextReconnectBackoffMs());
+ for (int i = 0; i < 64; i++) { // force overflow
+ scheduler.nextReconnectBackoffMs();
+ }
+ assertEquals(policy.getMaxDelayMs(),
scheduler.nextReconnectBackoffMs());
+ }
+
+ @Test
+ public void testIllegalArgumentsExponentialReconnectionPolicy() {
+ try {
+ new ExponentialReconnectAttemptPolicy(-1, -1);
+ fail();
+ } catch (Throwable t) {
+ assertTrue(t instanceof IllegalArgumentException);
+ }
+
+ try {
+ new ExponentialReconnectAttemptPolicy(100, 10);
+ fail();
+ } catch (Throwable t) {
+ assertTrue(t instanceof IllegalArgumentException);
+ }
+ }
+}
\ No newline at end of file
diff --git
a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index 18f7ecb4489..a40dd1a1032 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -53,11 +53,12 @@
private Cluster cluster = TestUtils.singletonCluster("test", nodeId);
private Node node = cluster.nodes().get(0);
private long reconnectBackoffMsTest = 10 * 1000;
- private NetworkClient client = new NetworkClient(selector, metadata,
"mock", Integer.MAX_VALUE, reconnectBackoffMsTest,
+
+ private NetworkClient client = new NetworkClient(selector, metadata,
"mock", Integer.MAX_VALUE, new
ConstantReconnectAttemptPolicy(reconnectBackoffMsTest),
64 * 1024, 64 * 1024, requestTimeoutMs, time);
private NetworkClient clientWithStaticNodes = new NetworkClient(selector,
new ManualMetadataUpdater(Arrays.asList(node)),
- "mock-static", Integer.MAX_VALUE, 0, 64 * 1024, 64 * 1024,
requestTimeoutMs, time);
+ "mock-static", Integer.MAX_VALUE, new
ConstantReconnectAttemptPolicy(0), 64 * 1024, 64 * 1024, requestTimeoutMs,
time);
@Before
public void setup() {
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
index f5aa8ae9eab..886590fdfb6 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
@@ -127,6 +127,23 @@
atLeast(0L),
ConfigDef.Importance.LOW,
CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC)
+
.define(CommonClientConfigs.RECONNECT_ATTEMPTS_POLICY_CLASS_CONFIG,
+ ConfigDef.Type.CLASS,
+
"org.apache.kafka.clients.ConstantReconnectAttemptPolicy",
+ ConfigDef.Importance.HIGH,
+
CommonClientConfigs.RECONNECT_ATTEMPTS_POLICY_CLASS_DOC)
+
.define(CommonClientConfigs.RECONNECT_EXPONENTIAL_BASE_DELAY_MS_CONFIG,
+ ConfigDef.Type.LONG,
+ 50L,
+ atLeast(0L),
+ ConfigDef.Importance.LOW,
+
CommonClientConfigs.RECONNECT_EXPONENTIAL_BASE_DELAY_MS_DOC)
+
.define(CommonClientConfigs.RECONNECT_EXPONENTIAL_MAX_DELAY_MS_CONFIG,
+ ConfigDef.Type.LONG,
+ 5000L,
+ atLeast(0L),
+ ConfigDef.Importance.LOW,
+
CommonClientConfigs.RECONNECT_EXPONENTIAL_MAX_DELAY_MS_DOC)
.define(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG,
ConfigDef.Type.LONG,
100L,
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index 7294ed4608e..b5614dc993b 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -21,6 +21,8 @@
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.ReconnectAttemptPolicy;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
@@ -90,12 +92,16 @@ public WorkerGroupMember(DistributedConfig config,
this.metadata.update(Cluster.bootstrap(addresses), 0);
String metricGrpPrefix = "connect";
ChannelBuilder channelBuilder =
ClientUtils.createChannelBuilder(config.values());
+
+ ReconnectAttemptPolicy reconnectAttemptPolicy =
config.getConfiguredInstance(ConsumerConfig.RECONNECT_ATTEMPTS_POLICY_CLASS_CONFIG,
ReconnectAttemptPolicy.class);
+ reconnectAttemptPolicy.configure(config);
+
NetworkClient netClient = new NetworkClient(
new
Selector(config.getLong(CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG),
metrics, time, metricGrpPrefix, channelBuilder),
this.metadata,
clientId,
100, // a fixed large enough value will suffice
-
config.getLong(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG),
+ reconnectAttemptPolicy,
config.getInt(CommonClientConfigs.SEND_BUFFER_CONFIG),
config.getInt(CommonClientConfigs.RECEIVE_BUFFER_CONFIG),
config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG), time);
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala
b/core/src/main/scala/kafka/admin/AdminClient.scala
index b8573153e5b..c0b1b6daf32 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -234,7 +234,7 @@ object AdminClient {
metadata,
"admin-" + AdminClientIdSequence.getAndIncrement(),
DefaultMaxInFlightRequestsPerConnection,
- DefaultReconnectBackoffMs,
+ new ConstantReconnectAttemptPolicy(DefaultReconnectBackoffMs),
DefaultSendBufferBytes,
DefaultReceiveBufferBytes,
DefaultRequestTimeoutMs,
diff --git
a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index b376d15e4eb..bbafda2dd93 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -24,7 +24,7 @@ import kafka.cluster.Broker
import kafka.common.{KafkaException, TopicAndPartition}
import kafka.server.KafkaConfig
import kafka.utils._
-import org.apache.kafka.clients.{ClientRequest, ClientResponse,
ManualMetadataUpdater, NetworkClient}
+import org.apache.kafka.clients._
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.{ChannelBuilders, LoginType, Mode,
NetworkReceive, Selectable, Selector}
import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
@@ -104,7 +104,7 @@ class ControllerChannelManager(controllerContext:
ControllerContext, config: Kaf
new ManualMetadataUpdater(Seq(brokerNode).asJava),
config.brokerId.toString,
1,
- 0,
+ new ConstantReconnectAttemptPolicy(0),
Selectable.USE_DEFAULT_BUFFER_SIZE,
Selectable.USE_DEFAULT_BUFFER_SIZE,
config.requestTimeoutMs,
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala
b/core/src/main/scala/kafka/server/KafkaServer.scala
index e29494baa1d..ea253dfd2ac 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -31,7 +31,7 @@ import java.io.{IOException, File}
import kafka.security.auth.Authorizer
import kafka.utils._
-import org.apache.kafka.clients.{ManualMetadataUpdater, ClientRequest,
NetworkClient}
+import org.apache.kafka.clients.{ManualMetadataUpdater, ClientRequest,
NetworkClient, ConstantReconnectAttemptPolicy}
import org.apache.kafka.common.Node
import org.apache.kafka.common.metrics._
import org.apache.kafka.common.network.{LoginType, Selectable,
ChannelBuilders, NetworkReceive, Selector, Mode}
@@ -341,7 +341,7 @@ class KafkaServer(val config: KafkaConfig, time: Time =
SystemTime, threadNamePr
metadataUpdater,
config.brokerId.toString,
1,
- 0,
+ new ConstantReconnectAttemptPolicy(0),
Selectable.USE_DEFAULT_BUFFER_SIZE,
Selectable.USE_DEFAULT_BUFFER_SIZE,
config.requestTimeoutMs,
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index de7269f8332..2914f6b0c6a 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -26,7 +26,7 @@ import kafka.message.ByteBufferMessageSet
import kafka.api.{KAFKA_0_10_0_IV0, KAFKA_0_9_0}
import kafka.common.{KafkaStorageException, TopicAndPartition}
import ReplicaFetcherThread._
-import org.apache.kafka.clients.{ManualMetadataUpdater, NetworkClient,
ClientRequest, ClientResponse}
+import org.apache.kafka.clients.{ManualMetadataUpdater, NetworkClient,
ClientRequest, ClientResponse, ConstantReconnectAttemptPolicy}
import org.apache.kafka.common.network.{LoginType, Selectable,
ChannelBuilders, NetworkReceive, Selector, Mode}
import org.apache.kafka.common.requests.{ListOffsetResponse, FetchResponse,
RequestSend, AbstractRequest, ListOffsetRequest}
import org.apache.kafka.common.requests.{FetchRequest => JFetchRequest}
@@ -88,7 +88,7 @@ class ReplicaFetcherThread(name: String,
new ManualMetadataUpdater(),
clientId,
1,
- 0,
+ new ConstantReconnectAttemptPolicy(0),
Selectable.USE_DEFAULT_BUFFER_SIZE,
brokerConfig.replicaSocketReceiveBufferBytes,
brokerConfig.requestTimeoutMs,
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Add reconnect attemps policies for client
> -----------------------------------------
>
> Key: KAFKA-3496
> URL: https://issues.apache.org/jira/browse/KAFKA-3496
> Project: Kafka
> Issue Type: Improvement
> Components: clients
> Affects Versions: 0.10.0.0
> Reporter: Florian Hussonnois
>
> Currently the client reconnection attempts is only controlled by the property
> : reconnect.backoff.ms
> It would be nice to introduce a reconnect attempt policy. At first, two
> policies may be defined :
> - ConstantReconnectAttemptPolicy
> - ExponentialReconnectAttemptPolicy
> The policy could be then configure as follows :
> Properties config = new Properties();
> config.put(ConsumerConfig.RECONNECT_ATTEMPTS_POLICY_CLASS_CONFIG,
> "org.apache.kafka.clients.ExponentialReconnectAttemptPolicy");
> config.put(ConsumerConfig.RECONNECT_EXPONENTIAL_MAX_DELAY_MS_CONFIG, 5000);
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)