This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.6 by this push:
new 277dc67 KAFKA-10793: move handling of FindCoordinatorFuture to fix
race condition (#9671)
277dc67 is described below
commit 277dc6720cdd3aa495db42cc522d610b415296a0
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Tue Jan 26 19:05:37 2021 -0800
KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition
(#9671)
Fixes a tricky race condition between the consumer and hb thread can lead
to a failed but non-null findCoordinatorFuture, causing the AbstractCoordinator
to wait endlessly on the request which it thinks is still in flight. We should
move the handling of this future out of the listener callbacks and into the
ensureCoordinatorReady() method where we can check the exception and clear the
future all in one place.
Reviewers: Guozhang Wang <[email protected]>
---
.../consumer/internals/AbstractCoordinator.java | 59 ++++++++++++++--------
1 file changed, 37 insertions(+), 22 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index f05a1d9..f8120e7 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -137,10 +137,11 @@ public abstract class AbstractCoordinator implements
Closeable {
private HeartbeatThread heartbeatThread = null;
private RequestFuture<ByteBuffer> joinFuture = null;
private RequestFuture<Void> findCoordinatorFuture = null;
- volatile private RuntimeException findCoordinatorException = null;
+ private volatile RuntimeException fatalFindCoordinatorException = null;
private Generation generation = Generation.NO_GENERATION;
private long lastRebalanceStartMs = -1L;
private long lastRebalanceEndMs = -1L;
+ private long lastTimeOfConnectionMs = -1L; // starting logging a warning
only after unable to connect for a while
protected MemberState state = MemberState.UNJOINED;
@@ -235,9 +236,9 @@ public abstract class AbstractCoordinator implements
Closeable {
return true;
do {
- if (findCoordinatorException != null && !(findCoordinatorException
instanceof RetriableException)) {
- final RuntimeException fatalException =
findCoordinatorException;
- findCoordinatorException = null;
+ if (fatalFindCoordinatorException != null) {
+ final RuntimeException fatalException =
fatalFindCoordinatorException;
+ fatalFindCoordinatorException = null;
throw fatalException;
}
final RequestFuture<Void> future = lookupCoordinator();
@@ -248,18 +249,26 @@ public abstract class AbstractCoordinator implements
Closeable {
break;
}
+ RuntimeException fatalException = null;
+
if (future.failed()) {
if (future.isRetriable()) {
log.debug("Coordinator discovery failed, refreshing
metadata");
client.awaitMetadataUpdate(timer);
- } else
- throw future.exception();
+ } else {
+ log.info("FindCoordinator request hit fatal exception",
fatalException);
+ fatalException = future.exception();
+ }
} else if (coordinator != null &&
client.isUnavailable(coordinator)) {
// we found the coordinator, but the connection has failed, so
mark
// it dead and backoff before retrying discovery
markCoordinatorUnknown();
timer.sleep(rebalanceConfig.retryBackoffMs);
}
+
+ clearFindCoordinatorFuture();
+ if (fatalException != null)
+ throw fatalException;
} while (coordinatorUnknown() && timer.notExpired());
return !coordinatorUnknown();
@@ -274,17 +283,6 @@ public abstract class AbstractCoordinator implements
Closeable {
return RequestFuture.noBrokersAvailable();
} else {
findCoordinatorFuture = sendFindCoordinatorRequest(node);
- // remember the exception even after the future is cleared so
that
- // it can still be thrown by the ensureCoordinatorReady caller
- findCoordinatorFuture.addListener(new
RequestFutureListener<Void>() {
- @Override
- public void onSuccess(Void value) {} // do nothing
-
- @Override
- public void onFailure(RuntimeException e) {
- findCoordinatorException = e;
- }
- });
}
}
return findCoordinatorFuture;
@@ -824,7 +822,6 @@ public abstract class AbstractCoordinator implements
Closeable {
@Override
public void onSuccess(ClientResponse resp, RequestFuture<Void> future)
{
log.debug("Received FindCoordinator response {}", resp);
- clearFindCoordinatorFuture();
FindCoordinatorResponse findCoordinatorResponse =
(FindCoordinatorResponse) resp.responseBody();
Errors error = findCoordinatorResponse.error();
@@ -853,7 +850,13 @@ public abstract class AbstractCoordinator implements
Closeable {
@Override
public void onFailure(RuntimeException e, RequestFuture<Void> future) {
- clearFindCoordinatorFuture();
+ log.debug("FindCoordinator request failed due to {}", e);
+
+ if (!(e instanceof RetriableException)) {
+ // Remember the exception if fatal so we can ensure it gets
thrown by the main thread
+ fatalFindCoordinatorException = e;
+ }
+
super.onFailure(e, future);
}
}
@@ -902,6 +905,12 @@ public abstract class AbstractCoordinator implements
Closeable {
// Pending callbacks will be invoked with a DisconnectException on
the next call to poll.
if (!isDisconnected)
client.disconnectAsync(oldCoordinator);
+
+ lastTimeOfConnectionMs = time.milliseconds();
+ } else {
+ long durationOfOngoingDisconnect = time.milliseconds() -
lastTimeOfConnectionMs;
+ if (durationOfOngoingDisconnect >
rebalanceConfig.rebalanceTimeoutMs)
+ log.warn("Consumer has been disconnected from the group
coordinator for {}ms", durationOfOngoingDisconnect);
}
}
@@ -1342,10 +1351,16 @@ public abstract class AbstractCoordinator implements
Closeable {
long now = time.milliseconds();
if (coordinatorUnknown()) {
- if (findCoordinatorFuture != null ||
lookupCoordinator().failed())
- // the immediate future check ensures that we
backoff properly in the case that no
- // brokers are available to connect to.
+ if (findCoordinatorFuture != null) {
+ // clear the future so that after the backoff,
if the hb still sees coordinator unknown in
+ // the next iteration it will try to
re-discover the coordinator in case the main thread cannot
+ clearFindCoordinatorFuture();
+
+ // backoff properly
AbstractCoordinator.this.wait(rebalanceConfig.retryBackoffMs);
+ } else {
+ lookupCoordinator();
+ }
} else if (heartbeat.sessionTimeoutExpired(now)) {
// the session timeout has expired without seeing
a successful heartbeat, so we should
// probably make sure the coordinator is still
healthy.