Repository: kafka Updated Branches: refs/heads/0.11.0 651d9a538 -> a1a487945
KAFKA-5211; Do not skip a corrupted record in consumer Author: Jiangjie Qin <[email protected]> Reviewers: Jason Gustafson <[email protected]> Closes #3114 from becketqin/KAFKA-5211 (cherry picked from commit d082563907103ea79eed681305df7093053f52ec) Signed-off-by: Jason Gustafson <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a1a48794 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a1a48794 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a1a48794 Branch: refs/heads/0.11.0 Commit: a1a48794548c546f45d55162f55928420fec0c8f Parents: 651d9a5 Author: Jiangjie Qin <[email protected]> Authored: Tue May 30 22:09:53 2017 -0700 Committer: Jason Gustafson <[email protected]> Committed: Tue May 30 22:59:09 2017 -0700 ---------------------------------------------------------------------- .../clients/consumer/internals/Fetcher.java | 84 +++++------- .../clients/consumer/internals/FetcherTest.java | 127 +++++++++++++------ 2 files changed, 123 insertions(+), 88 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/a1a48794/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 5287b4e..e3f2355 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -113,7 +113,6 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable { private final IsolationLevel isolationLevel; private PartitionRecords nextInLineRecords = null; - private ExceptionMetadata nextInLineExceptionMetadata = null; public Fetcher(ConsumerNetworkClient client, int minBytes, @@ -154,7 +153,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable { private <T> ExtendedDeserializer<T> ensureExtended(Deserializer<T> deserializer) { return deserializer instanceof ExtendedDeserializer ? (ExtendedDeserializer<T>) deserializer : new ExtendedDeserializer.Wrapper<>(deserializer); } - + /** * Represents data about an offset returned by a broker. */ @@ -513,31 +512,18 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable { * the defaultResetPolicy is NONE */ public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() { - if (nextInLineExceptionMetadata != null) { - ExceptionMetadata exceptionMetadata = nextInLineExceptionMetadata; - nextInLineExceptionMetadata = null; - TopicPartition tp = exceptionMetadata.partition; - if (subscriptions.isFetchable(tp) && subscriptions.position(tp) == exceptionMetadata.fetchedOffset) - throw exceptionMetadata.exception; - } Map<TopicPartition, List<ConsumerRecord<K, V>>> fetched = new HashMap<>(); int recordsRemaining = maxPollRecords; - // Needed to construct ExceptionMetadata if any exception is found when processing completedFetch - TopicPartition fetchedPartition = null; - long fetchedOffset = -1; try { while (recordsRemaining > 0) { if (nextInLineRecords == null || nextInLineRecords.isFetched) { - CompletedFetch completedFetch = completedFetches.poll(); + CompletedFetch completedFetch = completedFetches.peek(); if (completedFetch == null) break; - fetchedPartition = completedFetch.partition; - fetchedOffset = completedFetch.fetchedOffset; nextInLineRecords = parseCompletedFetch(completedFetch); + completedFetches.poll(); } else { - fetchedPartition = nextInLineRecords.partition; - fetchedOffset = nextInLineRecords.nextFetchOffset; List<ConsumerRecord<K, V>> records = fetchRecords(nextInLineRecords, recordsRemaining); TopicPartition partition = nextInLineRecords.partition; if (!records.isEmpty()) { @@ -560,8 +546,6 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable { } catch (KafkaException e) { if (fetched.isEmpty()) throw e; - // To be thrown in the next call of this method - nextInLineExceptionMetadata = new ExceptionMetadata(fetchedPartition, fetchedOffset, e); } return fetched; } @@ -952,10 +936,11 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable { private int recordsRead; private int bytesRead; private RecordBatch currentBatch; + private Record lastRecord; private CloseableIterator<Record> records; private long nextFetchOffset; private boolean isFetched = false; - private KafkaException nextInlineException; + private boolean hasExceptionInLastFetch; private PartitionRecords(TopicPartition partition, CompletedFetch completedFetch, @@ -966,13 +951,13 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable { this.nextFetchOffset = completedFetch.fetchedOffset; this.abortedProducerIds = new HashSet<>(); this.abortedTransactions = abortedTransactions(completedFetch.partitionData); - this.nextInlineException = null; + this.hasExceptionInLastFetch = false; } private void drain() { if (!isFetched) { maybeCloseRecordStream(); - nextInlineException = null; + hasExceptionInLastFetch = false; this.isFetched = true; this.completedFetch.metricAggregator.record(partition, bytesRead, recordsRead); @@ -1013,6 +998,15 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable { } private Record nextFetchedRecord() { + if (hasExceptionInLastFetch) { + if (lastRecord == null) { + maybeEnsureValid(currentBatch); + } else { + maybeEnsureValid(lastRecord); + return lastRecord; + } + } + while (true) { if (records == null || !records.hasNext()) { maybeCloseRecordStream(); @@ -1021,6 +1015,8 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable { drain(); return null; } + + lastRecord = null; currentBatch = batches.next(); maybeEnsureValid(currentBatch); @@ -1045,15 +1041,19 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable { } Record record = records.next(); - maybeEnsureValid(record); - + lastRecord = record; // skip any records out of range if (record.offset() >= nextFetchOffset) { - nextFetchOffset = record.offset() + 1; + // we only do validation when the message should not be skipped. + maybeEnsureValid(record); // control records are not returned to the user - if (!currentBatch.isControlBatch()) - return record; + if (!currentBatch.isControlBatch()) { + return record; + } else { + // Increment the next fetch offset when we skip a control batch. + nextFetchOffset = record.offset() + 1; + } } } } @@ -1061,11 +1061,6 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable { private List<ConsumerRecord<K, V>> fetchRecords(int maxRecords) { if (isFetched) return Collections.emptyList(); - if (nextInlineException != null) { - KafkaException e = nextInlineException; - nextInlineException = null; - throw e; - } List<ConsumerRecord<K, V>> records = new ArrayList<>(); try { @@ -1074,15 +1069,15 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable { if (record == null) break; + records.add(parseRecord(partition, currentBatch, record)); recordsRead++; bytesRead += record.sizeInBytes(); - records.add(parseRecord(partition, currentBatch, record)); + nextFetchOffset = record.offset() + 1; } } catch (KafkaException e) { + hasExceptionInLastFetch = true; if (records.isEmpty()) throw e; - // To be thrown in the next call of this method - nextInlineException = e; } return records; } @@ -1132,18 +1127,6 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable { } } - private static class ExceptionMetadata { - private final TopicPartition partition; - private final long fetchedOffset; - private final KafkaException exception; - - private ExceptionMetadata(TopicPartition partition, long fetchedOffset, KafkaException exception) { - this.partition = partition; - this.fetchedOffset = fetchedOffset; - this.exception = exception; - } - } - private static class CompletedFetch { private final TopicPartition partition; private final long fetchedOffset; @@ -1232,7 +1215,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable { private final Sensor recordsFetchLag; private Set<TopicPartition> assignedPartitions; - + private FetchManagerMetrics(Metrics metrics, FetcherMetricsRegistry metricsRegistry) { this.metrics = metrics; this.metricsRegistry = metricsRegistry; @@ -1305,8 +1288,8 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable { Sensor recordsLag = this.metrics.getSensor(name); if (recordsLag == null) { recordsLag = this.metrics.sensor(name); - recordsLag.add(this.metrics.metricName(name, - metricsRegistry.partitionRecordsLag.group(), + recordsLag.add(this.metrics.metricName(name, + metricsRegistry.partitionRecordsLag.group(), metricsRegistry.partitionRecordsLag.description()), new Value()); recordsLag.add(this.metrics.metricName(name + "-max", metricsRegistry.partitionRecordsLagMax.group(), @@ -1327,7 +1310,6 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable { public void close() { if (nextInLineRecords != null) nextInLineRecords.drain(); - nextInLineExceptionMetadata = null; decompressionBufferSupplier.close(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/a1a48794/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 720079c..fedec2a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -50,6 +50,7 @@ import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.ControlRecordType; import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.record.DefaultRecordBatch; import org.apache.kafka.common.record.EndTransactionMarker; import org.apache.kafka.common.record.LegacyRecord; import org.apache.kafka.common.record.MemoryRecords; @@ -261,8 +262,11 @@ public class FetcherTest { int i = 0; @Override public byte[] deserialize(String topic, byte[] data) { - if (i++ == 1) + if (i++ % 2 == 1) { + // Should be blocked on the value deserialization of the first record. + assertEquals(new String(data, StandardCharsets.UTF_8), "value-1"); throw new SerializationException(); + } return data; } }; @@ -276,12 +280,15 @@ public class FetcherTest { assertEquals(1, fetcher.sendFetches()); consumerClient.poll(0); - try { - fetcher.fetchedRecords(); - fail("fetchedRecords should have raised"); - } catch (SerializationException e) { - // the position should not advance since no data has been returned - assertEquals(1, subscriptions.position(tp1).longValue()); + // The fetcher should block on Deserialization error + for (int i = 0; i < 2; i++) { + try { + fetcher.fetchedRecords(); + fail("fetchedRecords should have raised"); + } catch (SerializationException e) { + // the position should not advance since no data has been returned + assertEquals(1, subscriptions.position(tp1).longValue()); + } } } @@ -329,20 +336,69 @@ public class FetcherTest { assertEquals(1, fetcher.fetchedRecords().get(tp1).size()); assertEquals(1, subscriptions.position(tp1).longValue()); - // the second fetchedRecords() should throw exception due to the second invalid message - try { - fetcher.fetchedRecords(); - fail("fetchedRecords should have raised KafkaException"); - } catch (KafkaException e) { - assertEquals(1, subscriptions.position(tp1).longValue()); + // the fetchedRecords() should always throw exception due to the second invalid message + for (int i = 0; i < 2; i++) { + try { + fetcher.fetchedRecords(); + fail("fetchedRecords should have raised KafkaException"); + } catch (KafkaException e) { + assertEquals(1, subscriptions.position(tp1).longValue()); + } } - // the third fetchedRecords() should return the third valid message - assertEquals(1, fetcher.fetchedRecords().get(tp1).size()); + // Seek to skip the bad record and fetch again. + subscriptions.seek(tp1, 2); + // Should not throw exception after the seek. + fetcher.fetchedRecords(); + assertEquals(1, fetcher.sendFetches()); + client.prepareResponse(fetchResponse(MemoryRecords.readableRecords(buffer), Errors.NONE, 100L, 0)); + consumerClient.poll(0); + + List<ConsumerRecord<byte[], byte[]>> records = fetcher.fetchedRecords().get(tp1); + assertEquals(1, records.size()); + assertEquals(2L, records.get(0).offset()); assertEquals(3, subscriptions.position(tp1).longValue()); } @Test + public void testInvalidDefaultRecordBatch() { + ByteBuffer buffer = ByteBuffer.allocate(1024); + ByteBufferOutputStream out = new ByteBufferOutputStream(buffer); + + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(out, + DefaultRecordBatch.CURRENT_MAGIC_VALUE, + CompressionType.NONE, + TimestampType.CREATE_TIME, + 0L, 10L, 0L, (short) 0, 0, false, false, 0, 1024); + builder.append(10L, "key".getBytes(), "value".getBytes()); + builder.close(); + buffer.flip(); + + // Garble the CRC + buffer.position(17); + buffer.put("beef".getBytes()); + buffer.position(0); + + subscriptions.assignFromUser(singleton(tp1)); + subscriptions.seek(tp1, 0); + + // normal fetch + assertEquals(1, fetcher.sendFetches()); + client.prepareResponse(fetchResponse(MemoryRecords.readableRecords(buffer), Errors.NONE, 100L, 0)); + consumerClient.poll(0); + + // the fetchedRecords() should always throw exception due to the bad batch. + for (int i = 0; i < 2; i++) { + try { + fetcher.fetchedRecords(); + fail("fetchedRecords should have raised KafkaException"); + } catch (KafkaException e) { + assertEquals(0, subscriptions.position(tp1).longValue()); + } + } + } + + @Test public void testParseInvalidRecordBatch() throws Exception { MemoryRecords records = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V2, 0L, CompressionType.NONE, TimestampType.CREATE_TIME, @@ -373,7 +429,7 @@ public class FetcherTest { @Test public void testHeaders() { Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, new Metrics(time)); - + MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME, 1L); builder.append(0L, "key".getBytes(), "value-1".getBytes()); @@ -397,14 +453,14 @@ public class FetcherTest { assertEquals(1, fetcher.sendFetches()); consumerClient.poll(0); records = fetcher.fetchedRecords().get(tp1); - + assertEquals(3, records.size()); Iterator<ConsumerRecord<byte[], byte[]>> recordIterator = records.iterator(); - + ConsumerRecord<byte[], byte[]> record = recordIterator.next(); assertNull(record.headers().lastHeader("headerKey")); - + record = recordIterator.next(); assertEquals("headerValue", new String(record.headers().lastHeader("headerKey").value(), StandardCharsets.UTF_8)); assertEquals("headerKey", record.headers().lastHeader("headerKey").key()); @@ -704,19 +760,20 @@ public class FetcherTest { consumerClient.poll(0); assertFalse(subscriptionsNoAutoReset.isOffsetResetNeeded(tp1)); - try { - fetcherNoAutoReset.fetchedRecords(); - fail("Should have thrown OffsetOutOfRangeException"); - } catch (OffsetOutOfRangeException e) { - assertTrue(e.offsetOutOfRangePartitions().containsKey(tp1)); - assertEquals(e.offsetOutOfRangePartitions().size(), 1); + for (int i = 0; i < 2; i++) { + try { + fetcherNoAutoReset.fetchedRecords(); + fail("Should have thrown OffsetOutOfRangeException"); + } catch (OffsetOutOfRangeException e) { + assertTrue(e.offsetOutOfRangePartitions().containsKey(tp1)); + assertEquals(e.offsetOutOfRangePartitions().size(), 1); + } } - assertEquals(0, fetcherNoAutoReset.fetchedRecords().size()); } @Test public void testFetchPositionAfterException() { - // verify the advancement in the next fetch offset equals the number of fetched records when + // verify the advancement in the next fetch offset equals to the number of fetched records when // some fetched partitions cause Exception. This ensures that consumer won't lose record upon exception subscriptionsNoAutoReset.assignFromUser(Utils.mkSet(tp1, tp2)); subscriptionsNoAutoReset.seek(tp1, 1); @@ -724,25 +781,21 @@ public class FetcherTest { assertEquals(1, fetcherNoAutoReset.sendFetches()); - Map<TopicPartition, FetchResponse.PartitionData> partitions = new HashMap<>(); - partitions.put(tp1, new FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE, 100, - FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY)); + Map<TopicPartition, FetchResponse.PartitionData> partitions = new LinkedHashMap<>(); partitions.put(tp2, new FetchResponse.PartitionData(Errors.NONE, 100, FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, records)); + partitions.put(tp1, new FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE, 100, + FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY)); client.prepareResponse(new FetchResponse(new LinkedHashMap<>(partitions), 0)); consumerClient.poll(0); List<ConsumerRecord<byte[], byte[]>> fetchedRecords = new ArrayList<>(); List<OffsetOutOfRangeException> exceptions = new ArrayList<>(); - try { - for (List<ConsumerRecord<byte[], byte[]>> records: fetcherNoAutoReset.fetchedRecords().values()) - fetchedRecords.addAll(records); - } catch (OffsetOutOfRangeException e) { - exceptions.add(e); - } + for (List<ConsumerRecord<byte[], byte[]>> records: fetcherNoAutoReset.fetchedRecords().values()) + fetchedRecords.addAll(records); - assertEquals(fetchedRecords.size(), subscriptionsNoAutoReset.position(tp2).longValue() - 1); + assertEquals(fetchedRecords.size(), subscriptionsNoAutoReset.position(tp2) - 1); try { for (List<ConsumerRecord<byte[], byte[]>> records: fetcherNoAutoReset.fetchedRecords().values())
