Repository: kafka
Updated Branches:
  refs/heads/trunk 916edc3a4 -> dcea49856


KAFKA-4777; Backoff properly in consumer heartbeat thread if no brokers are 
available

Author: Allen Xiang <allen.xi...@monsanto.com>

Reviewers: Jason Gustafson <ja...@confluent.io>

Closes #2564 from allenxiang/client-heartbeat-fix


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/dcea4985
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/dcea4985
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/dcea4985

Branch: refs/heads/trunk
Commit: dcea49856805a039f0859facf169a87a574c06d3
Parents: 916edc3
Author: Allen Xiang <allen.xi...@monsanto.com>
Authored: Sat Feb 18 09:25:28 2017 -0800
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Sat Feb 18 09:25:28 2017 -0800

----------------------------------------------------------------------
 .../clients/consumer/internals/AbstractCoordinator.java      | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/dcea4985/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 350a84b..1c2d607 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -888,9 +888,9 @@ public abstract class AbstractCoordinator implements 
Closeable {
                         long now = time.milliseconds();
 
                         if (coordinatorUnknown()) {
-                            if (findCoordinatorFuture == null)
-                                lookupCoordinator();
-                            else
+                            if (findCoordinatorFuture != null || 
lookupCoordinator().failed())
+                                // the immediate future check ensures that we 
backoff properly in the case that no
+                                // brokers are available to connect to.
                                 AbstractCoordinator.this.wait(retryBackoffMs);
                         } else if (heartbeat.sessionTimeoutExpired(now)) {
                             // the session timeout has expired without seeing 
a successful heartbeat, so we should
@@ -941,7 +941,7 @@ public abstract class AbstractCoordinator implements 
Closeable {
                 log.error("Unexpected interrupt received in heartbeat thread 
for group {}", groupId, e);
                 this.failed.set(new RuntimeException(e));
             } catch (RuntimeException e) {
-                log.error("Heartbeat thread for group {} failed due to 
unexpected error" , groupId, e);
+                log.error("Heartbeat thread for group {} failed due to 
unexpected error", groupId, e);
                 this.failed.set(e);
             } finally {
                 log.debug("Heartbeat thread for group {} has closed", groupId);

Reply via email to