Repository: kafka
Updated Branches:
  refs/heads/trunk e7663a306 -> 6199c6277


KAFKA-4303; Ensure commitSync does not block unnecessarily in poll without 
in-flight requests

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Guozhang Wang <wangg...@gmail.com>, Ismael Juma <ism...@juma.me.uk>

Closes #2031 from hachikuji/KAFKA-4303


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6199c627
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6199c627
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6199c627

Branch: refs/heads/trunk
Commit: 6199c62776bf3ce9467703ca651a0119b261e60e
Parents: e7663a3
Author: Jason Gustafson <ja...@confluent.io>
Authored: Fri Oct 14 15:36:43 2016 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Fri Oct 14 15:36:43 2016 -0700

----------------------------------------------------------------------
 .../apache/kafka/clients/InFlightRequests.java  | 16 +++++++-----
 .../kafka/clients/consumer/KafkaConsumer.java   |  8 +-----
 .../internals/ConsumerNetworkClient.java        |  8 ++++--
 .../clients/consumer/internals/Fetcher.java     | 16 +-----------
 .../internals/ConsumerNetworkClientTest.java    | 27 +++++++++++++++++++-
 .../clients/consumer/internals/FetcherTest.java |  4 ---
 6 files changed, 43 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/6199c627/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
index 8de19ee..91b9dba 100644
--- a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
+++ b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
@@ -26,7 +26,7 @@ import java.util.Map;
 final class InFlightRequests {
 
     private final int maxInFlightRequestsPerConnection;
-    private final Map<String, Deque<ClientRequest>> requests = new 
HashMap<String, Deque<ClientRequest>>();
+    private final Map<String, Deque<ClientRequest>> requests = new HashMap<>();
 
     public InFlightRequests(int maxInFlightRequestsPerConnection) {
         this.maxInFlightRequestsPerConnection = 
maxInFlightRequestsPerConnection;
@@ -133,14 +133,16 @@ final class InFlightRequests {
      * @return list of nodes
      */
     public List<String> getNodesWithTimedOutRequests(long now, int 
requestTimeout) {
-        List<String> nodeIds = new LinkedList<String>();
-        for (String nodeId : requests.keySet()) {
-            if (inFlightRequestCount(nodeId) > 0) {
-                ClientRequest request = requests.get(nodeId).peekLast();
+        List<String> nodeIds = new LinkedList<>();
+        for (Map.Entry<String, Deque<ClientRequest>> requestEntry : 
requests.entrySet()) {
+            String nodeId = requestEntry.getKey();
+            Deque<ClientRequest> deque = requestEntry.getValue();
+
+            if (!deque.isEmpty()) {
+                ClientRequest request = deque.peekLast();
                 long timeSinceSend = now - request.sendTimeMs();
-                if (timeSinceSend > requestTimeout) {
+                if (timeSinceSend > requestTimeout)
                     nodeIds.add(nodeId);
-                }
             }
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/6199c627/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index b2b4bf0..b384211 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1025,12 +1025,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
         // send any new fetches (won't resend pending fetches)
         fetcher.sendFetches();
 
-        // if no fetches could be sent at the moment (which can happen if a 
partition leader is in the
-        // blackout period following a disconnect, or if the partition leader 
is unknown), then we don't
-        // block for longer than the retry backoff duration.
-        if (!fetcher.hasInFlightFetches())
-            timeout = Math.min(timeout, retryBackoffMs);
-
         long now = time.milliseconds();
         long pollTimeout = Math.min(coordinator.timeToNextPoll(now), timeout);
 
@@ -1039,7 +1033,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
             public boolean shouldBlock() {
                 // since a fetch might be completed by the background thread, 
we need this poll condition
                 // to ensure that we do not block unnecessarily in poll()
-                return !fetcher.hasCompletedFetches() && 
fetcher.hasInFlightFetches();
+                return !fetcher.hasCompletedFetches();
             }
         });
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/6199c627/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index 21fe0b8..2495b23 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -47,6 +47,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
  */
 public class ConsumerNetworkClient implements Closeable {
     private static final Logger log = 
LoggerFactory.getLogger(ConsumerNetworkClient.class);
+    private static final long MAX_POLL_TIMEOUT_MS = 5000L;
 
     // the mutable state of this class is protected by the object's monitor 
(excluding the wakeup
     // flag and the request completion queue below).
@@ -176,7 +177,7 @@ public class ConsumerNetworkClient implements Closeable {
      */
     public void poll(RequestFuture<?> future) {
         while (!future.isDone())
-            poll(Long.MAX_VALUE, time.milliseconds(), future);
+            poll(MAX_POLL_TIMEOUT_MS, time.milliseconds(), future);
     }
 
     /**
@@ -225,7 +226,10 @@ public class ConsumerNetworkClient implements Closeable {
             // condition becomes satisfied after the call to shouldBlock() 
(because of a fired completion
             // handler), the client will be woken up.
             if (pollCondition == null || pollCondition.shouldBlock()) {
-                client.poll(timeout, now);
+                // if there are no requests in flight, do not block longer 
than the retry backoff
+                if (client.inFlightRequestCount() == 0)
+                    timeout = Math.min(timeout, retryBackoffMs);
+                client.poll(Math.min(MAX_POLL_TIMEOUT_MS, timeout), now);
                 now = time.milliseconds();
             } else {
                 client.poll(0, now);

http://git-wip-us.apache.org/repos/asf/kafka/blob/6199c627/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 9e9ae92..bfc1a0b 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -17,6 +17,7 @@ import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
 import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.Cluster;
@@ -41,7 +42,6 @@ import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.InvalidRecordException;
 import org.apache.kafka.common.record.LogEntry;
 import org.apache.kafka.common.record.MemoryRecords;
-import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
 import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.requests.FetchRequest;
@@ -91,7 +91,6 @@ public class Fetcher<K, V> {
     private final FetchManagerMetrics sensors;
     private final SubscriptionState subscriptions;
     private final ConcurrentLinkedQueue<CompletedFetch> completedFetches;
-    private final AtomicInteger numInFlightFetches = new AtomicInteger(0);
     private final Deserializer<K> keyDeserializer;
     private final Deserializer<V> valueDeserializer;
 
@@ -137,15 +136,6 @@ public class Fetcher<K, V> {
         return !completedFetches.isEmpty();
     }
 
-    /**
-     * Check whether there are in-flight fetches. This is used to avoid 
unnecessary blocking in
-     * {@link ConsumerNetworkClient#poll(long)} if there are no fetches to 
wait for. This method is thread-safe.
-     * @return true if there are, false otherwise
-     */
-    public boolean hasInFlightFetches() {
-        return numInFlightFetches.get() > 0;
-    }
-
     private boolean matchesRequestedPartitions(FetchRequest request, 
FetchResponse response) {
         Set<TopicPartition> requestedPartitions = request.fetchData().keySet();
         Set<TopicPartition> fetchedPartitions = 
response.responseData().keySet();
@@ -161,13 +151,10 @@ public class Fetcher<K, V> {
             final FetchRequest request = fetchEntry.getValue();
             final Node fetchTarget = fetchEntry.getKey();
 
-            numInFlightFetches.incrementAndGet();
             client.send(fetchTarget, ApiKeys.FETCH, request)
                     .addListener(new RequestFutureListener<ClientResponse>() {
                         @Override
                         public void onSuccess(ClientResponse resp) {
-                            numInFlightFetches.decrementAndGet();
-
                             FetchResponse response = new 
FetchResponse(resp.responseBody());
                             if (!matchesRequestedPartitions(request, 
response)) {
                                 // obviously we expect the broker to always 
send us valid responses, so this check
@@ -194,7 +181,6 @@ public class Fetcher<K, V> {
 
                         @Override
                         public void onFailure(RuntimeException e) {
-                            numInFlightFetches.decrementAndGet();
                             log.debug("Fetch request to {} failed", 
fetchTarget, e);
                         }
                     });

http://git-wip-us.apache.org/repos/asf/kafka/blob/6199c627/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
index f90cd63..f8ad3ca 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
@@ -100,10 +100,35 @@ public class ConsumerNetworkClientTest {
 
     @Test
     public void blockWhenPollConditionNotSatisfied() {
+        long timeout = 4000L;
+
         NetworkClient mockNetworkClient = EasyMock.mock(NetworkClient.class);
         ConsumerNetworkClient consumerClient = new 
ConsumerNetworkClient(mockNetworkClient, metadata, time, 100, 1000);
 
-        EasyMock.expect(mockNetworkClient.poll(EasyMock.eq(Long.MAX_VALUE), 
EasyMock.anyLong())).andReturn(Collections.<ClientResponse>emptyList());
+        EasyMock.expect(mockNetworkClient.inFlightRequestCount()).andReturn(1);
+        EasyMock.expect(mockNetworkClient.poll(EasyMock.eq(timeout), 
EasyMock.anyLong())).andReturn(Collections.<ClientResponse>emptyList());
+
+        EasyMock.replay(mockNetworkClient);
+
+        consumerClient.poll(timeout, time.milliseconds(), new 
ConsumerNetworkClient.PollCondition() {
+            @Override
+            public boolean shouldBlock() {
+                return true;
+            }
+        });
+
+        EasyMock.verify(mockNetworkClient);
+    }
+
+    @Test
+    public void blockOnlyForRetryBackoffIfNoInflightRequests() {
+        long retryBackoffMs = 100L;
+
+        NetworkClient mockNetworkClient = EasyMock.mock(NetworkClient.class);
+        ConsumerNetworkClient consumerClient = new 
ConsumerNetworkClient(mockNetworkClient, metadata, time, retryBackoffMs, 1000L);
+
+        EasyMock.expect(mockNetworkClient.inFlightRequestCount()).andReturn(0);
+        EasyMock.expect(mockNetworkClient.poll(EasyMock.eq(retryBackoffMs), 
EasyMock.anyLong())).andReturn(Collections.<ClientResponse>emptyList());
 
         EasyMock.replay(mockNetworkClient);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/6199c627/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index faf6efa..5822646 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -128,13 +128,11 @@ public class FetcherTest {
 
         // normal fetch
         fetcher.sendFetches();
-        assertTrue(fetcher.hasInFlightFetches());
         assertFalse(fetcher.hasCompletedFetches());
 
         client.prepareResponse(fetchResponse(this.records.buffer(), 
Errors.NONE.code(), 100L, 0));
         consumerClient.poll(0);
         assertTrue(fetcher.hasCompletedFetches());
-        assertFalse(fetcher.hasInFlightFetches());
 
         Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
partitionRecords = fetcher.fetchedRecords();
         assertTrue(partitionRecords.containsKey(tp));
@@ -155,13 +153,11 @@ public class FetcherTest {
         subscriptions.seek(tp, 0);
 
         fetcher.sendFetches();
-        assertTrue(fetcher.hasInFlightFetches());
         assertFalse(fetcher.hasCompletedFetches());
 
         client.prepareResponse(fetchResponse(this.records.buffer(), 
Errors.NOT_LEADER_FOR_PARTITION.code(), 100L, 0));
         consumerClient.poll(0);
         assertTrue(fetcher.hasCompletedFetches());
-        assertFalse(fetcher.hasInFlightFetches());
 
         Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
partitionRecords = fetcher.fetchedRecords();
         assertFalse(partitionRecords.containsKey(tp));

Reply via email to