Repository: kafka
Updated Branches:
  refs/heads/trunk 8b2995c31 -> abe699176


KAFKA-3878; Support exponential backoff policy via reconnect.backoff.max 
(KIP-144)

Summary:
- add `reconnect.backoff.max.ms` common client configuration parameter
- if `reconnect.backoff.max.ms` > `reconnect.backoff.ms`, apply an exponential 
backoff policy
- apply +/- 20% random jitter to smooth cluster reconnects

Author: Dana Powers <dana.pow...@gmail.com>

Reviewers: Ewen Cheslack-Postava <m...@ewencp.org>, Roger Hoover 
<roger.hoo...@gmail.com>, Ismael Juma <ism...@juma.me.uk>

Closes #1523 from dpkp/exp_backoff


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/abe69917
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/abe69917
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/abe69917

Branch: refs/heads/trunk
Commit: abe699176babe4f065b67b2b72d20daa0a2e46a1
Parents: 8b2995c
Author: Dana Powers <dana.pow...@gmail.com>
Authored: Fri May 19 14:03:22 2017 +0100
Committer: Ismael Juma <ism...@juma.me.uk>
Committed: Fri May 19 14:06:40 2017 +0100

----------------------------------------------------------------------
 .../kafka/clients/ClusterConnectionStates.java  | 60 +++++++++++++++++---
 .../kafka/clients/CommonClientConfigs.java      |  5 +-
 .../org/apache/kafka/clients/NetworkClient.java | 17 ++++--
 .../kafka/clients/admin/AdminClientConfig.java  | 12 ++++
 .../kafka/clients/admin/KafkaAdminClient.java   |  1 +
 .../kafka/clients/consumer/ConsumerConfig.java  | 11 ++++
 .../kafka/clients/consumer/KafkaConsumer.java   |  1 +
 .../kafka/clients/producer/KafkaProducer.java   |  1 +
 .../kafka/clients/producer/ProducerConfig.java  |  4 ++
 .../apache/kafka/clients/NetworkClientTest.java |  9 ++-
 .../runtime/distributed/DistributedConfig.java  |  6 ++
 .../runtime/distributed/WorkerGroupMember.java  |  1 +
 .../main/scala/kafka/admin/AdminClient.scala    |  2 +
 .../controller/ControllerChannelManager.scala   |  1 +
 .../TransactionMarkerChannelManager.scala       |  1 +
 .../main/scala/kafka/server/KafkaServer.scala   |  1 +
 .../server/ReplicaFetcherBlockingSend.scala     |  3 +-
 .../org/apache/kafka/streams/StreamsConfig.java | 10 ++++
 .../processor/internals/StreamsKafkaClient.java |  1 +
 19 files changed, 128 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/abe69917/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
