This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7afcb3a  KAFKA-6877; Remove completedFetch upon a failed parse if it 
contains no records.
7afcb3a is described below

commit 7afcb3a64c53831f05f99037dfa07aba1d396c48
Author: Adem Efe Gencer <[email protected]>
AuthorDate: Tue May 8 18:57:34 2018 -0700

    KAFKA-6877; Remove completedFetch upon a failed parse if it contains no 
records.
    
    This patch removed a completedFetch from the completedFetches queue upon a 
failed parse if it contains no records. The following scenario explains why 
this is needed for an instance of this case – i.e. in 
TopicAuthorizationException.
    
    0. Let's assume a scenario, in which the consumer is attempting to read 
from a topic without the necessary read permission.
    1. In Fetcher#fetchedRecords(), after peeking the completedFetches, the 
Fetcher#parseCompletedFetch(CompletedFetch) throws a 
TopicAuthorizationException (as expected).
    2. Fetcher#fetchedRecords() passes the TopicAuthorizationException up 
without having a chance to poll completedFetches. So, the same completedFetch 
remains at the completedFetches queue.
    3. Upon following calls to Fetcher#fetchedRecords(), peeking the 
completedFetches will always return the same completedFetch independent of any 
updates to the ACL that the topic is trying to read from.
    4. Hence, despite the creation of an ACL with correct permissions, once the 
consumer sees the TopicAuthorizationException, it will be unable to recover 
without a bounce.
    
    Author: Adem Efe Gencer <[email protected]>
    
    Reviewers: Jiangjie (Becket) Qin <[email protected]>
    
    Closes #4974 from efeg/fix/parseCompletedFetchRemainsInQueue
---
 .../kafka/clients/consumer/internals/Fetcher.java  | 18 ++++-
 .../clients/consumer/internals/FetcherTest.java    | 80 +++++++++++++++++++++-
 2 files changed, 94 insertions(+), 4 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 6d8fb6c..b3791ff 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -472,6 +472,7 @@ public class Fetcher<K, V> implements 
