Repository: kafka Updated Branches: refs/heads/0.10.1 dbe676720 -> f287efaff
KAFKA-4303; Ensure commitSync does not block unnecessarily in poll without in-flight requests Author: Jason Gustafson <ja...@confluent.io> Reviewers: Guozhang Wang <wangg...@gmail.com>, Ismael Juma <ism...@juma.me.uk> Closes #2031 from hachikuji/KAFKA-4303 (cherry picked from commit 6199c62776bf3ce9467703ca651a0119b261e60e) Signed-off-by: Jason Gustafson <ja...@confluent.io> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f287efaf Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f287efaf Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f287efaf Branch: refs/heads/0.10.1 Commit: f287efaff4bfd8a22f18859aff3dba538f08ac04 Parents: dbe6767 Author: Jason Gustafson <ja...@confluent.io> Authored: Fri Oct 14 15:36:43 2016 -0700 Committer: Jason Gustafson <ja...@confluent.io> Committed: Fri Oct 14 15:36:59 2016 -0700 ---------------------------------------------------------------------- .../apache/kafka/clients/InFlightRequests.java | 16 +++++++----- .../kafka/clients/consumer/KafkaConsumer.java | 8 +----- .../internals/ConsumerNetworkClient.java | 8 ++++-- .../clients/consumer/internals/Fetcher.java | 16 +----------- .../internals/ConsumerNetworkClientTest.java | 27 +++++++++++++++++++- .../clients/consumer/internals/FetcherTest.java | 4 --- 6 files changed, 43 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/f287efaf/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java index 8de19ee..91b9dba 100644 --- a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java +++ b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java @@ -26,7 +26,7 @@ import java.util.Map; final class InFlightRequests { private final int maxInFlightRequestsPerConnection; - private final Map<String, Deque<ClientRequest>> requests = new HashMap<String, Deque<ClientRequest>>(); + private final Map<String, Deque<ClientRequest>> requests = new HashMap<>(); public InFlightRequests(int maxInFlightRequestsPerConnection) { this.maxInFlightRequestsPerConnection = maxInFlightRequestsPerConnection; @@ -133,14 +133,16 @@ final class InFlightRequests { * @return list of nodes */ public List<String> getNodesWithTimedOutRequests(long now, int requestTimeout) { - List<String> nodeIds = new LinkedList<String>(); - for (String nodeId : requests.keySet()) { - if (inFlightRequestCount(nodeId) > 0) { - ClientRequest request = requests.get(nodeId).peekLast(); + List<String> nodeIds = new LinkedList<>(); + for (Map.Entry<String, Deque<ClientRequest>> requestEntry : requests.entrySet()) { + String nodeId = requestEntry.getKey(); + Deque<ClientRequest> deque = requestEntry.getValue(); + + if (!deque.isEmpty()) { + ClientRequest request = deque.peekLast(); long timeSinceSend = now - request.sendTimeMs(); - if (timeSinceSend > requestTimeout) { + if (timeSinceSend > requestTimeout) nodeIds.add(nodeId); - } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/f287efaf/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index b2b4bf0..b384211 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -1025,12 +1025,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { // send any new fetches (won't resend pending fetches) fetcher.sendFetches(); - // if no fetches could be sent at the moment (which can happen if a partition leader is in the - // blackout period following a disconnect, or if the partition leader is unknown), then we don't - // block for longer than the retry backoff duration. - if (!fetcher.hasInFlightFetches()) - timeout = Math.min(timeout, retryBackoffMs); - long now = time.milliseconds(); long pollTimeout = Math.min(coordinator.timeToNextPoll(now), timeout); @@ -1039,7 +1033,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { public boolean shouldBlock() { // since a fetch might be completed by the background thread, we need this poll condition // to ensure that we do not block unnecessarily in poll() - return !fetcher.hasCompletedFetches() && fetcher.hasInFlightFetches(); + return !fetcher.hasCompletedFetches(); } }); http://git-wip-us.apache.org/repos/asf/kafka/blob/f287efaf/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java index 21fe0b8..2495b23 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java @@ -47,6 +47,7 @@ import java.util.concurrent.atomic.AtomicBoolean; */ public class ConsumerNetworkClient implements Closeable { private static final Logger log = LoggerFactory.getLogger(ConsumerNetworkClient.class); + private static final long MAX_POLL_TIMEOUT_MS = 5000L; // the mutable state of this class is protected by the object's monitor (excluding the wakeup // flag and the request completion queue below). @@ -176,7 +177,7 @@ public class ConsumerNetworkClient implements Closeable { */ public void poll(RequestFuture<?> future) { while (!future.isDone()) - poll(Long.MAX_VALUE, time.milliseconds(), future); + poll(MAX_POLL_TIMEOUT_MS, time.milliseconds(), future); } /** @@ -225,7 +226,10 @@ public class ConsumerNetworkClient implements Closeable { // condition becomes satisfied after the call to shouldBlock() (because of a fired completion // handler), the client will be woken up. if (pollCondition == null || pollCondition.shouldBlock()) { - client.poll(timeout, now); + // if there are no requests in flight, do not block longer than the retry backoff + if (client.inFlightRequestCount() == 0) + timeout = Math.min(timeout, retryBackoffMs); + client.poll(Math.min(MAX_POLL_TIMEOUT_MS, timeout), now); now = time.milliseconds(); } else { client.poll(0, now); http://git-wip-us.apache.org/repos/asf/kafka/blob/f287efaf/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 9e9ae92..bfc1a0b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -17,6 +17,7 @@ import org.apache.kafka.clients.ClientResponse; import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.NoOffsetForPartitionException; +import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.Cluster; @@ -41,7 +42,6 @@ import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.InvalidRecordException; import org.apache.kafka.common.record.LogEntry; import org.apache.kafka.common.record.MemoryRecords; -import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.common.record.Record; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.requests.FetchRequest; @@ -91,7 +91,6 @@ public class Fetcher<K, V> { private final FetchManagerMetrics sensors; private final SubscriptionState subscriptions; private final ConcurrentLinkedQueue<CompletedFetch> completedFetches; - private final AtomicInteger numInFlightFetches = new AtomicInteger(0); private final Deserializer<K> keyDeserializer; private final Deserializer<V> valueDeserializer; @@ -137,15 +136,6 @@ public class Fetcher<K, V> { return !completedFetches.isEmpty(); } - /** - * Check whether there are in-flight fetches. This is used to avoid unnecessary blocking in - * {@link ConsumerNetworkClient#poll(long)} if there are no fetches to wait for. This method is thread-safe. - * @return true if there are, false otherwise - */ - public boolean hasInFlightFetches() { - return numInFlightFetches.get() > 0; - } - private boolean matchesRequestedPartitions(FetchRequest request, FetchResponse response) { Set<TopicPartition> requestedPartitions = request.fetchData().keySet(); Set<TopicPartition> fetchedPartitions = response.responseData().keySet(); @@ -161,13 +151,10 @@ public class Fetcher<K, V> { final FetchRequest request = fetchEntry.getValue(); final Node fetchTarget = fetchEntry.getKey(); - numInFlightFetches.incrementAndGet(); client.send(fetchTarget, ApiKeys.FETCH, request) .addListener(new RequestFutureListener<ClientResponse>() { @Override public void onSuccess(ClientResponse resp) { - numInFlightFetches.decrementAndGet(); - FetchResponse response = new FetchResponse(resp.responseBody()); if (!matchesRequestedPartitions(request, response)) { // obviously we expect the broker to always send us valid responses, so this check @@ -194,7 +181,6 @@ public class Fetcher<K, V> { @Override public void onFailure(RuntimeException e) { - numInFlightFetches.decrementAndGet(); log.debug("Fetch request to {} failed", fetchTarget, e); } }); http://git-wip-us.apache.org/repos/asf/kafka/blob/f287efaf/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java index f90cd63..f8ad3ca 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java @@ -100,10 +100,35 @@ public class ConsumerNetworkClientTest { @Test public void blockWhenPollConditionNotSatisfied() { + long timeout = 4000L; + NetworkClient mockNetworkClient = EasyMock.mock(NetworkClient.class); ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(mockNetworkClient, metadata, time, 100, 1000); - EasyMock.expect(mockNetworkClient.poll(EasyMock.eq(Long.MAX_VALUE), EasyMock.anyLong())).andReturn(Collections.<ClientResponse>emptyList()); + EasyMock.expect(mockNetworkClient.inFlightRequestCount()).andReturn(1); + EasyMock.expect(mockNetworkClient.poll(EasyMock.eq(timeout), EasyMock.anyLong())).andReturn(Collections.<ClientResponse>emptyList()); + + EasyMock.replay(mockNetworkClient); + + consumerClient.poll(timeout, time.milliseconds(), new ConsumerNetworkClient.PollCondition() { + @Override + public boolean shouldBlock() { + return true; + } + }); + + EasyMock.verify(mockNetworkClient); + } + + @Test + public void blockOnlyForRetryBackoffIfNoInflightRequests() { + long retryBackoffMs = 100L; + + NetworkClient mockNetworkClient = EasyMock.mock(NetworkClient.class); + ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(mockNetworkClient, metadata, time, retryBackoffMs, 1000L); + + EasyMock.expect(mockNetworkClient.inFlightRequestCount()).andReturn(0); + EasyMock.expect(mockNetworkClient.poll(EasyMock.eq(retryBackoffMs), EasyMock.anyLong())).andReturn(Collections.<ClientResponse>emptyList()); EasyMock.replay(mockNetworkClient); http://git-wip-us.apache.org/repos/asf/kafka/blob/f287efaf/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index faf6efa..5822646 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -128,13 +128,11 @@ public class FetcherTest { // normal fetch fetcher.sendFetches(); - assertTrue(fetcher.hasInFlightFetches()); assertFalse(fetcher.hasCompletedFetches()); client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0)); consumerClient.poll(0); assertTrue(fetcher.hasCompletedFetches()); - assertFalse(fetcher.hasInFlightFetches()); Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords = fetcher.fetchedRecords(); assertTrue(partitionRecords.containsKey(tp)); @@ -155,13 +153,11 @@ public class FetcherTest { subscriptions.seek(tp, 0); fetcher.sendFetches(); - assertTrue(fetcher.hasInFlightFetches()); assertFalse(fetcher.hasCompletedFetches()); client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NOT_LEADER_FOR_PARTITION.code(), 100L, 0)); consumerClient.poll(0); assertTrue(fetcher.hasCompletedFetches()); - assertFalse(fetcher.hasInFlightFetches()); Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords = fetcher.fetchedRecords(); assertFalse(partitionRecords.containsKey(tp));