Repository: kafka Updated Branches: refs/heads/0.10.2 8b306ca2a -> 386a8d041
KAFKA-5316; LogCleaner should account for larger record sets after cleaning Author: Jason Gustafson <[email protected]> Reviewers: Ismael Juma <[email protected]> Closes #3165, #3187 from ijuma/kafka-5316-log-cleaner-0.10.2 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/386a8d04 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/386a8d04 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/386a8d04 Branch: refs/heads/0.10.2 Commit: 386a8d041c474b8ba66f3abd815338347f944b79 Parents: 8b306ca Author: Jason Gustafson <[email protected]> Authored: Thu Jun 1 11:01:07 2017 +0100 Committer: Ismael Juma <[email protected]> Committed: Thu Jun 1 11:01:24 2017 +0100 ---------------------------------------------------------------------- .../common/record/ByteBufferOutputStream.java | 125 ++++++++++++++----- .../kafka/common/record/MemoryRecords.java | 58 +++++++-- .../common/record/MemoryRecordsBuilder.java | 50 +++++--- .../org/apache/kafka/common/record/Record.java | 2 +- .../clients/consumer/internals/FetcherTest.java | 3 +- .../record/ByteBufferOutputStreamTest.java | 101 +++++++++++++++ .../kafka/common/record/MemoryRecordsTest.java | 58 ++++++++- core/src/main/scala/kafka/log/LogCleaner.scala | 11 +- 8 files changed, 334 insertions(+), 74 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/386a8d04/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java index 3fb7f49..39c1cfe 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java +++ b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java @@ -16,54 +16,111 @@ */ package org.apache.kafka.common.record; -import java.io.DataOutputStream; import java.io.OutputStream; import java.nio.ByteBuffer; /** - * A byte buffer backed output outputStream + * A ByteBuffer-backed OutputStream that expands the internal ByteBuffer as required. Given this, the caller should + * always access the underlying ByteBuffer via the {@link #buffer()} method until all writes are completed. + * + * This class is typically used for 2 purposes: + * + * 1. Write to a ByteBuffer when there is a chance that we may need to expand it in order to fit all the desired data + * 2. Write to a ByteBuffer via methods that expect an OutputStream interface + * + * Hard to track bugs can happen when this class is used for the second reason and unexpected buffer expansion happens. + * So, it's best to assume that buffer expansion can always happen. An improvement would be to create a separate class + * that throws an error if buffer expansion is required to avoid the issue altogether. */ -public class ByteBufferOutputStream extends DataOutputStream { +public class ByteBufferOutputStream extends OutputStream { private static final float REALLOCATION_FACTOR = 1.1f; + private final int initialCapacity; + private final int initialPosition; + private ByteBuffer buffer; + + /** + * Creates an instance of this class that will write to the received `buffer` up to its `limit`. If necessary to + * satisfy `write` or `position` calls, larger buffers will be allocated so the {@link #buffer()} method may return + * a different buffer than the received `buffer` parameter. + * + * Prefer one of the constructors that allocate the internal buffer for clearer semantics. + */ public ByteBufferOutputStream(ByteBuffer buffer) { - super(new UnderlyingOutputStream(buffer)); + this.buffer = buffer; + this.initialPosition = buffer.position(); + this.initialCapacity = buffer.capacity(); + } + + public ByteBufferOutputStream(int initialCapacity) { + this(initialCapacity, false); + } + + public ByteBufferOutputStream(int initialCapacity, boolean directBuffer) { + this(directBuffer ? ByteBuffer.allocateDirect(initialCapacity) : ByteBuffer.allocate(initialCapacity)); + } + + public void write(int b) { + maybeExpandBuffer(1); + buffer.put((byte) b); + } + + public void write(byte[] bytes, int off, int len) { + maybeExpandBuffer(len); + buffer.put(bytes, off, len); + } + + public void write(ByteBuffer sourceBuffer) { + maybeExpandBuffer(sourceBuffer.remaining()); + buffer.put(sourceBuffer); } public ByteBuffer buffer() { - return ((UnderlyingOutputStream) out).buffer; + return buffer; + } + + public int position() { + return buffer.position(); + } + + public int remaining() { + return buffer.remaining(); + } + + public int limit() { + return buffer.limit(); + } + + public void position(int position) { + maybeExpandBuffer(position - buffer.position()); + buffer.position(position); + } + + /** + * The capacity of the first internal ByteBuffer used by this class. This is useful in cases where a pooled + * ByteBuffer was passed via the constructor and it needs to be returned to the pool. + */ + public int initialCapacity() { + return initialCapacity; + } + + private void maybeExpandBuffer(int remainingRequired) { + if (remainingRequired > buffer.remaining()) + expandBuffer(remainingRequired); } - public static class UnderlyingOutputStream extends OutputStream { - private ByteBuffer buffer; - - public UnderlyingOutputStream(ByteBuffer buffer) { - this.buffer = buffer; - } - - public void write(int b) { - if (buffer.remaining() < 1) - expandBuffer(buffer.capacity() + 1); - buffer.put((byte) b); - } - - public void write(byte[] bytes, int off, int len) { - if (buffer.remaining() < len) - expandBuffer(buffer.capacity() + len); - buffer.put(bytes, off, len); - } - - public ByteBuffer buffer() { - return buffer; - } - - private void expandBuffer(int size) { - int expandSize = Math.max((int) (buffer.capacity() * REALLOCATION_FACTOR), size); - ByteBuffer temp = ByteBuffer.allocate(expandSize); - temp.put(buffer.array(), buffer.arrayOffset(), buffer.position()); - buffer = temp; - } + private void expandBuffer(int remainingRequired) { + int expandSize = Math.max((int) (buffer.limit() * REALLOCATION_FACTOR), buffer.position() + remainingRequired); + ByteBuffer temp = ByteBuffer.allocate(expandSize); + int limit = limit(); + buffer.flip(); + temp.put(buffer); + buffer.limit(limit); + // reset the old buffer's position so that the partial data in the new buffer cannot be mistakenly consumed + // we should ideally only do this for the original buffer, but the additional complexity doesn't seem worth it + buffer.position(initialPosition); + buffer = temp; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/386a8d04/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java index 6c31b25..f1a6e43 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java @@ -12,7 +12,10 @@ */ package org.apache.kafka.common.record; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.record.ByteBufferLogInputStream.ByteBufferLogEntry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.ByteBuffer; @@ -28,7 +31,7 @@ import java.util.List; * or one of the {@link #builder(ByteBuffer, byte, CompressionType, TimestampType) builder} variants. */ public class MemoryRecords extends AbstractRecords { - + private static final Logger log = LoggerFactory.getLogger(MemoryRecords.class); public final static MemoryRecords EMPTY = MemoryRecords.readableRecords(ByteBuffer.allocate(0)); private final ByteBuffer buffer; @@ -107,12 +110,16 @@ public class MemoryRecords extends AbstractRecords { * @param destinationBuffer The byte buffer to write the filtered records to * @return A FilterResult with a summary of the output (for metrics) */ - public FilterResult filterTo(LogEntryFilter filter, ByteBuffer destinationBuffer) { - return filterTo(shallowEntries(), filter, destinationBuffer); + public FilterResult filterTo(TopicPartition partition, LogEntryFilter filter, ByteBuffer destinationBuffer, + int maxRecordSize) { + return filterTo(partition, shallowEntries(), filter, destinationBuffer, maxRecordSize); } - private static FilterResult filterTo(Iterable<ByteBufferLogEntry> fromShallowEntries, LogEntryFilter filter, - ByteBuffer destinationBuffer) { + private static FilterResult filterTo(TopicPartition partition, + Iterable<ByteBufferLogEntry> fromShallowEntries, + LogEntryFilter filter, + ByteBuffer destinationBuffer, + int maxRecordSize) { long maxTimestamp = Record.NO_TIMESTAMP; long maxOffset = -1L; long shallowOffsetOfMaxTimestamp = -1L; @@ -121,6 +128,8 @@ public class MemoryRecords extends AbstractRecords { int messagesRetained = 0; int bytesRetained = 0; + ByteBufferOutputStream bufferOutputStream = new ByteBufferOutputStream(destinationBuffer); + for (ByteBufferLogEntry shallowEntry : fromShallowEntries) { bytesRead += shallowEntry.sizeInBytes(); @@ -155,7 +164,7 @@ public class MemoryRecords extends AbstractRecords { if (writeOriginalEntry) { // There are no messages compacted out and no message format conversion, write the original message set back - shallowEntry.writeTo(destinationBuffer); + bufferOutputStream.write(shallowEntry.buffer()); messagesRetained += retainedEntries.size(); bytesRetained += shallowEntry.sizeInBytes(); @@ -164,23 +173,45 @@ public class MemoryRecords extends AbstractRecords { shallowOffsetOfMaxTimestamp = shallowEntry.offset(); } } else if (!retainedEntries.isEmpty()) { - ByteBuffer slice = destinationBuffer.slice(); - MemoryRecordsBuilder builder = builderWithEntries(slice, shallowRecord.timestampType(), shallowRecord.compressionType(), - shallowRecord.timestamp(), retainedEntries); + LogEntry firstEntry = retainedEntries.iterator().next(); + long firstOffset = firstEntry.offset(); + byte magic = firstEntry.record().magic(); + + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(bufferOutputStream, magic, + shallowRecord.compressionType(), shallowRecord.timestampType(), + firstOffset, shallowRecord.timestamp(), bufferOutputStream.buffer().remaining()); + for (LogEntry entry : retainedEntries) + builder.appendWithOffset(entry.offset(), entry.record()); + MemoryRecords records = builder.build(); - destinationBuffer.position(destinationBuffer.position() + slice.position()); + int filteredSizeInBytes = records.sizeInBytes(); + messagesRetained += retainedEntries.size(); bytesRetained += records.sizeInBytes(); + if (filteredSizeInBytes > shallowEntry.sizeInBytes() && filteredSizeInBytes > maxRecordSize) + log.warn("Record batch from {} with first offset {} exceeded max record size {} after cleaning " + + "(new size is {}). Consumers with version earlier than 0.10.1.0 may need to " + + "increase their fetch sizes.", + partition, firstOffset, maxRecordSize, filteredSizeInBytes); + MemoryRecordsBuilder.RecordsInfo info = builder.info(); if (info.maxTimestamp > maxTimestamp) { maxTimestamp = info.maxTimestamp; shallowOffsetOfMaxTimestamp = info.shallowOffsetOfMaxTimestamp; } } + + // If we had to allocate a new buffer to fit the filtered output (see KAFKA-5316), return early to + // avoid the need for additional allocations. + ByteBuffer outputBuffer = bufferOutputStream.buffer(); + if (outputBuffer != destinationBuffer) + return new FilterResult(outputBuffer, messagesRead, bytesRead, messagesRetained, bytesRetained, + maxOffset, maxTimestamp, shallowOffsetOfMaxTimestamp); } - return new FilterResult(messagesRead, bytesRead, messagesRetained, bytesRetained, maxOffset, maxTimestamp, shallowOffsetOfMaxTimestamp); + return new FilterResult(destinationBuffer, messagesRead, bytesRead, messagesRetained, bytesRetained, + maxOffset, maxTimestamp, shallowOffsetOfMaxTimestamp); } /** @@ -259,6 +290,7 @@ public class MemoryRecords extends AbstractRecords { } public static class FilterResult { + public final ByteBuffer output; public final int messagesRead; public final int bytesRead; public final int messagesRetained; @@ -267,13 +299,15 @@ public class MemoryRecords extends AbstractRecords { public final long maxTimestamp; public final long shallowOffsetOfMaxTimestamp; - public FilterResult(int messagesRead, + public FilterResult(ByteBuffer output, + int messagesRead, int bytesRead, int messagesRetained, int bytesRetained, long maxOffset, long maxTimestamp, long shallowOffsetOfMaxTimestamp) { + this.output = output; this.messagesRead = messagesRead; this.bytesRead = bytesRead; this.messagesRetained = messagesRetained; http://git-wip-us.apache.org/repos/asf/kafka/blob/386a8d04/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java index 02bfc24..a46c1c6 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java @@ -105,6 +105,34 @@ public class MemoryRecordsBuilder { private MemoryRecords builtRecords; + + public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream, + byte magic, + CompressionType compressionType, + TimestampType timestampType, + long baseOffset, + long logAppendTime, + int writeLimit) { + this.magic = magic; + this.timestampType = timestampType; + this.compressionType = compressionType; + this.baseOffset = baseOffset; + this.logAppendTime = logAppendTime; + this.initPos = bufferStream.position(); + this.writeLimit = writeLimit; + this.initialCapacity = bufferStream.initialCapacity(); + + if (compressionType != CompressionType.NONE) { + // for compressed records, leave space for the header and the shallow message metadata + // and move the starting position to the value payload offset + bufferStream.position(initPos + Records.LOG_OVERHEAD + Record.recordOverhead(magic)); + } + + // create the stream + this.bufferStream = bufferStream; + appendStream = wrapForOutput(bufferStream, compressionType, magic, COMPRESSION_DEFAULT_BUFFER_SIZE); + } + /** * Construct a new builder. * @@ -126,24 +154,8 @@ public class MemoryRecordsBuilder { long baseOffset, long logAppendTime, int writeLimit) { - this.magic = magic; - this.timestampType = timestampType; - this.compressionType = compressionType; - this.baseOffset = baseOffset; - this.logAppendTime = logAppendTime; - this.initPos = buffer.position(); - this.writeLimit = writeLimit; - this.initialCapacity = buffer.capacity(); - - if (compressionType != CompressionType.NONE) { - // for compressed records, leave space for the header and the shallow message metadata - // and move the starting position to the value payload offset - buffer.position(initPos + Records.LOG_OVERHEAD + Record.recordOverhead(magic)); - } - - // create the stream - bufferStream = new ByteBufferOutputStream(buffer); - appendStream = wrapForOutput(bufferStream, compressionType, magic, COMPRESSION_DEFAULT_BUFFER_SIZE); + this(new ByteBufferOutputStream(buffer), magic, compressionType, timestampType, baseOffset, logAppendTime, + writeLimit); } public ByteBuffer buffer() { @@ -400,7 +412,7 @@ public class MemoryRecordsBuilder { try { switch (type) { case NONE: - return buffer; + return new DataOutputStream(buffer); case GZIP: return new DataOutputStream(new GZIPOutputStream(buffer, bufferSize)); case SNAPPY: http://git-wip-us.apache.org/repos/asf/kafka/blob/386a8d04/clients/src/main/java/org/apache/kafka/common/record/Record.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/Record.java b/clients/src/main/java/org/apache/kafka/common/record/Record.java index 0c0fa3c..de092c7 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/Record.java +++ b/clients/src/main/java/org/apache/kafka/common/record/Record.java @@ -469,7 +469,7 @@ public final class Record { CompressionType compressionType, TimestampType timestampType) { try { - ByteBufferOutputStream out = new ByteBufferOutputStream(buffer); + DataOutputStream out = new DataOutputStream(new ByteBufferOutputStream(buffer)); write(out, magic, timestamp, key, value, compressionType, timestampType); } catch (IOException e) { throw new KafkaException(e); http://git-wip-us.apache.org/repos/asf/kafka/blob/386a8d04/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 24ba434..063c232 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -63,6 +63,7 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import java.io.DataOutputStream; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; @@ -219,7 +220,7 @@ public class FetcherTest { @Test public void testParseInvalidRecord() throws Exception { ByteBuffer buffer = ByteBuffer.allocate(1024); - ByteBufferOutputStream out = new ByteBufferOutputStream(buffer); + DataOutputStream out = new DataOutputStream(new ByteBufferOutputStream(buffer)); byte magic = Record.CURRENT_MAGIC_VALUE; byte[] key = "foo".getBytes(); http://git-wip-us.apache.org/repos/asf/kafka/blob/386a8d04/clients/src/test/java/org/apache/kafka/common/record/ByteBufferOutputStreamTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/record/ByteBufferOutputStreamTest.java b/clients/src/test/java/org/apache/kafka/common/record/ByteBufferOutputStreamTest.java new file mode 100644 index 0000000..6e7b81b --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/record/ByteBufferOutputStreamTest.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.record; + +import org.junit.Test; + +import java.nio.ByteBuffer; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +public class ByteBufferOutputStreamTest { + + @Test + public void testExpandByteBufferOnPositionIncrease() throws Exception { + testExpandByteBufferOnPositionIncrease(ByteBuffer.allocate(16)); + } + + @Test + public void testExpandDirectByteBufferOnPositionIncrease() throws Exception { + testExpandByteBufferOnPositionIncrease(ByteBuffer.allocateDirect(16)); + } + + private void testExpandByteBufferOnPositionIncrease(ByteBuffer initialBuffer) throws Exception { + ByteBufferOutputStream output = new ByteBufferOutputStream(initialBuffer); + output.write("hello".getBytes()); + output.position(32); + assertEquals(32, output.position()); + assertEquals(0, initialBuffer.position()); + + ByteBuffer buffer = output.buffer(); + assertEquals(32, buffer.limit()); + buffer.position(0); + buffer.limit(5); + byte[] bytes = new byte[5]; + buffer.get(bytes); + assertArrayEquals("hello".getBytes(), bytes); + } + + @Test + public void testExpandByteBufferOnWrite() throws Exception { + testExpandByteBufferOnWrite(ByteBuffer.allocate(16)); + } + + @Test + public void testExpandDirectByteBufferOnWrite() throws Exception { + testExpandByteBufferOnWrite(ByteBuffer.allocateDirect(16)); + } + + private void testExpandByteBufferOnWrite(ByteBuffer initialBuffer) throws Exception { + ByteBufferOutputStream output = new ByteBufferOutputStream(initialBuffer); + output.write("hello".getBytes()); + output.write(new byte[27]); + assertEquals(32, output.position()); + assertEquals(0, initialBuffer.position()); + + ByteBuffer buffer = output.buffer(); + assertEquals(32, buffer.limit()); + buffer.position(0); + buffer.limit(5); + byte[] bytes = new byte[5]; + buffer.get(bytes); + assertArrayEquals("hello".getBytes(), bytes); + } + + @Test + public void testWriteByteBuffer() { + testWriteByteBuffer(ByteBuffer.allocate(16)); + } + + @Test + public void testWriteDirectByteBuffer() { + testWriteByteBuffer(ByteBuffer.allocateDirect(16)); + } + + private void testWriteByteBuffer(ByteBuffer input) { + long value = 234239230L; + input.putLong(value); + input.flip(); + + ByteBufferOutputStream output = new ByteBufferOutputStream(ByteBuffer.allocate(32)); + output.write(input); + assertEquals(8, output.position()); + assertEquals(value, output.buffer().getLong(0)); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/386a8d04/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java index 9271a3f..5f668de 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.common.record; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.test.TestUtils; import org.junit.Test; import org.junit.runner.RunWith; @@ -31,6 +32,7 @@ import static java.util.Arrays.asList; import static org.apache.kafka.common.utils.Utils.toNullableArray; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @RunWith(value = Parameterized.class) @@ -93,6 +95,56 @@ public class MemoryRecordsTest { } @Test + public void testFilterToWithUndersizedBuffer() { + ByteBuffer buffer = ByteBuffer.allocate(1024); + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 0L); + builder.append(10L, null, "a".getBytes()); + builder.close(); + + builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 1L); + builder.append(11L, "1".getBytes(), new byte[128]); + builder.append(12L, "2".getBytes(), "c".getBytes()); + builder.append(13L, null, "d".getBytes()); + builder.close(); + + builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 4L); + builder.append(14L, null, "e".getBytes()); + builder.append(15L, "5".getBytes(), "f".getBytes()); + builder.append(16L, "6".getBytes(), "g".getBytes()); + builder.close(); + + builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 7L); + builder.append(17L, "7".getBytes(), new byte[128]); + builder.close(); + + buffer.flip(); + + ByteBuffer output = ByteBuffer.allocate(64); + + List<Record> records = new ArrayList<>(); + while (buffer.hasRemaining()) { + output.rewind(); + + MemoryRecords.FilterResult result = MemoryRecords.readableRecords(buffer) + .filterTo(new TopicPartition("foo", 0), new RetainNonNullKeysFilter(), output, Integer.MAX_VALUE); + + buffer.position(buffer.position() + result.bytesRead); + result.output.flip(); + + if (output != result.output) + assertEquals(0, output.position()); + + MemoryRecords filtered = MemoryRecords.readableRecords(result.output); + records.addAll(TestUtils.toList(filtered.records())); + } + + assertEquals(5, records.size()); + for (Record record : records) + assertNotNull(record.key()); + } + + + @Test public void testFilterTo() { ByteBuffer buffer = ByteBuffer.allocate(2048); MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 0L); @@ -117,7 +169,8 @@ public class MemoryRecordsTest { buffer.flip(); ByteBuffer filtered = ByteBuffer.allocate(2048); - MemoryRecords.FilterResult result = MemoryRecords.readableRecords(buffer).filterTo(new RetainNonNullKeysFilter(), filtered); + MemoryRecords.FilterResult result = MemoryRecords.readableRecords(buffer).filterTo( + new TopicPartition("foo", 0), new RetainNonNullKeysFilter(), filtered, Integer.MAX_VALUE); filtered.flip(); @@ -192,7 +245,8 @@ public class MemoryRecordsTest { buffer.flip(); ByteBuffer filtered = ByteBuffer.allocate(2048); - MemoryRecords.readableRecords(buffer).filterTo(new RetainNonNullKeysFilter(), filtered); + MemoryRecords.readableRecords(buffer).filterTo(new TopicPartition("foo", 0), + new RetainNonNullKeysFilter(), filtered, Integer.MAX_VALUE); filtered.flip(); MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered); http://git-wip-us.apache.org/repos/asf/kafka/blob/386a8d04/core/src/main/scala/kafka/log/LogCleaner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 5f06a73..6c1f13d 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -465,22 +465,23 @@ private[log] class Cleaner(val id: Int, source.log.readInto(readBuffer, position) val records = MemoryRecords.readableRecords(readBuffer) throttler.maybeThrottle(records.sizeInBytes) - val result = records.filterTo(logCleanerFilter, writeBuffer) + val result = records.filterTo(topicPartition, logCleanerFilter, writeBuffer, maxLogMessageSize) stats.readMessages(result.messagesRead, result.bytesRead) stats.recopyMessages(result.messagesRetained, result.bytesRetained) position += result.bytesRead // if any messages are to be retained, write them out - if (writeBuffer.position > 0) { - writeBuffer.flip() - val retained = MemoryRecords.readableRecords(writeBuffer) + val outputBuffer = result.output + if (outputBuffer.position > 0) { + outputBuffer.flip() + val retained = MemoryRecords.readableRecords(outputBuffer) dest.append(firstOffset = retained.deepEntries.iterator.next().offset, largestOffset = result.maxOffset, largestTimestamp = result.maxTimestamp, shallowOffsetOfMaxTimestamp = result.shallowOffsetOfMaxTimestamp, records = retained) - throttler.maybeThrottle(writeBuffer.limit) + throttler.maybeThrottle(outputBuffer.limit) } // if we read bytes but didn't get even one complete message, our I/O buffer is too small, grow it and try again
