[ 
https://issues.apache.org/jira/browse/KAFKA-6834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16468693#comment-16468693
 ] 

ASF GitHub Bot commented on KAFKA-6834:
---------------------------------------

rajinisivaram closed pull request #4953: KAFKA-6834: Handle compaction with 
batches bigger than max.message.bytes
URL: https://github.com/apache/kafka/pull/4953
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java
 
b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java
index 22f417f8dda..7f91f266158 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java
@@ -21,6 +21,7 @@
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
+import static org.apache.kafka.common.record.Records.HEADER_SIZE_UP_TO_MAGIC;
 import static org.apache.kafka.common.record.Records.LOG_OVERHEAD;
 import static org.apache.kafka.common.record.Records.MAGIC_OFFSET;
 import static org.apache.kafka.common.record.Records.SIZE_OFFSET;
@@ -40,9 +41,33 @@
 
     public MutableRecordBatch nextBatch() throws IOException {
         int remaining = buffer.remaining();
-        if (remaining < LOG_OVERHEAD)
+
+        Integer batchSize = nextBatchSize();
+        if (batchSize == null || remaining < batchSize)
             return null;
 
+        byte magic = buffer.get(buffer.position() + MAGIC_OFFSET);
+
+        ByteBuffer batchSlice = buffer.slice();
+        batchSlice.limit(batchSize);
+        buffer.position(buffer.position() + batchSize);
+
+        if (magic > RecordBatch.MAGIC_VALUE_V1)
+            return new DefaultRecordBatch(batchSlice);
+        else
+            return new 
AbstractLegacyRecordBatch.ByteBufferLegacyRecordBatch(batchSlice);
+    }
+
+    /**
+     * Validates the header of the next batch and returns batch size.
+     * @return next batch size including LOG_OVERHEAD if buffer contains 
header up to
+     *         magic byte, null otherwise
+     * @throws CorruptRecordException if record size or magic is invalid
+     */
+    Integer nextBatchSize() throws CorruptRecordException {
+        int remaining = buffer.remaining();
+        if (remaining < LOG_OVERHEAD)
+            return null;
         int recordSize = buffer.getInt(buffer.position() + SIZE_OFFSET);
         // V0 has the smallest overhead, stricter checking is done later
         if (recordSize < LegacyRecord.RECORD_OVERHEAD_V0)
@@ -52,23 +77,13 @@ public MutableRecordBatch nextBatch() throws IOException {
             throw new CorruptRecordException(String.format("Record size %d 
exceeds the largest allowable message size (%d).",
                     recordSize, maxMessageSize));
 
-        int batchSize = recordSize + LOG_OVERHEAD;
-        if (remaining < batchSize)
+        if (remaining < HEADER_SIZE_UP_TO_MAGIC)
             return null;
 
         byte magic = buffer.get(buffer.position() + MAGIC_OFFSET);
-
-        ByteBuffer batchSlice = buffer.slice();
-        batchSlice.limit(batchSize);
-        buffer.position(buffer.position() + batchSize);
-
         if (magic < 0 || magic > RecordBatch.CURRENT_MAGIC_VALUE)
             throw new CorruptRecordException("Invalid magic found in record: " 
+ magic);
 
-        if (magic > RecordBatch.MAGIC_VALUE_V1)
-            return new DefaultRecordBatch(batchSlice);
-        else
-            return new 
AbstractLegacyRecordBatch.ByteBufferLegacyRecordBatch(batchSlice);
+        return recordSize + LOG_OVERHEAD;
     }
-
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java 
b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index ea6aa4ce3a9..eb4e31b6e58 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -18,6 +18,7 @@
 
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.CorruptRecordException;
 import 
org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention;
 import org.apache.kafka.common.utils.ByteBufferOutputStream;
 import org.apache.kafka.common.utils.CloseableIterator;
@@ -117,6 +118,18 @@ public int validBytes() {
         return downConvert(batches(), toMagic, firstOffset, time);
     }
 
+    /**
+     * Validates the header of the first batch and returns batch size.
+     * @return first batch size including LOG_OVERHEAD if buffer contains 
header up to
+     *         magic byte, null otherwise
+     * @throws CorruptRecordException if record size or magic is invalid
+     */
+    public Integer firstBatchSize() {
+        if (buffer.remaining() < HEADER_SIZE_UP_TO_MAGIC)
+            return null;
+        return new ByteBufferLogInputStream(buffer, 
Integer.MAX_VALUE).nextBatchSize();
+    }
+
     /**
      * Filter the records into the provided ByteBuffer.
      *
diff --git 
a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java 
b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
index e1409e052f6..61d8a00865b 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.common.record;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.CorruptRecordException;
 import org.apache.kafka.common.header.internals.RecordHeaders;
 import 
org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention;
 import org.apache.kafka.common.utils.Utils;
@@ -794,6 +795,47 @@ public void testFilterToPreservesLogAppendTime() {
         }
     }
 
+    @Test
+    public void testNextBatchSize() {
+        ByteBuffer buffer = ByteBuffer.allocate(2048);
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, 
compression,
+                TimestampType.LOG_APPEND_TIME, 0L, logAppendTime, pid, epoch, 
firstSequence);
+        builder.append(10L, null, "abc".getBytes());
+        builder.close();
+
+        buffer.flip();
+        int size = buffer.remaining();
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+        assertEquals(size, records.firstBatchSize().intValue());
+        assertEquals(0, buffer.position());
+
+        buffer.limit(1); // size not in buffer
+        assertEquals(null, records.firstBatchSize());
+        buffer.limit(Records.LOG_OVERHEAD); // magic not in buffer
+        assertEquals(null, records.firstBatchSize());
+        buffer.limit(Records.HEADER_SIZE_UP_TO_MAGIC); // payload not in buffer
+        assertEquals(size, records.firstBatchSize().intValue());
+
+        buffer.limit(size);
+        byte magic = buffer.get(Records.MAGIC_OFFSET);
+        buffer.put(Records.MAGIC_OFFSET, (byte) 10);
+        try {
+            records.firstBatchSize();
+            fail("Did not fail with corrupt magic");
+        } catch (CorruptRecordException e) {
+            // Expected exception
+        }
+        buffer.put(Records.MAGIC_OFFSET, magic);
+
+        buffer.put(Records.SIZE_OFFSET + 3, (byte) 0);
+        try {
+            records.firstBatchSize();
+            fail("Did not fail with corrupt size");
+        } catch (CorruptRecordException e) {
+            // Expected exception
+        }
+    }
+
     @Parameterized.Parameters(name = "{index} magic={0}, firstOffset={1}, 
compressionType={2}")
     public static Collection<Object[]> data() {
         List<Object[]> values = new ArrayList<>();
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala 
b/core/src/main/scala/kafka/log/LogCleaner.scala
index ee31274f6c9..aa7cfe276c4 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -32,7 +32,7 @@ import org.apache.kafka.common.record._
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.config.ConfigException
-import org.apache.kafka.common.errors.KafkaStorageException
+import org.apache.kafka.common.errors.{CorruptRecordException, 
KafkaStorageException}
 import org.apache.kafka.common.record.MemoryRecords.RecordFilter
 import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention
 
@@ -621,13 +621,46 @@ private[log] class Cleaner(val id: Int,
       }
 
       // if we read bytes but didn't get even one complete batch, our I/O 
buffer is too small, grow it and try again
-      // `result.bytesRead` contains bytes from the `messagesRead` and any 
discarded batches.
+      // `result.bytesRead` contains bytes from `messagesRead` and any 
discarded batches.
       if (readBuffer.limit() > 0 && result.bytesRead == 0)
-        growBuffers(maxLogMessageSize)
+        growBuffersOrFail(sourceRecords, position, maxLogMessageSize, records)
     }
     restoreBuffers()
   }
 
+
+  /**
+   * Grow buffers to process next batch of records from `sourceRecords.` 
Buffers are doubled in size
+   * up to a maximum of `maxLogMessageSize`. In some scenarios, a record could 
be bigger than the
+   * current maximum size configured for the log. For example:
+   *   1. A compacted topic using compression may contain a message set 
slightly larger than max.message.bytes
+   *   2. max.message.bytes of a topic could have been reduced after writing 
larger messages
+   * In these cases, grow the buffer to hold the next batch.
+   */
+  private def growBuffersOrFail(sourceRecords: FileRecords,
+                                position: Int,
+                                maxLogMessageSize: Int,
+                                memoryRecords: MemoryRecords): Unit = {
+
+    val maxSize = if (readBuffer.capacity >= maxLogMessageSize) {
+      val nextBatchSize = memoryRecords.firstBatchSize
+      val logDesc = s"log segment ${sourceRecords.file} at position $position"
+      if (nextBatchSize == null)
+        throw new IllegalStateException(s"Could not determine next batch size 
for $logDesc")
+      if (nextBatchSize <= 0)
+        throw new IllegalStateException(s"Invalid batch size $nextBatchSize 
for $logDesc")
+      if (nextBatchSize <= readBuffer.capacity)
+        throw new IllegalStateException(s"Batch size $nextBatchSize < buffer 
size ${readBuffer.capacity}, but not processed for $logDesc")
+      val bytesLeft = sourceRecords.channel.size - position
+      if (nextBatchSize > bytesLeft)
+        throw new CorruptRecordException(s"Log segment may be corrupt, batch 
size $nextBatchSize > $bytesLeft bytes left in segment for $logDesc")
+      nextBatchSize.intValue
+    } else
+      maxLogMessageSize
+
+    growBuffers(maxSize)
+  }
+
   private def shouldDiscardBatch(batch: RecordBatch,
                                  transactionMetadata: 
CleanedTransactionMetadata,
                                  retainTxnMarkers: Boolean): Boolean = {
@@ -844,7 +877,7 @@ private[log] class Cleaner(val id: Int,
 
       // if we didn't read even one complete message, our read buffer may be 
too small
       if(position == startPosition)
-        growBuffers(maxLogMessageSize)
+        growBuffersOrFail(segment.log, position, maxLogMessageSize, records)
     }
     restoreBuffers()
     false
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index edc1744bfca..537c561b387 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -17,7 +17,7 @@
 
 package kafka.log
 
-import java.io.File
+import java.io.{File, RandomAccessFile}
 import java.nio._
 import java.nio.file.Paths
 import java.util.Properties
@@ -26,6 +26,7 @@ import kafka.common._
 import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
 import kafka.utils._
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.CorruptRecordException
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.utils.Utils
 import org.junit.Assert._
@@ -500,6 +501,78 @@ class LogCleanerTest extends JUnitSuite {
     assertEquals(shouldRemain, keysInLog(log))
   }
 
+  /**
+   * Test log cleaning with logs containing messages larger than topic's max 
message size
+   */
+  @Test
+  def testMessageLargerThanMaxMessageSize() {
+    val (log, offsetMap) = 
createLogWithMessagesLargerThanMaxSize(largeMessageSize = 1024 * 1024)
+
+    val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024)
+    cleaner.cleanSegments(log, Seq(log.logSegments.head), offsetMap, 0L, new 
CleanerStats)
+    val shouldRemain = keysInLog(log).filter(k => 
!offsetMap.map.containsKey(k.toString))
+    assertEquals(shouldRemain, keysInLog(log))
+  }
+
+  /**
+   * Test log cleaning with logs containing messages larger than topic's max 
message size
+   * where header is corrupt
+   */
+  @Test
+  def testMessageLargerThanMaxMessageSizeWithCorruptHeader() {
+    val (log, offsetMap) = 
createLogWithMessagesLargerThanMaxSize(largeMessageSize = 1024 * 1024)
+    val file = new RandomAccessFile(log.logSegments.head.log.file, "rw")
+    file.seek(Records.MAGIC_OFFSET)
+    file.write(0xff)
+    file.close()
+
+    val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024)
+    intercept[CorruptRecordException] {
+      cleaner.cleanSegments(log, Seq(log.logSegments.head), offsetMap, 0L, new 
CleanerStats)
+    }
+  }
+
+  /**
+   * Test log cleaning with logs containing messages larger than topic's max 
message size
+   * where message size is corrupt and larger than bytes available in log 
segment.
+   */
+  @Test
+  def testCorruptMessageSizeLargerThanBytesAvailable() {
+    val (log, offsetMap) = 
createLogWithMessagesLargerThanMaxSize(largeMessageSize = 1024 * 1024)
+    val file = new RandomAccessFile(log.logSegments.head.log.file, "rw")
+    file.setLength(1024)
+    file.close()
+
+    val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024)
+    intercept[CorruptRecordException] {
+      cleaner.cleanSegments(log, Seq(log.logSegments.head), offsetMap, 0L, new 
CleanerStats)
+    }
+  }
+
+  def createLogWithMessagesLargerThanMaxSize(largeMessageSize: Int): (Log, 
FakeOffsetMap) = {
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, largeMessageSize * 16: 
java.lang.Integer)
+    logProps.put(LogConfig.MaxMessageBytesProp, largeMessageSize * 2: 
java.lang.Integer)
+
+    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, 
logProps))
+
+    while(log.numberOfSegments < 2)
+      log.appendAsLeader(record(log.logEndOffset.toInt, 
Array.fill(largeMessageSize)(0: Byte)), leaderEpoch = 0)
+    val keysFound = keysInLog(log)
+    assertEquals(0L until log.logEndOffset, keysFound)
+
+    // Decrease the log's max message size
+    logProps.put(LogConfig.MaxMessageBytesProp, largeMessageSize / 2: 
java.lang.Integer)
+    log.config = LogConfig.fromProps(logConfig.originals, logProps)
+
+    // pretend we have the following keys
+    val keys = immutable.ListSet(1, 3, 5, 7, 9)
+    val map = new FakeOffsetMap(Int.MaxValue)
+    keys.foreach(k => map.put(key(k), Long.MaxValue))
+
+    (log, map)
+  }
+
   @Test
   def testCleaningWithDeletes(): Unit = {
     val cleaner = makeCleaner(Int.MaxValue)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> log cleaner should handle the case when the size of a message set is larger 
> than the max message size
> -----------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-6834
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6834
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Jun Rao
>            Assignee: Rajini Sivaram
>            Priority: Major
>
> In KAFKA-5316, we added the logic to allow a message (set) larger than the 
> per topic message size to be written to the log during log cleaning. However, 
> the buffer size in the log cleaner is still bounded by the per topic message 
> size. This can cause the log cleaner to die and cause the broker to run out 
> of disk space.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to