index 9bde1a2..32a222b 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.clients;
 
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -24,12 +25,17 @@ import java.util.Map;
  * 
  */
 final class ClusterConnectionStates {
-    private final long reconnectBackoffMs;
+    private final long reconnectBackoffInitMs;
+    private final long reconnectBackoffMaxMs;
+    private final static int RECONNECT_BACKOFF_EXP_BASE = 2;
+    private final double reconnectBackoffMaxExp;
     private final Map<String, NodeConnectionState> nodeState;
 
-    public ClusterConnectionStates(long reconnectBackoffMs) {
-        this.reconnectBackoffMs = reconnectBackoffMs;
-        this.nodeState = new HashMap<String, NodeConnectionState>();
+    public ClusterConnectionStates(long reconnectBackoffMs, long 
reconnectBackoffMaxMs) {
+        this.reconnectBackoffInitMs = reconnectBackoffMs;
+        this.reconnectBackoffMaxMs = reconnectBackoffMaxMs;
+        this.reconnectBackoffMaxExp = Math.log(this.reconnectBackoffMaxMs / 
(double) Math.max(reconnectBackoffMs, 1)) / 
Math.log(RECONNECT_BACKOFF_EXP_BASE);
+        this.nodeState = new HashMap<>();
     }
 
     /**
@@ -44,7 +50,7 @@ final class ClusterConnectionStates {
         if (state == null)
             return true;
         else
-            return state.state == ConnectionState.DISCONNECTED && now - 
state.lastConnectAttemptMs >= this.reconnectBackoffMs;
+            return state.state == ConnectionState.DISCONNECTED && now - 
state.lastConnectAttemptMs >= state.reconnectBackoffMs;
     }
 
     /**
@@ -57,7 +63,7 @@ final class ClusterConnectionStates {
         if (state == null)
             return false;
         else
-            return state.state == ConnectionState.DISCONNECTED && now - 
state.lastConnectAttemptMs < this.reconnectBackoffMs;
+            return state.state == ConnectionState.DISCONNECTED && now - 
state.lastConnectAttemptMs < state.reconnectBackoffMs;
     }
 
     /**
@@ -72,7 +78,7 @@ final class ClusterConnectionStates {
         if (state == null) return 0;
         long timeWaited = now - state.lastConnectAttemptMs;
         if (state.state == ConnectionState.DISCONNECTED) {
-            return Math.max(this.reconnectBackoffMs - timeWaited, 0);
+            return Math.max(state.reconnectBackoffMs - timeWaited, 0);
         } else {
             // When connecting or connected, we should be able to delay 
indefinitely since other events (connection or
             // data acked) will cause a wakeup once data can be sent.
@@ -95,7 +101,7 @@ final class ClusterConnectionStates {
      * @param now the current time
      */
     public void connecting(String id, long now) {
-        nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, 
now));
+        nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, 
now, this.reconnectBackoffInitMs));
     }
 
     /**
@@ -107,6 +113,7 @@ final class ClusterConnectionStates {
         NodeConnectionState nodeState = nodeState(id);
         nodeState.state = ConnectionState.DISCONNECTED;
         nodeState.lastConnectAttemptMs = now;
+        updateReconnectBackoff(nodeState);
     }
 
     /**
@@ -125,6 +132,7 @@ final class ClusterConnectionStates {
     public void ready(String id) {
         NodeConnectionState nodeState = nodeState(id);
         nodeState.state = ConnectionState.READY;
+        resetReconnectBackoff(nodeState);
     }
 
     /**
@@ -146,6 +154,36 @@ final class ClusterConnectionStates {
     }
 
     /**
+     * Resets the failure count for a node and sets the reconnect backoff to 
the base
+     * value configured via reconnect.backoff.ms
+     *
+     * @param nodeState The node state object to update
+     */
+    public void resetReconnectBackoff(NodeConnectionState nodeState) {
+        nodeState.failedAttempts = 0;
+        nodeState.reconnectBackoffMs = this.reconnectBackoffInitMs;
+    }
+
+    /**
+     * Update the node reconnect backoff exponentially.
+     * The delay is reconnect.backoff.ms * 2**(failures - 1) * (+/- 20% random 
jitter)
+     * Up to a (pre-jitter) maximum of reconnect.backoff.max.ms
+     *
+     * @param nodeState The node state object to update
+     */
+    public void updateReconnectBackoff(NodeConnectionState nodeState) {
+        if (this.reconnectBackoffMaxMs > this.reconnectBackoffInitMs) {
+            nodeState.failedAttempts += 1;
+            double backoffExp = Math.min(nodeState.failedAttempts - 1, 
this.reconnectBackoffMaxExp);
+            double backoffFactor = Math.pow(RECONNECT_BACKOFF_EXP_BASE, 
backoffExp);
+            long reconnectBackoffMs = (long) (this.reconnectBackoffInitMs * 
backoffFactor);
+            // Actual backoff is randomized to avoid connection storms.
+            double randomFactor = ThreadLocalRandom.current().nextDouble(0.8, 
1.2);
+            nodeState.reconnectBackoffMs = (long) (randomFactor * 
reconnectBackoffMs);
+        }
+    }
+
+    /**
      * Remove the given node from the tracked connection states. The main 
difference between this and `disconnected`
      * is the impact on `connectionDelay`: it will be 0 after this call 
whereas `reconnectBackoffMs` will be taken
      * into account after `disconnected` is called.
@@ -183,10 +221,14 @@ final class ClusterConnectionStates {
 
         ConnectionState state;
         long lastConnectAttemptMs;
+        long failedAttempts;
+        long reconnectBackoffMs;
 
-        public NodeConnectionState(ConnectionState state, long 
lastConnectAttempt) {
+        public NodeConnectionState(ConnectionState state, long 
lastConnectAttempt, long reconnectBackoffMs) {
             this.state = state;
             this.lastConnectAttemptMs = lastConnectAttempt;
+            this.failedAttempts = 0;
+            this.reconnectBackoffMs = reconnectBackoffMs;
         }
 
         public String toString() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/abe69917/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 
b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
index e06900c..f7103be 100644
--- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
@@ -50,7 +50,10 @@ public class CommonClientConfigs {
     public static final String CLIENT_ID_DOC = "An id string to pass to the 
server when making requests. The purpose of this is to be able to track the 
source of requests beyond just ip/port by allowing a logical application name 
to be included in server-side request logging.";
 
     public static final String RECONNECT_BACKOFF_MS_CONFIG = 
"reconnect.backoff.ms";
-    public static final String RECONNECT_BACKOFF_MS_DOC = "The amount of time 
to wait before attempting to reconnect to a given host. This avoids repeatedly 
connecting to a host in a tight loop. This backoff applies to all requests sent 
by the consumer to the broker.";
+    public static final String RECONNECT_BACKOFF_MS_DOC = "The base amount of 
time to wait before attempting to reconnect to a given host. This avoids 
repeatedly connecting to a host in a tight loop. This backoff applies to all 
connection attempts by the client to a broker.";
+
+    public static final String RECONNECT_BACKOFF_MAX_MS_CONFIG = 
"reconnect.backoff.max.ms";
+    public static final String RECONNECT_BACKOFF_MAX_MS_DOC = "The maximum 
amount of time in milliseconds to wait when reconnectng to a broker that has 
repeatedly failed to connect. If provided, the backoff per host will increase 
exponentially for each consecutive connection failure, up to this maximum. 
After calculating the backoff increase, 20% random jitter is added to avoid 
connection storms.";
 
     public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms";
     public static final String RETRY_BACKOFF_MS_DOC = "The amount of time to 
wait before attempting to retry a failed request to a given topic partition. 
This avoids repeatedly sending requests in a tight loop under some failure 
scenarios.";

http://git-wip-us.apache.org/repos/asf/kafka/blob/abe69917/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index a09f85d..8708218 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -109,14 +109,17 @@ public class NetworkClient implements KafkaClient {
                          String clientId,
                          int maxInFlightRequestsPerConnection,
                          long reconnectBackoffMs,
+                         long reconnectBackoffMax,
                          int socketSendBuffer,
                          int socketReceiveBuffer,
                          int requestTimeoutMs,
                          Time time,
                          boolean discoverBrokerVersions,
                          ApiVersions apiVersions) {
-        this(null, metadata, selector, clientId, 
maxInFlightRequestsPerConnection, reconnectBackoffMs,
-                socketSendBuffer, socketReceiveBuffer, requestTimeoutMs, time, 
discoverBrokerVersions, apiVersions);
+        this(null, metadata, selector, clientId, 
maxInFlightRequestsPerConnection,
+             reconnectBackoffMs, reconnectBackoffMax,
+             socketSendBuffer, socketReceiveBuffer, requestTimeoutMs, time,
+             discoverBrokerVersions, apiVersions);
     }
 
     public NetworkClient(Selectable selector,
@@ -124,14 +127,17 @@ public class NetworkClient implements KafkaClient {
                          String clientId,
                          int maxInFlightRequestsPerConnection,
                          long reconnectBackoffMs,
+                         long reconnectBackoffMax,
                          int socketSendBuffer,
                          int socketReceiveBuffer,
                          int requestTimeoutMs,
                          Time time,
                          boolean discoverBrokerVersions,
                          ApiVersions apiVersions) {
-        this(metadataUpdater, null, selector, clientId, 
maxInFlightRequestsPerConnection, reconnectBackoffMs,
-                socketSendBuffer, socketReceiveBuffer, requestTimeoutMs, time, 
discoverBrokerVersions, apiVersions);
+        this(metadataUpdater, null, selector, clientId, 
maxInFlightRequestsPerConnection,
+             reconnectBackoffMs, reconnectBackoffMax,
+             socketSendBuffer, socketReceiveBuffer, requestTimeoutMs, time,
+             discoverBrokerVersions, apiVersions);
     }
 
     private NetworkClient(MetadataUpdater metadataUpdater,
@@ -140,6 +146,7 @@ public class NetworkClient implements KafkaClient {
                           String clientId,
                           int maxInFlightRequestsPerConnection,
                           long reconnectBackoffMs,
+                          long reconnectBackoffMax,
                           int socketSendBuffer,
                           int socketReceiveBuffer,
                           int requestTimeoutMs,
@@ -160,7 +167,7 @@ public class NetworkClient implements KafkaClient {
         this.selector = selector;
         this.clientId = clientId;
         this.inFlightRequests = new 
InFlightRequests(maxInFlightRequestsPerConnection);
-        this.connectionStates = new 
ClusterConnectionStates(reconnectBackoffMs);
+        this.connectionStates = new 
ClusterConnectionStates(reconnectBackoffMs, reconnectBackoffMax);
         this.socketSendBuffer = socketSendBuffer;
         this.socketReceiveBuffer = socketReceiveBuffer;
         this.correlation = 0;

http://git-wip-us.apache.org/repos/asf/kafka/blob/abe69917/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
index 368a42e..62a48a8 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
@@ -49,6 +49,12 @@ public class AdminClientConfig extends AbstractConfig {
     private static final String RECONNECT_BACKOFF_MS_DOC = 
CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC;
 
     /**
+     * <code>reconnect.backoff.max.ms</code>
+     */
+    public static final String RECONNECT_BACKOFF_MAX_MS_CONFIG = 
CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG;
+    private static final String RECONNECT_BACKOFF_MAX_MS_DOC = 
CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_DOC;
+
+    /**
      * <code>retry.backoff.ms</code>
      */
     public static final String RETRY_BACKOFF_MS_CONFIG = 
CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG;
@@ -107,6 +113,12 @@ public class AdminClientConfig extends AbstractConfig {
                                         atLeast(0L),
                                         Importance.LOW,
                                         RECONNECT_BACKOFF_MS_DOC)
+                                .define(RECONNECT_BACKOFF_MAX_MS_CONFIG,
+                                        Type.LONG,
+                                        1000L,
+                                        atLeast(0L),
+                                        Importance.LOW,
+                                        RECONNECT_BACKOFF_MAX_MS_DOC)
                                 .define(RETRY_BACKOFF_MS_CONFIG,
                                         Type.LONG,
                                         100L,

http://git-wip-us.apache.org/repos/asf/kafka/blob/abe69917/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 76919ee..355dc9c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -291,6 +291,7 @@ public class KafkaAdminClient extends AdminClient {
                 clientId,
                 100,
                 config.getLong(AdminClientConfig.RECONNECT_BACKOFF_MS_CONFIG),
+                
config.getLong(AdminClientConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
                 config.getInt(AdminClientConfig.SEND_BUFFER_CONFIG),
                 config.getInt(AdminClientConfig.RECEIVE_BUFFER_CONFIG),
                 config.getInt(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG),

http://git-wip-us.apache.org/repos/asf/kafka/blob/abe69917/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 0c0c29d..b838b14 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -167,6 +167,11 @@ public class ConsumerConfig extends AbstractConfig {
     public static final String RECONNECT_BACKOFF_MS_CONFIG = 
CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG;
 
     /**
+     * <code>reconnect.backoff.max.ms</code>
+     */
+    public static final String RECONNECT_BACKOFF_MAX_MS_CONFIG = 
CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG;
+
+    /**
      * <code>retry.backoff.ms</code>
      */
     public static final String RETRY_BACKOFF_MS_CONFIG = 
CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG;
@@ -333,6 +338,12 @@ public class ConsumerConfig extends AbstractConfig {
                                         atLeast(0L),
                                         Importance.LOW,
                                         
CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC)
+                                .define(RECONNECT_BACKOFF_MAX_MS_CONFIG,
+                                        Type.LONG,
+                                        1000L,
+                                        atLeast(0L),
+                                        Importance.LOW,
+                                        
CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_DOC)
                                 .define(RETRY_BACKOFF_MS_CONFIG,
                                         Type.LONG,
                                         100L,

http://git-wip-us.apache.org/repos/asf/kafka/blob/abe69917/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index aad4453..6bcc086 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -673,6 +673,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                     clientId,
                     100, // a fixed large enough value will suffice for max 
in-flight requests
                     config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG),
+                    
config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
                     config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG),
                     config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG),
                     config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),

