Repository: kafka Updated Branches: refs/heads/trunk 8104c0de2 -> 1c786c589
MINOR: Reuse decompression buffers in log cleaner Follow-up to KAFKA-5150, reuse decompression buffers in the log cleaner thread. Author: Xavier Léauté <[email protected]> Reviewers: Jason Gustafson <[email protected]>, Ismael Juma <[email protected]> Closes #3180 from xvrl/logcleaner-decompression-buffers Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1c786c58 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1c786c58 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1c786c58 Branch: refs/heads/trunk Commit: 1c786c589ab6737e1660981e739582935f9e0f0d Parents: 8104c0d Author: Xavier Léauté <[email protected]> Authored: Sat Jun 3 02:06:48 2017 +0100 Committer: Ismael Juma <[email protected]> Committed: Sat Jun 3 02:07:01 2017 +0100 ---------------------------------------------------------------------- .../kafka/common/record/MemoryRecords.java | 53 ++++++++++++-------- .../kafka/common/record/MemoryRecordsTest.java | 16 +++--- core/src/main/scala/kafka/log/LogCleaner.scala | 4 +- .../src/test/scala/unit/kafka/log/LogTest.scala | 4 +- 4 files changed, 46 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/1c786c58/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java index e158e2f..1427421 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java @@ -18,6 +18,7 @@ package org.apache.kafka.common.record; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.apache.kafka.common.utils.CloseableIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -115,21 +116,28 @@ public class MemoryRecords extends AbstractRecords { /** * Filter the records into the provided ByteBuffer. - * @param partition The partition that is filtered (used only for logging) - * @param filter The filter function - * @param destinationBuffer The byte buffer to write the filtered records to - * @param maxRecordBatchSize The maximum record batch size. Note this is not a hard limit: if a batch - * exceeds this after filtering, we log a warning, but the batch will still be - * created. + * + * @param partition The partition that is filtered (used only for logging) + * @param filter The filter function + * @param destinationBuffer The byte buffer to write the filtered records to + * @param maxRecordBatchSize The maximum record batch size. Note this is not a hard limit: if a batch + * exceeds this after filtering, we log a warning, but the batch will still be + * created. + * @param decompressionBufferSupplier The supplier of ByteBuffer(s) used for decompression if supported. For small + * record batches, allocating a potentially large buffer (64 KB for LZ4) will + * dominate the cost of decompressing and iterating over the records in the + * batch. As such, a supplier that reuses buffers will have a significant + * performance impact. * @return A FilterResult with a summary of the output (for metrics) and potentially an overflow buffer */ public FilterResult filterTo(TopicPartition partition, RecordFilter filter, ByteBuffer destinationBuffer, - int maxRecordBatchSize) { - return filterTo(partition, batches(), filter, destinationBuffer, maxRecordBatchSize); + int maxRecordBatchSize, BufferSupplier decompressionBufferSupplier) { + return filterTo(partition, batches(), filter, destinationBuffer, maxRecordBatchSize, decompressionBufferSupplier); } private static FilterResult filterTo(TopicPartition partition, Iterable<MutableRecordBatch> batches, - RecordFilter filter, ByteBuffer destinationBuffer, int maxRecordBatchSize) { + RecordFilter filter, ByteBuffer destinationBuffer, int maxRecordBatchSize, + BufferSupplier decompressionBufferSupplier) { long maxTimestamp = RecordBatch.NO_TIMESTAMP; long maxOffset = -1L; long shallowOffsetOfMaxTimestamp = -1L; @@ -155,21 +163,24 @@ public class MemoryRecords extends AbstractRecords { boolean writeOriginalBatch = true; List<Record> retainedRecords = new ArrayList<>(); - for (Record record : batch) { - messagesRead += 1; + try (final CloseableIterator<Record> iterator = batch.streamingIterator(decompressionBufferSupplier)) { + while (iterator.hasNext()) { + Record record = iterator.next(); + messagesRead += 1; - if (filter.shouldRetain(batch, record)) { - // Check for log corruption due to KAFKA-4298. If we find it, make sure that we overwrite - // the corrupted batch with correct data. - if (!record.hasMagic(batchMagic)) - writeOriginalBatch = false; + if (filter.shouldRetain(batch, record)) { + // Check for log corruption due to KAFKA-4298. If we find it, make sure that we overwrite + // the corrupted batch with correct data. + if (!record.hasMagic(batchMagic)) + writeOriginalBatch = false; - if (record.offset() > maxOffset) - maxOffset = record.offset(); + if (record.offset() > maxOffset) + maxOffset = record.offset(); - retainedRecords.add(record); - } else { - writeOriginalBatch = false; + retainedRecords.add(record); + } else { + writeOriginalBatch = false; + } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/1c786c58/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java index 619fbbd..b251d6c 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java @@ -205,7 +205,7 @@ public class MemoryRecordsTest { ByteBuffer filtered = ByteBuffer.allocate(2048); builder.build().filterTo(new TopicPartition("foo", 0), new RetainNonNullKeysFilter(), filtered, - Integer.MAX_VALUE); + Integer.MAX_VALUE, BufferSupplier.NO_CACHING); filtered.flip(); MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered); @@ -292,7 +292,7 @@ public class MemoryRecordsTest { protected boolean shouldRetain(RecordBatch recordBatch, Record record) { return true; } - }, filtered, Integer.MAX_VALUE); + }, filtered, Integer.MAX_VALUE, BufferSupplier.NO_CACHING); filtered.flip(); MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered); @@ -320,7 +320,7 @@ public class MemoryRecordsTest { ByteBuffer filtered = ByteBuffer.allocate(2048); MemoryRecords.readableRecords(buffer).filterTo(new TopicPartition("foo", 0), new RetainNonNullKeysFilter(), - filtered, Integer.MAX_VALUE); + filtered, Integer.MAX_VALUE, BufferSupplier.NO_CACHING); filtered.flip(); MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered); @@ -388,7 +388,7 @@ public class MemoryRecordsTest { ByteBuffer filtered = ByteBuffer.allocate(2048); MemoryRecords.readableRecords(buffer).filterTo(new TopicPartition("foo", 0), new RetainNonNullKeysFilter(), - filtered, Integer.MAX_VALUE); + filtered, Integer.MAX_VALUE, BufferSupplier.NO_CACHING); filtered.flip(); MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered); @@ -476,7 +476,8 @@ public class MemoryRecordsTest { output.rewind(); MemoryRecords.FilterResult result = MemoryRecords.readableRecords(buffer) - .filterTo(new TopicPartition("foo", 0), new RetainNonNullKeysFilter(), output, Integer.MAX_VALUE); + .filterTo(new TopicPartition("foo", 0), new RetainNonNullKeysFilter(), output, Integer.MAX_VALUE, + BufferSupplier.NO_CACHING); buffer.position(buffer.position() + result.bytesRead); result.output.flip(); @@ -519,7 +520,8 @@ public class MemoryRecordsTest { ByteBuffer filtered = ByteBuffer.allocate(2048); MemoryRecords.FilterResult result = MemoryRecords.readableRecords(buffer).filterTo( - new TopicPartition("foo", 0), new RetainNonNullKeysFilter(), filtered, Integer.MAX_VALUE); + new TopicPartition("foo", 0), new RetainNonNullKeysFilter(), filtered, Integer.MAX_VALUE, + BufferSupplier.NO_CACHING); filtered.flip(); @@ -632,7 +634,7 @@ public class MemoryRecordsTest { ByteBuffer filtered = ByteBuffer.allocate(2048); MemoryRecords.readableRecords(buffer).filterTo(new TopicPartition("foo", 0), new RetainNonNullKeysFilter(), - filtered, Integer.MAX_VALUE); + filtered, Integer.MAX_VALUE, BufferSupplier.NO_CACHING); filtered.flip(); MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered); http://git-wip-us.apache.org/repos/asf/kafka/blob/1c786c58/core/src/main/scala/kafka/log/LogCleaner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index b05e37f..5aa8672 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -322,6 +322,8 @@ private[log] class Cleaner(val id: Int, /* buffer used for write i/o */ private var writeBuffer = ByteBuffer.allocate(ioBufferSize) + private val decompressionBufferSupplier = BufferSupplier.create(); + require(offsetMap.slots * dupBufferLoadFactor > 1, "offset map is too small to fit in even a single message, so log cleaning will never make progress. You can increase log.cleaner.dedupe.buffer.size or decrease log.cleaner.threads") /** @@ -516,7 +518,7 @@ private[log] class Cleaner(val id: Int, source.log.readInto(readBuffer, position) val records = MemoryRecords.readableRecords(readBuffer) throttler.maybeThrottle(records.sizeInBytes) - val result = records.filterTo(topicPartition, logCleanerFilter, writeBuffer, maxLogMessageSize) + val result = records.filterTo(topicPartition, logCleanerFilter, writeBuffer, maxLogMessageSize, decompressionBufferSupplier) stats.readMessages(result.messagesRead, result.bytesRead) stats.recopyMessages(result.messagesRetained, result.bytesRetained) http://git-wip-us.apache.org/repos/asf/kafka/blob/1c786c58/core/src/test/scala/unit/kafka/log/LogTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 52e9140..6fcc7ae 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -224,7 +224,7 @@ class LogTest { val filtered = ByteBuffer.allocate(2048) records.filterTo(new TopicPartition("foo", 0), new RecordFilter { override def shouldRetain(recordBatch: RecordBatch, record: Record): Boolean = !record.hasKey - }, filtered, Int.MaxValue) + }, filtered, Int.MaxValue, BufferSupplier.NO_CACHING) filtered.flip() val filteredRecords = MemoryRecords.readableRecords(filtered) @@ -268,7 +268,7 @@ class LogTest { val filtered = ByteBuffer.allocate(2048) records.filterTo(new TopicPartition("foo", 0), new RecordFilter { override def shouldRetain(recordBatch: RecordBatch, record: Record): Boolean = !record.hasKey - }, filtered, Int.MaxValue) + }, filtered, Int.MaxValue, BufferSupplier.NO_CACHING) filtered.flip() val filteredRecords = MemoryRecords.readableRecords(filtered)
