This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new 962eef0  KAFKA-6854; Handle batches deleted during log cleaning of 
logs with txns (#4962)
962eef0 is described below

commit 962eef014bc56e30a308933ac220afeffd52918b
Author: Rajini Sivaram <[email protected]>
AuthorDate: Thu May 3 21:05:36 2018 +0100

    KAFKA-6854; Handle batches deleted during log cleaning of logs with txns 
(#4962)
    
    Log cleaner grows buffers when result.messagesRead is zero. This contains 
the number of filtered messages read from source which can be zero when 
transactions are used because batches may be discarded. Log cleaner incorrectly 
assumes that messages were not read because the buffer was too small and 
attempts to double the buffer size unnecessarily, failing with an exception if 
the buffer is already max.message.bytes. Additional check for discarded batches 
has been added to avoid growing [...]
    
    Reviewers: Ismael Juma <[email protected]>, Jason Gustafson 
<[email protected]>
---
 .../apache/kafka/common/record/MemoryRecords.java  |  4 ++-
 core/src/main/scala/kafka/log/LogCleaner.scala     |  5 ++--
 .../test/scala/unit/kafka/log/LogCleanerTest.scala | 33 ++++++++++++++++++++++
 3 files changed, 39 insertions(+), 3 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java 
b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index 932d4b6..e37fabe 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -145,7 +145,7 @@ public class MemoryRecords extends AbstractRecords {
         long maxOffset = -1L;
         long shallowOffsetOfMaxTimestamp = -1L;
         int messagesRead = 0;
-        int bytesRead = 0;
+        int bytesRead = 0; // bytes processed from `batches`
         int messagesRetained = 0;
         int bytesRetained = 0;
 
@@ -359,6 +359,8 @@ public class MemoryRecords extends AbstractRecords {
         public final long maxTimestamp;
         public final long shallowOffsetOfMaxTimestamp;
 
+        // Note that `bytesRead` should contain only bytes from batches that 
have been processed,
+        // i.e. bytes from `messagesRead` and any discarded batches.
         public FilterResult(ByteBuffer output,
                             int messagesRead,
                             int bytesRead,
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala 
b/core/src/main/scala/kafka/log/LogCleaner.scala
index 658b853..f5a070d 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -622,8 +622,9 @@ private[log] class Cleaner(val id: Int,
         throttler.maybeThrottle(outputBuffer.limit())
       }
 
-      // if we read bytes but didn't get even one complete message, our I/O 
buffer is too small, grow it and try again
-      if (readBuffer.limit() > 0 && result.messagesRead == 0)
+      // if we read bytes but didn't get even one complete batch, our I/O 
buffer is too small, grow it and try again
+      // `result.bytesRead` contains bytes from the `messagesRead` and any 
discarded batches.
+      if (readBuffer.limit() > 0 && result.bytesRead == 0)
         growBuffers(maxLogMessageSize)
     }
     restoreBuffers()
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index c12f617..ae949bf 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -300,6 +300,39 @@ class LogCleanerTest extends JUnitSuite {
     assertEquals(List(4, 5, 6, 7, 8), offsetsInLog(log))
   }
 
+  /**
+   * Tests log cleaning with batches that are deleted where no additional 
messages
+   * are available to read in the buffer. Cleaning should continue from the 
next offset.
+   */
+  @Test
+  def testDeletedBatchesWithNoMessagesRead(): Unit = {
+    val tp = new TopicPartition("test", 0)
+    val cleaner = makeCleaner(capacity = Int.MaxValue, maxMessageSize = 100)
+    val logProps = new Properties()
+    logProps.put(LogConfig.MaxMessageBytesProp, 100: java.lang.Integer)
+    logProps.put(LogConfig.SegmentBytesProp, 1000: java.lang.Integer)
+    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, 
logProps))
+
+    val producerEpoch = 0.toShort
+    val producerId = 1L
+    val appendProducer = appendTransactionalAsLeader(log, producerId, 
producerEpoch)
+
+    appendProducer(Seq(1))
+    log.appendAsLeader(abortMarker(producerId, producerEpoch), leaderEpoch = 
0, isFromClient = false)
+    appendProducer(Seq(2))
+    appendProducer(Seq(2))
+    log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 
0, isFromClient = false)
+    log.roll()
+
+    cleaner.doClean(LogToClean(tp, log, 0L, 100L), deleteHorizonMs = 
Long.MaxValue)
+    assertEquals(List(2), keysInLog(log))
+    assertEquals(List(1, 3, 4), offsetsInLog(log))
+
+    cleaner.doClean(LogToClean(tp, log, 0L, 100L), deleteHorizonMs = 
Long.MaxValue)
+    assertEquals(List(2), keysInLog(log))
+    assertEquals(List(3, 4), offsetsInLog(log))
+  }
+
   @Test
   def testCommitMarkerRetentionWithEmptyBatch(): Unit = {
     val tp = new TopicPartition("test", 0)

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to