SubscriptionState.Listener, Closeable {
      * @return The fetched records per partition
      * @throws OffsetOutOfRangeException If there is OffsetOutOfRange error in 
fetchResponse and
      *         the defaultResetPolicy is NONE
+     * @throws TopicAuthorizationException If there is TopicAuthorization 
error in fetchResponse.
      */
     public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
         Map<TopicPartition, List<ConsumerRecord<K, V>>> fetched = new 
HashMap<>();
@@ -483,7 +484,20 @@ public class Fetcher<K, V> implements 
SubscriptionState.Listener, Closeable {
                     CompletedFetch completedFetch = completedFetches.peek();
                     if (completedFetch == null) break;
 
-                    nextInLineRecords = parseCompletedFetch(completedFetch);
+                    try {
+                        nextInLineRecords = 
parseCompletedFetch(completedFetch);
+                    } catch (Exception e) {
+                        // Remove a completedFetch upon a parse with exception 
if (1) it contains no records, and
+                        // (2) there are no fetched records with actual 
content preceding this exception.
+                        // The first condition ensures that the 
completedFetches is not stuck with the same completedFetch
+                        // in cases such as the TopicAuthorizationException, 
and the second condition ensures that no
+                        // potential data loss due to an exception in a 
following record.
+                        FetchResponse.PartitionData partition = 
completedFetch.partitionData;
+                        if (fetched.isEmpty() && (partition.records == null || 
partition.records.sizeInBytes() == 0)) {
+                            completedFetches.poll();
+                        }
+                        throw e;
+                    }
                     completedFetches.poll();
                 } else {
                     List<ConsumerRecord<K, V>> records = 
fetchRecords(nextInLineRecords, recordsRemaining);
@@ -945,7 +959,7 @@ public class Fetcher<K, V> implements 
SubscriptionState.Listener, Closeable {
                 this.metadata.requestUpdate();
             } else if (error == Errors.OFFSET_OUT_OF_RANGE) {
                 if (fetchOffset != subscriptions.position(tp)) {
-                    log.debug("Discarding stale fetch response for partition 
{} since the fetched offset {}" +
+                    log.debug("Discarding stale fetch response for partition 
{} since the fetched offset {} " +
                             "does not match the current offset {}", tp, 
fetchOffset, subscriptions.position(tp));
                 } else if (subscriptions.hasDefaultOffsetResetPolicy()) {
                     log.info("Fetch offset {} is out of range for partition 
{}, resetting offset", fetchOffset, tp);
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 6de1186..7157470 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -59,6 +59,7 @@ import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.MemoryRecordsBuilder;
 import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.Records;
 import org.apache.kafka.common.record.SimpleRecord;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.requests.AbstractRequest;
@@ -117,6 +118,8 @@ public class FetcherTest {
     private final String metricGroup = "consumer" + groupId + 
"-fetch-manager-metrics";
     private TopicPartition tp0 = new TopicPartition(topicName, 0);
     private TopicPartition tp1 = new TopicPartition(topicName, 1);
+    private TopicPartition tp2 = new TopicPartition(topicName, 2);
+    private TopicPartition tp3 = new TopicPartition(topicName, 3);
     private int minBytes = 1;
     private int maxBytes = Integer.MAX_VALUE;
     private int maxWaitMs = 0;
@@ -126,7 +129,7 @@ public class FetcherTest {
     private MockTime time = new MockTime(1);
     private Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
     private MockClient client = new MockClient(time, metadata);
-    private Cluster cluster = TestUtils.singletonCluster(topicName, 2);
+    private Cluster cluster = TestUtils.singletonCluster(topicName, 4);
     private Node node = cluster.nodes().get(0);
     private Metrics metrics = new Metrics(time);
     FetcherMetricsRegistry metricsRegistry = new 
FetcherMetricsRegistry("consumer" + groupId);
@@ -140,6 +143,7 @@ public class FetcherTest {
     private MemoryRecords records;
     private MemoryRecords nextRecords;
     private MemoryRecords emptyRecords;
+    private MemoryRecords partialRecords;
     private Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, 
metrics);
     private Metrics fetcherMetrics = new Metrics(time);
     private Fetcher<byte[], byte[]> fetcherNoAutoReset = 
createFetcher(subscriptionsNoAutoReset, fetcherMetrics);
@@ -162,6 +166,11 @@ public class FetcherTest {
 
         builder = MemoryRecords.builder(ByteBuffer.allocate(1024), 
CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
         emptyRecords = builder.build();
+
+        builder = MemoryRecords.builder(ByteBuffer.allocate(1024), 
CompressionType.NONE, TimestampType.CREATE_TIME, 4L);
+        builder.append(0L, "key".getBytes(), "value-0".getBytes());
+        partialRecords = builder.build();
+        partialRecords.buffer().putInt(Records.SIZE_OFFSET, 10000);
     }
 
     @After
@@ -833,7 +842,7 @@ public class FetcherTest {
         partitions.put(tp1, new FetchResponse.PartitionData(Errors.NONE, 100,
             FetchResponse.INVALID_LAST_STABLE_OFFSET, 
FetchResponse.INVALID_LOG_START_OFFSET, null, records));
         partitions.put(tp0, new 
FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE, 100,
-                FetchResponse.INVALID_LAST_STABLE_OFFSET, 
FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY));
+            FetchResponse.INVALID_LAST_STABLE_OFFSET, 
FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY));
         client.prepareResponse(new FetchResponse(Errors.NONE, new 
LinkedHashMap<>(partitions),
             0, INVALID_SESSION_ID));
         consumerClient.poll(0);
@@ -864,6 +873,73 @@ public class FetcherTest {
     }
 
     @Test
+    public void testCompletedFetchRemoval() {
+        // Ensure the removal of completed fetches that cause an Exception if 
and only if they contain empty records.
+        subscriptionsNoAutoReset.assignFromUser(Utils.mkSet(tp0, tp1, tp2, 
tp3));
+        subscriptionsNoAutoReset.seek(tp0, 1);
+        subscriptionsNoAutoReset.seek(tp1, 1);
+        subscriptionsNoAutoReset.seek(tp2, 1);
+        subscriptionsNoAutoReset.seek(tp3, 1);
+
+        assertEquals(1, fetcherNoAutoReset.sendFetches());
+
+        Map<TopicPartition, FetchResponse.PartitionData> partitions = new 
LinkedHashMap<>();
+        partitions.put(tp1, new FetchResponse.PartitionData(Errors.NONE, 100, 
FetchResponse.INVALID_LAST_STABLE_OFFSET,
+            FetchResponse.INVALID_LOG_START_OFFSET, null, records));
+        partitions.put(tp0, new 
FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE, 100,
+            FetchResponse.INVALID_LAST_STABLE_OFFSET, 
FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY));
+        partitions.put(tp2, new FetchResponse.PartitionData(Errors.NONE, 100L, 
4,
+            0L, null, nextRecords));
+        partitions.put(tp3, new FetchResponse.PartitionData(Errors.NONE, 100L, 
4,
+            0L, null, partialRecords));
+        client.prepareResponse(new FetchResponse(Errors.NONE, new 
LinkedHashMap<>(partitions),
+                                                 0, INVALID_SESSION_ID));
+        consumerClient.poll(0);
+
+        List<ConsumerRecord<byte[], byte[]>> fetchedRecords = new 
ArrayList<>();
+        for (List<ConsumerRecord<byte[], byte[]>> records: 
fetcherNoAutoReset.fetchedRecords().values())
+            fetchedRecords.addAll(records);
+
+        assertEquals(fetchedRecords.size(), 
subscriptionsNoAutoReset.position(tp1) - 1);
+        assertEquals(4, subscriptionsNoAutoReset.position(tp1).longValue());
+        assertEquals(3, fetchedRecords.size());
+
+        List<OffsetOutOfRangeException> oorExceptions = new ArrayList<>();
+        try {
+            for (List<ConsumerRecord<byte[], byte[]>> records: 
fetcherNoAutoReset.fetchedRecords().values())
+                fetchedRecords.addAll(records);
+        } catch (OffsetOutOfRangeException oor) {
+            oorExceptions.add(oor);
+        }
+
+        // Should have received one OffsetOutOfRangeException for partition tp1
+        assertEquals(1, oorExceptions.size());
+        OffsetOutOfRangeException oor = oorExceptions.get(0);
+        assertTrue(oor.offsetOutOfRangePartitions().containsKey(tp0));
+        assertEquals(oor.offsetOutOfRangePartitions().size(), 1);
+
+        for (List<ConsumerRecord<byte[], byte[]>> records: 
fetcherNoAutoReset.fetchedRecords().values())
+            fetchedRecords.addAll(records);
+
+        // Should not have received an Exception for tp2.
+        assertEquals(6, subscriptionsNoAutoReset.position(tp2).longValue());
+        assertEquals(5, fetchedRecords.size());
+
+        int numExceptionsExpected = 3;
+        List<KafkaException> kafkaExceptions = new ArrayList<>();
+        for (int i = 1; i <= numExceptionsExpected; i++) {
+            try {
+                for (List<ConsumerRecord<byte[], byte[]>> records: 
fetcherNoAutoReset.fetchedRecords().values())
+                    fetchedRecords.addAll(records);
+            } catch (KafkaException e) {
+                kafkaExceptions.add(e);
+            }
+        }
+        // Should have received as much as numExceptionsExpected Kafka 
exceptions for tp3.
+        assertEquals(numExceptionsExpected, kafkaExceptions.size());
+    }
+
+    @Test
     public void testSeekBeforeException() {
         Fetcher<byte[], byte[]> fetcher = 
createFetcher(subscriptionsNoAutoReset, new Metrics(time), 2);
 

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to