Repository: kafka Updated Branches: refs/heads/trunk da9a171c9 -> 81f0c1e8f
KAFKA-5093; Avoid loading full batch data when possible when iterating FileRecords Author: Jason Gustafson <[email protected]> Reviewers: Ismael Juma <[email protected]> Closes #3160 from hachikuji/KAFKA-5093 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/81f0c1e8 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/81f0c1e8 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/81f0c1e8 Branch: refs/heads/trunk Commit: 81f0c1e8f2ba2d86f061361b5ee33bb8e6f640c5 Parents: da9a171 Author: Jason Gustafson <[email protected]> Authored: Wed May 31 14:11:47 2017 -0700 Committer: Jason Gustafson <[email protected]> Committed: Wed May 31 14:11:47 2017 -0700 ---------------------------------------------------------------------- .../record/AbstractLegacyRecordBatch.java | 73 ++++++ .../kafka/common/record/DefaultRecordBatch.java | 77 +++++++ .../kafka/common/record/FileLogInputStream.java | 216 +++++++----------- .../apache/kafka/common/record/FileRecords.java | 4 +- .../kafka/common/record/LegacyRecord.java | 43 ++-- .../kafka/common/record/MemoryRecords.java | 11 +- .../org/apache/kafka/common/record/Records.java | 2 + .../clients/consumer/internals/FetcherTest.java | 2 +- .../common/record/FileLogInputStreamTest.java | 223 ++++++++++++++++++- .../kafka/common/record/FileRecordsTest.java | 6 +- .../common/record/MemoryRecordsBuilderTest.java | 2 +- .../kafka/message/ByteBufferMessageSet.scala | 4 +- .../scala/kafka/message/MessageAndOffset.scala | 24 +- 13 files changed, 504 insertions(+), 183 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/81f0c1e8/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java index e4938be..9b74d06 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java @@ -29,6 +29,7 @@ import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; import java.util.ArrayDeque; import java.util.NoSuchElementException; @@ -510,4 +511,76 @@ public abstract class AbstractLegacyRecordBatch extends AbstractRecordBatch impl } } + static class LegacyFileChannelRecordBatch extends FileLogInputStream.FileChannelRecordBatch { + + LegacyFileChannelRecordBatch(long offset, + byte magic, + FileChannel channel, + int position, + int batchSize) { + super(offset, magic, channel, position, batchSize); + } + + @Override + protected RecordBatch toMemoryRecordBatch(ByteBuffer buffer) { + return new ByteBufferLegacyRecordBatch(buffer); + } + + @Override + public long baseOffset() { + return loadFullBatch().baseOffset(); + } + + @Override + public long lastOffset() { + return offset; + } + + @Override + public long producerId() { + return RecordBatch.NO_PRODUCER_ID; + } + + @Override + public short producerEpoch() { + return RecordBatch.NO_PRODUCER_EPOCH; + } + + @Override + public int baseSequence() { + return RecordBatch.NO_SEQUENCE; + } + + @Override + public int lastSequence() { + return RecordBatch.NO_SEQUENCE; + } + + @Override + public Integer countOrNull() { + return null; + } + + @Override + public boolean isTransactional() { + return false; + } + + @Override + public boolean isControlBatch() { + return false; + } + + @Override + public int partitionLeaderEpoch() { + return RecordBatch.NO_PARTITION_LEADER_EPOCH; + } + + @Override + protected int headerSize() { + return LOG_OVERHEAD + LegacyRecord.headerSize(magic); + } + + } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/81f0c1e8/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java index 2bf889f..bdba860 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java @@ -26,6 +26,7 @@ import org.apache.kafka.common.utils.Crc32C; import java.io.DataInputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -498,4 +499,80 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe } + static class DefaultFileChannelRecordBatch extends FileLogInputStream.FileChannelRecordBatch { + + DefaultFileChannelRecordBatch(long offset, + byte magic, + FileChannel channel, + int position, + int batchSize) { + super(offset, magic, channel, position, batchSize); + } + + @Override + protected RecordBatch toMemoryRecordBatch(ByteBuffer buffer) { + return new DefaultRecordBatch(buffer); + } + + @Override + public long baseOffset() { + return offset; + } + + @Override + public long lastOffset() { + return loadBatchHeader().lastOffset(); + } + + @Override + public long producerId() { + return loadBatchHeader().producerId(); + } + + @Override + public short producerEpoch() { + return loadBatchHeader().producerEpoch(); + } + + @Override + public int baseSequence() { + return loadBatchHeader().baseSequence(); + } + + @Override + public int lastSequence() { + return loadBatchHeader().lastSequence(); + } + + @Override + public long checksum() { + return loadBatchHeader().checksum(); + } + + @Override + public Integer countOrNull() { + return loadBatchHeader().countOrNull(); + } + + @Override + public boolean isTransactional() { + return loadBatchHeader().isTransactional(); + } + + @Override + public boolean isControlBatch() { + return loadBatchHeader().isControlBatch(); + } + + @Override + public int partitionLeaderEpoch() { + return loadBatchHeader().partitionLeaderEpoch(); + } + + @Override + protected int headerSize() { + return RECORD_BATCH_OVERHEAD; + } + } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/81f0c1e8/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java index 57fec4f..75eb1b3 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java +++ b/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java @@ -18,6 +18,8 @@ package org.apache.kafka.common.record; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.record.AbstractLegacyRecordBatch.LegacyFileChannelRecordBatch; +import org.apache.kafka.common.record.DefaultRecordBatch.DefaultFileChannelRecordBatch; import org.apache.kafka.common.utils.CloseableIterator; import org.apache.kafka.common.utils.Utils; @@ -27,6 +29,10 @@ import java.nio.channels.FileChannel; import java.util.Iterator; import static org.apache.kafka.common.record.Records.LOG_OVERHEAD; +import static org.apache.kafka.common.record.Records.HEADER_SIZE_UP_TO_MAGIC; +import static org.apache.kafka.common.record.Records.MAGIC_OFFSET; +import static org.apache.kafka.common.record.Records.OFFSET_OFFSET; +import static org.apache.kafka.common.record.Records.SIZE_OFFSET; /** * A log input stream which is backed by a {@link FileChannel}. @@ -35,7 +41,7 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil private int position; private final int end; private final FileChannel channel; - private final ByteBuffer logHeaderBuffer = ByteBuffer.allocate(LOG_OVERHEAD); + private final ByteBuffer logHeaderBuffer = ByteBuffer.allocate(HEADER_SIZE_UP_TO_MAGIC); /** * Create a new log input stream over the FileChannel @@ -53,15 +59,15 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil @Override public FileChannelRecordBatch nextBatch() throws IOException { - if (position + LOG_OVERHEAD >= end) + if (position + HEADER_SIZE_UP_TO_MAGIC >= end) return null; logHeaderBuffer.rewind(); Utils.readFullyOrFail(channel, logHeaderBuffer, position, "log header"); logHeaderBuffer.rewind(); - long offset = logHeaderBuffer.getLong(); - int size = logHeaderBuffer.getInt(); + long offset = logHeaderBuffer.getLong(OFFSET_OFFSET); + int size = logHeaderBuffer.getInt(SIZE_OFFSET); // V0 has the smallest overhead, stricter checking is done later if (size < LegacyRecord.RECORD_OVERHEAD_V0) @@ -70,7 +76,14 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil if (position + LOG_OVERHEAD + size > end) return null; - FileChannelRecordBatch batch = new FileChannelRecordBatch(offset, channel, position, size); + byte magic = logHeaderBuffer.get(MAGIC_OFFSET); + final FileChannelRecordBatch batch; + + if (magic < RecordBatch.MAGIC_VALUE_V2) + batch = new LegacyFileChannelRecordBatch(offset, magic, channel, position, size); + else + batch = new DefaultFileChannelRecordBatch(offset, magic, channel, position, size); + position += batch.sizeInBytes(); return batch; } @@ -80,71 +93,46 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil * without needing to read the record data into memory until it is needed. The downside * is that entries will generally no longer be readable when the underlying channel is closed. */ - public static class FileChannelRecordBatch extends AbstractRecordBatch { - private final long offset; - private final FileChannel channel; - private final int position; - private final int batchSize; - private RecordBatch underlying; - private Byte magic; - - private FileChannelRecordBatch(long offset, - FileChannel channel, - int position, - int batchSize) { + public abstract static class FileChannelRecordBatch extends AbstractRecordBatch { + protected final long offset; + protected final byte magic; + protected final FileChannel channel; + protected final int position; + protected final int batchSize; + + private RecordBatch fullBatch; + private RecordBatch batchHeader; + + FileChannelRecordBatch(long offset, + byte magic, + FileChannel channel, + int position, + int batchSize) { this.offset = offset; + this.magic = magic; this.channel = channel; this.position = position; this.batchSize = batchSize; } @Override - public long baseOffset() { - if (magic() >= RecordBatch.MAGIC_VALUE_V2) - return offset; - - loadUnderlyingRecordBatch(); - return underlying.baseOffset(); - } - - @Override public CompressionType compressionType() { - loadUnderlyingRecordBatch(); - return underlying.compressionType(); + return loadBatchHeader().compressionType(); } @Override public TimestampType timestampType() { - loadUnderlyingRecordBatch(); - return underlying.timestampType(); + return loadBatchHeader().timestampType(); } @Override - public long maxTimestamp() { - loadUnderlyingRecordBatch(); - return underlying.maxTimestamp(); + public long checksum() { + return loadBatchHeader().checksum(); } @Override - public long lastOffset() { - if (magic() < RecordBatch.MAGIC_VALUE_V2) - return offset; - else if (underlying != null) - return underlying.lastOffset(); - - try { - // TODO: this logic probably should be moved into DefaultRecordBatch somehow - // maybe we just need two separate implementations - - byte[] offsetDelta = new byte[4]; - ByteBuffer buf = ByteBuffer.wrap(offsetDelta); - channel.read(buf, position + DefaultRecordBatch.LAST_OFFSET_DELTA_OFFSET); - if (buf.hasRemaining()) - throw new KafkaException("Failed to read magic byte from FileChannel " + channel); - return offset + buf.getInt(0); - } catch (IOException e) { - throw new KafkaException(e); - } + public long maxTimestamp() { + return loadBatchHeader().maxTimestamp(); } public int position() { @@ -153,92 +141,27 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil @Override public byte magic() { - if (magic != null) - return magic; - if (underlying != null) - return underlying.magic(); - - try { - ByteBuffer buf = ByteBuffer.wrap(new byte[1]); - Utils.readFullyOrFail(channel, buf, position + Records.MAGIC_OFFSET, "magic byte"); - magic = buf.get(0); - return magic; - } catch (IOException e) { - throw new KafkaException(e); - } - } - - @Override - public long producerId() { - loadUnderlyingRecordBatch(); - return underlying.producerId(); - } - - @Override - public short producerEpoch() { - loadUnderlyingRecordBatch(); - return underlying.producerEpoch(); - } - - @Override - public int baseSequence() { - loadUnderlyingRecordBatch(); - return underlying.baseSequence(); - } - - @Override - public int lastSequence() { - loadUnderlyingRecordBatch(); - return underlying.lastSequence(); - } - - private void loadUnderlyingRecordBatch() { - try { - if (underlying != null) - return; - - ByteBuffer batchBuffer = ByteBuffer.allocate(sizeInBytes()); - Utils.readFullyOrFail(channel, batchBuffer, position, "full record batch"); - batchBuffer.rewind(); - - byte magic = batchBuffer.get(Records.MAGIC_OFFSET); - if (magic > RecordBatch.MAGIC_VALUE_V1) - underlying = new DefaultRecordBatch(batchBuffer); - else - underlying = new AbstractLegacyRecordBatch.ByteBufferLegacyRecordBatch(batchBuffer); - } catch (IOException e) { - throw new KafkaException("Failed to load record batch at position " + position + " from file channel " + channel); - } + return magic; } @Override public Iterator<Record> iterator() { - loadUnderlyingRecordBatch(); - return underlying.iterator(); + return loadFullBatch().iterator(); } @Override public CloseableIterator<Record> streamingIterator(BufferSupplier bufferSupplier) { - loadUnderlyingRecordBatch(); - return underlying.streamingIterator(bufferSupplier); + return loadFullBatch().streamingIterator(bufferSupplier); } @Override public boolean isValid() { - loadUnderlyingRecordBatch(); - return underlying.isValid(); + return loadFullBatch().isValid(); } @Override public void ensureValid() { - loadUnderlyingRecordBatch(); - underlying.ensureValid(); - } - - @Override - public long checksum() { - loadUnderlyingRecordBatch(); - return underlying.checksum(); + loadFullBatch().ensureValid(); } @Override @@ -247,12 +170,6 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil } @Override - public Integer countOrNull() { - loadUnderlyingRecordBatch(); - return underlying.countOrNull(); - } - - @Override public void writeTo(ByteBuffer buffer) { try { int limit = buffer.limit(); @@ -265,22 +182,37 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil } } - @Override - public boolean isTransactional() { - loadUnderlyingRecordBatch(); - return underlying.isTransactional(); + protected abstract RecordBatch toMemoryRecordBatch(ByteBuffer buffer); + + protected abstract int headerSize(); + + protected RecordBatch loadFullBatch() { + if (fullBatch == null) { + batchHeader = null; + fullBatch = loadBatchWithSize(sizeInBytes(), "full record batch"); + } + return fullBatch; } - @Override - public boolean isControlBatch() { - loadUnderlyingRecordBatch(); - return underlying.isControlBatch(); + protected RecordBatch loadBatchHeader() { + if (fullBatch != null) + return fullBatch; + + if (batchHeader == null) + batchHeader = loadBatchWithSize(headerSize(), "record batch header"); + + return batchHeader; } - @Override - public int partitionLeaderEpoch() { - loadUnderlyingRecordBatch(); - return underlying.partitionLeaderEpoch(); + private RecordBatch loadBatchWithSize(int size, String description) { + try { + ByteBuffer buffer = ByteBuffer.allocate(size); + Utils.readFullyOrFail(channel, buffer, position, description); + buffer.rewind(); + return toMemoryRecordBatch(buffer); + } catch (IOException e) { + throw new KafkaException(e); + } } @Override @@ -309,7 +241,9 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil @Override public String toString() { - return "FileChannelRecordBatch(magic: " + magic() + ", offsets: [" + baseOffset() + ", " + lastOffset() + "])"; + return "FileChannelRecordBatch(magic: " + magic + + ", offset: " + offset + + ", size: " + batchSize + ")"; } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/81f0c1e8/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java index a72ba8b..32ca1a7 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java @@ -202,7 +202,7 @@ public class FileRecords extends AbstractRecords implements Closeable { public void renameTo(File f) throws IOException { try { Utils.atomicMoveWithFallback(file.toPath(), f.toPath()); - } finally { + } finally { this.file = f; } } @@ -391,7 +391,7 @@ public class FileRecords extends AbstractRecords implements Closeable { * @param mutable mutable * @param fileAlreadyExists File already exists or not * @param initFileSize The size used for pre allocate file, for example 512 * 1025 *1024 - * @param preallocate Pre allocate file or not, gotten from configuration. + * @param preallocate Pre-allocate file or not, gotten from configuration. */ private static FileChannel openChannel(File file, boolean mutable, http://git-wip-us.apache.org/repos/asf/kafka/blob/81f0c1e8/clients/src/main/java/org/apache/kafka/common/record/LegacyRecord.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/LegacyRecord.java b/clients/src/main/java/org/apache/kafka/common/record/LegacyRecord.java index 25185b0..482c4a6 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/LegacyRecord.java +++ b/clients/src/main/java/org/apache/kafka/common/record/LegacyRecord.java @@ -45,10 +45,10 @@ public final class LegacyRecord { public static final int MAGIC_OFFSET = CRC_OFFSET + CRC_LENGTH; public static final int MAGIC_LENGTH = 1; public static final int ATTRIBUTES_OFFSET = MAGIC_OFFSET + MAGIC_LENGTH; - public static final int ATTRIBUTE_LENGTH = 1; - public static final int TIMESTAMP_OFFSET = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH; + public static final int ATTRIBUTES_LENGTH = 1; + public static final int TIMESTAMP_OFFSET = ATTRIBUTES_OFFSET + ATTRIBUTES_LENGTH; public static final int TIMESTAMP_LENGTH = 8; - public static final int KEY_SIZE_OFFSET_V0 = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH; + public static final int KEY_SIZE_OFFSET_V0 = ATTRIBUTES_OFFSET + ATTRIBUTES_LENGTH; public static final int KEY_SIZE_OFFSET_V1 = TIMESTAMP_OFFSET + TIMESTAMP_LENGTH; public static final int KEY_SIZE_LENGTH = 4; public static final int KEY_OFFSET_V0 = KEY_SIZE_OFFSET_V0 + KEY_SIZE_LENGTH; @@ -58,17 +58,18 @@ public final class LegacyRecord { /** * The size for the record header */ - public static final int HEADER_SIZE = CRC_LENGTH + MAGIC_LENGTH + ATTRIBUTE_LENGTH; + public static final int HEADER_SIZE_V0 = CRC_LENGTH + MAGIC_LENGTH + ATTRIBUTES_LENGTH; + public static final int HEADER_SIZE_V1 = CRC_LENGTH + MAGIC_LENGTH + ATTRIBUTES_LENGTH + TIMESTAMP_LENGTH; /** * The amount of overhead bytes in a record */ - public static final int RECORD_OVERHEAD_V0 = HEADER_SIZE + KEY_SIZE_LENGTH + VALUE_SIZE_LENGTH; + public static final int RECORD_OVERHEAD_V0 = HEADER_SIZE_V0 + KEY_SIZE_LENGTH + VALUE_SIZE_LENGTH; /** * The amount of overhead bytes in a record */ - public static final int RECORD_OVERHEAD_V1 = HEADER_SIZE + TIMESTAMP_LENGTH + KEY_SIZE_LENGTH + VALUE_SIZE_LENGTH; + public static final int RECORD_OVERHEAD_V1 = HEADER_SIZE_V1 + KEY_SIZE_LENGTH + VALUE_SIZE_LENGTH; /** * Specifies the mask for the compression code. 3 bits to hold the compression codec. 0 is reserved to indicate no @@ -483,19 +484,11 @@ public final class LegacyRecord { } } - public static int recordSize(byte[] key, byte[] value) { - return recordSize(RecordBatch.CURRENT_MAGIC_VALUE, key, value); - } - - public static int recordSize(byte magic, byte[] key, byte[] value) { - return recordSize(magic, key == null ? 0 : key.length, value == null ? 0 : value.length); - } - - public static int recordSize(byte magic, ByteBuffer key, ByteBuffer value) { + static int recordSize(byte magic, ByteBuffer key, ByteBuffer value) { return recordSize(magic, key == null ? 0 : key.limit(), value == null ? 0 : value.limit()); } - private static int recordSize(byte magic, int keySize, int valueSize) { + public static int recordSize(byte magic, int keySize, int valueSize) { return recordOverhead(magic) + keySize + valueSize; } @@ -547,16 +540,28 @@ public final class LegacyRecord { return crc.getValue(); } - public static int recordOverhead(byte magic) { + static int recordOverhead(byte magic) { if (magic == 0) return RECORD_OVERHEAD_V0; - return RECORD_OVERHEAD_V1; + else if (magic == 1) + return RECORD_OVERHEAD_V1; + throw new IllegalArgumentException("Invalid magic used in LegacyRecord: " + magic); + } + + static int headerSize(byte magic) { + if (magic == 0) + return HEADER_SIZE_V0; + else if (magic == 1) + return HEADER_SIZE_V1; + throw new IllegalArgumentException("Invalid magic used in LegacyRecord: " + magic); } private static int keyOffset(byte magic) { if (magic == 0) return KEY_OFFSET_V0; - return KEY_OFFSET_V1; + else if (magic == 1) + return KEY_OFFSET_V1; + throw new IllegalArgumentException("Invalid magic used in LegacyRecord: " + magic); } public static TimestampType timestampType(byte magic, TimestampType wrapperRecordTimestampType, byte attributes) { http://git-wip-us.apache.org/repos/asf/kafka/blob/81f0c1e8/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java index 528095e..1d45635 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java @@ -506,11 +506,18 @@ public class MemoryRecords extends AbstractRecords { producerId, producerEpoch, baseSequence, RecordBatch.NO_PARTITION_LEADER_EPOCH, true, records); } + public static MemoryRecords withTransactionalRecords(byte magic, long initialOffset, CompressionType compressionType, + long producerId, short producerEpoch, int baseSequence, + int partitionLeaderEpoch, SimpleRecord... records) { + return withRecords(magic, initialOffset, compressionType, TimestampType.CREATE_TIME, producerId, producerEpoch, + baseSequence, partitionLeaderEpoch, true, records); + } + public static MemoryRecords withTransactionalRecords(long initialOffset, CompressionType compressionType, long producerId, short producerEpoch, int baseSequence, int partitionLeaderEpoch, SimpleRecord... records) { - return withRecords(RecordBatch.CURRENT_MAGIC_VALUE, initialOffset, compressionType, TimestampType.CREATE_TIME, - producerId, producerEpoch, baseSequence, partitionLeaderEpoch, true, records); + return withTransactionalRecords(RecordBatch.CURRENT_MAGIC_VALUE, initialOffset, compressionType, + producerId, producerEpoch, baseSequence, partitionLeaderEpoch, records); } public static MemoryRecords withRecords(byte magic, long initialOffset, CompressionType compressionType, http://git-wip-us.apache.org/repos/asf/kafka/blob/81f0c1e8/clients/src/main/java/org/apache/kafka/common/record/Records.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/Records.java b/clients/src/main/java/org/apache/kafka/common/record/Records.java index 6a4d1a1..a5a5036 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/Records.java +++ b/clients/src/main/java/org/apache/kafka/common/record/Records.java @@ -49,6 +49,8 @@ public interface Records { // the magic offset is at the same offset for all current message formats, but the 4 bytes // between the size and the magic is dependent on the version. int MAGIC_OFFSET = 16; + int MAGIC_LENGTH = 1; + int HEADER_SIZE_UP_TO_MAGIC = MAGIC_OFFSET + MAGIC_LENGTH; /** * The size of these records in bytes. http://git-wip-us.apache.org/repos/asf/kafka/blob/81f0c1e8/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 7d48623..97a6259 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -303,7 +303,7 @@ public class FetcherTest { long offset = 0; long timestamp = 500L; - int size = LegacyRecord.recordSize(key, value); + int size = LegacyRecord.recordSize(magic, key.length, value.length); byte attributes = LegacyRecord.computeAttributes(magic, CompressionType.NONE, TimestampType.CREATE_TIME); long crc = LegacyRecord.computeChecksum(magic, attributes, timestamp, key, value); http://git-wip-us.apache.org/repos/asf/kafka/blob/81f0c1e8/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java index 7c37354..d5de4bd 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java @@ -16,33 +16,56 @@ */ package org.apache.kafka.common.record; +import org.apache.kafka.common.record.FileLogInputStream.FileChannelRecordBatch; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.test.TestUtils; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; import java.util.List; +import static java.util.Arrays.asList; +import static org.apache.kafka.common.record.RecordBatch.MAGIC_VALUE_V0; +import static org.apache.kafka.common.record.RecordBatch.MAGIC_VALUE_V1; +import static org.apache.kafka.common.record.RecordBatch.MAGIC_VALUE_V2; +import static org.apache.kafka.common.record.RecordBatch.NO_TIMESTAMP; +import static org.apache.kafka.common.record.TimestampType.CREATE_TIME; +import static org.apache.kafka.common.record.TimestampType.NO_TIMESTAMP_TYPE; import static org.apache.kafka.test.TestUtils.tempFile; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +@RunWith(value = Parameterized.class) public class FileLogInputStreamTest { + private final byte magic; + private final CompressionType compression; + + public FileLogInputStreamTest(byte magic, CompressionType compression) { + this.magic = magic; + this.compression = compression; + } + @Test public void testWriteTo() throws IOException { try (FileRecords fileRecords = FileRecords.open(tempFile())) { - fileRecords.append(MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("foo".getBytes()), - new SimpleRecord("bar".getBytes()))); + fileRecords.append(MemoryRecords.withRecords(magic, compression, new SimpleRecord("foo".getBytes()))); fileRecords.flush(); FileLogInputStream logInputStream = new FileLogInputStream(fileRecords.channel(), 0, fileRecords.sizeInBytes()); - FileLogInputStream.FileChannelRecordBatch batch = logInputStream.nextBatch(); + FileChannelRecordBatch batch = logInputStream.nextBatch(); assertNotNull(batch); - assertEquals(RecordBatch.MAGIC_VALUE_V2, batch.magic()); + assertEquals(magic, batch.magic()); ByteBuffer buffer = ByteBuffer.allocate(128); batch.writeTo(buffer); @@ -50,13 +73,195 @@ public class FileLogInputStreamTest { MemoryRecords memRecords = MemoryRecords.readableRecords(buffer); List<Record> records = Utils.toList(memRecords.records().iterator()); - assertEquals(2, records.size()); + assertEquals(1, records.size()); Record record0 = records.get(0); - assertTrue(record0.hasMagic(RecordBatch.MAGIC_VALUE_V2)); + assertTrue(record0.hasMagic(magic)); assertEquals("foo", Utils.utf8(record0.value(), record0.valueSize())); - Record record1 = records.get(1); - assertTrue(record1.hasMagic(RecordBatch.MAGIC_VALUE_V2)); - assertEquals("bar", Utils.utf8(record1.value(), record1.valueSize())); } } + + @Test + public void testSimpleBatchIteration() throws IOException { + try (FileRecords fileRecords = FileRecords.open(tempFile())) { + SimpleRecord firstBatchRecord = new SimpleRecord(3241324L, "a".getBytes(), "foo".getBytes()); + SimpleRecord secondBatchRecord = new SimpleRecord(234280L, "b".getBytes(), "bar".getBytes()); + + fileRecords.append(MemoryRecords.withRecords(magic, 0L, compression, CREATE_TIME, firstBatchRecord)); + fileRecords.append(MemoryRecords.withRecords(magic, 1L, compression, CREATE_TIME, secondBatchRecord)); + fileRecords.flush(); + + FileLogInputStream logInputStream = new FileLogInputStream(fileRecords.channel(), 0, + fileRecords.sizeInBytes()); + + FileChannelRecordBatch firstBatch = logInputStream.nextBatch(); + assertGenericRecordBatchData(firstBatch, 0L, 3241324L, firstBatchRecord); + assertNoProducerData(firstBatch); + + FileChannelRecordBatch secondBatch = logInputStream.nextBatch(); + assertGenericRecordBatchData(secondBatch, 1L, 234280L, secondBatchRecord); + assertNoProducerData(secondBatch); + + assertNull(logInputStream.nextBatch()); + } + } + + @Test + public void testBatchIterationWithMultipleRecordsPerBatch() throws IOException { + if (magic < MAGIC_VALUE_V2 && compression == CompressionType.NONE) + return; + + try (FileRecords fileRecords = FileRecords.open(tempFile())) { + SimpleRecord[] firstBatchRecords = new SimpleRecord[]{ + new SimpleRecord(3241324L, "a".getBytes(), "1".getBytes()), + new SimpleRecord(234280L, "b".getBytes(), "2".getBytes()) + + }; + SimpleRecord[] secondBatchRecords = new SimpleRecord[]{ + new SimpleRecord(238423489L, "c".getBytes(), "3".getBytes()), + new SimpleRecord(897839L, null, "4".getBytes()), + new SimpleRecord(8234020L, "e".getBytes(), null) + }; + + fileRecords.append(MemoryRecords.withRecords(magic, 0L, compression, CREATE_TIME, firstBatchRecords)); + fileRecords.append(MemoryRecords.withRecords(magic, 1L, compression, CREATE_TIME, secondBatchRecords)); + fileRecords.flush(); + + FileLogInputStream logInputStream = new FileLogInputStream(fileRecords.channel(), 0, + fileRecords.sizeInBytes()); + + FileChannelRecordBatch firstBatch = logInputStream.nextBatch(); + assertNoProducerData(firstBatch); + assertGenericRecordBatchData(firstBatch, 0L, 3241324L, firstBatchRecords); + + FileChannelRecordBatch secondBatch = logInputStream.nextBatch(); + assertNoProducerData(secondBatch); + assertGenericRecordBatchData(secondBatch, 1L, 238423489L, secondBatchRecords); + + assertNull(logInputStream.nextBatch()); + } + } + + @Test + public void testBatchIterationV2() throws IOException { + if (magic != MAGIC_VALUE_V2) + return; + + try (FileRecords fileRecords = FileRecords.open(tempFile())) { + long producerId = 83843L; + short producerEpoch = 15; + int baseSequence = 234; + int partitionLeaderEpoch = 9832; + + SimpleRecord[] firstBatchRecords = new SimpleRecord[]{ + new SimpleRecord(3241324L, "a".getBytes(), "1".getBytes()), + new SimpleRecord(234280L, "b".getBytes(), "2".getBytes()) + + }; + SimpleRecord[] secondBatchRecords = new SimpleRecord[]{ + new SimpleRecord(238423489L, "c".getBytes(), "3".getBytes()), + new SimpleRecord(897839L, null, "4".getBytes()), + new SimpleRecord(8234020L, "e".getBytes(), null) + }; + + fileRecords.append(MemoryRecords.withIdempotentRecords(magic, 15L, compression, producerId, + producerEpoch, baseSequence, partitionLeaderEpoch, firstBatchRecords)); + fileRecords.append(MemoryRecords.withTransactionalRecords(magic, 27L, compression, producerId, + producerEpoch, baseSequence + firstBatchRecords.length, partitionLeaderEpoch, secondBatchRecords)); + fileRecords.flush(); + + FileLogInputStream logInputStream = new FileLogInputStream(fileRecords.channel(), 0, + fileRecords.sizeInBytes()); + + FileChannelRecordBatch firstBatch = logInputStream.nextBatch(); + assertProducerData(firstBatch, producerId, producerEpoch, baseSequence, false, firstBatchRecords); + assertGenericRecordBatchData(firstBatch, 15L, 3241324L, firstBatchRecords); + assertEquals(partitionLeaderEpoch, firstBatch.partitionLeaderEpoch()); + + FileChannelRecordBatch secondBatch = logInputStream.nextBatch(); + assertProducerData(secondBatch, producerId, producerEpoch, baseSequence + firstBatchRecords.length, + true, secondBatchRecords); + assertGenericRecordBatchData(secondBatch, 27L, 238423489L, secondBatchRecords); + assertEquals(partitionLeaderEpoch, secondBatch.partitionLeaderEpoch()); + + assertNull(logInputStream.nextBatch()); + } + } + + @Test + public void testBatchIterationIncompleteBatch() throws IOException { + try (FileRecords fileRecords = FileRecords.open(tempFile())) { + SimpleRecord firstBatchRecord = new SimpleRecord(100L, "foo".getBytes()); + SimpleRecord secondBatchRecord = new SimpleRecord(200L, "bar".getBytes()); + + fileRecords.append(MemoryRecords.withRecords(magic, 0L, compression, CREATE_TIME, firstBatchRecord)); + fileRecords.append(MemoryRecords.withRecords(magic, 1L, compression, CREATE_TIME, secondBatchRecord)); + fileRecords.flush(); + fileRecords.truncateTo(fileRecords.sizeInBytes() - 13); + + FileLogInputStream logInputStream = new FileLogInputStream(fileRecords.channel(), 0, + fileRecords.sizeInBytes()); + + FileChannelRecordBatch firstBatch = logInputStream.nextBatch(); + assertNoProducerData(firstBatch); + assertGenericRecordBatchData(firstBatch, 0L, 100L, firstBatchRecord); + + assertNull(logInputStream.nextBatch()); + } + } + + private void assertProducerData(RecordBatch batch, long producerId, short producerEpoch, int baseSequence, + boolean isTransactional, SimpleRecord ... records) { + assertEquals(producerId, batch.producerId()); + assertEquals(producerEpoch, batch.producerEpoch()); + assertEquals(baseSequence, batch.baseSequence()); + assertEquals(baseSequence + records.length - 1, batch.lastSequence()); + assertEquals(isTransactional, batch.isTransactional()); + } + + private void assertNoProducerData(RecordBatch batch) { + assertEquals(RecordBatch.NO_PRODUCER_ID, batch.producerId()); + assertEquals(RecordBatch.NO_PRODUCER_EPOCH, batch.producerEpoch()); + assertEquals(RecordBatch.NO_SEQUENCE, batch.baseSequence()); + assertEquals(RecordBatch.NO_SEQUENCE, batch.lastSequence()); + assertFalse(batch.isTransactional()); + } + + private void assertGenericRecordBatchData(RecordBatch batch, long baseOffset, long maxTimestamp, SimpleRecord ... records) { + assertEquals(magic, batch.magic()); + assertEquals(compression, batch.compressionType()); + + if (magic == MAGIC_VALUE_V0) { + assertEquals(NO_TIMESTAMP_TYPE, batch.timestampType()); + } else { + assertEquals(CREATE_TIME, batch.timestampType()); + assertEquals(maxTimestamp, batch.maxTimestamp()); + } + + assertEquals(baseOffset + records.length - 1, batch.lastOffset()); + if (magic >= MAGIC_VALUE_V2) + assertEquals(Integer.valueOf(records.length), batch.countOrNull()); + + assertEquals(baseOffset, batch.baseOffset()); + assertTrue(batch.isValid()); + + List<Record> batchRecords = TestUtils.toList(batch); + for (int i = 0; i < records.length; i++) { + assertEquals(baseOffset + i, batchRecords.get(i).offset()); + assertEquals(records[i].key(), batchRecords.get(i).key()); + assertEquals(records[i].value(), batchRecords.get(i).value()); + if (magic == MAGIC_VALUE_V0) + assertEquals(NO_TIMESTAMP, batchRecords.get(i).timestamp()); + else + assertEquals(records[i].timestamp(), batchRecords.get(i).timestamp()); + } + } + + @Parameterized.Parameters(name = "magic={0}, compression={1}") + public static Collection<Object[]> data() { + List<Object[]> values = new ArrayList<>(); + for (byte magic : asList(MAGIC_VALUE_V0, MAGIC_VALUE_V1, MAGIC_VALUE_V2)) + for (CompressionType type: CompressionType.values()) + values.add(new Object[] {magic, type}); + return values; + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/81f0c1e8/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java index 11ee419..8b9c900 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java @@ -26,11 +26,11 @@ import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; -import java.util.Arrays; import java.util.Collections; import java.util.Iterator; import java.util.List; +import static java.util.Arrays.asList; import static org.apache.kafka.test.TestUtils.tempFile; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -324,8 +324,8 @@ public class FileRecordsTest { } private void doTestConversion(CompressionType compressionType, byte toMagic) throws IOException { - List<Long> offsets = Arrays.asList(0L, 2L, 3L, 9L, 11L, 15L); - List<SimpleRecord> records = Arrays.asList( + List<Long> offsets = asList(0L, 2L, 3L, 9L, 11L, 15L); + List<SimpleRecord> records = asList( new SimpleRecord(1L, "k1".getBytes(), "hello".getBytes()), new SimpleRecord(2L, "k2".getBytes(), "goodbye".getBytes()), new SimpleRecord(3L, "k3".getBytes(), "hello again".getBytes()), http://git-wip-us.apache.org/repos/asf/kafka/blob/81f0c1e8/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java index 0922c48..c621d53 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java @@ -554,7 +554,7 @@ public class MemoryRecordsBuilderTest { } } - @Parameterized.Parameters + @Parameterized.Parameters(name = "bufferOffset={0}, compression={1}") public static Collection<Object[]> data() { List<Object[]> values = new ArrayList<>(); for (int bufferOffset : Arrays.asList(0, 15)) http://git-wip-us.apache.org/repos/asf/kafka/blob/81f0c1e8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala index e1d2882..c6fa1ce 100644 --- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala +++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala @@ -166,9 +166,9 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi /** When flag isShallow is set to be true, we do a shallow iteration: just traverse the first level of messages. **/ private def internalIterator(isShallow: Boolean = false): Iterator[MessageAndOffset] = { if (isShallow) - asRecords.batches.asScala.iterator.map(batch => MessageAndOffset.fromRecordBatch(batch.asInstanceOf[AbstractLegacyRecordBatch])) + asRecords.batches.asScala.iterator.map(MessageAndOffset.fromRecordBatch) else - asRecords.records.asScala.iterator.map(record => MessageAndOffset.fromRecordBatch(record.asInstanceOf[AbstractLegacyRecordBatch])) + asRecords.records.asScala.iterator.map(MessageAndOffset.fromRecord) } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/81f0c1e8/core/src/main/scala/kafka/message/MessageAndOffset.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/message/MessageAndOffset.scala b/core/src/main/scala/kafka/message/MessageAndOffset.scala index 349e90b..8de0f81 100644 --- a/core/src/main/scala/kafka/message/MessageAndOffset.scala +++ b/core/src/main/scala/kafka/message/MessageAndOffset.scala @@ -17,11 +17,29 @@ package kafka.message -import org.apache.kafka.common.record.AbstractLegacyRecordBatch +import org.apache.kafka.common.record.{AbstractLegacyRecordBatch, Record, RecordBatch} object MessageAndOffset { - def fromRecordBatch(recordBatch: AbstractLegacyRecordBatch): MessageAndOffset = { - MessageAndOffset(Message.fromRecord(recordBatch.outerRecord), recordBatch.lastOffset) + def fromRecordBatch(batch: RecordBatch): MessageAndOffset = { + batch match { + case legacyBatch: AbstractLegacyRecordBatch => + MessageAndOffset(Message.fromRecord(legacyBatch.outerRecord), legacyBatch.lastOffset) + + case _ => + throw new IllegalArgumentException(s"Illegal batch type ${batch.getClass}. The older message format classes " + + s"only support conversion from ${classOf[AbstractLegacyRecordBatch]}, which is used for magic v0 and v1") + } + } + + def fromRecord(record: Record): MessageAndOffset = { + record match { + case legacyBatch: AbstractLegacyRecordBatch => + MessageAndOffset(Message.fromRecord(legacyBatch.outerRecord), legacyBatch.lastOffset) + + case _ => + throw new IllegalArgumentException(s"Illegal record type ${record.getClass}. The older message format classes " + + s"only support conversion from ${classOf[AbstractLegacyRecordBatch]}, which is used for magic v0 and v1") + } } }
