Repository: kafka Updated Branches: refs/heads/trunk b50387eb7 -> dfa3c8a92
KAFKA-5316; LogCleaner should account for larger record sets after cleaning Author: Jason Gustafson <[email protected]> Reviewers: Ismael Juma <[email protected]>, Jun Rao <[email protected]> Closes #3142 from hachikuji/KAFKA-5316 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/dfa3c8a9 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/dfa3c8a9 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/dfa3c8a9 Branch: refs/heads/trunk Commit: dfa3c8a92dddd58cab95e12c72669f250bb99683 Parents: b50387e Author: Jason Gustafson <[email protected]> Authored: Sun May 28 09:57:59 2017 -0700 Committer: Jason Gustafson <[email protected]> Committed: Sun May 28 09:57:59 2017 -0700 ---------------------------------------------------------------------- checkstyle/checkstyle.xml | 2 +- .../producer/internals/ProducerBatch.java | 57 +++------ .../record/AbstractLegacyRecordBatch.java | 6 + .../kafka/common/record/DefaultRecordBatch.java | 14 +-- .../kafka/common/record/FileLogInputStream.java | 7 -- .../apache/kafka/common/record/FileRecords.java | 21 +--- .../kafka/common/record/MemoryRecords.java | 110 ++++++++++------ .../common/record/MemoryRecordsBuilder.java | 124 ++++++++++++------- .../kafka/common/record/MutableRecordBatch.java | 9 ++ .../common/utils/ByteBufferOutputStream.java | 45 ++++++- .../org/apache/kafka/clients/MockClient.java | 15 ++- .../internals/RecordAccumulatorTest.java | 8 +- .../clients/producer/internals/SenderTest.java | 61 ++++++--- .../common/record/FileLogInputStreamTest.java | 2 +- .../kafka/common/record/MemoryRecordsTest.java | 70 +++++++++-- .../utils/ByteBufferOutputStreamTest.java | 101 +++++++++++++++ core/src/main/scala/kafka/log/Log.scala | 2 +- core/src/main/scala/kafka/log/LogCleaner.scala | 11 +- core/src/main/scala/kafka/log/LogSegment.scala | 14 +-- .../scala/kafka/tools/DumpLogSegments.scala | 4 +- .../scala/unit/kafka/log/LogSegmentTest.scala | 10 +- .../src/test/scala/unit/kafka/log/LogTest.scala | 9 +- 22 files changed, 490 insertions(+), 212 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa3c8a9/checkstyle/checkstyle.xml ---------------------------------------------------------------------- diff --git a/checkstyle/checkstyle.xml b/checkstyle/checkstyle.xml index 743c68d..ccab85c 100644 --- a/checkstyle/checkstyle.xml +++ b/checkstyle/checkstyle.xml @@ -105,7 +105,7 @@ </module> <module name="ClassDataAbstractionCoupling"> <!-- default is 7 --> - <property name="max" value="17"/> + <property name="max" value="20"/> </module> <module name="BooleanExpressionComplexity"> <!-- default is 3 --> http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa3c8a9/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java index df79707..974e230 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java @@ -16,9 +16,6 @@ */ package org.apache.kafka.clients.producer.internals; -import java.util.ArrayDeque; -import java.util.Deque; -import java.util.Iterator; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; @@ -27,23 +24,24 @@ import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.record.AbstractRecords; import org.apache.kafka.common.record.CompressionRatioEstimator; -import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; import org.apache.kafka.common.record.MutableRecordBatch; import org.apache.kafka.common.record.Record; import org.apache.kafka.common.record.RecordBatch; -import org.apache.kafka.common.record.MemoryRecords; -import org.apache.kafka.common.record.MemoryRecordsBuilder; -import org.apache.kafka.common.record.Records; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.requests.ProduceResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.nio.ByteBuffer; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.ArrayDeque; import java.util.ArrayList; +import java.util.Deque; +import java.util.Iterator; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import static org.apache.kafka.common.record.RecordBatch.NO_TIMESTAMP; @@ -119,9 +117,9 @@ public final class ProducerBatch { } /** - + * This method is only used by {@link #split(int)} when splitting a large batch to smaller ones. - + * @return true if the record has been successfully appended, false otherwise. - + */ + * This method is only used by {@link #split(int)} when splitting a large batch to smaller ones. + * @return true if the record has been successfully appended, false otherwise. + */ private boolean tryAppendForSplit(long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers, Thunk thunk) { if (!recordsBuilder.hasRoomFor(timestamp, key, value)) { return false; @@ -196,15 +194,13 @@ public final class ProducerBatch { assert thunkIter.hasNext(); Thunk thunk = thunkIter.next(); if (batch == null) { - batch = createBatchOffAccumulatorForRecord(this.topicPartition, this.recordsBuilder.compressionType(), - record, splitBatchSize, this.createdMs); + batch = createBatchOffAccumulatorForRecord(record, splitBatchSize); } // A newly created batch can always host the first message. if (!batch.tryAppendForSplit(record.timestamp(), record.key(), record.value(), record.headers(), thunk)) { batches.add(batch); - batch = createBatchOffAccumulatorForRecord(this.topicPartition, this.recordsBuilder.compressionType(), - record, splitBatchSize, this.createdMs); + batch = createBatchOffAccumulatorForRecord(record, splitBatchSize); batch.tryAppendForSplit(record.timestamp(), record.key(), record.value(), record.headers(), thunk); } } @@ -217,30 +213,13 @@ public final class ProducerBatch { return batches; } - private ProducerBatch createBatchOffAccumulatorForRecord(TopicPartition tp, - CompressionType compressionType, - Record record, - int batchSize, - long createdMs) { - int initialSize = Math.max(Records.LOG_OVERHEAD + AbstractRecords.sizeInBytesUpperBound(magic(), - record.key(), - record.value(), - record.headers()), - batchSize); - return createBatchOffAccumulator(tp, compressionType, initialSize, createdMs); - } - - // package private for testing purpose. - static ProducerBatch createBatchOffAccumulator(TopicPartition tp, - CompressionType compressionType, - int batchSize, - long createdMs) { - ByteBuffer buffer = ByteBuffer.allocate(batchSize); - MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, - compressionType, - TimestampType.CREATE_TIME, - batchSize); - return new ProducerBatch(tp, builder, createdMs, true); + private ProducerBatch createBatchOffAccumulatorForRecord(Record record, int batchSize) { + int initialSize = Math.max(AbstractRecords.sizeInBytesUpperBound(magic(), + record.key(), record.value(), record.headers()), batchSize); + ByteBuffer buffer = ByteBuffer.allocate(initialSize); + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic(), recordsBuilder.compressionType(), + TimestampType.CREATE_TIME, 0L, recordsBuilder.isTransactional()); + return new ProducerBatch(topicPartition, builder, this.createdMs, true); } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa3c8a9/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java index e028988..be69686 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.errors.CorruptRecordException; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.utils.AbstractIterator; import org.apache.kafka.common.utils.ByteBufferInputStream; +import org.apache.kafka.common.utils.ByteBufferOutputStream; import org.apache.kafka.common.utils.ByteUtils; import org.apache.kafka.common.utils.CloseableIterator; import org.apache.kafka.common.utils.Utils; @@ -479,6 +480,11 @@ public abstract class AbstractLegacyRecordBatch extends AbstractRecordBatch impl } @Override + public void writeTo(ByteBufferOutputStream outputStream) { + outputStream.write(buffer.duplicate()); + } + + @Override public boolean equals(Object o) { if (this == o) return true; http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa3c8a9/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java index 4e52d61..f01116e 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java @@ -19,10 +19,10 @@ package org.apache.kafka.common.record; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.utils.ByteBufferInputStream; +import org.apache.kafka.common.utils.ByteBufferOutputStream; import org.apache.kafka.common.utils.ByteUtils; import org.apache.kafka.common.utils.CloseableIterator; import org.apache.kafka.common.utils.Crc32C; -import org.apache.kafka.common.utils.Utils; import java.io.DataInputStream; import java.io.IOException; @@ -207,6 +207,11 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe } @Override + public void writeTo(ByteBufferOutputStream outputStream) { + outputStream.write(this.buffer.duplicate()); + } + + @Override public boolean isTransactional() { return (attributes() & TRANSACTIONAL_FLAG_MASK) > 0; } @@ -447,13 +452,6 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe /** * Get an upper bound on the size of a batch with only a single record using a given key and value. */ - static int batchSizeUpperBound(byte[] key, byte[] value, Header[] headers) { - return batchSizeUpperBound(Utils.wrapNullable(key), Utils.wrapNullable(value), headers); - } - - /** - * Get an upper bound on the size of a batch with only a single record using a given key and value. - */ static int batchSizeUpperBound(ByteBuffer key, ByteBuffer value, Header[] headers) { return RECORD_BATCH_OVERHEAD + DefaultRecord.recordSizeUpperBound(key, value, headers); } http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa3c8a9/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java index 1af5527..5fe1cef 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java +++ b/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java @@ -35,22 +35,18 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil private int position; private final int end; private final FileChannel channel; - private final int maxRecordSize; private final ByteBuffer logHeaderBuffer = ByteBuffer.allocate(LOG_OVERHEAD); /** * Create a new log input stream over the FileChannel * @param channel Underlying FileChannel - * @param maxRecordSize Maximum size of records * @param start Position in the file channel to start from * @param end Position in the file channel not to read past */ FileLogInputStream(FileChannel channel, - int maxRecordSize, int start, int end) { this.channel = channel; - this.maxRecordSize = maxRecordSize; this.position = start; this.end = end; } @@ -71,9 +67,6 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil if (size < LegacyRecord.RECORD_OVERHEAD_V0) throw new CorruptRecordException(String.format("Record size is smaller than minimum record overhead (%d).", LegacyRecord.RECORD_OVERHEAD_V0)); - if (size > maxRecordSize) - throw new CorruptRecordException(String.format("Record size exceeds the largest allowable message size (%d).", maxRecordSize)); - if (position + LOG_OVERHEAD + size > end) return null; http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa3c8a9/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java index 16d3777..a72ba8b 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java @@ -339,35 +339,22 @@ public class FileRecords extends AbstractRecords implements Closeable { return batches; } - /** - * Get an iterator over the record batches, enforcing a maximum record size - * @param maxRecordSize The maximum allowable size of individual records (including compressed record sets) - * @return An iterator over the batches - */ - public Iterable<FileChannelRecordBatch> batches(int maxRecordSize) { - return batches(maxRecordSize, start); - } - - private Iterable<FileChannelRecordBatch> batchesFrom(int start) { - return batches(Integer.MAX_VALUE, start); - } - - private Iterable<FileChannelRecordBatch> batches(final int maxRecordSize, final int start) { + private Iterable<FileChannelRecordBatch> batchesFrom(final int start) { return new Iterable<FileChannelRecordBatch>() { @Override public Iterator<FileChannelRecordBatch> iterator() { - return batchIterator(maxRecordSize, start); + return batchIterator(start); } }; } - private Iterator<FileChannelRecordBatch> batchIterator(int maxRecordSize, int start) { + private Iterator<FileChannelRecordBatch> batchIterator(int start) { final int end; if (isSlice) end = this.end; else end = this.sizeInBytes(); - FileLogInputStream inputStream = new FileLogInputStream(channel, maxRecordSize, start, end); + FileLogInputStream inputStream = new FileLogInputStream(channel, start, end); return new RecordBatchIterator<>(inputStream); } http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa3c8a9/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java index d3bdee2..56d7ed1 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java @@ -16,6 +16,11 @@ */ package org.apache.kafka.common.record; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.GatheringByteChannel; @@ -31,8 +36,8 @@ import java.util.Objects; * or one of the {@link #builder(ByteBuffer, byte, CompressionType, TimestampType, long)} variants. */ public class MemoryRecords extends AbstractRecords { - - public final static MemoryRecords EMPTY = MemoryRecords.readableRecords(ByteBuffer.allocate(0)); + private static final Logger log = LoggerFactory.getLogger(MemoryRecords.class); + public static final MemoryRecords EMPTY = MemoryRecords.readableRecords(ByteBuffer.allocate(0)); private final ByteBuffer buffer; @@ -110,16 +115,21 @@ public class MemoryRecords extends AbstractRecords { /** * Filter the records into the provided ByteBuffer. + * @param partition The partition that is filtered (used only for logging) * @param filter The filter function * @param destinationBuffer The byte buffer to write the filtered records to - * @return A FilterResult with a summary of the output (for metrics) + * @param maxRecordBatchSize The maximum record batch size. Note this is not a hard limit: if a batch + * exceeds this after filtering, we log a warning, but the batch will still be + * created. + * @return A FilterResult with a summary of the output (for metrics) and potentially an overflow buffer */ - public FilterResult filterTo(RecordFilter filter, ByteBuffer destinationBuffer) { - return filterTo(batches(), filter, destinationBuffer); + public FilterResult filterTo(TopicPartition partition, RecordFilter filter, ByteBuffer destinationBuffer, + int maxRecordBatchSize) { + return filterTo(partition, batches(), filter, destinationBuffer, maxRecordBatchSize); } - private static FilterResult filterTo(Iterable<MutableRecordBatch> batches, RecordFilter filter, - ByteBuffer destinationBuffer) { + private static FilterResult filterTo(TopicPartition partition, Iterable<MutableRecordBatch> batches, + RecordFilter filter, ByteBuffer destinationBuffer, int maxRecordBatchSize) { long maxTimestamp = RecordBatch.NO_TIMESTAMP; long maxOffset = -1L; long shallowOffsetOfMaxTimestamp = -1L; @@ -128,6 +138,8 @@ public class MemoryRecords extends AbstractRecords { int messagesRetained = 0; int bytesRetained = 0; + ByteBufferOutputStream bufferOutputStream = new ByteBufferOutputStream(destinationBuffer); + for (MutableRecordBatch batch : batches) { bytesRead += batch.sizeInBytes(); @@ -140,7 +152,7 @@ public class MemoryRecords extends AbstractRecords { // recopy the messages to the destination buffer. byte batchMagic = batch.magic(); - boolean writeOriginalEntry = true; + boolean writeOriginalBatch = true; List<Record> retainedRecords = new ArrayList<>(); for (Record record : batch) { @@ -150,20 +162,19 @@ public class MemoryRecords extends AbstractRecords { // Check for log corruption due to KAFKA-4298. If we find it, make sure that we overwrite // the corrupted batch with correct data. if (!record.hasMagic(batchMagic)) - writeOriginalEntry = false; + writeOriginalBatch = false; if (record.offset() > maxOffset) maxOffset = record.offset(); retainedRecords.add(record); } else { - writeOriginalEntry = false; + writeOriginalBatch = false; } } - if (writeOriginalEntry) { - // There are no messages compacted out and no message format conversion, write the original message set back - batch.writeTo(destinationBuffer); + if (writeOriginalBatch) { + batch.writeTo(bufferOutputStream); messagesRetained += retainedRecords.size(); bytesRetained += batch.sizeInBytes(); if (batch.maxTimestamp() > maxTimestamp) { @@ -171,29 +182,18 @@ public class MemoryRecords extends AbstractRecords { shallowOffsetOfMaxTimestamp = batch.lastOffset(); } } else if (!retainedRecords.isEmpty()) { - ByteBuffer slice = destinationBuffer.slice(); - TimestampType timestampType = batch.timestampType(); - long logAppendTime = timestampType == TimestampType.LOG_APPEND_TIME ? batch.maxTimestamp() : RecordBatch.NO_TIMESTAMP; - long baseOffset = batchMagic >= RecordBatch.MAGIC_VALUE_V2 ? - batch.baseOffset() : retainedRecords.get(0).offset(); - - MemoryRecordsBuilder builder = builder(slice, batch.magic(), batch.compressionType(), timestampType, - baseOffset, logAppendTime, batch.producerId(), batch.producerEpoch(), batch.baseSequence(), - batch.isTransactional(), batch.partitionLeaderEpoch()); - - for (Record record : retainedRecords) - builder.append(record); - - if (batch.magic() >= RecordBatch.MAGIC_VALUE_V2) - // we must preserve the last offset from the initial batch in order to ensure that the - // last sequence number from the batch remains even after compaction. Otherwise, the producer - // could incorrectly see an out of sequence error. - builder.overrideLastOffset(batch.lastOffset()); - + MemoryRecordsBuilder builder = buildRetainedRecordsInto(batch, retainedRecords, bufferOutputStream); MemoryRecords records = builder.build(); - destinationBuffer.position(destinationBuffer.position() + slice.position()); + int filteredBatchSize = records.sizeInBytes(); + messagesRetained += retainedRecords.size(); - bytesRetained += records.sizeInBytes(); + bytesRetained += filteredBatchSize; + + if (filteredBatchSize > batch.sizeInBytes() && filteredBatchSize > maxRecordBatchSize) + log.warn("Record batch from {} with last offset {} exceeded max record batch size {} after cleaning " + + "(new size is {}). Consumers from version 0.10.1 and earlier may need to " + + "increase their fetch sizes.", + partition, batch.lastOffset(), maxRecordBatchSize, filteredBatchSize); MemoryRecordsBuilder.RecordsInfo info = builder.info(); if (info.maxTimestamp > maxTimestamp) { @@ -201,9 +201,44 @@ public class MemoryRecords extends AbstractRecords { shallowOffsetOfMaxTimestamp = info.shallowOffsetOfMaxTimestamp; } } + + // If we had to allocate a new buffer to fit the filtered output (see KAFKA-5316), return early to + // avoid the need for additional allocations. + ByteBuffer outputBuffer = bufferOutputStream.buffer(); + if (outputBuffer != destinationBuffer) + return new FilterResult(outputBuffer, messagesRead, bytesRead, messagesRetained, bytesRetained, + maxOffset, maxTimestamp, shallowOffsetOfMaxTimestamp); } - return new FilterResult(messagesRead, bytesRead, messagesRetained, bytesRetained, maxOffset, maxTimestamp, shallowOffsetOfMaxTimestamp); + return new FilterResult(destinationBuffer, messagesRead, bytesRead, messagesRetained, bytesRetained, + maxOffset, maxTimestamp, shallowOffsetOfMaxTimestamp); + } + + private static MemoryRecordsBuilder buildRetainedRecordsInto(RecordBatch originalBatch, + List<Record> retainedRecords, + ByteBufferOutputStream bufferOutputStream) { + byte magic = originalBatch.magic(); + TimestampType timestampType = originalBatch.timestampType(); + long logAppendTime = timestampType == TimestampType.LOG_APPEND_TIME ? + originalBatch.maxTimestamp() : RecordBatch.NO_TIMESTAMP; + long baseOffset = magic >= RecordBatch.MAGIC_VALUE_V2 ? + originalBatch.baseOffset() : retainedRecords.get(0).offset(); + + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(bufferOutputStream, magic, + originalBatch.compressionType(), timestampType, baseOffset, logAppendTime, originalBatch.producerId(), + originalBatch.producerEpoch(), originalBatch.baseSequence(), originalBatch.isTransactional(), + originalBatch.isControlBatch(), originalBatch.partitionLeaderEpoch(), bufferOutputStream.limit()); + + for (Record record : retainedRecords) + builder.append(record); + + if (magic >= RecordBatch.MAGIC_VALUE_V2) + // we must preserve the last offset from the initial batch in order to ensure that the + // last sequence number from the batch remains even after compaction. Otherwise, the producer + // could incorrectly see an out of sequence error. + builder.overrideLastOffset(originalBatch.lastOffset()); + + return builder; } /** @@ -271,6 +306,7 @@ public class MemoryRecords extends AbstractRecords { } public static class FilterResult { + public final ByteBuffer output; public final int messagesRead; public final int bytesRead; public final int messagesRetained; @@ -279,13 +315,15 @@ public class MemoryRecords extends AbstractRecords { public final long maxTimestamp; public final long shallowOffsetOfMaxTimestamp; - public FilterResult(int messagesRead, + public FilterResult(ByteBuffer output, + int messagesRead, int bytesRead, int messagesRetained, int bytesRetained, long maxOffset, long maxTimestamp, long shallowOffsetOfMaxTimestamp) { + this.output = output; this.messagesRead = messagesRead; this.bytesRead = bytesRead; this.messagesRetained = messagesRetained; http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa3c8a9/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java index e055aa5..49d70c6 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java @@ -49,7 +49,7 @@ public class MemoryRecordsBuilder { // so it's not safe to hold a direct reference to the underlying ByteBuffer. private final ByteBufferOutputStream bufferStream; private final byte magic; - private final int initPos; + private final int initialPosition; private final long baseOffset; private final long logAppendTime; private final boolean isTransactional; @@ -75,25 +75,7 @@ public class MemoryRecordsBuilder { private MemoryRecords builtRecords; private boolean aborted = false; - /** - * Construct a new builder. - * - * @param buffer The underlying buffer to use (note that this class will allocate a new buffer if necessary - * to fit the records appended) - * @param magic The magic value to use - * @param compressionType The compression codec to use - * @param timestampType The desired timestamp type. For magic > 0, this cannot be {@link TimestampType#NO_TIMESTAMP_TYPE}. - * @param baseOffset The initial offset to use for - * @param logAppendTime The log append time of this record set. Can be set to NO_TIMESTAMP if CREATE_TIME is used. - * @param producerId The producer ID associated with the producer writing this record set - * @param producerEpoch The epoch of the producer - * @param baseSequence The sequence number of the first record in this set - * @param isTransactional Whether or not the records are part of a transaction - * @param writeLimit The desired limit on the total bytes for this record set (note that this can be exceeded - * when compression is used since size estimates are rough, and in the case that the first - * record added exceeds the size). - */ - public MemoryRecordsBuilder(ByteBuffer buffer, + public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream, byte magic, CompressionType compressionType, TimestampType timestampType, @@ -120,7 +102,6 @@ public class MemoryRecordsBuilder { this.compressionType = compressionType; this.baseOffset = baseOffset; this.logAppendTime = logAppendTime; - this.initPos = buffer.position(); this.numRecords = 0; this.writtenUncompressed = 0; this.actualCompressionRatio = 1; @@ -132,22 +113,62 @@ public class MemoryRecordsBuilder { this.isControlBatch = isControlBatch; this.partitionLeaderEpoch = partitionLeaderEpoch; this.writeLimit = writeLimit; - this.initialCapacity = buffer.capacity(); + + this.initialPosition = bufferStream.position(); + this.initialCapacity = bufferStream.capacity(); if (magic > RecordBatch.MAGIC_VALUE_V1) { - buffer.position(initPos + DefaultRecordBatch.RECORDS_OFFSET); + bufferStream.position(initialPosition + DefaultRecordBatch.RECORDS_OFFSET); } else if (compressionType != CompressionType.NONE) { // for compressed records, leave space for the header and the shallow message metadata // and move the starting position to the value payload offset - buffer.position(initPos + Records.LOG_OVERHEAD + LegacyRecord.recordOverhead(magic)); + bufferStream.position(initialPosition + Records.LOG_OVERHEAD + LegacyRecord.recordOverhead(magic)); } // create the stream - bufferStream = new ByteBufferOutputStream(buffer); - appendStream = new DataOutputStream(compressionType.wrapForOutput(bufferStream, magic, + this.bufferStream = bufferStream; + this.appendStream = new DataOutputStream(compressionType.wrapForOutput(this.bufferStream, magic, COMPRESSION_DEFAULT_BUFFER_SIZE)); } + /** + * Construct a new builder. + * + * @param buffer The underlying buffer to use (note that this class will allocate a new buffer if necessary + * to fit the records appended) + * @param magic The magic value to use + * @param compressionType The compression codec to use + * @param timestampType The desired timestamp type. For magic > 0, this cannot be {@link TimestampType#NO_TIMESTAMP_TYPE}. + * @param baseOffset The initial offset to use for + * @param logAppendTime The log append time of this record set. Can be set to NO_TIMESTAMP if CREATE_TIME is used. + * @param producerId The producer ID associated with the producer writing this record set + * @param producerEpoch The epoch of the producer + * @param baseSequence The sequence number of the first record in this set + * @param isTransactional Whether or not the records are part of a transaction + * @param isControlBatch Whether or not this is a control batch (e.g. for transaction markers) + * @param partitionLeaderEpoch The epoch of the partition leader appending the record set to the log + * @param writeLimit The desired limit on the total bytes for this record set (note that this can be exceeded + * when compression is used since size estimates are rough, and in the case that the first + * record added exceeds the size). + */ + public MemoryRecordsBuilder(ByteBuffer buffer, + byte magic, + CompressionType compressionType, + TimestampType timestampType, + long baseOffset, + long logAppendTime, + long producerId, + short producerEpoch, + int baseSequence, + boolean isTransactional, + boolean isControlBatch, + int partitionLeaderEpoch, + int writeLimit) { + this(new ByteBufferOutputStream(buffer), magic, compressionType, timestampType, baseOffset, logAppendTime, + producerId, producerEpoch, baseSequence, isTransactional, isControlBatch, partitionLeaderEpoch, + writeLimit); + } + public ByteBuffer buffer() { return bufferStream.buffer(); } @@ -168,6 +189,10 @@ public class MemoryRecordsBuilder { return isControlBatch; } + public boolean isTransactional() { + return isTransactional; + } + /** * Close this builder and return the resulting buffer. * @return The built log buffer @@ -249,7 +274,7 @@ public class MemoryRecordsBuilder { public void abort() { closeForRecordAppends(); - buffer().position(initPos); + buffer().position(initialPosition); aborted = true; } @@ -260,24 +285,12 @@ public class MemoryRecordsBuilder { if (builtRecords != null) return; - if (isTransactional && producerId == RecordBatch.NO_PRODUCER_ID) - throw new IllegalArgumentException("Cannot write transactional messages without a valid producer ID"); - - if (producerId != RecordBatch.NO_PRODUCER_ID) { - if (producerEpoch == RecordBatch.NO_PRODUCER_EPOCH) - throw new IllegalArgumentException("Invalid negative producer epoch"); - - if (baseSequence < 0 && !isControlBatch) - throw new IllegalArgumentException("Invalid negative sequence number used"); - - if (magic < RecordBatch.MAGIC_VALUE_V2) - throw new IllegalArgumentException("Idempotent messages are not supported for magic " + magic); - } + validateProducerState(); closeForRecordAppends(); if (numRecords == 0L) { - buffer().position(initPos); + buffer().position(initialPosition); builtRecords = MemoryRecords.EMPTY; } else { if (magic > RecordBatch.MAGIC_VALUE_V1) @@ -287,11 +300,27 @@ public class MemoryRecordsBuilder { ByteBuffer buffer = buffer().duplicate(); buffer.flip(); - buffer.position(initPos); + buffer.position(initialPosition); builtRecords = MemoryRecords.readableRecords(buffer.slice()); } } + private void validateProducerState() { + if (isTransactional && producerId == RecordBatch.NO_PRODUCER_ID) + throw new IllegalArgumentException("Cannot write transactional messages without a valid producer ID"); + + if (producerId != RecordBatch.NO_PRODUCER_ID) { + if (producerEpoch == RecordBatch.NO_PRODUCER_EPOCH) + throw new IllegalArgumentException("Invalid negative producer epoch"); + + if (baseSequence < 0 && !isControlBatch) + throw new IllegalArgumentException("Invalid negative sequence number used"); + + if (magic < RecordBatch.MAGIC_VALUE_V2) + throw new IllegalArgumentException("Idempotent messages are not supported for magic " + magic); + } + } + /** * Write the header to the default batch. * @return the written compressed bytes. @@ -300,8 +329,8 @@ public class MemoryRecordsBuilder { ensureOpenForRecordBatchWrite(); ByteBuffer buffer = bufferStream.buffer(); int pos = buffer.position(); - buffer.position(initPos); - int size = pos - initPos; + buffer.position(initialPosition); + int size = pos - initialPosition; int writtenCompressed = size - DefaultRecordBatch.RECORD_BATCH_OVERHEAD; int offsetDelta = (int) (lastOffset - baseOffset); @@ -331,9 +360,9 @@ public class MemoryRecordsBuilder { ensureOpenForRecordBatchWrite(); ByteBuffer buffer = bufferStream.buffer(); int pos = buffer.position(); - buffer.position(initPos); + buffer.position(initialPosition); - int wrapperSize = pos - initPos - Records.LOG_OVERHEAD; + int wrapperSize = pos - initialPosition - Records.LOG_OVERHEAD; int writtenCompressed = wrapperSize - LegacyRecord.recordOverhead(magic); AbstractLegacyRecordBatch.writeHeader(buffer, lastOffset, wrapperSize); @@ -544,7 +573,7 @@ public class MemoryRecordsBuilder { * @param record the record to add */ public void append(Record record) { - appendWithOffset(record.offset(), record.timestamp(), record.key(), record.value(), record.headers()); + appendWithOffset(record.offset(), isControlBatch, record.timestamp(), record.key(), record.value(), record.headers()); } /** @@ -736,4 +765,5 @@ public class MemoryRecordsBuilder { public short producerEpoch() { return this.producerEpoch; } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa3c8a9/clients/src/main/java/org/apache/kafka/common/record/MutableRecordBatch.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/MutableRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/MutableRecordBatch.java index 2f0a96c..728b6eb 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MutableRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MutableRecordBatch.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.common.record; +import org.apache.kafka.common.utils.ByteBufferOutputStream; + /** * A mutable record batch is one that can be modified in place (without copying). This is used by the broker * to override certain fields in the batch before appending it to the log. @@ -42,4 +44,11 @@ public interface MutableRecordBatch extends RecordBatch { * @param epoch The partition leader epoch to use */ void setPartitionLeaderEpoch(int epoch); + + /** + * Write this record batch into an output stream. + * @param outputStream The buffer to write the batch to + */ + void writeTo(ByteBufferOutputStream outputStream); + } http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa3c8a9/clients/src/main/java/org/apache/kafka/common/utils/ByteBufferOutputStream.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/utils/ByteBufferOutputStream.java b/clients/src/main/java/org/apache/kafka/common/utils/ByteBufferOutputStream.java index 9480c6d..79d4d4c 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/ByteBufferOutputStream.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/ByteBufferOutputStream.java @@ -20,16 +20,18 @@ import java.io.OutputStream; import java.nio.ByteBuffer; /** - * A byte buffer backed output outputStream + * A ByteBuffer-backed OutputStream */ public class ByteBufferOutputStream extends OutputStream { private static final float REALLOCATION_FACTOR = 1.1f; private ByteBuffer buffer; + private int initialPosition; public ByteBufferOutputStream(ByteBuffer buffer) { this.buffer = buffer; + this.initialPosition = buffer.position(); } public void write(int b) { @@ -40,18 +42,55 @@ public class ByteBufferOutputStream extends OutputStream { public void write(byte[] bytes, int off, int len) { if (buffer.remaining() < len) - expandBuffer(buffer.capacity() + len); + expandBuffer(buffer.position() + len); buffer.put(bytes, off, len); } + public void write(ByteBuffer buffer) { + if (buffer.hasArray()) + write(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining()); + else { + int pos = buffer.position(); + for (int i = pos, limit = buffer.remaining() + pos; i < limit; i++) + write(buffer.get(i)); + } + } + public ByteBuffer buffer() { return buffer; } + public int position() { + return buffer.position(); + } + + public int capacity() { + return buffer.capacity(); + } + + public int limit() { + return buffer.limit(); + } + + public void position(int position) { + if (position > buffer.limit()) + expandBuffer(position); + buffer.position(position); + } + private void expandBuffer(int size) { int expandSize = Math.max((int) (buffer.capacity() * REALLOCATION_FACTOR), size); ByteBuffer temp = ByteBuffer.allocate(expandSize); - temp.put(buffer.array(), buffer.arrayOffset(), buffer.position()); + if (buffer.hasArray()) { + temp.put(buffer.array(), buffer.arrayOffset(), buffer.position()); + } else { + int limit = buffer.position(); + for (int i = 0; i < limit; i++) + temp.put(buffer.get(i)); + } + + // reset the old buffer's position so that the partial data in the new buffer cannot be mistakenly consumed + buffer.position(initialPosition); buffer = temp; } http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa3c8a9/clients/src/test/java/org/apache/kafka/clients/MockClient.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java index 9ca95e3..ce3c599 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java @@ -153,8 +153,7 @@ public class MockClient implements KafkaClient { short version = nodeApiVersions.usableVersion(request.apiKey(), builder.desiredVersion()); AbstractRequest abstractRequest = request.requestBuilder().build(version); if (!futureResp.requestMatcher.matches(abstractRequest)) - throw new IllegalStateException("Next in line response did not match expected request, request: " - + abstractRequest); + throw new IllegalStateException("Request matcher did not match next-in-line request " + abstractRequest); ClientResponse resp = new ClientResponse(request.makeHeader(version), request.callback(), request.destination(), request.createdTimeMs(), time.milliseconds(), futureResp.disconnected, null, futureResp.responseBody); responses.add(resp); @@ -195,6 +194,18 @@ public class MockClient implements KafkaClient { respond(response, false); } + public void respond(RequestMatcher matcher, AbstractResponse response) { + ClientRequest nextRequest = requests.peek(); + if (nextRequest == null) + throw new IllegalStateException("No current requests queued"); + + AbstractRequest request = nextRequest.requestBuilder().build(); + if (!matcher.matches(request)) + throw new IllegalStateException("Request matcher did not match next-in-line request " + request); + + respond(response); + } + public void respond(AbstractResponse response, boolean disconnected) { ClientRequest request = requests.remove(); short version = request.requestBuilder().desiredOrLatestVersion(); http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa3c8a9/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index f48ab33..e079f2a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -32,8 +32,11 @@ import org.apache.kafka.common.record.CompressionRatioEstimator; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.DefaultRecord; import org.apache.kafka.common.record.DefaultRecordBatch; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; import org.apache.kafka.common.record.MutableRecordBatch; import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.requests.ApiVersionsResponse; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; @@ -563,7 +566,10 @@ public class RecordAccumulatorTest { RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.GZIP, 10, 100L, metrics, time, new ApiVersions(), null); // Create a big batch - ProducerBatch batch = ProducerBatch.createBatchOffAccumulator(tp1, CompressionType.NONE, 4096, now); + ByteBuffer buffer = ByteBuffer.allocate(4096); + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, CompressionType.NONE, TimestampType.CREATE_TIME, 0L); + ProducerBatch batch = new ProducerBatch(tp1, builder, now, true); + byte[] value = new byte[1024]; final AtomicInteger acked = new AtomicInteger(0); Callback cb = new Callback() { http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa3c8a9/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index c08ea57..77b1da8 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -38,14 +38,14 @@ import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.CompressionRatioEstimator; import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.MutableRecordBatch; import org.apache.kafka.common.record.RecordBatch; -import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.requests.ApiVersionsResponse; import org.apache.kafka.common.requests.InitProducerIdRequest; -import org.apache.kafka.common.requests.ProduceRequest; import org.apache.kafka.common.requests.InitProducerIdResponse; +import org.apache.kafka.common.requests.ProduceRequest; import org.apache.kafka.common.requests.ProduceResponse; import org.apache.kafka.common.requests.ResponseHeader; import org.apache.kafka.common.utils.MockTime; @@ -61,6 +61,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -554,28 +555,18 @@ public class SenderTest { CompressionRatioEstimator.setEstimation(topic, CompressionType.GZIP, 0.2f); Metrics m = new Metrics(); TransactionManager txnManager = new TransactionManager("testSplitBatchAndSend", 0); - txnManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(123456L, (short) 0)); + ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0); + txnManager.setProducerIdAndEpoch(producerIdAndEpoch); accumulator = new RecordAccumulator(batchSize, 1024 * 1024, CompressionType.GZIP, 0L, 0L, m, time, new ApiVersions(), txnManager); try { - Sender sender = new Sender(client, - metadata, - this.accumulator, - true, - MAX_REQUEST_SIZE, - ACKS_ALL, - maxRetries, - m, - time, - REQUEST_TIMEOUT, - 1000L, - txnManager, - new ApiVersions()); + Sender sender = new Sender(client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries, + m, time, REQUEST_TIMEOUT, 1000L, txnManager, new ApiVersions()); // Create a two broker cluster, with partition 0 on broker 0 and partition 1 on broker 1 Cluster cluster1 = TestUtils.clusterWith(2, topic, 2); metadata.update(cluster1, Collections.<String>emptySet(), time.milliseconds()); // Send the first message. - TopicPartition tp2 = new TopicPartition(topic, 1); + final TopicPartition tp2 = new TopicPartition(topic, 1); Future<RecordMetadata> f1 = accumulator.append(tp2, 0L, "key1".getBytes(), new byte[batchSize / 2], null, null, MAX_BLOCK_TIMEOUT).future; Future<RecordMetadata> f2 = @@ -607,7 +598,8 @@ public class SenderTest { assertTrue("Client ready status should be true", client.isReady(node, 0L)); responseMap.put(tp2, new ProduceResponse.PartitionResponse(Errors.NONE, 0L, 0L)); - client.respond(new ProduceResponse(responseMap)); + client.respond(produceRequestMatcher(tp2, producerIdAndEpoch, 0, false), new ProduceResponse(responseMap)); + sender.run(time.milliseconds()); // receive assertTrue("The future should have been done.", f1.isDone()); assertEquals("The sequence number should be 1", 1, txnManager.sequenceNumber(tp2).longValue()); @@ -621,7 +613,8 @@ public class SenderTest { assertTrue("Client ready status should be true", client.isReady(node, 0L)); responseMap.put(tp2, new ProduceResponse.PartitionResponse(Errors.NONE, 1L, 0L)); - client.respond(new ProduceResponse(responseMap)); + client.respond(produceRequestMatcher(tp2, producerIdAndEpoch, 1, false), new ProduceResponse(responseMap)); + sender.run(time.milliseconds()); // receive assertTrue("The future should have been done.", f2.isDone()); assertEquals("The sequence number should be 2", 2, txnManager.sequenceNumber(tp2).longValue()); @@ -635,6 +628,36 @@ public class SenderTest { } } + private MockClient.RequestMatcher produceRequestMatcher(final TopicPartition tp, + final ProducerIdAndEpoch producerIdAndEpoch, + final int sequence, + final boolean isTransactional) { + return new MockClient.RequestMatcher() { + @Override + public boolean matches(AbstractRequest body) { + if (!(body instanceof ProduceRequest)) + return false; + + ProduceRequest request = (ProduceRequest) body; + Map<TopicPartition, MemoryRecords> recordsMap = request.partitionRecordsOrFail(); + MemoryRecords records = recordsMap.get(tp); + if (records == null) + return false; + + List<MutableRecordBatch> batches = TestUtils.toList(records.batches()); + if (batches.isEmpty() || batches.size() > 1) + return false; + + MutableRecordBatch batch = batches.get(0); + return batch.baseOffset() == 0L && + batch.baseSequence() == sequence && + batch.producerId() == producerIdAndEpoch.producerId && + batch.producerEpoch() == producerIdAndEpoch.epoch && + batch.isTransactional() == isTransactional; + } + }; + } + private void completedWithError(Future<RecordMetadata> future, Errors error) throws Exception { assertTrue("Request should be completed", future.isDone()); try { http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa3c8a9/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java index 65de01c..7c37354 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java @@ -37,7 +37,7 @@ public class FileLogInputStreamTest { new SimpleRecord("bar".getBytes()))); fileRecords.flush(); - FileLogInputStream logInputStream = new FileLogInputStream(fileRecords.channel(), Integer.MAX_VALUE, 0, + FileLogInputStream logInputStream = new FileLogInputStream(fileRecords.channel(), 0, fileRecords.sizeInBytes()); FileLogInputStream.FileChannelRecordBatch batch = logInputStream.nextBatch(); http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa3c8a9/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java index afd0126..a7058a3 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.common.record; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.test.TestUtils; import org.junit.Test; @@ -30,6 +31,7 @@ import java.util.List; import static java.util.Arrays.asList; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -202,7 +204,8 @@ public class MemoryRecordsTest { builder.append(12L, null, "c".getBytes()); ByteBuffer filtered = ByteBuffer.allocate(2048); - builder.build().filterTo(new RetainNonNullKeysFilter(), filtered); + builder.build().filterTo(new TopicPartition("foo", 0), new RetainNonNullKeysFilter(), filtered, + Integer.MAX_VALUE); filtered.flip(); MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered); @@ -278,7 +281,7 @@ public class MemoryRecordsTest { buffer.flip(); ByteBuffer filtered = ByteBuffer.allocate(2048); - MemoryRecords.readableRecords(buffer).filterTo(new MemoryRecords.RecordFilter() { + MemoryRecords.readableRecords(buffer).filterTo(new TopicPartition("foo", 0), new MemoryRecords.RecordFilter() { @Override protected boolean shouldDiscard(RecordBatch batch) { // discard the second and fourth batches @@ -289,7 +292,7 @@ public class MemoryRecordsTest { protected boolean shouldRetain(RecordBatch recordBatch, Record record) { return true; } - }, filtered); + }, filtered, Integer.MAX_VALUE); filtered.flip(); MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered); @@ -316,7 +319,8 @@ public class MemoryRecordsTest { buffer.flip(); ByteBuffer filtered = ByteBuffer.allocate(2048); - MemoryRecords.readableRecords(buffer).filterTo(new RetainNonNullKeysFilter(), filtered); + MemoryRecords.readableRecords(buffer).filterTo(new TopicPartition("foo", 0), new RetainNonNullKeysFilter(), + filtered, Integer.MAX_VALUE); filtered.flip(); MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered); @@ -383,7 +387,8 @@ public class MemoryRecordsTest { buffer.flip(); ByteBuffer filtered = ByteBuffer.allocate(2048); - MemoryRecords.readableRecords(buffer).filterTo(new RetainNonNullKeysFilter(), filtered); + MemoryRecords.readableRecords(buffer).filterTo(new TopicPartition("foo", 0), new RetainNonNullKeysFilter(), + filtered, Integer.MAX_VALUE); filtered.flip(); MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered); @@ -440,6 +445,55 @@ public class MemoryRecordsTest { } @Test + public void testFilterToWithUndersizedBuffer() { + ByteBuffer buffer = ByteBuffer.allocate(1024); + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 0L); + builder.append(10L, null, "a".getBytes()); + builder.close(); + + builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 1L); + builder.append(11L, "1".getBytes(), new byte[128]); + builder.append(12L, "2".getBytes(), "c".getBytes()); + builder.append(13L, null, "d".getBytes()); + builder.close(); + + builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 4L); + builder.append(14L, null, "e".getBytes()); + builder.append(15L, "5".getBytes(), "f".getBytes()); + builder.append(16L, "6".getBytes(), "g".getBytes()); + builder.close(); + + builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 7L); + builder.append(17L, "7".getBytes(), new byte[128]); + builder.close(); + + buffer.flip(); + + ByteBuffer output = ByteBuffer.allocate(64); + + List<Record> records = new ArrayList<>(); + while (buffer.hasRemaining()) { + output.rewind(); + + MemoryRecords.FilterResult result = MemoryRecords.readableRecords(buffer) + .filterTo(new TopicPartition("foo", 0), new RetainNonNullKeysFilter(), output, Integer.MAX_VALUE); + + buffer.position(buffer.position() + result.bytesRead); + result.output.flip(); + + if (output != result.output) + assertEquals(0, output.position()); + + MemoryRecords filtered = MemoryRecords.readableRecords(result.output); + records.addAll(TestUtils.toList(filtered.records())); + } + + assertEquals(5, records.size()); + for (Record record : records) + assertNotNull(record.key()); + } + + @Test public void testFilterTo() { ByteBuffer buffer = ByteBuffer.allocate(2048); MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 0L); @@ -464,7 +518,8 @@ public class MemoryRecordsTest { buffer.flip(); ByteBuffer filtered = ByteBuffer.allocate(2048); - MemoryRecords.FilterResult result = MemoryRecords.readableRecords(buffer).filterTo(new RetainNonNullKeysFilter(), filtered); + MemoryRecords.FilterResult result = MemoryRecords.readableRecords(buffer).filterTo( + new TopicPartition("foo", 0), new RetainNonNullKeysFilter(), filtered, Integer.MAX_VALUE); filtered.flip(); @@ -576,7 +631,8 @@ public class MemoryRecordsTest { buffer.flip(); ByteBuffer filtered = ByteBuffer.allocate(2048); - MemoryRecords.readableRecords(buffer).filterTo(new RetainNonNullKeysFilter(), filtered); + MemoryRecords.readableRecords(buffer).filterTo(new TopicPartition("foo", 0), new RetainNonNullKeysFilter(), + filtered, Integer.MAX_VALUE); filtered.flip(); MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered); http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa3c8a9/clients/src/test/java/org/apache/kafka/common/utils/ByteBufferOutputStreamTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/utils/ByteBufferOutputStreamTest.java b/clients/src/test/java/org/apache/kafka/common/utils/ByteBufferOutputStreamTest.java new file mode 100644 index 0000000..2ef5672 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/utils/ByteBufferOutputStreamTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.utils; + +import org.junit.Test; + +import java.nio.ByteBuffer; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +public class ByteBufferOutputStreamTest { + + @Test + public void testExpandByteBufferOnPositionIncrease() throws Exception { + testExpandByteBufferOnPositionIncrease(ByteBuffer.allocate(16)); + } + + @Test + public void testExpandDirectByteBufferOnPositionIncrease() throws Exception { + testExpandByteBufferOnPositionIncrease(ByteBuffer.allocateDirect(16)); + } + + private void testExpandByteBufferOnPositionIncrease(ByteBuffer initialBuffer) throws Exception { + ByteBufferOutputStream output = new ByteBufferOutputStream(initialBuffer); + output.write("hello".getBytes()); + output.position(32); + assertEquals(32, output.position()); + assertEquals(0, initialBuffer.position()); + + ByteBuffer buffer = output.buffer(); + assertEquals(32, buffer.limit()); + buffer.position(0); + buffer.limit(5); + byte[] bytes = new byte[5]; + buffer.get(bytes); + assertArrayEquals("hello".getBytes(), bytes); + } + + @Test + public void testExpandByteBufferOnWrite() throws Exception { + testExpandByteBufferOnWrite(ByteBuffer.allocate(16)); + } + + @Test + public void testExpandDirectByteBufferOnWrite() throws Exception { + testExpandByteBufferOnWrite(ByteBuffer.allocateDirect(16)); + } + + private void testExpandByteBufferOnWrite(ByteBuffer initialBuffer) throws Exception { + ByteBufferOutputStream output = new ByteBufferOutputStream(initialBuffer); + output.write("hello".getBytes()); + output.write(new byte[27]); + assertEquals(32, output.position()); + assertEquals(0, initialBuffer.position()); + + ByteBuffer buffer = output.buffer(); + assertEquals(32, buffer.limit()); + buffer.position(0); + buffer.limit(5); + byte[] bytes = new byte[5]; + buffer.get(bytes); + assertArrayEquals("hello".getBytes(), bytes); + } + + @Test + public void testWriteByteBuffer() { + testWriteByteBuffer(ByteBuffer.allocate(16)); + } + + @Test + public void testWriteDirectByteBuffer() { + testWriteByteBuffer(ByteBuffer.allocateDirect(16)); + } + + private void testWriteByteBuffer(ByteBuffer input) { + long value = 234239230L; + input.putLong(value); + input.flip(); + + ByteBufferOutputStream output = new ByteBufferOutputStream(ByteBuffer.allocate(32)); + output.write(input); + assertEquals(8, output.position()); + assertEquals(value, output.buffer().getLong(0)); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa3c8a9/core/src/main/scala/kafka/log/Log.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 67b9271..c37ea08 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -318,7 +318,7 @@ class Log(@volatile var dir: File, loadProducersFromLog(stateManager, fetchDataInfo.records) } stateManager.updateMapEndOffset(segment.baseOffset) - val bytesTruncated = segment.recover(config.maxMessageSize, stateManager, leaderEpochCache) + val bytesTruncated = segment.recover(stateManager, leaderEpochCache) // once we have recovered the segment's data, take a snapshot to ensure that we won't // need to reload the same segment again while recovering another segment. http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa3c8a9/core/src/main/scala/kafka/log/LogCleaner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 8eda2e1..b05e37f 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -516,22 +516,23 @@ private[log] class Cleaner(val id: Int, source.log.readInto(readBuffer, position) val records = MemoryRecords.readableRecords(readBuffer) throttler.maybeThrottle(records.sizeInBytes) - val result = records.filterTo(logCleanerFilter, writeBuffer) + val result = records.filterTo(topicPartition, logCleanerFilter, writeBuffer, maxLogMessageSize) stats.readMessages(result.messagesRead, result.bytesRead) stats.recopyMessages(result.messagesRetained, result.bytesRetained) position += result.bytesRead // if any messages are to be retained, write them out - if (writeBuffer.position > 0) { - writeBuffer.flip() - val retained = MemoryRecords.readableRecords(writeBuffer) + val outputBuffer = result.output + if (outputBuffer.position > 0) { + outputBuffer.flip() + val retained = MemoryRecords.readableRecords(outputBuffer) dest.append(firstOffset = retained.batches.iterator.next().baseOffset, largestOffset = result.maxOffset, largestTimestamp = result.maxTimestamp, shallowOffsetOfMaxTimestamp = result.shallowOffsetOfMaxTimestamp, records = retained) - throttler.maybeThrottle(writeBuffer.limit) + throttler.maybeThrottle(outputBuffer.limit) } // if we read bytes but didn't get even one complete message, our I/O buffer is too small, grow it and try again http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa3c8a9/core/src/main/scala/kafka/log/LogSegment.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 70269bb..3e4c47d 100755 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -242,19 +242,16 @@ class LogSegment(val log: FileRecords, index.fetchUpperBoundOffset(startOffsetPosition, fetchSize).map(_.offset) /** - * Run recovery on the given segment. This will rebuild the index from the log file and lop off any invalid bytes from the end of the log and index. + * Run recovery on the given segment. This will rebuild the index from the log file and lop off any invalid bytes + * from the end of the log and index. * - * @param maxMessageSize A bound the memory allocation in the case of a corrupt message size--we will assume any message larger than this - * is corrupt. * @param producerStateManager Producer state corresponding to the segment's base offset. This is needed to recover * the transaction index. * @param leaderEpochCache Optionally a cache for updating the leader epoch during recovery. * @return The number of bytes truncated from the log */ @nonthreadsafe - def recover(maxMessageSize: Int, - producerStateManager: ProducerStateManager, - leaderEpochCache: Option[LeaderEpochCache] = None): Int = { + def recover(producerStateManager: ProducerStateManager, leaderEpochCache: Option[LeaderEpochCache] = None): Int = { index.truncate() index.resize(index.maxIndexSize) timeIndex.truncate() @@ -264,7 +261,7 @@ class LogSegment(val log: FileRecords, var lastIndexEntry = 0 maxTimestampSoFar = RecordBatch.NO_TIMESTAMP try { - for (batch <- log.batches(maxMessageSize).asScala) { + for (batch <- log.batches.asScala) { batch.ensureValid() // The max timestamp is exposed at the batch level, so no need to iterate the records @@ -296,6 +293,9 @@ class LogSegment(val log: FileRecords, .format(log.file.getAbsolutePath, validBytes, e.getMessage)) } val truncated = log.sizeInBytes - validBytes + if (truncated > 0) + logger.debug(s"Truncated $truncated invalid bytes at the end of segment ${log.file.getAbsoluteFile} during recovery") + log.truncateTo(validBytes) index.trimToValidSize() // A normally closed segment always appends the biggest timestamp ever seen into log segment, we do this as well. http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa3c8a9/core/src/main/scala/kafka/tools/DumpLogSegments.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala index f0c41c7..4de546f 100755 --- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala +++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala @@ -337,8 +337,8 @@ object DumpLogSegments { val messageSet = FileRecords.open(file, false) var validBytes = 0L var lastOffset = -1L - val batches = messageSet.batches(maxMessageSize).asScala - for (batch <- batches) { + + for (batch <- messageSet.batches.asScala) { if (isDeepIteration) { for (record <- batch.asScala) { if (lastOffset == -1) http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa3c8a9/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala index 5db1ed6..79fe220 100644 --- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala @@ -264,7 +264,7 @@ class LogSegmentTest { seg.append(i, i, RecordBatch.NO_TIMESTAMP, -1L, records(i, i.toString)) val indexFile = seg.index.file TestUtils.writeNonsenseToFile(indexFile, 5, indexFile.length.toInt) - seg.recover(64*1024, new ProducerStateManager(topicPartition, logDir)) + seg.recover(new ProducerStateManager(topicPartition, logDir)) for(i <- 0 until 100) assertEquals(i, seg.read(i, Some(i + 1), 1024).records.records.iterator.next().offset) } @@ -303,7 +303,7 @@ class LogSegmentTest { shallowOffsetOfMaxTimestamp = 107L, endTxnRecords(ControlRecordType.COMMIT, pid1, producerEpoch, offset = 107L)) var stateManager = new ProducerStateManager(topicPartition, logDir) - segment.recover(64 * 1024, stateManager) + segment.recover(stateManager) assertEquals(108L, stateManager.mapEndOffset) @@ -318,7 +318,7 @@ class LogSegmentTest { // recover again, but this time assuming the transaction from pid2 began on a previous segment stateManager = new ProducerStateManager(topicPartition, logDir) stateManager.loadProducerEntry(ProducerIdEntry(pid2, producerEpoch, 10, 90L, 5, RecordBatch.NO_TIMESTAMP, 0, Some(75L))) - segment.recover(64 * 1024, stateManager) + segment.recover(stateManager) assertEquals(108L, stateManager.mapEndOffset) abortedTxns = segment.txnIndex.allAbortedTxns @@ -352,7 +352,7 @@ class LogSegmentTest { seg.append(i, i, i * 10, i, records(i, i.toString)) val timeIndexFile = seg.timeIndex.file TestUtils.writeNonsenseToFile(timeIndexFile, 5, timeIndexFile.length.toInt) - seg.recover(64*1024, new ProducerStateManager(topicPartition, logDir)) + seg.recover(new ProducerStateManager(topicPartition, logDir)) for(i <- 0 until 100) { assertEquals(i, seg.findOffsetByTimestamp(i * 10).get.offset) if (i < 99) @@ -376,7 +376,7 @@ class LogSegmentTest { val recordPosition = seg.log.searchForOffsetWithSize(offsetToBeginCorruption, 0) val position = recordPosition.position + TestUtils.random.nextInt(15) TestUtils.writeNonsenseToFile(seg.log.file, position, (seg.log.file.length - position).toInt) - seg.recover(64*1024, new ProducerStateManager(topicPartition, logDir)) + seg.recover(new ProducerStateManager(topicPartition, logDir)) assertEquals("Should have truncated off bad messages.", (0 until offsetToBeginCorruption).toList, seg.log.batches.asScala.map(_.lastOffset).toList) seg.delete() http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa3c8a9/core/src/test/scala/unit/kafka/log/LogTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 84ff43b..52e9140 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -30,6 +30,7 @@ import org.junit.{After, Before, Test} import kafka.utils._ import kafka.server.{BrokerTopicStats, KafkaConfig} import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache} +import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.record.MemoryRecords.RecordFilter import org.apache.kafka.common.record._ import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction @@ -221,9 +222,9 @@ class LogTest { records.batches.asScala.foreach(_.setPartitionLeaderEpoch(0)) val filtered = ByteBuffer.allocate(2048) - records.filterTo(new RecordFilter { + records.filterTo(new TopicPartition("foo", 0), new RecordFilter { override def shouldRetain(recordBatch: RecordBatch, record: Record): Boolean = !record.hasKey - }, filtered) + }, filtered, Int.MaxValue) filtered.flip() val filteredRecords = MemoryRecords.readableRecords(filtered) @@ -265,9 +266,9 @@ class LogTest { records.batches.asScala.foreach(_.setPartitionLeaderEpoch(0)) val filtered = ByteBuffer.allocate(2048) - records.filterTo(new RecordFilter { + records.filterTo(new TopicPartition("foo", 0), new RecordFilter { override def shouldRetain(recordBatch: RecordBatch, record: Record): Boolean = !record.hasKey - }, filtered) + }, filtered, Int.MaxValue) filtered.flip() val filteredRecords = MemoryRecords.readableRecords(filtered)
