Repository: kafka Updated Branches: refs/heads/trunk 6ed3e6b1c -> 7b16b4731
KAFKA-4066; Fix NPE in consumer due to multi-threaded updates Author: Rajini Sivaram <[email protected]> Reviewers: Jason Gustafson <[email protected]>, Ismael Juma <[email protected]> Closes #1763 from rajinisivaram/KAFKA-4066 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7b16b473 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7b16b473 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7b16b473 Branch: refs/heads/trunk Commit: 7b16b4731666ff321fbe46828d526872ff5f56d7 Parents: 6ed3e6b Author: Rajini Sivaram <[email protected]> Authored: Mon Aug 22 21:34:26 2016 +0100 Committer: Ismael Juma <[email protected]> Committed: Mon Aug 22 21:34:26 2016 +0100 ---------------------------------------------------------------------- .../consumer/internals/AbstractCoordinator.java | 32 +++++++++----------- 1 file changed, 15 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/7b16b473/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index bf6b920..f2e15ca 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -206,24 +206,16 @@ public abstract class AbstractCoordinator implements Closeable { } } - protected RequestFuture<Void> lookupCoordinator() { - if (findCoordinatorFuture == null) { + protected synchronized RequestFuture<Void> lookupCoordinator() { + if (findCoordinatorFuture == null) findCoordinatorFuture = sendGroupCoordinatorRequest(); - findCoordinatorFuture.addListener(new RequestFutureListener<Void>() { - @Override - public void onSuccess(Void value) { - findCoordinatorFuture = null; - } - - @Override - public void onFailure(RuntimeException e) { - findCoordinatorFuture = null; - } - }); - } return findCoordinatorFuture; } + private synchronized void clearFindCoordinatorFuture() { + findCoordinatorFuture = null; + } + /** * Check whether the group should be rejoined (e.g. if metadata changes) * @return true if it should, false otherwise @@ -532,6 +524,7 @@ public abstract class AbstractCoordinator implements Closeable { // for the coordinator in the underlying network client layer // TODO: this needs to be better handled in KAFKA-1935 Errors error = Errors.forCode(groupCoordinatorResponse.errorCode()); + clearFindCoordinatorFuture(); if (error == Errors.NONE) { synchronized (AbstractCoordinator.this) { AbstractCoordinator.this.coordinator = new Node( @@ -550,6 +543,12 @@ public abstract class AbstractCoordinator implements Closeable { future.raise(error); } } + + @Override + public void onFailure(RuntimeException e, RequestFuture<Void> future) { + clearFindCoordinatorFuture(); + super.onFailure(e, future); + } } /** @@ -820,7 +819,6 @@ public abstract class AbstractCoordinator implements Closeable { @Override public void run() { try { - RequestFuture findCoordinatorFuture = null; while (true) { synchronized (AbstractCoordinator.this) { @@ -843,8 +841,8 @@ public abstract class AbstractCoordinator implements Closeable { long now = time.milliseconds(); if (coordinatorUnknown()) { - if (findCoordinatorFuture == null || findCoordinatorFuture.isDone()) - findCoordinatorFuture = lookupCoordinator(); + if (findCoordinatorFuture == null) + lookupCoordinator(); else AbstractCoordinator.this.wait(retryBackoffMs); } else if (heartbeat.sessionTimeoutExpired(now)) {
