Repository: kafka Updated Branches: refs/heads/0.10.2 386a8d041 -> e944956a3
KAFKA-5150; reduce lz4 decompression overhead - reuse decompression buffers in consumer Fetcher - switch lz4 input stream to operate directly on ByteBuffers - avoids performance impact of catching exceptions when reaching the end of legacy record batches - more tests with both compressible / incompressible data, multiple blocks, and various other combinations to increase code coverage - fixes bug that would cause exception instead of invalid block size for invalid incompressible blocks - fixes bug if incompressible flag is set on end frame block size Author: Xavier Léauté <[email protected]> Author: Ismael Juma <[email protected]> Reviewers: Jason Gustafson <[email protected]>, Ismael Juma <[email protected]> Closes #3090 from xvrl/kafka-5150-0.10 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e944956a Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e944956a Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e944956a Branch: refs/heads/0.10.2 Commit: e944956a35d30ba4e393be887f873f12106efe75 Parents: 386a8d0 Author: Xavier Léauté <[email protected]> Authored: Thu Jun 1 20:05:04 2017 +0100 Committer: Ismael Juma <[email protected]> Committed: Thu Jun 1 20:05:11 2017 +0100 ---------------------------------------------------------------------- .../clients/consumer/internals/Fetcher.java | 4 +- .../kafka/common/record/AbstractRecords.java | 4 +- .../kafka/common/record/BufferSupplier.java | 95 ++++++ .../apache/kafka/common/record/FileRecords.java | 23 +- .../common/record/KafkaLZ4BlockInputStream.java | 205 +++++++------ .../apache/kafka/common/record/LogEntry.java | 2 +- .../kafka/common/record/MemoryRecords.java | 19 +- .../common/record/MemoryRecordsBuilder.java | 22 +- .../org/apache/kafka/common/record/Records.java | 18 +- .../kafka/common/record/RecordsIterator.java | 51 ++-- .../org/apache/kafka/common/utils/Utils.java | 24 ++ .../internals/RecordAccumulatorTest.java | 7 +- .../kafka/common/record/FileRecordsTest.java | 2 +- .../kafka/common/record/KafkaLZ4Test.java | 286 ++++++++++++++++--- .../kafka/common/record/MemoryRecordsTest.java | 6 +- .../src/main/scala/kafka/log/LogValidator.scala | 2 +- .../kafka/message/CompressionFactory.scala | 19 +- .../kafka/message/MessageCompressionTest.scala | 7 +- .../unit/kafka/message/MessageWriterTest.scala | 2 +- 19 files changed, 593 insertions(+), 205 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/e944956a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index e2631b5..a3d4a90 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -41,6 +41,7 @@ import org.apache.kafka.common.metrics.stats.Max; import org.apache.kafka.common.metrics.stats.Rate; import org.apache.kafka.common.metrics.stats.Value; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.BufferSupplier; import org.apache.kafka.common.record.InvalidRecordException; import org.apache.kafka.common.record.LogEntry; import org.apache.kafka.common.record.Record; @@ -93,6 +94,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener { private final ConcurrentLinkedQueue<CompletedFetch> completedFetches; private final Deserializer<K> keyDeserializer; private final Deserializer<V> valueDeserializer; + private final BufferSupplier decompressionBufferSupplier = BufferSupplier.create(); private PartitionRecords<K, V> nextInLineRecords = null; private ExceptionMetadata nextInLineExceptionMetadata = null; @@ -782,7 +784,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener { List<ConsumerRecord<K, V>> parsed = new ArrayList<>(); boolean skippedRecords = false; - for (LogEntry logEntry : partition.records.deepEntries()) { + for (LogEntry logEntry : partition.records.deepEntries(decompressionBufferSupplier)) { // Skip the messages earlier than current position. if (logEntry.offset() >= position) { parsed.add(parseRecord(tp, logEntry)); http://git-wip-us.apache.org/repos/asf/kafka/blob/e944956a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java index 3a96d88..ebfc7ef 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java @@ -26,7 +26,7 @@ public abstract class AbstractRecords implements Records { @Override public Iterator<Record> iterator() { return new Iterator<Record>() { - private final Iterator<? extends LogEntry> deepEntries = deepEntries().iterator(); + private final Iterator<? extends LogEntry> deepEntries = deepEntries(BufferSupplier.NO_CACHING).iterator(); @Override public boolean hasNext() { return deepEntries.hasNext(); @@ -57,7 +57,7 @@ public abstract class AbstractRecords implements Records { @Override public Records toMessageFormat(byte toMagic) { List<LogEntry> converted = new ArrayList<>(); - for (LogEntry entry : deepEntries()) + for (LogEntry entry : deepEntries(BufferSupplier.NO_CACHING)) converted.add(LogEntry.create(entry.offset(), entry.record().convert(toMagic))); if (converted.isEmpty()) { http://git-wip-us.apache.org/repos/asf/kafka/blob/e944956a/clients/src/main/java/org/apache/kafka/common/record/BufferSupplier.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/BufferSupplier.java b/clients/src/main/java/org/apache/kafka/common/record/BufferSupplier.java new file mode 100644 index 0000000..df7ba0b --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/record/BufferSupplier.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.record; + +import java.nio.ByteBuffer; +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.HashMap; +import java.util.Map; + +/** + * Simple non-threadsafe interface for caching byte buffers. This is suitable for simple cases like ensuring that + * a given KafkaConsumer reuses the same decompression buffer when iterating over fetched records. For small record + * batches, allocating a potentially large buffer (64 KB for LZ4) will dominate the cost of decompressing and + * iterating over the records in the batch. + */ +public abstract class BufferSupplier implements AutoCloseable { + + public static final BufferSupplier NO_CACHING = new BufferSupplier() { + @Override + public ByteBuffer get(int capacity) { + return ByteBuffer.allocate(capacity); + } + + @Override + public void release(ByteBuffer buffer) {} + + @Override + public void close() {} + }; + + public static BufferSupplier create() { + return new DefaultSupplier(); + } + + /** + * Supply a buffer with the required capacity. This may return a cached buffer or allocate a new instance. + */ + public abstract ByteBuffer get(int capacity); + + /** + * Return the provided buffer to be reused by a subsequent call to `get`. + */ + public abstract void release(ByteBuffer buffer); + + /** + * Release all resources associated with this supplier. + */ + public abstract void close(); + + private static class DefaultSupplier extends BufferSupplier { + // We currently use a single block size, so optimise for that case + private final Map<Integer, Deque<ByteBuffer>> bufferMap = new HashMap<>(1); + + @Override + public ByteBuffer get(int size) { + Deque<ByteBuffer> bufferQueue = bufferMap.get(size); + if (bufferQueue == null || bufferQueue.isEmpty()) + return ByteBuffer.allocate(size); + else + return bufferQueue.pollFirst(); + } + + @Override + public void release(ByteBuffer buffer) { + buffer.clear(); + Deque<ByteBuffer> bufferQueue = bufferMap.get(buffer.capacity()); + if (bufferQueue == null) { + // We currently keep a single buffer in flight, so optimise for that case + bufferQueue = new ArrayDeque<>(1); + bufferMap.put(buffer.capacity(), bufferQueue); + } + bufferQueue.addLast(buffer); + } + + @Override + public void close() { + bufferMap.clear(); + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/e944956a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java index 960b716..fd8df3e 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java @@ -43,13 +43,6 @@ public class FileRecords extends AbstractRecords implements Closeable { private final Iterable<FileChannelLogEntry> shallowEntries; - private final Iterable<LogEntry> deepEntries = new Iterable<LogEntry>() { - @Override - public Iterator<LogEntry> iterator() { - return deepIterator(); - } - }; - // mutable state private final AtomicInteger size; private final FileChannel channel; @@ -362,18 +355,28 @@ public class FileRecords extends AbstractRecords implements Closeable { } @Override + public Iterable<LogEntry> deepEntries(final BufferSupplier bufferSupplier) { + return new Iterable<LogEntry>() { + @Override + public Iterator<LogEntry> iterator() { + return deepIterator(bufferSupplier); + } + }; + } + + @Override public Iterable<LogEntry> deepEntries() { - return deepEntries; + return deepEntries(BufferSupplier.NO_CACHING); } - private Iterator<LogEntry> deepIterator() { + private Iterator<LogEntry> deepIterator(BufferSupplier bufferSupplier) { final int end; if (isSlice) end = this.end; else end = this.sizeInBytes(); FileLogInputStream inputStream = new FileLogInputStream(channel, Integer.MAX_VALUE, start, end); - return new RecordsIterator(inputStream, false, false, Integer.MAX_VALUE); + return new RecordsIterator(inputStream, false, false, Integer.MAX_VALUE, bufferSupplier); } public static FileRecords open(File file, http://git-wip-us.apache.org/repos/asf/kafka/blob/e944956a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java index a408580..3d9d86b 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java +++ b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java @@ -14,83 +14,78 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.kafka.common.record; -import static org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.LZ4_FRAME_INCOMPRESSIBLE_MASK; -import static org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.LZ4_MAX_HEADER_LENGTH; -import static org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.MAGIC; - -import java.io.FilterInputStream; -import java.io.IOException; -import java.io.InputStream; - -import org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.BD; -import org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.FLG; -import org.apache.kafka.common.utils.Utils; - import net.jpountz.lz4.LZ4Exception; import net.jpountz.lz4.LZ4Factory; import net.jpountz.lz4.LZ4SafeDecompressor; import net.jpountz.xxhash.XXHash32; import net.jpountz.xxhash.XXHashFactory; +import org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.BD; +import org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.FLG; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +import static org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.LZ4_FRAME_INCOMPRESSIBLE_MASK; +import static org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.MAGIC; + /** * A partial implementation of the v1.5.1 LZ4 Frame format. * - * @see <a href="http://cyan4973.github.io/lz4/lz4_Frame_format.html">LZ4 Frame Format</a> + * @see <a href="https://github.com/lz4/lz4/wiki/lz4_Frame_format.md">LZ4 Frame Format</a> + * + * This class is not thread-safe. */ -public final class KafkaLZ4BlockInputStream extends FilterInputStream { +public final class KafkaLZ4BlockInputStream extends InputStream { public static final String PREMATURE_EOS = "Stream ended prematurely"; public static final String NOT_SUPPORTED = "Stream unsupported (invalid magic bytes)"; public static final String BLOCK_HASH_MISMATCH = "Block checksum mismatch"; public static final String DESCRIPTOR_HASH_MISMATCH = "Stream frame descriptor corrupted"; - private final LZ4SafeDecompressor decompressor; - private final XXHash32 checksum; - private final byte[] buffer; - private final byte[] compressedBuffer; - private final int maxBlockSize; + private static final LZ4SafeDecompressor DECOMPRESSOR = LZ4Factory.fastestInstance().safeDecompressor(); + private static final XXHash32 CHECKSUM = XXHashFactory.fastestInstance().hash32(); + + private final ByteBuffer in; private final boolean ignoreFlagDescriptorChecksum; + private final BufferSupplier bufferSupplier; + private final ByteBuffer decompressionBuffer; + // `flg` and `maxBlockSize` are effectively final, they are initialised in the `readHeader` method that is only + // invoked from the constructor private FLG flg; - private BD bd; - private int bufferOffset; - private int bufferSize; + private int maxBlockSize; + + // If a block is compressed, this is the same as `decompressionBuffer`. If a block is not compressed, this is + // a slice of `in` to avoid unnecessary copies. + private ByteBuffer decompressedBuffer; private boolean finished; /** * Create a new {@link InputStream} that will decompress data using the LZ4 algorithm. * - * @param in The stream to decompress + * @param in The byte buffer to decompress * @param ignoreFlagDescriptorChecksum for compatibility with old kafka clients, ignore incorrect HC byte * @throws IOException */ - public KafkaLZ4BlockInputStream(InputStream in, boolean ignoreFlagDescriptorChecksum) throws IOException { - super(in); - decompressor = LZ4Factory.fastestInstance().safeDecompressor(); - checksum = XXHashFactory.fastestInstance().hash32(); + public KafkaLZ4BlockInputStream(ByteBuffer in, BufferSupplier bufferSupplier, boolean ignoreFlagDescriptorChecksum) throws IOException { this.ignoreFlagDescriptorChecksum = ignoreFlagDescriptorChecksum; + this.in = in.duplicate().order(ByteOrder.LITTLE_ENDIAN); + this.bufferSupplier = bufferSupplier; readHeader(); - maxBlockSize = bd.getBlockMaximumSize(); - buffer = new byte[maxBlockSize]; - compressedBuffer = new byte[maxBlockSize]; - bufferOffset = 0; - bufferSize = 0; + decompressionBuffer = bufferSupplier.get(maxBlockSize); + if (!decompressionBuffer.hasArray() || decompressionBuffer.arrayOffset() != 0) { + // require array backed decompression buffer with zero offset + // to simplify workaround for https://github.com/lz4/lz4-java/pull/65 + throw new RuntimeException("decompression buffer must have backing array with zero array offset"); + } finished = false; } /** - * Create a new {@link InputStream} that will decompress data using the LZ4 algorithm. - * - * @param in The stream to decompress - * @throws IOException - */ - public KafkaLZ4BlockInputStream(InputStream in) throws IOException { - this(in, false); - } - - /** * Check whether KafkaLZ4BlockInputStream is configured to ignore the * Frame Descriptor checksum, which is useful for compatibility with * old client implementations that use incorrect checksum calculations. @@ -100,43 +95,50 @@ public final class KafkaLZ4BlockInputStream extends FilterInputStream { } /** - * Reads the magic number and frame descriptor from the underlying {@link InputStream}. + * Reads the magic number and frame descriptor from input buffer. * * @throws IOException */ private void readHeader() throws IOException { - byte[] header = new byte[LZ4_MAX_HEADER_LENGTH]; - // read first 6 bytes into buffer to check magic and FLG/BD descriptor flags - int headerOffset = 6; - if (in.read(header, 0, headerOffset) != headerOffset) { + if (in.remaining() < 6) { throw new IOException(PREMATURE_EOS); } - if (MAGIC != Utils.readUnsignedIntLE(header, headerOffset - 6)) { + if (MAGIC != in.getInt()) { throw new IOException(NOT_SUPPORTED); } - flg = FLG.fromByte(header[headerOffset - 2]); - bd = BD.fromByte(header[headerOffset - 1]); + // mark start of data to checksum + in.mark(); + + flg = FLG.fromByte(in.get()); + maxBlockSize = BD.fromByte(in.get()).getBlockMaximumSize(); if (flg.isContentSizeSet()) { - if (in.read(header, headerOffset, 8) != 8) + if (in.remaining() < 8) { throw new IOException(PREMATURE_EOS); - headerOffset += 8; + } + in.position(in.position() + 8); } // Final byte of Frame Descriptor is HC checksum - header[headerOffset++] = (byte) in.read(); // Old implementations produced incorrect HC checksums - if (ignoreFlagDescriptorChecksum) + if (ignoreFlagDescriptorChecksum) { + in.position(in.position() + 1); return; + } + + int len = in.position() - in.reset().position(); - int offset = 4; - int len = headerOffset - offset - 1; // dont include magic bytes or HC - byte hash = (byte) ((checksum.hash(header, offset, len, 0) >> 8) & 0xFF); - if (hash != header[headerOffset - 1]) + int hash = in.hasArray() ? + // workaround for https://github.com/lz4/lz4-java/pull/65 + CHECKSUM.hash(in.array(), in.arrayOffset() + in.position(), len, 0) : + CHECKSUM.hash(in, in.position(), len, 0); + in.position(in.position() + len); + if (in.get() != (byte) ((hash >> 8) & 0xFF)) { throw new IOException(DESCRIPTOR_HASH_MISMATCH); + } } /** @@ -146,46 +148,70 @@ public final class KafkaLZ4BlockInputStream extends FilterInputStream { * @throws IOException */ private void readBlock() throws IOException { - int blockSize = Utils.readUnsignedIntLE(in); + if (in.remaining() < 4) { + throw new IOException(PREMATURE_EOS); + } + + int blockSize = in.getInt(); + boolean compressed = (blockSize & LZ4_FRAME_INCOMPRESSIBLE_MASK) == 0; + blockSize &= ~LZ4_FRAME_INCOMPRESSIBLE_MASK; // Check for EndMark if (blockSize == 0) { finished = true; if (flg.isContentChecksumSet()) - Utils.readUnsignedIntLE(in); // TODO: verify this content checksum + in.getInt(); // TODO: verify this content checksum return; } else if (blockSize > maxBlockSize) { throw new IOException(String.format("Block size %s exceeded max: %s", blockSize, maxBlockSize)); } - boolean compressed = (blockSize & LZ4_FRAME_INCOMPRESSIBLE_MASK) == 0; - byte[] bufferToRead; - if (compressed) { - bufferToRead = compressedBuffer; - } else { - blockSize &= ~LZ4_FRAME_INCOMPRESSIBLE_MASK; - bufferToRead = buffer; - bufferSize = blockSize; - } - - if (in.read(bufferToRead, 0, blockSize) != blockSize) { + if (in.remaining() < blockSize) { throw new IOException(PREMATURE_EOS); } - // verify checksum - if (flg.isBlockChecksumSet() && Utils.readUnsignedIntLE(in) != checksum.hash(bufferToRead, 0, blockSize, 0)) { - throw new IOException(BLOCK_HASH_MISMATCH); - } - if (compressed) { try { - bufferSize = decompressor.decompress(compressedBuffer, 0, blockSize, buffer, 0, maxBlockSize); + // workaround for https://github.com/lz4/lz4-java/pull/65 + final int bufferSize; + if (in.hasArray()) { + bufferSize = DECOMPRESSOR.decompress( + in.array(), + in.position() + in.arrayOffset(), + blockSize, + decompressionBuffer.array(), + 0, + maxBlockSize + ); + } else { + // decompressionBuffer has zero arrayOffset, so we don't need to worry about + // https://github.com/lz4/lz4-java/pull/65 + bufferSize = DECOMPRESSOR.decompress(in, in.position(), blockSize, decompressionBuffer, 0, maxBlockSize); + } + decompressionBuffer.position(0); + decompressionBuffer.limit(bufferSize); + decompressedBuffer = decompressionBuffer; } catch (LZ4Exception e) { throw new IOException(e); } + } else { + decompressedBuffer = in.slice(); + decompressedBuffer.limit(blockSize); } - bufferOffset = 0; + // verify checksum + if (flg.isBlockChecksumSet()) { + // workaround for https://github.com/lz4/lz4-java/pull/65 + int hash = in.hasArray() ? + CHECKSUM.hash(in.array(), in.arrayOffset() + in.position(), blockSize, 0) : + CHECKSUM.hash(in, in.position(), blockSize, 0); + in.position(in.position() + blockSize); + if (hash != in.getInt()) { + throw new IOException(BLOCK_HASH_MISMATCH); + } + } else { + in.position(in.position() + blockSize); + } } @Override @@ -200,7 +226,7 @@ public final class KafkaLZ4BlockInputStream extends FilterInputStream { return -1; } - return buffer[bufferOffset++] & 0xFF; + return decompressedBuffer.get() & 0xFF; } @Override @@ -216,8 +242,8 @@ public final class KafkaLZ4BlockInputStream extends FilterInputStream { return -1; } len = Math.min(len, available()); - System.arraycopy(buffer, bufferOffset, b, off, len); - bufferOffset += len; + + decompressedBuffer.get(b, off, len); return len; } @@ -232,28 +258,28 @@ public final class KafkaLZ4BlockInputStream extends FilterInputStream { if (finished) { return 0; } - n = Math.min(n, available()); - bufferOffset += n; - return n; + int skipped = (int) Math.min(n, available()); + decompressedBuffer.position(decompressedBuffer.position() + skipped); + return skipped; } @Override public int available() throws IOException { - return bufferSize - bufferOffset; + return decompressedBuffer == null ? 0 : decompressedBuffer.remaining(); } @Override public void close() throws IOException { - in.close(); + bufferSupplier.release(decompressionBuffer); } @Override - public synchronized void mark(int readlimit) { + public void mark(int readlimit) { throw new RuntimeException("mark not supported"); } @Override - public synchronized void reset() throws IOException { + public void reset() throws IOException { throw new RuntimeException("reset not supported"); } @@ -261,5 +287,4 @@ public final class KafkaLZ4BlockInputStream extends FilterInputStream { public boolean markSupported() { return false; } - } http://git-wip-us.apache.org/repos/asf/kafka/blob/e944956a/clients/src/main/java/org/apache/kafka/common/record/LogEntry.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/LogEntry.java b/clients/src/main/java/org/apache/kafka/common/record/LogEntry.java index d2db356..51c9ebb 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/LogEntry.java +++ b/clients/src/main/java/org/apache/kafka/common/record/LogEntry.java @@ -107,7 +107,7 @@ public abstract class LogEntry implements Iterable<LogEntry> { @Override public Iterator<LogEntry> iterator() { if (isCompressed()) - return new RecordsIterator.DeepRecordsIterator(this, false, Integer.MAX_VALUE); + return new RecordsIterator.DeepRecordsIterator(this, false, Integer.MAX_VALUE, BufferSupplier.NO_CACHING); return Collections.singletonList(this).iterator(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/e944956a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java index f1a6e43..6cc70be 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java @@ -43,8 +43,6 @@ public class MemoryRecords extends AbstractRecords { } }; - private final Iterable<LogEntry> deepEntries = deepEntries(false); - private int validBytes = -1; // Construct a writable memory records @@ -231,27 +229,32 @@ public class MemoryRecords extends AbstractRecords { } @Override + public Iterable<LogEntry> deepEntries(BufferSupplier bufferSupplier) { + return deepEntries(false, bufferSupplier); + } + + @Override public Iterable<LogEntry> deepEntries() { - return deepEntries; + return deepEntries(false, BufferSupplier.NO_CACHING); } - public Iterable<LogEntry> deepEntries(final boolean ensureMatchingMagic) { + public Iterable<LogEntry> deepEntries(final boolean ensureMatchingMagic, final BufferSupplier bufferSupplier) { return new Iterable<LogEntry>() { @Override public Iterator<LogEntry> iterator() { - return deepIterator(ensureMatchingMagic, Integer.MAX_VALUE); + return deepIterator(ensureMatchingMagic, Integer.MAX_VALUE, bufferSupplier); } }; } - private Iterator<LogEntry> deepIterator(boolean ensureMatchingMagic, int maxMessageSize) { + private Iterator<LogEntry> deepIterator(boolean ensureMatchingMagic, int maxMessageSize, BufferSupplier bufferSupplier) { return new RecordsIterator(new ByteBufferLogInputStream(buffer.duplicate(), maxMessageSize), false, - ensureMatchingMagic, maxMessageSize); + ensureMatchingMagic, maxMessageSize, bufferSupplier); } @Override public String toString() { - Iterator<LogEntry> iter = deepEntries().iterator(); + Iterator<LogEntry> iter = deepEntries(BufferSupplier.NO_CACHING).iterator(); StringBuilder builder = new StringBuilder(); builder.append('['); while (iter.hasNext()) { http://git-wip-us.apache.org/repos/asf/kafka/blob/e944956a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java index a46c1c6..7775140 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java @@ -18,7 +18,6 @@ package org.apache.kafka.common.record; import org.apache.kafka.common.KafkaException; -import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStream; @@ -81,7 +80,7 @@ public class MemoryRecordsBuilder { @Override public Constructor get() throws ClassNotFoundException, NoSuchMethodException { return Class.forName("org.apache.kafka.common.record.KafkaLZ4BlockInputStream") - .getConstructor(InputStream.class, Boolean.TYPE); + .getConstructor(ByteBuffer.class, BufferSupplier.class, Boolean.TYPE); } }); @@ -408,7 +407,7 @@ public class MemoryRecordsBuilder { return builtRecords != null ? builtRecords.sizeInBytes() : estimatedBytesWritten(); } - private static DataOutputStream wrapForOutput(ByteBufferOutputStream buffer, CompressionType type, byte messageVersion, int bufferSize) { + protected static DataOutputStream wrapForOutput(ByteBufferOutputStream buffer, CompressionType type, byte messageVersion, int bufferSize) { try { switch (type) { case NONE: @@ -438,25 +437,26 @@ public class MemoryRecordsBuilder { } } - public static DataInputStream wrapForInput(ByteBufferInputStream buffer, CompressionType type, byte messageVersion) { + public static InputStream wrapForInput(ByteBuffer buffer, CompressionType type, byte messageVersion, BufferSupplier bufferSupplier) { try { switch (type) { case NONE: - return buffer; + return new ByteBufferInputStream(buffer); case GZIP: - return new DataInputStream(new GZIPInputStream(buffer)); + return new GZIPInputStream(new ByteBufferInputStream(buffer)); case SNAPPY: try { - InputStream stream = (InputStream) snappyInputStreamSupplier.get().newInstance(buffer); - return new DataInputStream(stream); + return (InputStream) snappyInputStreamSupplier.get().newInstance(new ByteBufferInputStream(buffer)); } catch (Exception e) { throw new KafkaException(e); } case LZ4: try { - InputStream stream = (InputStream) lz4InputStreamSupplier.get().newInstance(buffer, - messageVersion == Record.MAGIC_VALUE_V0); - return new DataInputStream(stream); + return (InputStream) lz4InputStreamSupplier.get().newInstance( + buffer, + bufferSupplier, + messageVersion == Record.MAGIC_VALUE_V0 + ); } catch (Exception e) { throw new KafkaException(e); } http://git-wip-us.apache.org/repos/asf/kafka/blob/e944956a/clients/src/main/java/org/apache/kafka/common/record/Records.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/Records.java b/clients/src/main/java/org/apache/kafka/common/record/Records.java index 9235f92..2a3e506 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/Records.java +++ b/clients/src/main/java/org/apache/kafka/common/record/Records.java @@ -24,7 +24,7 @@ import java.nio.channels.GatheringByteChannel; * Each log entry consists of an 8 byte offset, a 4 byte record size, and a "shallow" {@link Record record}. * If the entry is not compressed, then each entry will have only the shallow record contained inside it. If it is * compressed, the entry contains "deep" records, which are packed into the value field of the shallow record. To iterate - * over the shallow records, use {@link #shallowEntries()}; for the deep records, use {@link #deepEntries()}. Note + * over the shallow records, use {@link #shallowEntries()}; for the deep records, use {@link #deepEntries(BufferSupplier)}. Note * that the deep iterator handles both compressed and non-compressed entries: if the entry is not compressed, the * shallow record is returned; otherwise, the shallow record is decompressed and the deep entries are returned. * See {@link MemoryRecords} for the in-memory representation and {@link FileRecords} for the on-disk representation. @@ -67,6 +67,22 @@ public interface Records { * there are fewer options for optimization since the data must be decompressed before it can be * returned. Hence there is little advantage in allowing subclasses to return a more specific type * as we do for {@link #shallowEntries()}. + * + * @param decompressionBufferSupplier The supplier of ByteBuffer(s) used for decompression if supported. + * For small record batches, allocating a potentially large buffer (64 KB for LZ4) + * will dominate the cost of decompressing and iterating over the records in the + * batch. As such, a supplier that reuses buffers will have a significant + * performance impact. + * @return An iterator over the deep entries of the log + */ + Iterable<LogEntry> deepEntries(BufferSupplier decompressionBufferSupplier); + + /** + * Get the deep log entries (i.e. descend into compressed message sets). For the deep records, + * there are fewer options for optimization since the data must be decompressed before it can be + * returned. Hence there is little advantage in allowing subclasses to return a more specific type + * as we do for {@link #shallowEntries()}. + * * @return An iterator over the deep entries of the log */ Iterable<LogEntry> deepEntries(); http://git-wip-us.apache.org/repos/asf/kafka/blob/e944956a/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java b/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java index 792a857..3150be3 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java +++ b/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java @@ -21,9 +21,8 @@ import org.apache.kafka.common.errors.CorruptRecordException; import org.apache.kafka.common.utils.AbstractIterator; import org.apache.kafka.common.utils.Utils; -import java.io.DataInputStream; -import java.io.EOFException; import java.io.IOException; +import java.io.InputStream; import java.nio.ByteBuffer; import java.util.ArrayDeque; import java.util.Iterator; @@ -36,16 +35,19 @@ public class RecordsIterator extends AbstractIterator<LogEntry> { private final boolean ensureMatchingMagic; private final int maxRecordSize; private final ShallowRecordsIterator<?> shallowIter; + private final BufferSupplier bufferSupplier; private DeepRecordsIterator innerIter; public RecordsIterator(LogInputStream<?> logInputStream, boolean shallow, boolean ensureMatchingMagic, - int maxRecordSize) { + int maxRecordSize, + BufferSupplier bufferSupplier) { this.shallowIter = new ShallowRecordsIterator<>(logInputStream); this.shallow = shallow; this.ensureMatchingMagic = ensureMatchingMagic; this.maxRecordSize = maxRecordSize; + this.bufferSupplier = bufferSupplier; } /** @@ -76,7 +78,7 @@ public class RecordsIterator extends AbstractIterator<LogEntry> { // would not try to further decompress underlying messages // There will be at least one element in the inner iterator, so we don't // need to call hasNext() here. - innerIter = new DeepRecordsIterator(entry, ensureMatchingMagic, maxRecordSize); + innerIter = new DeepRecordsIterator(entry, ensureMatchingMagic, maxRecordSize, bufferSupplier); return innerIter.next(); } } else { @@ -89,30 +91,35 @@ public class RecordsIterator extends AbstractIterator<LogEntry> { } private static final class DataLogInputStream implements LogInputStream<LogEntry> { - private final DataInputStream stream; + + private final InputStream stream; protected final int maxMessageSize; + private final ByteBuffer offsetAndSizeBuffer; - DataLogInputStream(DataInputStream stream, int maxMessageSize) { + DataLogInputStream(InputStream stream, int maxMessageSize) { this.stream = stream; this.maxMessageSize = maxMessageSize; + this.offsetAndSizeBuffer = ByteBuffer.allocate(Records.LOG_OVERHEAD); } public LogEntry nextEntry() throws IOException { - try { - long offset = stream.readLong(); - int size = stream.readInt(); - if (size < Record.RECORD_OVERHEAD_V0) - throw new CorruptRecordException(String.format("Record size is less than the minimum record overhead (%d)", Record.RECORD_OVERHEAD_V0)); - if (size > maxMessageSize) - throw new CorruptRecordException(String.format("Record size exceeds the largest allowable message size (%d).", maxMessageSize)); - - byte[] recordBuffer = new byte[size]; - stream.readFully(recordBuffer, 0, size); - ByteBuffer buf = ByteBuffer.wrap(recordBuffer); - return LogEntry.create(offset, new Record(buf)); - } catch (EOFException e) { + offsetAndSizeBuffer.clear(); + Utils.readFully(stream, offsetAndSizeBuffer); + if (offsetAndSizeBuffer.hasRemaining()) return null; - } + long offset = offsetAndSizeBuffer.getLong(Records.OFFSET_OFFSET); + int size = offsetAndSizeBuffer.getInt(Records.SIZE_OFFSET); + if (size < Record.RECORD_OVERHEAD_V0) + throw new CorruptRecordException(String.format("Record size is less than the minimum record overhead (%d)", Record.RECORD_OVERHEAD_V0)); + if (size > maxMessageSize) + throw new CorruptRecordException(String.format("Record size exceeds the largest allowable message size (%d).", maxMessageSize)); + + ByteBuffer batchBuffer = ByteBuffer.allocate(size); + Utils.readFully(stream, batchBuffer); + if (batchBuffer.hasRemaining()) + return null; + batchBuffer.flip(); + return LogEntry.create(offset, new Record(batchBuffer)); } } @@ -141,13 +148,13 @@ public class RecordsIterator extends AbstractIterator<LogEntry> { private final long absoluteBaseOffset; private final byte wrapperMagic; - public DeepRecordsIterator(LogEntry wrapperEntry, boolean ensureMatchingMagic, int maxMessageSize) { + public DeepRecordsIterator(LogEntry wrapperEntry, boolean ensureMatchingMagic, int maxMessageSize, BufferSupplier bufferSupplier) { Record wrapperRecord = wrapperEntry.record(); this.wrapperMagic = wrapperRecord.magic(); CompressionType compressionType = wrapperRecord.compressionType(); ByteBuffer buffer = wrapperRecord.value(); - DataInputStream stream = MemoryRecordsBuilder.wrapForInput(new ByteBufferInputStream(buffer), compressionType, wrapperRecord.magic()); + InputStream stream = MemoryRecordsBuilder.wrapForInput(buffer, compressionType, wrapperRecord.magic(), bufferSupplier); LogInputStream logStream = new DataLogInputStream(stream, maxMessageSize); long wrapperRecordOffset = wrapperEntry.offset(); http://git-wip-us.apache.org/repos/asf/kafka/blob/e944956a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index afa85bd..1751e44 100755 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -855,4 +855,28 @@ public class Utils { currentPosition += bytesRead; } while (bytesRead != -1 && destinationBuffer.hasRemaining()); } + + /** + * Read data from the input stream to the given byte buffer until there are no bytes remaining in the buffer or the + * end of the stream has been reached. + * + * @param inputStream Input stream to read from + * @param destinationBuffer The buffer into which bytes are to be transferred (it must be backed by an array) + * @throws IOException If an I/O error occurs + */ + public static final void readFully(InputStream inputStream, ByteBuffer destinationBuffer) throws IOException { + if (!destinationBuffer.hasArray()) + throw new IllegalArgumentException("destinationBuffer must be backed by an array"); + int initialOffset = destinationBuffer.arrayOffset() + destinationBuffer.position(); + byte[] array = destinationBuffer.array(); + int length = destinationBuffer.remaining(); + int totalBytesRead = 0; + do { + int bytesRead = inputStream.read(array, initialOffset + totalBytesRead, length - totalBytesRead); + if (bytesRead == -1) + break; + totalBytesRead += bytesRead; + } while (length > totalBytesRead); + destinationBuffer.position(destinationBuffer.position() + totalBytesRead); + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/e944956a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index f8bb1e9..5f26c13 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.record.BufferSupplier; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.LogEntry; import org.apache.kafka.common.record.Record; @@ -104,7 +105,7 @@ public class RecordAccumulatorTest { assertEquals(1, batches.size()); RecordBatch batch = batches.get(0); - Iterator<LogEntry> iter = batch.records().deepEntries().iterator(); + Iterator<LogEntry> iter = batch.records().deepEntries(BufferSupplier.NO_CACHING).iterator(); for (int i = 0; i < appends; i++) { LogEntry entry = iter.next(); assertEquals("Keys should match", ByteBuffer.wrap(key), entry.record().key()); @@ -133,7 +134,7 @@ public class RecordAccumulatorTest { assertEquals(1, batches.size()); RecordBatch batch = batches.get(0); - Iterator<LogEntry> iter = batch.records().deepEntries().iterator(); + Iterator<LogEntry> iter = batch.records().deepEntries(BufferSupplier.NO_CACHING).iterator(); LogEntry entry = iter.next(); assertEquals("Keys should match", ByteBuffer.wrap(key), entry.record().key()); assertEquals("Values should match", ByteBuffer.wrap(value), entry.record().value()); @@ -185,7 +186,7 @@ public class RecordAccumulatorTest { List<RecordBatch> batches = accum.drain(cluster, nodes, 5 * 1024, 0).get(node1.id()); if (batches != null) { for (RecordBatch batch : batches) { - for (LogEntry entry : batch.records().deepEntries()) + for (LogEntry entry : batch.records().deepEntries(BufferSupplier.NO_CACHING)) read++; accum.deallocate(batch); } http://git-wip-us.apache.org/repos/asf/kafka/blob/e944956a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java index 274bf9d..dcbadda 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java @@ -385,7 +385,7 @@ public class FileRecordsTest { } private static List<LogEntry> deepEntries(Records buffer) { - return TestUtils.toList(buffer.deepEntries()); + return TestUtils.toList(buffer.deepEntries(BufferSupplier.NO_CACHING)); } private FileRecords createFileRecords(Record ... records) throws IOException { http://git-wip-us.apache.org/repos/asf/kafka/blob/e944956a/clients/src/test/java/org/apache/kafka/common/record/KafkaLZ4Test.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/record/KafkaLZ4Test.java b/clients/src/test/java/org/apache/kafka/common/record/KafkaLZ4Test.java index 47aebcb..74e9713 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/KafkaLZ4Test.java +++ b/clients/src/test/java/org/apache/kafka/common/record/KafkaLZ4Test.java @@ -16,63 +16,166 @@ */ package org.apache.kafka.common.record; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import net.jpountz.xxhash.XXHashFactory; + +import org.hamcrest.CoreMatchers; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; -import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.Random; -import net.jpountz.xxhash.XXHashFactory; - -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; +import static org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.LZ4_FRAME_INCOMPRESSIBLE_MASK; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; @RunWith(value = Parameterized.class) public class KafkaLZ4Test { + private final static Random RANDOM = new Random(0); + private final boolean useBrokenFlagDescriptorChecksum; private final boolean ignoreFlagDescriptorChecksum; private final byte[] payload; private final boolean close; + private final boolean blockChecksum; + + static class Payload { + String name; + byte[] payload; + + Payload(String name, byte[] payload) { + this.name = name; + this.payload = payload; + } + + @Override + public String toString() { + return "Payload{" + + "size=" + payload.length + + ", name='" + name + '\'' + + '}'; + } + } + + @Rule + public ExpectedException thrown = ExpectedException.none(); - public KafkaLZ4Test(boolean useBrokenFlagDescriptorChecksum, boolean ignoreFlagDescriptorChecksum, byte[] payload, boolean close) { + @Parameters(name = "{index} useBrokenFlagDescriptorChecksum={0}, ignoreFlagDescriptorChecksum={1}, blockChecksum={2}, close={3}, payload={4}") + public static Collection<Object[]> data() { + List<Payload> payloads = new ArrayList<>(); + + payloads.add(new Payload("empty", new byte[]{})); + payloads.add(new Payload("onebyte", new byte[]{1})); + + for (int size : Arrays.asList(1000, 1 << 16, (1 << 10) * 96)) { + byte[] random = new byte[size]; + RANDOM.nextBytes(random); + payloads.add(new Payload("random", random)); + + byte[] ones = new byte[size]; + Arrays.fill(ones, (byte) 1); + payloads.add(new Payload("ones", ones)); + } + + List<Object[]> values = new ArrayList<>(); + for (Payload payload : payloads) + for (boolean broken : Arrays.asList(false, true)) + for (boolean ignore : Arrays.asList(false, true)) + for (boolean blockChecksum : Arrays.asList(false, true)) + for (boolean close : Arrays.asList(false, true)) + values.add(new Object[]{broken, ignore, blockChecksum, close, payload}); + return values; + } + + public KafkaLZ4Test(boolean useBrokenFlagDescriptorChecksum, boolean ignoreFlagDescriptorChecksum, + boolean blockChecksum, boolean close, Payload payload) { this.useBrokenFlagDescriptorChecksum = useBrokenFlagDescriptorChecksum; this.ignoreFlagDescriptorChecksum = ignoreFlagDescriptorChecksum; - this.payload = payload; + this.payload = payload.payload; this.close = close; + this.blockChecksum = blockChecksum; } - @Parameters - public static Collection<Object[]> data() { - byte[] payload = new byte[1000]; - Arrays.fill(payload, (byte) 1); - List<Object[]> values = new ArrayList<Object[]>(); - for (boolean broken : Arrays.asList(false, true)) - for (boolean ignore : Arrays.asList(false, true)) - for (boolean close : Arrays.asList(false, true)) - values.add(new Object[] {broken, ignore, payload, close}); - return values; + @Test + public void testHeaderPrematureEnd() throws Exception { + thrown.expect(IOException.class); + thrown.expectMessage(KafkaLZ4BlockInputStream.PREMATURE_EOS); + + final ByteBuffer buffer = ByteBuffer.allocate(2); + makeInputStream(buffer); + } + + private KafkaLZ4BlockInputStream makeInputStream( + ByteBuffer buffer + ) + throws IOException { + return new KafkaLZ4BlockInputStream( + buffer, + BufferSupplier.create(), + ignoreFlagDescriptorChecksum + ); } @Test - public void testKafkaLZ4() throws IOException { - ByteArrayOutputStream output = new ByteArrayOutputStream(); - KafkaLZ4BlockOutputStream lz4 = new KafkaLZ4BlockOutputStream(output, this.useBrokenFlagDescriptorChecksum); - lz4.write(this.payload, 0, this.payload.length); - if (this.close) { - lz4.close(); - } else { - lz4.flush(); + public void testNotSupported() throws Exception { + thrown.expect(IOException.class); + thrown.expectMessage(KafkaLZ4BlockInputStream.NOT_SUPPORTED); + + byte[] compressed = compressedBytes(); + compressed[0] = 0x00; + + makeInputStream(ByteBuffer.wrap(compressed)); + } + + @Test + public void testBadFrameChecksum() throws Exception { + if (!ignoreFlagDescriptorChecksum) { + thrown.expect(IOException.class); + thrown.expectMessage(KafkaLZ4BlockInputStream.DESCRIPTOR_HASH_MISMATCH); } - byte[] compressed = output.toByteArray(); + + byte[] compressed = compressedBytes(); + compressed[6] = (byte) 0xFF; + + makeInputStream(ByteBuffer.wrap(compressed)); + } + + @Test + public void testBadBlockSize() throws Exception { + if (!close || (useBrokenFlagDescriptorChecksum && !ignoreFlagDescriptorChecksum)) return; + + thrown.expect(IOException.class); + thrown.expectMessage(CoreMatchers.containsString("exceeded max")); + + byte[] compressed = compressedBytes(); + final ByteBuffer buffer = ByteBuffer.wrap(compressed).order(ByteOrder.LITTLE_ENDIAN); + + int blockSize = buffer.getInt(7); + blockSize = (blockSize & LZ4_FRAME_INCOMPRESSIBLE_MASK) | (1 << 24 & ~LZ4_FRAME_INCOMPRESSIBLE_MASK); + buffer.putInt(7, blockSize); + + testDecompression(buffer); + } + + + + @Test + public void testCompression() throws Exception { + byte[] compressed = compressedBytes(); // Check magic bytes stored as little-endian int offset = 0; @@ -138,16 +241,125 @@ public class KafkaLZ4Test { assertEquals(0, compressed[offset++]); assertEquals(0, compressed[offset++]); } + } + + @Test + public void testArrayBackedBuffer() throws IOException { + byte[] compressed = compressedBytes(); + testDecompression(ByteBuffer.wrap(compressed)); + } + + @Test + public void testArrayBackedBufferSlice() throws IOException { + byte[] compressed = compressedBytes(); + + int sliceOffset = 12; + + ByteBuffer buffer = ByteBuffer.allocate(compressed.length + sliceOffset + 123); + buffer.position(sliceOffset); + buffer.put(compressed).flip(); + buffer.position(sliceOffset); + + ByteBuffer slice = buffer.slice(); + testDecompression(slice); + + int offset = 42; + buffer = ByteBuffer.allocate(compressed.length + sliceOffset + offset); + buffer.position(sliceOffset + offset); + buffer.put(compressed).flip(); + buffer.position(sliceOffset); - ByteArrayInputStream input = new ByteArrayInputStream(compressed); + slice = buffer.slice(); + slice.position(offset); + testDecompression(slice); + } + + @Test + public void testDirectBuffer() throws IOException { + byte[] compressed = compressedBytes(); + ByteBuffer buffer; + + buffer = ByteBuffer.allocateDirect(compressed.length); + buffer.put(compressed).flip(); + testDecompression(buffer); + + int offset = 42; + buffer = ByteBuffer.allocateDirect(compressed.length + offset + 123); + buffer.position(offset); + buffer.put(compressed).flip(); + buffer.position(offset); + testDecompression(buffer); + } + + @Test + public void testSkip() throws Exception { + if (!close || (useBrokenFlagDescriptorChecksum && !ignoreFlagDescriptorChecksum)) return; + + final KafkaLZ4BlockInputStream in = makeInputStream(ByteBuffer.wrap(compressedBytes())); + + int n = 100; + int remaining = payload.length; + long skipped = in.skip(n); + assertEquals(Math.min(n, remaining), skipped); + + n = 10000; + remaining -= skipped; + skipped = in.skip(n); + assertEquals(Math.min(n, remaining), skipped); + } + + private void testDecompression(ByteBuffer buffer) throws IOException { + IOException error = null; try { - KafkaLZ4BlockInputStream decompressed = new KafkaLZ4BlockInputStream(input, this.ignoreFlagDescriptorChecksum); - byte[] testPayload = new byte[this.payload.length]; - int ret = decompressed.read(testPayload, 0, this.payload.length); - assertEquals(ret, this.payload.length); + KafkaLZ4BlockInputStream decompressed = makeInputStream(buffer); + + byte[] testPayload = new byte[payload.length]; + + byte[] tmp = new byte[1024]; + int n, pos = 0, i = 0; + while ((n = decompressed.read(tmp, i, tmp.length - i)) != -1) { + i += n; + if (i == tmp.length) { + System.arraycopy(tmp, 0, testPayload, pos, i); + pos += i; + i = 0; + } + } + System.arraycopy(tmp, 0, testPayload, pos, i); + pos += i; + + assertEquals(-1, decompressed.read(tmp, 0, tmp.length)); + assertEquals(this.payload.length, pos); assertArrayEquals(this.payload, testPayload); } catch (IOException e) { - assertTrue(this.useBrokenFlagDescriptorChecksum && !this.ignoreFlagDescriptorChecksum); + if (!ignoreFlagDescriptorChecksum && useBrokenFlagDescriptorChecksum) { + assertEquals(KafkaLZ4BlockInputStream.DESCRIPTOR_HASH_MISMATCH, e.getMessage()); + error = e; + } else if (!close) { + assertEquals(KafkaLZ4BlockInputStream.PREMATURE_EOS, e.getMessage()); + error = e; + } else { + throw e; + } + } + if (!ignoreFlagDescriptorChecksum && useBrokenFlagDescriptorChecksum) assertNotNull(error); + if (!close) assertNotNull(error); + } + + private byte[] compressedBytes() throws IOException { + ByteArrayOutputStream output = new ByteArrayOutputStream(); + KafkaLZ4BlockOutputStream lz4 = new KafkaLZ4BlockOutputStream( + output, + KafkaLZ4BlockOutputStream.BLOCKSIZE_64KB, + blockChecksum, + useBrokenFlagDescriptorChecksum + ); + lz4.write(this.payload, 0, this.payload.length); + if (this.close) { + lz4.close(); + } else { + lz4.flush(); } + return output.toByteArray(); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/e944956a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java index 5f668de..c2ef38b 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java @@ -71,7 +71,7 @@ public class MemoryRecordsTest { for (int iteration = 0; iteration < 2; iteration++) { for (MemoryRecords recs : asList(recs1, recs2)) { - Iterator<LogEntry> iter = recs.deepEntries().iterator(); + Iterator<LogEntry> iter = recs.deepEntries(BufferSupplier.NO_CACHING).iterator(); for (int i = 0; i < list.size(); i++) { assertTrue(iter.hasNext()); LogEntry entry = iter.next(); @@ -201,7 +201,7 @@ public class MemoryRecordsTest { shallowEntry.record().timestampType()); } - List<LogEntry> deepEntries = TestUtils.toList(filteredRecords.deepEntries()); + List<LogEntry> deepEntries = TestUtils.toList(filteredRecords.deepEntries(BufferSupplier.NO_CACHING)); assertEquals(4, deepEntries.size()); LogEntry first = deepEntries.get(0); @@ -263,7 +263,7 @@ public class MemoryRecordsTest { } } - @Parameterized.Parameters + @Parameterized.Parameters(name = "{index} magic={0}, firstOffset={1}, compressionType={2}") public static Collection<Object[]> data() { List<Object[]> values = new ArrayList<>(); for (long firstOffset : asList(0L, 57L)) http://git-wip-us.apache.org/repos/asf/kafka/blob/e944956a/core/src/main/scala/kafka/log/LogValidator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala index 224a792..47d4bd4 100644 --- a/core/src/main/scala/kafka/log/LogValidator.scala +++ b/core/src/main/scala/kafka/log/LogValidator.scala @@ -164,7 +164,7 @@ private[kafka] object LogValidator { val expectedInnerOffset = new LongRef(0) val validatedRecords = new mutable.ArrayBuffer[Record] - records.deepEntries(true).asScala.foreach { logEntry => + records.deepEntries(true, BufferSupplier.NO_CACHING).asScala.foreach { logEntry => val record = logEntry.record validateKey(record, compactedTopic) http://git-wip-us.apache.org/repos/asf/kafka/blob/e944956a/core/src/main/scala/kafka/message/CompressionFactory.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/message/CompressionFactory.scala b/core/src/main/scala/kafka/message/CompressionFactory.scala index e02ed63..592d09e 100644 --- a/core/src/main/scala/kafka/message/CompressionFactory.scala +++ b/core/src/main/scala/kafka/message/CompressionFactory.scala @@ -17,12 +17,11 @@ package kafka.message -import java.io.OutputStream -import java.util.zip.GZIPOutputStream -import java.util.zip.GZIPInputStream -import java.io.InputStream +import java.io.{InputStream, OutputStream} +import java.nio.ByteBuffer +import java.util.zip.{GZIPInputStream, GZIPOutputStream} -import org.apache.kafka.common.record.{KafkaLZ4BlockInputStream, KafkaLZ4BlockOutputStream} +import org.apache.kafka.common.record.{BufferSupplier, KafkaLZ4BlockInputStream, KafkaLZ4BlockOutputStream} object CompressionFactory { @@ -40,15 +39,15 @@ object CompressionFactory { } } - def apply(compressionCodec: CompressionCodec, messageVersion: Byte, stream: InputStream): InputStream = { + def apply(compressionCodec: CompressionCodec, messageVersion: Byte, buffer: ByteBuffer): InputStream = { compressionCodec match { - case DefaultCompressionCodec => new GZIPInputStream(stream) - case GZIPCompressionCodec => new GZIPInputStream(stream) + case DefaultCompressionCodec => new GZIPInputStream(new ByteBufferBackedInputStream(buffer)) + case GZIPCompressionCodec => new GZIPInputStream(new ByteBufferBackedInputStream(buffer)) case SnappyCompressionCodec => import org.xerial.snappy.SnappyInputStream - new SnappyInputStream(stream) + new SnappyInputStream(new ByteBufferBackedInputStream(buffer)) case LZ4CompressionCodec => - new KafkaLZ4BlockInputStream(stream, messageVersion == Message.MagicValue_V0) + new KafkaLZ4BlockInputStream(buffer, BufferSupplier.NO_CACHING, messageVersion == Message.MagicValue_V0) case _ => throw new kafka.common.UnknownCodecException("Unknown Codec: " + compressionCodec) } http://git-wip-us.apache.org/repos/asf/kafka/blob/e944956a/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala b/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala index 5d2c8fb..9f63604 100644 --- a/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala +++ b/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala @@ -18,9 +18,10 @@ package kafka.message import org.apache.kafka.common.record._ - import java.io.ByteArrayInputStream import java.io.ByteArrayOutputStream +import java.nio.ByteBuffer + import scala.collection._ import org.scalatest.junit.JUnitSuite import org.junit._ @@ -33,7 +34,7 @@ class MessageCompressionTest extends JUnitSuite { val output = CompressionFactory(LZ4CompressionCodec, Message.MagicValue_V0, new ByteArrayOutputStream()) assertTrue(output.asInstanceOf[KafkaLZ4BlockOutputStream].useBrokenFlagDescriptorChecksum()) - val input = CompressionFactory(LZ4CompressionCodec, Message.MagicValue_V0, new ByteArrayInputStream(Array[Byte](0x04, 0x22, 0x4D, 0x18, 0x60, 0x40, 0x1A))) + val input = CompressionFactory(LZ4CompressionCodec, Message.MagicValue_V0, ByteBuffer.wrap(Array[Byte](0x04, 0x22, 0x4D, 0x18, 0x60, 0x40, 0x1A))) assertTrue(input.asInstanceOf[KafkaLZ4BlockInputStream].ignoreFlagDescriptorChecksum()) } @@ -42,7 +43,7 @@ class MessageCompressionTest extends JUnitSuite { val output = CompressionFactory(LZ4CompressionCodec, Message.MagicValue_V1, new ByteArrayOutputStream()) assertFalse(output.asInstanceOf[KafkaLZ4BlockOutputStream].useBrokenFlagDescriptorChecksum()) - val input = CompressionFactory(LZ4CompressionCodec, Message.MagicValue_V1, new ByteArrayInputStream(Array[Byte](0x04, 0x22, 0x4D, 0x18, 0x60, 0x40, -126))) + val input = CompressionFactory(LZ4CompressionCodec, Message.MagicValue_V1, ByteBuffer.wrap(Array[Byte](0x04, 0x22, 0x4D, 0x18, 0x60, 0x40, -126))) assertFalse(input.asInstanceOf[KafkaLZ4BlockInputStream].ignoreFlagDescriptorChecksum()) } http://git-wip-us.apache.org/repos/asf/kafka/blob/e944956a/core/src/test/scala/unit/kafka/message/MessageWriterTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/message/MessageWriterTest.scala b/core/src/test/scala/unit/kafka/message/MessageWriterTest.scala index a82a553..467f66c 100644 --- a/core/src/test/scala/unit/kafka/message/MessageWriterTest.scala +++ b/core/src/test/scala/unit/kafka/message/MessageWriterTest.scala @@ -60,7 +60,7 @@ class MessageWriterTest extends JUnitSuite { } private def decompress(compressed: Array[Byte], codec: CompressionCodec): Array[Byte] = { - toArray(CompressionFactory(codec, Message.MagicValue_V1, new ByteArrayInputStream(compressed))) + toArray(CompressionFactory(codec, Message.MagicValue_V1, ByteBuffer.wrap(compressed))) } private def toArray(in: InputStream): Array[Byte] = {
