This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.1 by this push:
new 962eef0 KAFKA-6854; Handle batches deleted during log cleaning of
logs with txns (#4962)
962eef0 is described below
commit 962eef014bc56e30a308933ac220afeffd52918b
Author: Rajini Sivaram <[email protected]>
AuthorDate: Thu May 3 21:05:36 2018 +0100
KAFKA-6854; Handle batches deleted during log cleaning of logs with txns
(#4962)
Log cleaner grows buffers when result.messagesRead is zero. This contains
the number of filtered messages read from source which can be zero when
transactions are used because batches may be discarded. Log cleaner incorrectly
assumes that messages were not read because the buffer was too small and
attempts to double the buffer size unnecessarily, failing with an exception if
the buffer is already max.message.bytes. Additional check for discarded batches
has been added to avoid growing [...]
Reviewers: Ismael Juma <[email protected]>, Jason Gustafson
<[email protected]>
---
.../apache/kafka/common/record/MemoryRecords.java | 4 ++-
core/src/main/scala/kafka/log/LogCleaner.scala | 5 ++--
.../test/scala/unit/kafka/log/LogCleanerTest.scala | 33 ++++++++++++++++++++++
3 files changed, 39 insertions(+), 3 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index 932d4b6..e37fabe 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -145,7 +145,7 @@ public class MemoryRecords extends AbstractRecords {
long maxOffset = -1L;
long shallowOffsetOfMaxTimestamp = -1L;
int messagesRead = 0;
- int bytesRead = 0;
+ int bytesRead = 0; // bytes processed from `batches`
int messagesRetained = 0;
int bytesRetained = 0;
@@ -359,6 +359,8 @@ public class MemoryRecords extends AbstractRecords {
public final long maxTimestamp;
public final long shallowOffsetOfMaxTimestamp;
+ // Note that `bytesRead` should contain only bytes from batches that
have been processed,
+ // i.e. bytes from `messagesRead` and any discarded batches.
public FilterResult(ByteBuffer output,
int messagesRead,
int bytesRead,
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala
b/core/src/main/scala/kafka/log/LogCleaner.scala
index 658b853..f5a070d 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -622,8 +622,9 @@ private[log] class Cleaner(val id: Int,
throttler.maybeThrottle(outputBuffer.limit())
}
- // if we read bytes but didn't get even one complete message, our I/O
buffer is too small, grow it and try again
- if (readBuffer.limit() > 0 && result.messagesRead == 0)
+ // if we read bytes but didn't get even one complete batch, our I/O
buffer is too small, grow it and try again
+ // `result.bytesRead` contains bytes from the `messagesRead` and any
discarded batches.
+ if (readBuffer.limit() > 0 && result.bytesRead == 0)
growBuffers(maxLogMessageSize)
}
restoreBuffers()
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index c12f617..ae949bf 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -300,6 +300,39 @@ class LogCleanerTest extends JUnitSuite {
assertEquals(List(4, 5, 6, 7, 8), offsetsInLog(log))
}
+ /**
+ * Tests log cleaning with batches that are deleted where no additional
messages
+ * are available to read in the buffer. Cleaning should continue from the
next offset.
+ */
+ @Test
+ def testDeletedBatchesWithNoMessagesRead(): Unit = {
+ val tp = new TopicPartition("test", 0)
+ val cleaner = makeCleaner(capacity = Int.MaxValue, maxMessageSize = 100)
+ val logProps = new Properties()
+ logProps.put(LogConfig.MaxMessageBytesProp, 100: java.lang.Integer)
+ logProps.put(LogConfig.SegmentBytesProp, 1000: java.lang.Integer)
+ val log = makeLog(config = LogConfig.fromProps(logConfig.originals,
logProps))
+
+ val producerEpoch = 0.toShort
+ val producerId = 1L
+ val appendProducer = appendTransactionalAsLeader(log, producerId,
producerEpoch)
+
+ appendProducer(Seq(1))
+ log.appendAsLeader(abortMarker(producerId, producerEpoch), leaderEpoch =
0, isFromClient = false)
+ appendProducer(Seq(2))
+ appendProducer(Seq(2))
+ log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch =
0, isFromClient = false)
+ log.roll()
+
+ cleaner.doClean(LogToClean(tp, log, 0L, 100L), deleteHorizonMs =
Long.MaxValue)
+ assertEquals(List(2), keysInLog(log))
+ assertEquals(List(1, 3, 4), offsetsInLog(log))
+
+ cleaner.doClean(LogToClean(tp, log, 0L, 100L), deleteHorizonMs =
Long.MaxValue)
+ assertEquals(List(2), keysInLog(log))
+ assertEquals(List(3, 4), offsetsInLog(log))
+ }
+
@Test
def testCommitMarkerRetentionWithEmptyBatch(): Unit = {
val tp = new TopicPartition("test", 0)
--
To stop receiving notification emails like this one, please contact
[email protected].