Repository: kafka Updated Branches: refs/heads/trunk 5781feb52 -> adb70da13
KAFKA-4820; ConsumerNetworkClient.send() should not require global lock Author: Dong Lin <[email protected]> Reviewers: Jason Gustafson <[email protected]>, Ismael Juma <[email protected]>, Jiangjie Qin <[email protected]> Closes #2619 from lindong28/KAFKA-4820 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/adb70da1 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/adb70da1 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/adb70da1 Branch: refs/heads/trunk Commit: adb70da13e18eb652e734887b430ac0ecbc5f9e6 Parents: 5781feb Author: Dong Lin <[email protected]> Authored: Sat Mar 4 11:38:57 2017 -0800 Committer: Jiangjie Qin <[email protected]> Committed: Sat Mar 4 11:38:57 2017 -0800 ---------------------------------------------------------------------- .../kafka/clients/consumer/KafkaConsumer.java | 2 +- .../internals/ConsumerNetworkClient.java | 210 +++++++++++++------ .../clients/consumer/internals/Fetcher.java | 2 +- 3 files changed, 153 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/adb70da1/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index b4514c5..51b00af 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -1004,7 +1004,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { // // NOTE: since the consumed position has already been updated, we must not allow // wakeups or any other errors to be triggered prior to returning the fetched records. - if (fetcher.sendFetches() > 0 || client.pendingRequestCount() > 0) + if (fetcher.sendFetches() > 0 || client.hasPendingRequest()) client.pollNoWakeup(); if (this.interceptors == null) http://git-wip-us.apache.org/repos/asf/kafka/blob/adb70da1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java index 8781676..2fa7667 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java @@ -35,10 +35,13 @@ import org.slf4j.LoggerFactory; import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.kafka.common.errors.InterruptException; @@ -55,7 +58,7 @@ public class ConsumerNetworkClient implements Closeable { // the mutable state of this class is protected by the object's monitor (excluding the wakeup // flag and the request completion queue below). private final KafkaClient client; - private final Map<Node, List<ClientRequest>> unsent = new HashMap<>(); + private final UnsentRequests unsent = new UnsentRequests(); private final Metadata metadata; private final Time time; private final long retryBackoffMs; @@ -99,24 +102,13 @@ public class ConsumerNetworkClient implements Closeable { RequestFutureCompletionHandler completionHandler = new RequestFutureCompletionHandler(); ClientRequest clientRequest = client.newClientRequest(node.idString(), requestBuilder, now, true, completionHandler); - put(node, clientRequest); + unsent.put(node, clientRequest); // wakeup the client in case it is blocking in poll so that we can send the queued request client.wakeup(); return completionHandler.future; } - private void put(Node node, ClientRequest request) { - synchronized (this) { - List<ClientRequest> nodeUnsent = unsent.get(node); - if (nodeUnsent == null) { - nodeUnsent = new ArrayList<>(); - unsent.put(node, nodeUnsent); - } - nodeUnsent.add(request); - } - } - public Node leastLoadedNode() { synchronized (this) { return client.leastLoadedNode(time.milliseconds()); @@ -280,12 +272,12 @@ public class ConsumerNetworkClient implements Closeable { long startMs = time.milliseconds(); long remainingMs = timeoutMs; - while (pendingRequestCount(node) > 0 && remainingMs > 0) { + while (hasPendingRequest(node) && remainingMs > 0) { poll(remainingMs); remainingMs = timeoutMs - (time.milliseconds() - startMs); } - return pendingRequestCount(node) == 0; + return !hasPendingRequest(node); } /** @@ -296,9 +288,21 @@ public class ConsumerNetworkClient implements Closeable { */ public int pendingRequestCount(Node node) { synchronized (this) { - List<ClientRequest> pending = unsent.get(node); - int unsentCount = pending == null ? 0 : pending.size(); - return unsentCount + client.inFlightRequestCount(node.idString()); + return unsent.getRequestCount(node) + client.inFlightRequestCount(node.idString()); + } + } + + /** + * Check whether there is pending request to the given node. This includes both request that + * have been transmitted (i.e. in-flight requests) and those which are awaiting transmission. + * @param node The node in question + * @return A boolean indicating whether there is pending request + */ + public boolean hasPendingRequest(Node node) { + if (unsent.hasRequest(node)) + return true; + synchronized (this) { + return client.inFlightRequestCount(node.idString()) > 0; } } @@ -309,10 +313,20 @@ public class ConsumerNetworkClient implements Closeable { */ public int pendingRequestCount() { synchronized (this) { - int total = 0; - for (List<ClientRequest> requests: unsent.values()) - total += requests.size(); - return total + client.inFlightRequestCount(); + return unsent.getRequestCount() + client.inFlightRequestCount(); + } + } + + /** + * Check whether there is pending request. This includes both requests that + * have been transmitted (i.e. in-flight requests) and those which are awaiting transmission. + * @return A boolean indicating whether there is pending request + */ + public boolean hasPendingRequest() { + if (unsent.hasRequest()) + return true; + synchronized (this) { + return client.inFlightRequestCount() > 0; } } @@ -337,19 +351,17 @@ public class ConsumerNetworkClient implements Closeable { // by NetworkClient, so we just need to check whether connections for any of the unsent // requests have been disconnected; if they have, then we complete the corresponding future // and set the disconnect flag in the ClientResponse - Iterator<Map.Entry<Node, List<ClientRequest>>> iterator = unsent.entrySet().iterator(); - while (iterator.hasNext()) { - Map.Entry<Node, List<ClientRequest>> requestEntry = iterator.next(); - Node node = requestEntry.getKey(); + Set<Node> nodes = unsent.getNodes(); + for (Node node: nodes) { if (client.connectionFailed(node)) { // Remove entry before invoking request callback to avoid callbacks handling // coordinator failures traversing the unsent list again. - iterator.remove(); - for (ClientRequest request : requestEntry.getValue()) { + List<ClientRequest> requests = unsent.remove(node); + for (ClientRequest request : requests) { RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler) request.callback(); handler.onComplete(new ClientResponse(request.makeHeader(request.requestBuilder().desiredOrLatestVersion()), - request.callback(), request.destination(), request.createdTimeMs(), now, true, - null, null)); + request.callback(), request.destination(), request.createdTimeMs(), now, true, + null, null)); } } } @@ -357,21 +369,10 @@ public class ConsumerNetworkClient implements Closeable { private void failExpiredRequests(long now) { // clear all expired unsent requests and fail their corresponding futures - Iterator<Map.Entry<Node, List<ClientRequest>>> iterator = unsent.entrySet().iterator(); - while (iterator.hasNext()) { - Map.Entry<Node, List<ClientRequest>> requestEntry = iterator.next(); - Iterator<ClientRequest> requestIterator = requestEntry.getValue().iterator(); - while (requestIterator.hasNext()) { - ClientRequest request = requestIterator.next(); - if (request.createdTimeMs() < now - unsentExpiryMs) { - RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler) request.callback(); - handler.onFailure(new TimeoutException("Failed to send request after " + unsentExpiryMs + " ms.")); - requestIterator.remove(); - } else - break; - } - if (requestEntry.getValue().isEmpty()) - iterator.remove(); + List<ClientRequest> expiredRequests = unsent.removeExpiredRequests(now, unsentExpiryMs); + for (ClientRequest request: expiredRequests) { + RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler) request.callback(); + handler.onFailure(new TimeoutException("Failed to send request after " + unsentExpiryMs + " ms.")); } } @@ -379,11 +380,9 @@ public class ConsumerNetworkClient implements Closeable { // clear unsent requests to node and fail their corresponding futures synchronized (this) { List<ClientRequest> unsentRequests = unsent.remove(node); - if (unsentRequests != null) { - for (ClientRequest unsentRequest : unsentRequests) { - RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler) unsentRequest.callback(); - handler.onFailure(e); - } + for (ClientRequest unsentRequest : unsentRequests) { + RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler) unsentRequest.callback(); + handler.onFailure(e); } } @@ -394,15 +393,25 @@ public class ConsumerNetworkClient implements Closeable { private boolean trySend(long now) { // send any requests that can be sent now boolean requestsSent = false; - for (Map.Entry<Node, List<ClientRequest>> requestEntry: unsent.entrySet()) { - Node node = requestEntry.getKey(); - Iterator<ClientRequest> iterator = requestEntry.getValue().iterator(); - while (iterator.hasNext()) { - ClientRequest request = iterator.next(); - if (client.ready(node, now)) { - client.send(request, now); - iterator.remove(); - requestsSent = true; + Set<Node> nodes = unsent.getNodes(); + for (Node node: nodes) { + if (client.ready(node, now)) { + // Remove entry before invoking request callback to avoid callbacks handling + // coordinator failures traversing the unsent list again. + List<ClientRequest> requests = unsent.remove(node); + try { + Iterator<ClientRequest> iterator = requests.iterator(); + while (iterator.hasNext()) { + ClientRequest request = iterator.next(); + if (!client.ready(node, now)) + break; + client.send(request, now); + requestsSent = true; + iterator.remove(); + } + } finally { + if (!requests.isEmpty()) + unsent.put(node, requests); } } } @@ -527,4 +536,87 @@ public class ConsumerNetworkClient implements Closeable { boolean shouldBlock(); } + + /* + * A threadsafe helper class to hold requests per node that has not been sent yet + */ + private final static class UnsentRequests { + private final Map<Node, List<ClientRequest>> unsent; + + public UnsentRequests() { + unsent = new HashMap<>(); + } + + public synchronized void put(Node node, List<ClientRequest> requests) { + List<ClientRequest> nodeUnsent = unsent.get(node); + if (nodeUnsent == null) { + nodeUnsent = new ArrayList<>(); + unsent.put(node, nodeUnsent); + } + nodeUnsent.addAll(requests); + } + + public synchronized void put(Node node, ClientRequest request) { + List<ClientRequest> nodeUnsent = unsent.get(node); + if (nodeUnsent == null) { + nodeUnsent = new ArrayList<>(); + unsent.put(node, nodeUnsent); + } + nodeUnsent.add(request); + } + + public synchronized int getRequestCount(Node node) { + List<ClientRequest> requests = unsent.get(node); + return requests == null ? 0 : requests.size(); + } + + public synchronized int getRequestCount() { + int total = 0; + for (List<ClientRequest> requests : unsent.values()) + total += requests.size(); + return total; + } + + public synchronized boolean hasRequest(Node node) { + List<ClientRequest> requests = unsent.get(node); + return requests != null && !requests.isEmpty(); + } + + public synchronized boolean hasRequest() { + for (List<ClientRequest> requests : unsent.values()) + if (!requests.isEmpty()) + return true; + return false; + } + + public synchronized List<ClientRequest> removeExpiredRequests(long now, long unsentExpiryMs) { + List<ClientRequest> expiredRequests = new ArrayList<>(); + Iterator<Map.Entry<Node, List<ClientRequest>>> iterator = unsent.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry<Node, List<ClientRequest>> requestEntry = iterator.next(); + Iterator<ClientRequest> requestIterator = requestEntry.getValue().iterator(); + while (requestIterator.hasNext()) { + ClientRequest request = requestIterator.next(); + if (request.createdTimeMs() < now - unsentExpiryMs) { + expiredRequests.add(request); + requestIterator.remove(); + } else + break; + } + if (requestEntry.getValue().isEmpty()) + iterator.remove(); + } + return expiredRequests; + } + + public synchronized List<ClientRequest> remove(Node node) { + List<ClientRequest> requests = unsent.remove(node); + return requests == null ? Collections.<ClientRequest>emptyList() : requests; + } + + public synchronized Set<Node> getNodes() { + return new HashSet<>(unsent.keySet()); + } + } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/adb70da1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 8a8952c..536e4e8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -713,7 +713,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener { Node node = cluster.leaderFor(partition); if (node == null) { metadata.requestUpdate(); - } else if (this.client.pendingRequestCount(node) == 0) { + } else if (!this.client.hasPendingRequest(node)) { // if there is a leader and no in-flight requests, issue a new fetch LinkedHashMap<TopicPartition, FetchRequest.PartitionData> fetch = fetchable.get(node); if (fetch == null) {
