This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0ecb72f KAFKA-6834: Handle compaction with batches bigger than
max.message.bytes (#4953)
0ecb72f is described below
commit 0ecb72f59da6491edc96b99b147b4983be794acf
Author: Rajini Sivaram <[email protected]>
AuthorDate: Wed May 9 11:46:36 2018 +0100
KAFKA-6834: Handle compaction with batches bigger than max.message.bytes
(#4953)
Grow buffers in log cleaner to hold one message set after sanity check even
if message set is bigger than max.message.bytes.
Reviewers: Jason Gustafson <[email protected]>, Ismael Juma
<[email protected]>, Jun Rao <[email protected]>
---
.../common/record/ByteBufferLogInputStream.java | 41 ++++++++----
.../apache/kafka/common/record/MemoryRecords.java | 13 ++++
.../kafka/common/record/MemoryRecordsTest.java | 42 ++++++++++++
core/src/main/scala/kafka/log/LogCleaner.scala | 41 ++++++++++--
.../test/scala/unit/kafka/log/LogCleanerTest.scala | 75 +++++++++++++++++++++-
5 files changed, 194 insertions(+), 18 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java
b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java
index 22f417f..7f91f26 100644
---
a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java
+++
b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.errors.CorruptRecordException;
import java.io.IOException;
import java.nio.ByteBuffer;
+import static org.apache.kafka.common.record.Records.HEADER_SIZE_UP_TO_MAGIC;
import static org.apache.kafka.common.record.Records.LOG_OVERHEAD;
import static org.apache.kafka.common.record.Records.MAGIC_OFFSET;
import static org.apache.kafka.common.record.Records.SIZE_OFFSET;
@@ -40,9 +41,33 @@ class ByteBufferLogInputStream implements
LogInputStream<MutableRecordBatch> {
public MutableRecordBatch nextBatch() throws IOException {
int remaining = buffer.remaining();
- if (remaining < LOG_OVERHEAD)
+
+ Integer batchSize = nextBatchSize();
+ if (batchSize == null || remaining < batchSize)
return null;
+ byte magic = buffer.get(buffer.position() + MAGIC_OFFSET);
+
+ ByteBuffer batchSlice = buffer.slice();
+ batchSlice.limit(batchSize);
+ buffer.position(buffer.position() + batchSize);
+
+ if (magic > RecordBatch.MAGIC_VALUE_V1)
+ return new DefaultRecordBatch(batchSlice);
+ else
+ return new
AbstractLegacyRecordBatch.ByteBufferLegacyRecordBatch(batchSlice);
+ }
+
+ /**
+ * Validates the header of the next batch and returns batch size.
+ * @return next batch size including LOG_OVERHEAD if buffer contains
header up to
+ * magic byte, null otherwise
+ * @throws CorruptRecordException if record size or magic is invalid
+ */
+ Integer nextBatchSize() throws CorruptRecordException {
+ int remaining = buffer.remaining();
+ if (remaining < LOG_OVERHEAD)
+ return null;
int recordSize = buffer.getInt(buffer.position() + SIZE_OFFSET);
// V0 has the smallest overhead, stricter checking is done later
if (recordSize < LegacyRecord.RECORD_OVERHEAD_V0)
@@ -52,23 +77,13 @@ class ByteBufferLogInputStream implements
LogInputStream<MutableRecordBatch> {
throw new CorruptRecordException(String.format("Record size %d
exceeds the largest allowable message size (%d).",
recordSize, maxMessageSize));
- int batchSize = recordSize + LOG_OVERHEAD;
- if (remaining < batchSize)
+ if (remaining < HEADER_SIZE_UP_TO_MAGIC)
return null;
byte magic = buffer.get(buffer.position() + MAGIC_OFFSET);
-
- ByteBuffer batchSlice = buffer.slice();
- batchSlice.limit(batchSize);
- buffer.position(buffer.position() + batchSize);
-
if (magic < 0 || magic > RecordBatch.CURRENT_MAGIC_VALUE)
throw new CorruptRecordException("Invalid magic found in record: "
+ magic);
- if (magic > RecordBatch.MAGIC_VALUE_V1)
- return new DefaultRecordBatch(batchSlice);
- else
- return new
AbstractLegacyRecordBatch.ByteBufferLegacyRecordBatch(batchSlice);
+ return recordSize + LOG_OVERHEAD;
}
-
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index ea6aa4c..eb4e31b 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -18,6 +18,7 @@ package org.apache.kafka.common.record;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.CorruptRecordException;
import
org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.apache.kafka.common.utils.CloseableIterator;
@@ -118,6 +119,18 @@ public class MemoryRecords extends AbstractRecords {
}
/**
+ * Validates the header of the first batch and returns batch size.
+ * @return first batch size including LOG_OVERHEAD if buffer contains
header up to
+ * magic byte, null otherwise
+ * @throws CorruptRecordException if record size or magic is invalid
+ */
+ public Integer firstBatchSize() {
+ if (buffer.remaining() < HEADER_SIZE_UP_TO_MAGIC)
+ return null;
+ return new ByteBufferLogInputStream(buffer,
Integer.MAX_VALUE).nextBatchSize();
+ }
+
+ /**
* Filter the records into the provided ByteBuffer.
*
* @param partition The partition that is filtered (used
only for logging)
diff --git
a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
index e1409e0..61d8a00 100644
---
a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.common.record;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.header.internals.RecordHeaders;
import
org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention;
import org.apache.kafka.common.utils.Utils;
@@ -794,6 +795,47 @@ public class MemoryRecordsTest {
}
}
+ @Test
+ public void testNextBatchSize() {
+ ByteBuffer buffer = ByteBuffer.allocate(2048);
+ MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic,
compression,
+ TimestampType.LOG_APPEND_TIME, 0L, logAppendTime, pid, epoch,
firstSequence);
+ builder.append(10L, null, "abc".getBytes());
+ builder.close();
+
+ buffer.flip();
+ int size = buffer.remaining();
+ MemoryRecords records = MemoryRecords.readableRecords(buffer);
+ assertEquals(size, records.firstBatchSize().intValue());
+ assertEquals(0, buffer.position());
+
+ buffer.limit(1); // size not in buffer
+ assertEquals(null, records.firstBatchSize());
+ buffer.limit(Records.LOG_OVERHEAD); // magic not in buffer
+ assertEquals(null, records.firstBatchSize());
+ buffer.limit(Records.HEADER_SIZE_UP_TO_MAGIC); // payload not in buffer
+ assertEquals(size, records.firstBatchSize().intValue());
+
+ buffer.limit(size);
+ byte magic = buffer.get(Records.MAGIC_OFFSET);
+ buffer.put(Records.MAGIC_OFFSET, (byte) 10);
+ try {
+ records.firstBatchSize();
+ fail("Did not fail with corrupt magic");
+ } catch (CorruptRecordException e) {
+ // Expected exception
+ }
+ buffer.put(Records.MAGIC_OFFSET, magic);
+
+ buffer.put(Records.SIZE_OFFSET + 3, (byte) 0);
+ try {
+ records.firstBatchSize();
+ fail("Did not fail with corrupt size");
+ } catch (CorruptRecordException e) {
+ // Expected exception
+ }
+ }
+
@Parameterized.Parameters(name = "{index} magic={0}, firstOffset={1},
compressionType={2}")
public static Collection<Object[]> data() {
List<Object[]> values = new ArrayList<>();
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala
b/core/src/main/scala/kafka/log/LogCleaner.scala
index ee31274..aa7cfe2 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -32,7 +32,7 @@ import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.ConfigException
-import org.apache.kafka.common.errors.KafkaStorageException
+import org.apache.kafka.common.errors.{CorruptRecordException,
KafkaStorageException}
import org.apache.kafka.common.record.MemoryRecords.RecordFilter
import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention
@@ -621,13 +621,46 @@ private[log] class Cleaner(val id: Int,
}
// if we read bytes but didn't get even one complete batch, our I/O
buffer is too small, grow it and try again
- // `result.bytesRead` contains bytes from the `messagesRead` and any
discarded batches.
+ // `result.bytesRead` contains bytes from `messagesRead` and any
discarded batches.
if (readBuffer.limit() > 0 && result.bytesRead == 0)
- growBuffers(maxLogMessageSize)
+ growBuffersOrFail(sourceRecords, position, maxLogMessageSize, records)
}
restoreBuffers()
}
+
+ /**
+ * Grow buffers to process next batch of records from `sourceRecords.`
Buffers are doubled in size
+ * up to a maximum of `maxLogMessageSize`. In some scenarios, a record could
be bigger than the
+ * current maximum size configured for the log. For example:
+ * 1. A compacted topic using compression may contain a message set
slightly larger than max.message.bytes
+ * 2. max.message.bytes of a topic could have been reduced after writing
larger messages
+ * In these cases, grow the buffer to hold the next batch.
+ */
+ private def growBuffersOrFail(sourceRecords: FileRecords,
+ position: Int,
+ maxLogMessageSize: Int,
+ memoryRecords: MemoryRecords): Unit = {
+
+ val maxSize = if (readBuffer.capacity >= maxLogMessageSize) {
+ val nextBatchSize = memoryRecords.firstBatchSize
+ val logDesc = s"log segment ${sourceRecords.file} at position $position"
+ if (nextBatchSize == null)
+ throw new IllegalStateException(s"Could not determine next batch size
for $logDesc")
+ if (nextBatchSize <= 0)
+ throw new IllegalStateException(s"Invalid batch size $nextBatchSize
for $logDesc")
+ if (nextBatchSize <= readBuffer.capacity)
+ throw new IllegalStateException(s"Batch size $nextBatchSize < buffer
size ${readBuffer.capacity}, but not processed for $logDesc")
+ val bytesLeft = sourceRecords.channel.size - position
+ if (nextBatchSize > bytesLeft)
+ throw new CorruptRecordException(s"Log segment may be corrupt, batch
size $nextBatchSize > $bytesLeft bytes left in segment for $logDesc")
+ nextBatchSize.intValue
+ } else
+ maxLogMessageSize
+
+ growBuffers(maxSize)
+ }
+
private def shouldDiscardBatch(batch: RecordBatch,
transactionMetadata:
CleanedTransactionMetadata,
retainTxnMarkers: Boolean): Boolean = {
@@ -844,7 +877,7 @@ private[log] class Cleaner(val id: Int,
// if we didn't read even one complete message, our read buffer may be
too small
if(position == startPosition)
- growBuffers(maxLogMessageSize)
+ growBuffersOrFail(segment.log, position, maxLogMessageSize, records)
}
restoreBuffers()
false
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index edc1744..537c561 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -17,7 +17,7 @@
package kafka.log
-import java.io.File
+import java.io.{File, RandomAccessFile}
import java.nio._
import java.nio.file.Paths
import java.util.Properties
@@ -26,6 +26,7 @@ import kafka.common._
import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
import kafka.utils._
import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.CorruptRecordException
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.Utils
import org.junit.Assert._
@@ -500,6 +501,78 @@ class LogCleanerTest extends JUnitSuite {
assertEquals(shouldRemain, keysInLog(log))
}
+ /**
+ * Test log cleaning with logs containing messages larger than topic's max
message size
+ */
+ @Test
+ def testMessageLargerThanMaxMessageSize() {
+ val (log, offsetMap) =
createLogWithMessagesLargerThanMaxSize(largeMessageSize = 1024 * 1024)
+
+ val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024)
+ cleaner.cleanSegments(log, Seq(log.logSegments.head), offsetMap, 0L, new
CleanerStats)
+ val shouldRemain = keysInLog(log).filter(k =>
!offsetMap.map.containsKey(k.toString))
+ assertEquals(shouldRemain, keysInLog(log))
+ }
+
+ /**
+ * Test log cleaning with logs containing messages larger than topic's max
message size
+ * where header is corrupt
+ */
+ @Test
+ def testMessageLargerThanMaxMessageSizeWithCorruptHeader() {
+ val (log, offsetMap) =
createLogWithMessagesLargerThanMaxSize(largeMessageSize = 1024 * 1024)
+ val file = new RandomAccessFile(log.logSegments.head.log.file, "rw")
+ file.seek(Records.MAGIC_OFFSET)
+ file.write(0xff)
+ file.close()
+
+ val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024)
+ intercept[CorruptRecordException] {
+ cleaner.cleanSegments(log, Seq(log.logSegments.head), offsetMap, 0L, new
CleanerStats)
+ }
+ }
+
+ /**
+ * Test log cleaning with logs containing messages larger than topic's max
message size
+ * where message size is corrupt and larger than bytes available in log
segment.
+ */
+ @Test
+ def testCorruptMessageSizeLargerThanBytesAvailable() {
+ val (log, offsetMap) =
createLogWithMessagesLargerThanMaxSize(largeMessageSize = 1024 * 1024)
+ val file = new RandomAccessFile(log.logSegments.head.log.file, "rw")
+ file.setLength(1024)
+ file.close()
+
+ val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024)
+ intercept[CorruptRecordException] {
+ cleaner.cleanSegments(log, Seq(log.logSegments.head), offsetMap, 0L, new
CleanerStats)
+ }
+ }
+
+ def createLogWithMessagesLargerThanMaxSize(largeMessageSize: Int): (Log,
FakeOffsetMap) = {
+ val logProps = new Properties()
+ logProps.put(LogConfig.SegmentBytesProp, largeMessageSize * 16:
java.lang.Integer)
+ logProps.put(LogConfig.MaxMessageBytesProp, largeMessageSize * 2:
java.lang.Integer)
+
+ val log = makeLog(config = LogConfig.fromProps(logConfig.originals,
logProps))
+
+ while(log.numberOfSegments < 2)
+ log.appendAsLeader(record(log.logEndOffset.toInt,
Array.fill(largeMessageSize)(0: Byte)), leaderEpoch = 0)
+ val keysFound = keysInLog(log)
+ assertEquals(0L until log.logEndOffset, keysFound)
+
+ // Decrease the log's max message size
+ logProps.put(LogConfig.MaxMessageBytesProp, largeMessageSize / 2:
java.lang.Integer)
+ log.config = LogConfig.fromProps(logConfig.originals, logProps)
+
+ // pretend we have the following keys
+ val keys = immutable.ListSet(1, 3, 5, 7, 9)
+ val map = new FakeOffsetMap(Int.MaxValue)
+ keys.foreach(k => map.put(key(k), Long.MaxValue))
+
+ (log, map)
+ }
+
@Test
def testCleaningWithDeletes(): Unit = {
val cleaner = makeCleaner(Int.MaxValue)
--
To stop receiving notification emails like this one, please contact
[email protected].