This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new 277dc67  KAFKA-10793: move handling of FindCoordinatorFuture to fix 
race condition (#9671)
277dc67 is described below

commit 277dc6720cdd3aa495db42cc522d610b415296a0
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Tue Jan 26 19:05:37 2021 -0800

    KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition 
(#9671)
    
    Fixes a tricky race condition between the consumer and hb thread can lead 
to a failed but non-null findCoordinatorFuture, causing the AbstractCoordinator 
to wait endlessly on the request which it thinks is still in flight. We should 
move the handling of this future out of the listener callbacks and into the 
ensureCoordinatorReady() method where we can check the exception and clear the 
future all in one place.
    
    Reviewers: Guozhang Wang <[email protected]>
---
 .../consumer/internals/AbstractCoordinator.java    | 59 ++++++++++++++--------
 1 file changed, 37 insertions(+), 22 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index f05a1d9..f8120e7 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -137,10 +137,11 @@ public abstract class AbstractCoordinator implements 
Closeable {
     private HeartbeatThread heartbeatThread = null;
     private RequestFuture<ByteBuffer> joinFuture = null;
     private RequestFuture<Void> findCoordinatorFuture = null;
-    volatile private RuntimeException findCoordinatorException = null;
+    private volatile RuntimeException fatalFindCoordinatorException = null;
     private Generation generation = Generation.NO_GENERATION;
     private long lastRebalanceStartMs = -1L;
     private long lastRebalanceEndMs = -1L;
+    private long lastTimeOfConnectionMs = -1L; // starting logging a warning 
only after unable to connect for a while
 
     protected MemberState state = MemberState.UNJOINED;
 
@@ -235,9 +236,9 @@ public abstract class AbstractCoordinator implements 
Closeable {
             return true;
 
         do {
-            if (findCoordinatorException != null && !(findCoordinatorException 
instanceof RetriableException)) {
-                final RuntimeException fatalException = 
findCoordinatorException;
-                findCoordinatorException = null;
+            if (fatalFindCoordinatorException != null) {
+                final RuntimeException fatalException = 
fatalFindCoordinatorException;
+                fatalFindCoordinatorException = null;
                 throw fatalException;
             }
             final RequestFuture<Void> future = lookupCoordinator();
@@ -248,18 +249,26 @@ public abstract class AbstractCoordinator implements 
Closeable {
                 break;
             }
 
+            RuntimeException fatalException = null;
+
             if (future.failed()) {
                 if (future.isRetriable()) {
                     log.debug("Coordinator discovery failed, refreshing 
metadata");
                     client.awaitMetadataUpdate(timer);
-                } else
-                    throw future.exception();
+                } else {
+                    log.info("FindCoordinator request hit fatal exception", 
fatalException);
+                    fatalException = future.exception();
+                }
             } else if (coordinator != null && 
client.isUnavailable(coordinator)) {
                 // we found the coordinator, but the connection has failed, so 
mark
                 // it dead and backoff before retrying discovery
                 markCoordinatorUnknown();
                 timer.sleep(rebalanceConfig.retryBackoffMs);
             }
+
+            clearFindCoordinatorFuture();
+            if (fatalException !=  null)
+                throw fatalException;
         } while (coordinatorUnknown() && timer.notExpired());
 
         return !coordinatorUnknown();
@@ -274,17 +283,6 @@ public abstract class AbstractCoordinator implements 
Closeable {
                 return RequestFuture.noBrokersAvailable();
             } else {
                 findCoordinatorFuture = sendFindCoordinatorRequest(node);
-                // remember the exception even after the future is cleared so 
that
-                // it can still be thrown by the ensureCoordinatorReady caller
-                findCoordinatorFuture.addListener(new 
RequestFutureListener<Void>() {
-                    @Override
-                    public void onSuccess(Void value) {} // do nothing
-
-                    @Override
-                    public void onFailure(RuntimeException e) {
-                        findCoordinatorException = e;
-                    }
-                });
             }
         }
         return findCoordinatorFuture;
@@ -824,7 +822,6 @@ public abstract class AbstractCoordinator implements 
Closeable {
         @Override
         public void onSuccess(ClientResponse resp, RequestFuture<Void> future) 
{
             log.debug("Received FindCoordinator response {}", resp);
-            clearFindCoordinatorFuture();
 
             FindCoordinatorResponse findCoordinatorResponse = 
(FindCoordinatorResponse) resp.responseBody();
             Errors error = findCoordinatorResponse.error();
@@ -853,7 +850,13 @@ public abstract class AbstractCoordinator implements 
Closeable {
 
         @Override
         public void onFailure(RuntimeException e, RequestFuture<Void> future) {
-            clearFindCoordinatorFuture();
+            log.debug("FindCoordinator request failed due to {}", e);
+
+            if (!(e instanceof RetriableException)) {
+                // Remember the exception if fatal so we can ensure it gets 
thrown by the main thread
+                fatalFindCoordinatorException = e;
+            }
+
             super.onFailure(e, future);
         }
     }
@@ -902,6 +905,12 @@ public abstract class AbstractCoordinator implements 
Closeable {
             // Pending callbacks will be invoked with a DisconnectException on 
the next call to poll.
             if (!isDisconnected)
                 client.disconnectAsync(oldCoordinator);
+
+            lastTimeOfConnectionMs = time.milliseconds();
+        } else {
+            long durationOfOngoingDisconnect = time.milliseconds() - 
lastTimeOfConnectionMs;
+            if (durationOfOngoingDisconnect > 
rebalanceConfig.rebalanceTimeoutMs)
+                log.warn("Consumer has been disconnected from the group 
coordinator for {}ms", durationOfOngoingDisconnect);
         }
     }
 
@@ -1342,10 +1351,16 @@ public abstract class AbstractCoordinator implements 
Closeable {
                         long now = time.milliseconds();
 
                         if (coordinatorUnknown()) {
-                            if (findCoordinatorFuture != null || 
lookupCoordinator().failed())
-                                // the immediate future check ensures that we 
backoff properly in the case that no
-                                // brokers are available to connect to.
+                            if (findCoordinatorFuture != null) {
+                                // clear the future so that after the backoff, 
if the hb still sees coordinator unknown in
+                                // the next iteration it will try to 
re-discover the coordinator in case the main thread cannot
+                                clearFindCoordinatorFuture();
+
+                                // backoff properly
                                 
AbstractCoordinator.this.wait(rebalanceConfig.retryBackoffMs);
+                            } else {
+                                lookupCoordinator();
+                            }
                         } else if (heartbeat.sessionTimeoutExpired(now)) {
                             // the session timeout has expired without seeing 
a successful heartbeat, so we should
                             // probably make sure the coordinator is still 
healthy.

Reply via email to