Repository: kafka Updated Branches: refs/heads/0.11.0 396f17a94 -> 8b63fc81d
KAFKA-5316; Follow-up with ByteBufferOutputStream and other misc improvements ByteBufferOutputStream improvements: * Document pitfalls * Improve efficiency when dealing with direct byte buffers * Improve handling of buffer expansion * Be consistent about using `limit` instead of `capacity` * Add constructors that allocate the internal buffer Other minor changes: * Fix log warning to specify correct Kafka version * Clean-ups Author: Ismael Juma <[email protected]> Reviewers: Jason Gustafson <[email protected]> Closes #3166 from ijuma/minor-kafka-5316-follow-ups (cherry picked from commit b3788d8dcbeee7a20f562e878c187a75bac11ff0) Signed-off-by: Jason Gustafson <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8b63fc81 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8b63fc81 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8b63fc81 Branch: refs/heads/0.11.0 Commit: 8b63fc81ddc193d90f6a0e0694f2735f7466969c Parents: 396f17a Author: Ismael Juma <[email protected]> Authored: Tue May 30 12:53:32 2017 -0700 Committer: Jason Gustafson <[email protected]> Committed: Tue May 30 12:54:04 2017 -0700 ---------------------------------------------------------------------- .../producer/internals/RecordAccumulator.java | 2 +- .../kafka/common/record/DefaultRecord.java | 19 ----- .../kafka/common/record/MemoryRecords.java | 8 +- .../common/record/MemoryRecordsBuilder.java | 7 +- .../common/utils/ByteBufferOutputStream.java | 85 +++++++++++++------- .../kafka/common/record/DefaultRecordTest.java | 18 +++-- .../utils/ByteBufferOutputStreamTest.java | 1 + 7 files changed, 77 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/8b63fc81/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index e1f04a8..d3d1b82 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -531,7 +531,7 @@ public final class RecordAccumulator { */ public void deallocate(ProducerBatch batch) { incomplete.remove(batch); - // Only deallocate the batch if it is not a split batch because split batch are allocated aside the + // Only deallocate the batch if it is not a split batch because split batch are allocated outside the // buffer pool. if (!batch.isSplitBatch()) free.deallocate(batch.buffer(), batch.initialCapacity()); http://git-wip-us.apache.org/repos/asf/kafka/blob/8b63fc81/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java index 05b5bb2..e61bbc9 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java +++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java @@ -18,7 +18,6 @@ package org.apache.kafka.common.record; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.internals.RecordHeader; -import org.apache.kafka.common.utils.ByteBufferOutputStream; import org.apache.kafka.common.utils.ByteUtils; import org.apache.kafka.common.utils.Checksums; import org.apache.kafka.common.utils.Crc32C; @@ -230,24 +229,6 @@ public class DefaultRecord implements Record { return ByteUtils.sizeOfVarint(sizeInBytes) + sizeInBytes; } - /** - * Write the record to `out` and return its size. - */ - public static int writeTo(ByteBuffer out, - int offsetDelta, - long timestampDelta, - ByteBuffer key, - ByteBuffer value, - Header[] headers) { - try { - return writeTo(new DataOutputStream(new ByteBufferOutputStream(out)), offsetDelta, timestampDelta, - key, value, headers); - } catch (IOException e) { - // cannot actually be raised by ByteBufferOutputStream - throw new IllegalStateException("Unexpected exception raised from ByteBufferOutputStream", e); - } - } - @Override public boolean hasMagic(byte magic) { return magic >= MAGIC_VALUE_V2; http://git-wip-us.apache.org/repos/asf/kafka/blob/8b63fc81/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java index 56d7ed1..528095e 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java @@ -191,7 +191,7 @@ public class MemoryRecords extends AbstractRecords { if (filteredBatchSize > batch.sizeInBytes() && filteredBatchSize > maxRecordBatchSize) log.warn("Record batch from {} with last offset {} exceeded max record batch size {} after cleaning " + - "(new size is {}). Consumers from version 0.10.1 and earlier may need to " + + "(new size is {}). Consumers with version earlier than 0.10.1.0 may need to " + "increase their fetch sizes.", partition, batch.lastOffset(), maxRecordBatchSize, filteredBatchSize); @@ -527,13 +527,13 @@ public class MemoryRecords extends AbstractRecords { if (records.length == 0) return MemoryRecords.EMPTY; int sizeEstimate = AbstractRecords.estimateSizeInBytes(magic, compressionType, Arrays.asList(records)); - ByteBuffer buffer = ByteBuffer.allocate(sizeEstimate); + ByteBufferOutputStream bufferStream = new ByteBufferOutputStream(sizeEstimate); long logAppendTime = RecordBatch.NO_TIMESTAMP; if (timestampType == TimestampType.LOG_APPEND_TIME) logAppendTime = System.currentTimeMillis(); - MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic, compressionType, timestampType, + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(bufferStream, magic, compressionType, timestampType, initialOffset, logAppendTime, producerId, producerEpoch, baseSequence, isTransactional, false, - partitionLeaderEpoch, buffer.capacity()); + partitionLeaderEpoch, sizeEstimate); for (SimpleRecord record : records) builder.append(record); return builder.build(); http://git-wip-us.apache.org/repos/asf/kafka/blob/8b63fc81/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java index 49d70c6..aaca851 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java @@ -56,7 +56,6 @@ public class MemoryRecordsBuilder { private final boolean isControlBatch; private final int partitionLeaderEpoch; private final int writeLimit; - private final int initialCapacity; private volatile float estimatedCompressionRatio; @@ -115,7 +114,6 @@ public class MemoryRecordsBuilder { this.writeLimit = writeLimit; this.initialPosition = bufferStream.position(); - this.initialCapacity = bufferStream.capacity(); if (magic > RecordBatch.MAGIC_VALUE_V1) { bufferStream.position(initialPosition + DefaultRecordBatch.RECORDS_OFFSET); @@ -125,7 +123,6 @@ public class MemoryRecordsBuilder { bufferStream.position(initialPosition + Records.LOG_OVERHEAD + LegacyRecord.recordOverhead(magic)); } - // create the stream this.bufferStream = bufferStream; this.appendStream = new DataOutputStream(compressionType.wrapForOutput(this.bufferStream, magic, COMPRESSION_DEFAULT_BUFFER_SIZE)); @@ -174,7 +171,7 @@ public class MemoryRecordsBuilder { } public int initialCapacity() { - return initialCapacity; + return bufferStream.initialCapacity(); } public double compressionRatio() { @@ -718,7 +715,7 @@ public class MemoryRecordsBuilder { // Be conservative and not take compression of the new record into consideration. return numRecords == 0 ? - this.initialCapacity >= recordSize : + bufferStream.remaining() >= recordSize : this.writeLimit >= estimatedBytesWritten() + recordSize; } http://git-wip-us.apache.org/repos/asf/kafka/blob/8b63fc81/clients/src/main/java/org/apache/kafka/common/utils/ByteBufferOutputStream.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/utils/ByteBufferOutputStream.java b/clients/src/main/java/org/apache/kafka/common/utils/ByteBufferOutputStream.java index 79d4d4c..2b13e7e 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/ByteBufferOutputStream.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/ByteBufferOutputStream.java @@ -20,40 +20,60 @@ import java.io.OutputStream; import java.nio.ByteBuffer; /** - * A ByteBuffer-backed OutputStream + * A ByteBuffer-backed OutputStream that expands the internal ByteBuffer as required. Given this, the caller should + * always access the underlying ByteBuffer via the {@link #buffer()} method until all writes are completed. + * + * This class is typically used for 2 purposes: + * + * 1. Write to a ByteBuffer when there is a chance that we may need to expand it in order to fit all the desired data + * 2. Write to a ByteBuffer via methods that expect an OutputStream interface + * + * Hard to track bugs can happen when this class is used for the second reason and unexpected buffer expansion happens. + * So, it's best to assume that buffer expansion can always happen. An improvement would be to create a separate class + * that throws an error if buffer expansion is required to avoid the issue altogether. */ public class ByteBufferOutputStream extends OutputStream { private static final float REALLOCATION_FACTOR = 1.1f; + private final int initialCapacity; + private final int initialPosition; private ByteBuffer buffer; - private int initialPosition; + /** + * Creates an instance of this class that will write to the received `buffer` up to its `limit`. If necessary to + * satisfy `write` or `position` calls, larger buffers will be allocated so the {@link #buffer()} method may return + * a different buffer than the received `buffer` parameter. + * + * Prefer one of the constructors that allocate the internal buffer for clearer semantics. + */ public ByteBufferOutputStream(ByteBuffer buffer) { this.buffer = buffer; this.initialPosition = buffer.position(); + this.initialCapacity = buffer.capacity(); + } + + public ByteBufferOutputStream(int initialCapacity) { + this(initialCapacity, false); + } + + public ByteBufferOutputStream(int initialCapacity, boolean directBuffer) { + this(directBuffer ? ByteBuffer.allocateDirect(initialCapacity) : ByteBuffer.allocate(initialCapacity)); } public void write(int b) { - if (buffer.remaining() < 1) - expandBuffer(buffer.capacity() + 1); + maybeExpandBuffer(1); buffer.put((byte) b); } public void write(byte[] bytes, int off, int len) { - if (buffer.remaining() < len) - expandBuffer(buffer.position() + len); + maybeExpandBuffer(len); buffer.put(bytes, off, len); } - public void write(ByteBuffer buffer) { - if (buffer.hasArray()) - write(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining()); - else { - int pos = buffer.position(); - for (int i = pos, limit = buffer.remaining() + pos; i < limit; i++) - write(buffer.get(i)); - } + public void write(ByteBuffer sourceBuffer) { + maybeExpandBuffer(sourceBuffer.remaining()); + buffer.put(sourceBuffer); } public ByteBuffer buffer() { @@ -64,8 +84,8 @@ public class ByteBufferOutputStream extends OutputStream { return buffer.position(); } - public int capacity() { - return buffer.capacity(); + public int remaining() { + return buffer.remaining(); } public int limit() { @@ -73,23 +93,32 @@ public class ByteBufferOutputStream extends OutputStream { } public void position(int position) { - if (position > buffer.limit()) - expandBuffer(position); + maybeExpandBuffer(position - buffer.position()); buffer.position(position); } - private void expandBuffer(int size) { - int expandSize = Math.max((int) (buffer.capacity() * REALLOCATION_FACTOR), size); - ByteBuffer temp = ByteBuffer.allocate(expandSize); - if (buffer.hasArray()) { - temp.put(buffer.array(), buffer.arrayOffset(), buffer.position()); - } else { - int limit = buffer.position(); - for (int i = 0; i < limit; i++) - temp.put(buffer.get(i)); - } + /** + * The capacity of the first internal ByteBuffer used by this class. This is useful in cases where a pooled + * ByteBuffer was passed via the constructor and it needs to be returned to the pool. + */ + public int initialCapacity() { + return initialCapacity; + } + private void maybeExpandBuffer(int remainingRequired) { + if (remainingRequired > buffer.remaining()) + expandBuffer(remainingRequired); + } + + private void expandBuffer(int remainingRequired) { + int expandSize = Math.max((int) (buffer.limit() * REALLOCATION_FACTOR), buffer.position() + remainingRequired); + ByteBuffer temp = ByteBuffer.allocate(expandSize); + int limit = limit(); + buffer.flip(); + temp.put(buffer); + buffer.limit(limit); // reset the old buffer's position so that the partial data in the new buffer cannot be mistakenly consumed + // we should ideally only do this for the original buffer, but the additional complexity doesn't seem worth it buffer.position(initialPosition); buffer = temp; } http://git-wip-us.apache.org/repos/asf/kafka/blob/8b63fc81/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java index 61b7b00..2c0ef05 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java @@ -18,8 +18,11 @@ package org.apache.kafka.common.record; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.utils.ByteBufferOutputStream; import org.junit.Test; +import java.io.DataOutputStream; +import java.io.IOException; import java.nio.ByteBuffer; import static org.junit.Assert.assertArrayEquals; @@ -29,7 +32,7 @@ import static org.junit.Assert.assertNotNull; public class DefaultRecordTest { @Test - public void testBasicSerde() { + public void testBasicSerde() throws IOException { Header[] headers = new Header[] { new RecordHeader("foo", "value".getBytes()), new RecordHeader("bar", (byte[]) null), @@ -51,8 +54,10 @@ public class DefaultRecordTest { long baseTimestamp = System.currentTimeMillis(); long timestampDelta = 323; - ByteBuffer buffer = ByteBuffer.allocate(1024); - DefaultRecord.writeTo(buffer, offsetDelta, timestampDelta, record.key(), record.value(), record.headers()); + ByteBufferOutputStream out = new ByteBufferOutputStream(1024); + DefaultRecord.writeTo(new DataOutputStream(out), offsetDelta, timestampDelta, record.key(), record.value(), + record.headers()); + ByteBuffer buffer = out.buffer(); buffer.flip(); DefaultRecord logRecord = DefaultRecord.readFrom(buffer, baseOffset, baseTimestamp, baseSequence, null); @@ -69,7 +74,7 @@ public class DefaultRecordTest { } @Test - public void testSerdeNoSequence() { + public void testSerdeNoSequence() throws IOException { ByteBuffer key = ByteBuffer.wrap("hi".getBytes()); ByteBuffer value = ByteBuffer.wrap("there".getBytes()); long baseOffset = 37; @@ -77,8 +82,9 @@ public class DefaultRecordTest { long baseTimestamp = System.currentTimeMillis(); long timestampDelta = 323; - ByteBuffer buffer = ByteBuffer.allocate(1024); - DefaultRecord.writeTo(buffer, offsetDelta, timestampDelta, key, value, new Header[0]); + ByteBufferOutputStream out = new ByteBufferOutputStream(1024); + DefaultRecord.writeTo(new DataOutputStream(out), offsetDelta, timestampDelta, key, value, new Header[0]); + ByteBuffer buffer = out.buffer(); buffer.flip(); DefaultRecord record = DefaultRecord.readFrom(buffer, baseOffset, baseTimestamp, RecordBatch.NO_SEQUENCE, null); http://git-wip-us.apache.org/repos/asf/kafka/blob/8b63fc81/clients/src/test/java/org/apache/kafka/common/utils/ByteBufferOutputStreamTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/utils/ByteBufferOutputStreamTest.java b/clients/src/test/java/org/apache/kafka/common/utils/ByteBufferOutputStreamTest.java index 2ef5672..fbac719 100644 --- a/clients/src/test/java/org/apache/kafka/common/utils/ByteBufferOutputStreamTest.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/ByteBufferOutputStreamTest.java @@ -94,6 +94,7 @@ public class ByteBufferOutputStreamTest { ByteBufferOutputStream output = new ByteBufferOutputStream(ByteBuffer.allocate(32)); output.write(input); + assertEquals(8, input.position()); assertEquals(8, output.position()); assertEquals(value, output.buffer().getLong(0)); }
