This is an automated email from the ASF dual-hosted git repository.
jqin pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7afcb3a KAFKA-6877; Remove completedFetch upon a failed parse if it
contains no records.
7afcb3a is described below
commit 7afcb3a64c53831f05f99037dfa07aba1d396c48
Author: Adem Efe Gencer <[email protected]>
AuthorDate: Tue May 8 18:57:34 2018 -0700
KAFKA-6877; Remove completedFetch upon a failed parse if it contains no
records.
This patch removed a completedFetch from the completedFetches queue upon a
failed parse if it contains no records. The following scenario explains why
this is needed for an instance of this case – i.e. in
TopicAuthorizationException.
0. Let's assume a scenario, in which the consumer is attempting to read
from a topic without the necessary read permission.
1. In Fetcher#fetchedRecords(), after peeking the completedFetches, the
Fetcher#parseCompletedFetch(CompletedFetch) throws a
TopicAuthorizationException (as expected).
2. Fetcher#fetchedRecords() passes the TopicAuthorizationException up
without having a chance to poll completedFetches. So, the same completedFetch
remains at the completedFetches queue.
3. Upon following calls to Fetcher#fetchedRecords(), peeking the
completedFetches will always return the same completedFetch independent of any
updates to the ACL that the topic is trying to read from.
4. Hence, despite the creation of an ACL with correct permissions, once the
consumer sees the TopicAuthorizationException, it will be unable to recover
without a bounce.
Author: Adem Efe Gencer <[email protected]>
Reviewers: Jiangjie (Becket) Qin <[email protected]>
Closes #4974 from efeg/fix/parseCompletedFetchRemainsInQueue
---
.../kafka/clients/consumer/internals/Fetcher.java | 18 ++++-
.../clients/consumer/internals/FetcherTest.java | 80 +++++++++++++++++++++-
2 files changed, 94 insertions(+), 4 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 6d8fb6c..b3791ff 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -472,6 +472,7 @@ public class Fetcher<K, V> implements
SubscriptionState.Listener, Closeable {
* @return The fetched records per partition
* @throws OffsetOutOfRangeException If there is OffsetOutOfRange error in
fetchResponse and
* the defaultResetPolicy is NONE
+ * @throws TopicAuthorizationException If there is TopicAuthorization
error in fetchResponse.
*/
public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
Map<TopicPartition, List<ConsumerRecord<K, V>>> fetched = new
HashMap<>();
@@ -483,7 +484,20 @@ public class Fetcher<K, V> implements
SubscriptionState.Listener, Closeable {
CompletedFetch completedFetch = completedFetches.peek();
if (completedFetch == null) break;
- nextInLineRecords = parseCompletedFetch(completedFetch);
+ try {
+ nextInLineRecords =
parseCompletedFetch(completedFetch);
+ } catch (Exception e) {
+ // Remove a completedFetch upon a parse with exception
if (1) it contains no records, and
+ // (2) there are no fetched records with actual
content preceding this exception.
+ // The first condition ensures that the
completedFetches is not stuck with the same completedFetch
+ // in cases such as the TopicAuthorizationException,
and the second condition ensures that no
+ // potential data loss due to an exception in a
following record.
+ FetchResponse.PartitionData partition =
completedFetch.partitionData;
+ if (fetched.isEmpty() && (partition.records == null ||
partition.records.sizeInBytes() == 0)) {
+ completedFetches.poll();
+ }
+ throw e;
+ }
completedFetches.poll();
} else {
List<ConsumerRecord<K, V>> records =
fetchRecords(nextInLineRecords, recordsRemaining);
@@ -945,7 +959,7 @@ public class Fetcher<K, V> implements
SubscriptionState.Listener, Closeable {
this.metadata.requestUpdate();
} else if (error == Errors.OFFSET_OUT_OF_RANGE) {
if (fetchOffset != subscriptions.position(tp)) {
- log.debug("Discarding stale fetch response for partition
{} since the fetched offset {}" +
+ log.debug("Discarding stale fetch response for partition
{} since the fetched offset {} " +
"does not match the current offset {}", tp,
fetchOffset, subscriptions.position(tp));
} else if (subscriptions.hasDefaultOffsetResetPolicy()) {
log.info("Fetch offset {} is out of range for partition
{}, resetting offset", fetchOffset, tp);
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 6de1186..7157470 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -59,6 +59,7 @@ import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.AbstractRequest;
@@ -117,6 +118,8 @@ public class FetcherTest {
private final String metricGroup = "consumer" + groupId +
"-fetch-manager-metrics";
private TopicPartition tp0 = new TopicPartition(topicName, 0);
private TopicPartition tp1 = new TopicPartition(topicName, 1);
+ private TopicPartition tp2 = new TopicPartition(topicName, 2);
+ private TopicPartition tp3 = new TopicPartition(topicName, 3);
private int minBytes = 1;
private int maxBytes = Integer.MAX_VALUE;
private int maxWaitMs = 0;
@@ -126,7 +129,7 @@ public class FetcherTest {
private MockTime time = new MockTime(1);
private Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
private MockClient client = new MockClient(time, metadata);
- private Cluster cluster = TestUtils.singletonCluster(topicName, 2);
+ private Cluster cluster = TestUtils.singletonCluster(topicName, 4);
private Node node = cluster.nodes().get(0);
private Metrics metrics = new Metrics(time);
FetcherMetricsRegistry metricsRegistry = new
FetcherMetricsRegistry("consumer" + groupId);
@@ -140,6 +143,7 @@ public class FetcherTest {
private MemoryRecords records;
private MemoryRecords nextRecords;
private MemoryRecords emptyRecords;
+ private MemoryRecords partialRecords;
private Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions,
metrics);
private Metrics fetcherMetrics = new Metrics(time);
private Fetcher<byte[], byte[]> fetcherNoAutoReset =
createFetcher(subscriptionsNoAutoReset, fetcherMetrics);
@@ -162,6 +166,11 @@ public class FetcherTest {
builder = MemoryRecords.builder(ByteBuffer.allocate(1024),
CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
emptyRecords = builder.build();
+
+ builder = MemoryRecords.builder(ByteBuffer.allocate(1024),
CompressionType.NONE, TimestampType.CREATE_TIME, 4L);
+ builder.append(0L, "key".getBytes(), "value-0".getBytes());
+ partialRecords = builder.build();
+ partialRecords.buffer().putInt(Records.SIZE_OFFSET, 10000);
}
@After
@@ -833,7 +842,7 @@ public class FetcherTest {
partitions.put(tp1, new FetchResponse.PartitionData(Errors.NONE, 100,
FetchResponse.INVALID_LAST_STABLE_OFFSET,
FetchResponse.INVALID_LOG_START_OFFSET, null, records));
partitions.put(tp0, new
FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE, 100,
- FetchResponse.INVALID_LAST_STABLE_OFFSET,
FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY));
+ FetchResponse.INVALID_LAST_STABLE_OFFSET,
FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY));
client.prepareResponse(new FetchResponse(Errors.NONE, new
LinkedHashMap<>(partitions),
0, INVALID_SESSION_ID));
consumerClient.poll(0);
@@ -864,6 +873,73 @@ public class FetcherTest {
}
@Test
+ public void testCompletedFetchRemoval() {
+ // Ensure the removal of completed fetches that cause an Exception if
and only if they contain empty records.
+ subscriptionsNoAutoReset.assignFromUser(Utils.mkSet(tp0, tp1, tp2,
tp3));
+ subscriptionsNoAutoReset.seek(tp0, 1);
+ subscriptionsNoAutoReset.seek(tp1, 1);
+ subscriptionsNoAutoReset.seek(tp2, 1);
+ subscriptionsNoAutoReset.seek(tp3, 1);
+
+ assertEquals(1, fetcherNoAutoReset.sendFetches());
+
+ Map<TopicPartition, FetchResponse.PartitionData> partitions = new
LinkedHashMap<>();
+ partitions.put(tp1, new FetchResponse.PartitionData(Errors.NONE, 100,
FetchResponse.INVALID_LAST_STABLE_OFFSET,
+ FetchResponse.INVALID_LOG_START_OFFSET, null, records));
+ partitions.put(tp0, new
FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE, 100,
+ FetchResponse.INVALID_LAST_STABLE_OFFSET,
FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY));
+ partitions.put(tp2, new FetchResponse.PartitionData(Errors.NONE, 100L,
4,
+ 0L, null, nextRecords));
+ partitions.put(tp3, new FetchResponse.PartitionData(Errors.NONE, 100L,
4,
+ 0L, null, partialRecords));
+ client.prepareResponse(new FetchResponse(Errors.NONE, new
LinkedHashMap<>(partitions),
+ 0, INVALID_SESSION_ID));
+ consumerClient.poll(0);
+
+ List<ConsumerRecord<byte[], byte[]>> fetchedRecords = new
ArrayList<>();
+ for (List<ConsumerRecord<byte[], byte[]>> records:
fetcherNoAutoReset.fetchedRecords().values())
+ fetchedRecords.addAll(records);
+
+ assertEquals(fetchedRecords.size(),
subscriptionsNoAutoReset.position(tp1) - 1);
+ assertEquals(4, subscriptionsNoAutoReset.position(tp1).longValue());
+ assertEquals(3, fetchedRecords.size());
+
+ List<OffsetOutOfRangeException> oorExceptions = new ArrayList<>();
+ try {
+ for (List<ConsumerRecord<byte[], byte[]>> records:
fetcherNoAutoReset.fetchedRecords().values())
+ fetchedRecords.addAll(records);
+ } catch (OffsetOutOfRangeException oor) {
+ oorExceptions.add(oor);
+ }
+
+ // Should have received one OffsetOutOfRangeException for partition tp1
+ assertEquals(1, oorExceptions.size());
+ OffsetOutOfRangeException oor = oorExceptions.get(0);
+ assertTrue(oor.offsetOutOfRangePartitions().containsKey(tp0));
+ assertEquals(oor.offsetOutOfRangePartitions().size(), 1);
+
+ for (List<ConsumerRecord<byte[], byte[]>> records:
fetcherNoAutoReset.fetchedRecords().values())
+ fetchedRecords.addAll(records);
+
+ // Should not have received an Exception for tp2.
+ assertEquals(6, subscriptionsNoAutoReset.position(tp2).longValue());
+ assertEquals(5, fetchedRecords.size());
+
+ int numExceptionsExpected = 3;
+ List<KafkaException> kafkaExceptions = new ArrayList<>();
+ for (int i = 1; i <= numExceptionsExpected; i++) {
+ try {
+ for (List<ConsumerRecord<byte[], byte[]>> records:
fetcherNoAutoReset.fetchedRecords().values())
+ fetchedRecords.addAll(records);
+ } catch (KafkaException e) {
+ kafkaExceptions.add(e);
+ }
+ }
+ // Should have received as much as numExceptionsExpected Kafka
exceptions for tp3.
+ assertEquals(numExceptionsExpected, kafkaExceptions.size());
+ }
+
+ @Test
public void testSeekBeforeException() {
Fetcher<byte[], byte[]> fetcher =
createFetcher(subscriptionsNoAutoReset, new Metrics(time), 2);
--
To stop receiving notification emails like this one, please contact
[email protected].