http://git-wip-us.apache.org/repos/asf/kafka/blob/abe69917/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 71fb077..1ba13b2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -297,6 +297,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                     clientId,
                     maxInflightRequests,
                     config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
+                    
config.getLong(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
                     config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
                     config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
                     this.requestTimeoutMs,

http://git-wip-us.apache.org/repos/asf/kafka/blob/abe69917/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 4208a90..399a5a5 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -116,6 +116,9 @@ public class ProducerConfig extends AbstractConfig {
     /** <code>reconnect.backoff.ms</code> */
     public static final String RECONNECT_BACKOFF_MS_CONFIG = 
CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG;
 
+    /** <code>reconnect.backoff.max.ms</code> */
+    public static final String RECONNECT_BACKOFF_MAX_MS_CONFIG = 
CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG;
+
     /** <code>max.block.ms</code> */
     public static final String MAX_BLOCK_MS_CONFIG = "max.block.ms";
     private static final String MAX_BLOCK_MS_DOC = "The configuration controls 
how long <code>KafkaProducer.send()</code> and 
<code>KafkaProducer.partitionsFor()</code> will block."
@@ -238,6 +241,7 @@ public class ProducerConfig extends AbstractConfig {
                                         Importance.MEDIUM,
                                         MAX_REQUEST_SIZE_DOC)
                                 .define(RECONNECT_BACKOFF_MS_CONFIG, 
Type.LONG, 50L, atLeast(0L), Importance.LOW, 
CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC)
+                                .define(RECONNECT_BACKOFF_MAX_MS_CONFIG, 
Type.LONG, 1000L, atLeast(0L), Importance.LOW, 
CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_DOC)
                                 .define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, 
100L, atLeast(0L), Importance.LOW, CommonClientConfigs.RETRY_BACKOFF_MS_DOC)
                                 .define(MAX_BLOCK_MS_CONFIG,
                                         Type.LONG,

http://git-wip-us.apache.org/repos/asf/kafka/blob/abe69917/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 
b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index 59a46ac..c87acd7 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -54,6 +54,7 @@ public class NetworkClientTest {
     protected final Cluster cluster = TestUtils.singletonCluster("test", 
nodeId);
     protected final Node node = cluster.nodes().get(0);
     protected final long reconnectBackoffMsTest = 10 * 1000;
+    protected final long reconnectBackoffMaxTest = 10 * 1000;
     protected final NetworkClient client = createNetworkClient();
 
     private final NetworkClient clientWithStaticNodes = 
createNetworkClientWithStaticNodes();
@@ -61,18 +62,20 @@ public class NetworkClientTest {
     private final NetworkClient clientWithNoVersionDiscovery = 
createNetworkClientWithNoVersionDiscovery();
 
     private NetworkClient createNetworkClient() {
-        return new NetworkClient(selector, metadata, "mock", 
Integer.MAX_VALUE, reconnectBackoffMsTest,
+        return new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE,
+                reconnectBackoffMsTest, reconnectBackoffMaxTest,
                 64 * 1024, 64 * 1024, requestTimeoutMs, time, true, new 
ApiVersions());
     }
 
     private NetworkClient createNetworkClientWithStaticNodes() {
         return new NetworkClient(selector, new 
ManualMetadataUpdater(Arrays.asList(node)),
-                "mock-static", Integer.MAX_VALUE, 0, 64 * 1024, 64 * 1024, 
requestTimeoutMs,
+                "mock-static", Integer.MAX_VALUE, 0, 0, 64 * 1024, 64 * 1024, 
requestTimeoutMs,
                 time, true, new ApiVersions());
     }
 
     private NetworkClient createNetworkClientWithNoVersionDiscovery() {
-        return new NetworkClient(selector, metadata, "mock", 
Integer.MAX_VALUE, reconnectBackoffMsTest,
+        return new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE,
+                reconnectBackoffMsTest, reconnectBackoffMaxTest,
                 64 * 1024, 64 * 1024, requestTimeoutMs, time, false, new 
ApiVersions());
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/abe69917/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
index eb3b876..8c6c4a4 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
@@ -182,6 +182,12 @@ public class DistributedConfig extends WorkerConfig {
                         atLeast(0L),
                         ConfigDef.Importance.LOW,
                         CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC)
+                .define(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG,
+                        ConfigDef.Type.LONG,
+                        50L,
+                        atLeast(0L),
+                        ConfigDef.Importance.LOW,
+                        CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_DOC)
                 .define(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG,
                         ConfigDef.Type.LONG,
                         100L,

http://git-wip-us.apache.org/repos/asf/kafka/blob/abe69917/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index 5ab8c9c..24d321e 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -98,6 +98,7 @@ public class WorkerGroupMember {
                     clientId,
                     100, // a fixed large enough value will suffice
                     
config.getLong(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG),
+                    
config.getLong(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG),
                     config.getInt(CommonClientConfigs.SEND_BUFFER_CONFIG),
                     config.getInt(CommonClientConfigs.RECEIVE_BUFFER_CONFIG),
                     
config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG),

http://git-wip-us.apache.org/repos/asf/kafka/blob/abe69917/core/src/main/scala/kafka/admin/AdminClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala 
b/core/src/main/scala/kafka/admin/AdminClient.scala
index d297b70..50198a7 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -373,6 +373,7 @@ object AdminClient {
   val DefaultRequestTimeoutMs = 5000
   val DefaultMaxInFlightRequestsPerConnection = 100
   val DefaultReconnectBackoffMs = 50
+  val DefaultReconnectBackoffMax = 50
   val DefaultSendBufferBytes = 128 * 1024
   val DefaultReceiveBufferBytes = 32 * 1024
   val DefaultRetryBackoffMs = 100
@@ -447,6 +448,7 @@ object AdminClient {
       "admin-" + AdminClientIdSequence.getAndIncrement(),
       DefaultMaxInFlightRequestsPerConnection,
       DefaultReconnectBackoffMs,
+      DefaultReconnectBackoffMax,
       DefaultSendBufferBytes,
       DefaultReceiveBufferBytes,
       requestTimeoutMs,

http://git-wip-us.apache.org/repos/asf/kafka/blob/abe69917/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala 
b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index c5a9678..41b9549 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -116,6 +116,7 @@ class ControllerChannelManager(controllerContext: 
ControllerContext, config: Kaf
         config.brokerId.toString,
         1,
         0,
+        0,
         Selectable.USE_DEFAULT_BUFFER_SIZE,
         Selectable.USE_DEFAULT_BUFFER_SIZE,
         config.requestTimeoutMs,

http://git-wip-us.apache.org/repos/asf/kafka/blob/abe69917/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
index 9aa3e70..7c42574 100644
--- 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
+++ 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
@@ -68,6 +68,7 @@ object TransactionMarkerChannelManager {
       s"broker-${config.brokerId}-txn-marker-sender",
       1,
       50,
+      50,
       Selectable.USE_DEFAULT_BUFFER_SIZE,
       config.socketReceiveBufferBytes,
       config.requestTimeoutMs,

http://git-wip-us.apache.org/repos/asf/kafka/blob/abe69917/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index 94dfa43..c7dac0d 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -385,6 +385,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
Time.SYSTEM, threadNameP
           config.brokerId.toString,
           1,
           0,
+          0,
           Selectable.USE_DEFAULT_BUFFER_SIZE,
           Selectable.USE_DEFAULT_BUFFER_SIZE,
           config.requestTimeoutMs,

http://git-wip-us.apache.org/repos/asf/kafka/blob/abe69917/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala 
b/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala
index 8ba3f60..c24cf0a 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala
@@ -73,6 +73,7 @@ class ReplicaFetcherBlockingSend(sourceBroker: BrokerEndPoint,
       clientId,
       1,
       0,
+      0,
       Selectable.USE_DEFAULT_BUFFER_SIZE,
       brokerConfig.replicaSocketReceiveBufferBytes,
       brokerConfig.requestTimeoutMs,
@@ -102,4 +103,4 @@ class ReplicaFetcherBlockingSend(sourceBroker: 
BrokerEndPoint,
   def close(): Unit = {
     networkClient.close()
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/abe69917/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index af9b8e7..deb5e89 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -209,6 +209,10 @@ public class StreamsConfig extends AbstractConfig {
     public static final String RECONNECT_BACKOFF_MS_CONFIG = 
CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG;
     private static final String RECONNECT_BACKOFF_MS_DOC = 
CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC;
 
+    /** {@code reconnect.backoff.max} */
+    public static final String RECONNECT_BACKOFF_MAX_MS_CONFIG = 
CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG;
+    private static final String RECONNECT_BACKOFF_MAX_MS_DOC = 
CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_DOC;
+
     /** {@code replication.factor} */
     public static final String REPLICATION_FACTOR_CONFIG = 
"replication.factor";
     private static final String REPLICATION_FACTOR_DOC = "The replication 
factor for change log topics and repartition topics created by the stream 
processing application.";
@@ -433,6 +437,12 @@ public class StreamsConfig extends AbstractConfig {
                     atLeast(0L),
                     ConfigDef.Importance.LOW,
                     RECONNECT_BACKOFF_MS_DOC)
+            .define(RECONNECT_BACKOFF_MAX_MS_CONFIG,
+                    ConfigDef.Type.LONG,
+                    50L,
+                    atLeast(0L),
+                    ConfigDef.Importance.LOW,
+                    RECONNECT_BACKOFF_MAX_MS_DOC)
             .define(SEND_BUFFER_CONFIG,
                     ConfigDef.Type.INT,
                     128 * 1024,

http://git-wip-us.apache.org/repos/asf/kafka/blob/abe69917/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
index 47ca8ae..c2b76f9 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
@@ -127,6 +127,7 @@ public class StreamsKafkaClient {
             streamsConfig.getString(StreamsConfig.CLIENT_ID_CONFIG),
             MAX_INFLIGHT_REQUESTS, // a fixed large enough value will suffice
             streamsConfig.getLong(StreamsConfig.RECONNECT_BACKOFF_MS_CONFIG),
+            
streamsConfig.getLong(StreamsConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
             streamsConfig.getInt(StreamsConfig.SEND_BUFFER_CONFIG),
             streamsConfig.getInt(StreamsConfig.RECEIVE_BUFFER_CONFIG),
             streamsConfig.getInt(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG),

Reply via email to