Repository: kafka Updated Branches: refs/heads/0.11.0 22288e805 -> c3f806d02
KAFKA-5263: Avoid tight polling loop in consumer with no ready nodes For consumers with manual partition assignment, await metadata when there are no ready nodes to avoid busy polling. Author: Rajini Sivaram <[email protected]> Reviewers: Jason Gustafson <[email protected]>, Ismael Juma <[email protected]> Closes #3124 from rajinisivaram/KAFKA-5263 (cherry picked from commit 64fc1a7cae348fad10e84c5ebc457c2a391573ee) Signed-off-by: Rajini Sivaram <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c3f806d0 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c3f806d0 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c3f806d0 Branch: refs/heads/0.11.0 Commit: c3f806d020e8b362ba1ae9e25eeb5ae155df0777 Parents: 22288e8 Author: Rajini Sivaram <[email protected]> Authored: Thu May 25 11:23:18 2017 +0100 Committer: Rajini Sivaram <[email protected]> Committed: Thu May 25 11:23:57 2017 +0100 ---------------------------------------------------------------------- .../kafka/clients/ClusterConnectionStates.java | 12 ++++++ .../org/apache/kafka/clients/KafkaClient.java | 5 +++ .../org/apache/kafka/clients/NetworkClient.java | 5 +++ .../kafka/clients/consumer/KafkaConsumer.java | 2 +- .../consumer/internals/ConsumerCoordinator.java | 42 ++++++++++++++------ .../internals/ConsumerNetworkClient.java | 10 +++-- .../org/apache/kafka/clients/MockClient.java | 5 +++ .../internals/ConsumerCoordinatorTest.java | 32 +++++++-------- 8 files changed, 79 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/c3f806d0/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java index 32a222b..4d4bedd 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java +++ b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java @@ -145,6 +145,18 @@ final class ClusterConnectionStates { } /** + * Return true if there is at least one node with connection in ready state and false otherwise. + */ + public boolean hasReadyNodes() { + for (Map.Entry<String, NodeConnectionState> entry : nodeState.entrySet()) { + NodeConnectionState state = entry.getValue(); + if (state != null && state.state == ConnectionState.READY) + return true; + } + return false; + } + + /** * Return true if the connection has been disconnected * @param id The id of the node to check */ http://git-wip-us.apache.org/repos/asf/kafka/blob/c3f806d0/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java index 9d63d43..d563fa0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java @@ -125,6 +125,11 @@ public interface KafkaClient extends Closeable { boolean hasInFlightRequests(String nodeId); /** + * Return true if there is at least one node with connection in ready state and false otherwise. + */ + boolean hasReadyNodes(); + + /** * Wake up the client if it is currently blocked waiting for I/O */ void wakeup(); http://git-wip-us.apache.org/repos/asf/kafka/blob/c3f806d0/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 8708218..93fbb85 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -417,6 +417,11 @@ public class NetworkClient implements KafkaClient { return this.inFlightRequests.isEmpty(node); } + @Override + public boolean hasReadyNodes() { + return connectionStates.hasReadyNodes(); + } + /** * Interrupt the client if it is blocked waiting on I/O. */ http://git-wip-us.apache.org/repos/asf/kafka/blob/c3f806d0/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 3c6d409..6489792 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -1043,7 +1043,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { */ private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) { client.maybeTriggerWakeup(); - coordinator.poll(time.milliseconds()); + coordinator.poll(time.milliseconds(), timeout); // fetch positions if we have partitions we're subscribed to that we // don't know the offset for http://git-wip-us.apache.org/repos/asf/kafka/blob/c3f806d0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index ca2108d..b35a571 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -278,23 +278,39 @@ public final class ConsumerCoordinator extends AbstractCoordinator { * * @param now current time in milliseconds */ - public void poll(long now) { + public void poll(long now, long remainingMs) { invokeCompletedOffsetCommitCallbacks(); - if (subscriptions.partitionsAutoAssigned() && coordinatorUnknown()) { - ensureCoordinatorReady(); - now = time.milliseconds(); - } + if (subscriptions.partitionsAutoAssigned()) { + if (coordinatorUnknown()) { + ensureCoordinatorReady(); + now = time.milliseconds(); + } - if (needRejoin()) { - // due to a race condition between the initial metadata fetch and the initial rebalance, - // we need to ensure that the metadata is fresh before joining initially. This ensures - // that we have matched the pattern against the cluster's topics at least once before joining. - if (subscriptions.hasPatternSubscription()) - client.ensureFreshMetadata(); + if (needRejoin()) { + // due to a race condition between the initial metadata fetch and the initial rebalance, + // we need to ensure that the metadata is fresh before joining initially. This ensures + // that we have matched the pattern against the cluster's topics at least once before joining. + if (subscriptions.hasPatternSubscription()) + client.ensureFreshMetadata(); - ensureActiveGroup(); - now = time.milliseconds(); + ensureActiveGroup(); + now = time.milliseconds(); + } + } else { + // For manually assigned partitions, if there are no ready nodes, await metadata. + // If connections to all nodes fail, wakeups triggered while attempting to send fetch + // requests result in polls returning immediately, causing a tight loop of polls. Without + // the wakeup, poll() with no channels would block for the timeout, delaying re-connection. + // awaitMetadataUpdate() initiates new connections with configured backoff and avoids the busy loop. + // When group management is used, metadata wait is already performed for this scenario as + // coordinator is unknown, hence this check is not required. + if (metadata.updateRequested() && !client.hasReadyNodes()) { + boolean metadataUpdated = client.awaitMetadataUpdate(remainingMs); + if (!metadataUpdated && !client.hasReadyNodes()) + return; + now = time.milliseconds(); + } } pollHeartbeat(now); http://git-wip-us.apache.org/repos/asf/kafka/blob/c3f806d0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java index 92d049a..84e9a81 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java @@ -107,10 +107,12 @@ public class ConsumerNetworkClient implements Closeable { return completionHandler.future; } - public Node leastLoadedNode() { - synchronized (this) { - return client.leastLoadedNode(time.milliseconds()); - } + public synchronized Node leastLoadedNode() { + return client.leastLoadedNode(time.milliseconds()); + } + + public synchronized boolean hasReadyNodes() { + return client.hasReadyNodes(); } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/c3f806d0/clients/src/test/java/org/apache/kafka/clients/MockClient.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java index 8fff3cc..9ca95e3 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java @@ -318,6 +318,11 @@ public class MockClient implements KafkaClient { } @Override + public boolean hasReadyNodes() { + return !ready.isEmpty(); + } + + @Override public ClientRequest newClientRequest(String nodeId, AbstractRequest.Builder<?> requestBuilder, long createdTimeMs, boolean expectResponse) { return newClientRequest(nodeId, requestBuilder, createdTimeMs, expectResponse, null); http://git-wip-us.apache.org/repos/asf/kafka/blob/c3f806d0/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 8b582ca..770d4f7 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -170,7 +170,7 @@ public class ConsumerCoordinatorTest { client.prepareResponse(joinGroupLeaderResponse(0, "memberId", Collections.<String, List<String>>emptyMap(), Errors.GROUP_AUTHORIZATION_FAILED)); - coordinator.poll(time.milliseconds()); + coordinator.poll(time.milliseconds(), Long.MAX_VALUE); } @Test @@ -299,7 +299,7 @@ public class ConsumerCoordinatorTest { client.prepareResponse(joinGroupLeaderResponse(0, consumerId, Collections.<String, List<String>>emptyMap(), Errors.INVALID_GROUP_ID)); - coordinator.poll(time.milliseconds()); + coordinator.poll(time.milliseconds(), Long.MAX_VALUE); } @Test @@ -329,7 +329,7 @@ public class ConsumerCoordinatorTest { sync.groupAssignment().containsKey(consumerId); } }, syncGroupResponse(singletonList(t1p), Errors.NONE)); - coordinator.poll(time.milliseconds()); + coordinator.poll(time.milliseconds(), Long.MAX_VALUE); assertFalse(coordinator.needRejoin()); assertEquals(singleton(t1p), subscriptions.assignedPartitions()); @@ -371,7 +371,7 @@ public class ConsumerCoordinatorTest { // expect client to force updating the metadata, if yes gives it both topics client.prepareMetadataUpdate(cluster, Collections.<String>emptySet()); - coordinator.poll(time.milliseconds()); + coordinator.poll(time.milliseconds(), Long.MAX_VALUE); assertFalse(coordinator.needRejoin()); assertEquals(2, subscriptions.assignedPartitions().size()); @@ -434,7 +434,7 @@ public class ConsumerCoordinatorTest { }, joinGroupLeaderResponse(2, consumerId, updatedSubscriptions, Errors.NONE)); client.prepareResponse(syncGroupResponse(newAssignment, Errors.NONE)); - coordinator.poll(time.milliseconds()); + coordinator.poll(time.milliseconds(), Long.MAX_VALUE); assertFalse(coordinator.needRejoin()); assertEquals(updatedSubscriptionSet, subscriptions.subscription()); @@ -466,14 +466,14 @@ public class ConsumerCoordinatorTest { consumerClient.wakeup(); try { - coordinator.poll(time.milliseconds()); + coordinator.poll(time.milliseconds(), Long.MAX_VALUE); } catch (WakeupException e) { // ignore } // now complete the second half client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE)); - coordinator.poll(time.milliseconds()); + coordinator.poll(time.milliseconds(), Long.MAX_VALUE); assertFalse(coordinator.needRejoin()); assertEquals(singleton(t1p), subscriptions.assignedPartitions()); @@ -726,7 +726,7 @@ public class ConsumerCoordinatorTest { client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE)); client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE)); - coordinator.poll(time.milliseconds()); + coordinator.poll(time.milliseconds(), Long.MAX_VALUE); assertFalse(coordinator.needRejoin()); @@ -783,7 +783,7 @@ public class ConsumerCoordinatorTest { client.prepareResponse(joinGroupLeaderResponse(2, consumerId, memberSubscriptions, Errors.NONE)); client.prepareResponse(syncGroupResponse(Arrays.asList(tp1, tp2), Errors.NONE)); - coordinator.poll(time.milliseconds()); + coordinator.poll(time.milliseconds(), Long.MAX_VALUE); assertFalse(coordinator.needRejoin()); assertEquals(new HashSet<>(Arrays.asList(tp1, tp2)), subscriptions.assignedPartitions()); @@ -830,7 +830,7 @@ public class ConsumerCoordinatorTest { client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE)); client.prepareResponse(syncGroupResponse(Collections.<TopicPartition>emptyList(), Errors.NONE)); - coordinator.poll(time.milliseconds()); + coordinator.poll(time.milliseconds(), Long.MAX_VALUE); if (!assign) { assertFalse(coordinator.needRejoin()); assertEquals(Collections.<TopicPartition>emptySet(), rebalanceListener.assigned); @@ -841,7 +841,7 @@ public class ConsumerCoordinatorTest { client.poll(0, time.milliseconds()); client.prepareResponse(joinGroupLeaderResponse(2, consumerId, memberSubscriptions, Errors.NONE)); client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE)); - coordinator.poll(time.milliseconds()); + coordinator.poll(time.milliseconds(), Long.MAX_VALUE); assertFalse("Metadata refresh requested unnecessarily", metadata.updateRequested()); if (!assign) { @@ -970,7 +970,7 @@ public class ConsumerCoordinatorTest { client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE))); time.sleep(autoCommitIntervalMs); - coordinator.poll(time.milliseconds()); + coordinator.poll(time.milliseconds(), Long.MAX_VALUE); assertEquals(100L, subscriptions.committed(t1p).offset()); } @@ -999,7 +999,7 @@ public class ConsumerCoordinatorTest { client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE))); time.sleep(autoCommitIntervalMs); - coordinator.poll(time.milliseconds()); + coordinator.poll(time.milliseconds(), Long.MAX_VALUE); assertEquals(100L, subscriptions.committed(t1p).offset()); } @@ -1017,7 +1017,7 @@ public class ConsumerCoordinatorTest { client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE))); time.sleep(autoCommitIntervalMs); - coordinator.poll(time.milliseconds()); + coordinator.poll(time.milliseconds(), Long.MAX_VALUE); assertEquals(100L, subscriptions.committed(t1p).offset()); } @@ -1044,7 +1044,7 @@ public class ConsumerCoordinatorTest { // sleep only for the retry backoff time.sleep(retryBackoffMs); client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE))); - coordinator.poll(time.milliseconds()); + coordinator.poll(time.milliseconds(), Long.MAX_VALUE); assertEquals(100L, subscriptions.committed(t1p).offset()); } @@ -1508,7 +1508,7 @@ public class ConsumerCoordinatorTest { subscriptions.assignFromUser(singleton(t1p)); subscriptions.seek(t1p, 100); - coordinator.poll(time.milliseconds()); + coordinator.poll(time.milliseconds(), Long.MAX_VALUE); return coordinator; }
