This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6dbd9b5 KAFKA-6854; Handle batches deleted during log cleaning of
logs with txns (#4962)
6dbd9b5 is described below
commit 6dbd9b59e60eff821fe677af1c6ed33b96243153
Author: Rajini Sivaram <[email protected]>
AuthorDate: Thu May 3 21:05:36 2018 +0100
KAFKA-6854; Handle batches deleted during log cleaning of logs with txns
(#4962)
Log cleaner grows buffers when result.messagesRead is zero. This contains
the number of filtered messages read from source which can be zero when
transactions are used because batches may be discarded. Log cleaner incorrectly
assumes that messages were not read because the buffer was too small and
attempts to double the buffer size unnecessarily, failing with an exception if
the buffer is already max.message.bytes. Additional check for discarded batches
has been added to avoid growing [...]
Reviewers: Ismael Juma <[email protected]>, Jason Gustafson
<[email protected]>
---
.../apache/kafka/common/record/MemoryRecords.java | 4 ++-
core/src/main/scala/kafka/log/LogCleaner.scala | 5 ++--
.../test/scala/unit/kafka/log/LogCleanerTest.scala | 33 ++++++++++++++++++++++
3 files changed, 39 insertions(+), 3 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index da6b68c..ea6aa4c 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -145,7 +145,7 @@ public class MemoryRecords extends AbstractRecords {
long maxOffset = -1L;
long shallowOffsetOfMaxTimestamp = -1L;
int messagesRead = 0;
- int bytesRead = 0;
+ int bytesRead = 0; // bytes processed from `batches`
int messagesRetained = 0;
int bytesRetained = 0;
@@ -359,6 +359,8 @@ public class MemoryRecords extends AbstractRecords {
public final long maxTimestamp;
public final long shallowOffsetOfMaxTimestamp;
+ // Note that `bytesRead` should contain only bytes from batches that
have been processed,
+ // i.e. bytes from `messagesRead` and any discarded batches.
public FilterResult(ByteBuffer output,
int messagesRead,
int bytesRead,
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala
b/core/src/main/scala/kafka/log/LogCleaner.scala
index 2e32250..ee31274 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -620,8 +620,9 @@ private[log] class Cleaner(val id: Int,
throttler.maybeThrottle(outputBuffer.limit())
}
- // if we read bytes but didn't get even one complete message, our I/O
buffer is too small, grow it and try again
- if (readBuffer.limit() > 0 && result.messagesRead == 0)
+ // if we read bytes but didn't get even one complete batch, our I/O
buffer is too small, grow it and try again
+ // `result.bytesRead` contains bytes from the `messagesRead` and any
discarded batches.
+ if (readBuffer.limit() > 0 && result.bytesRead == 0)
growBuffers(maxLogMessageSize)
}
restoreBuffers()
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 906c26d..edc1744 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -300,6 +300,39 @@ class LogCleanerTest extends JUnitSuite {
assertEquals(List(4, 5, 6, 7, 8), offsetsInLog(log))
}
+ /**
+ * Tests log cleaning with batches that are deleted where no additional
messages
+ * are available to read in the buffer. Cleaning should continue from the
next offset.
+ */
+ @Test
+ def testDeletedBatchesWithNoMessagesRead(): Unit = {
+ val tp = new TopicPartition("test", 0)
+ val cleaner = makeCleaner(capacity = Int.MaxValue, maxMessageSize = 100)
+ val logProps = new Properties()
+ logProps.put(LogConfig.MaxMessageBytesProp, 100: java.lang.Integer)
+ logProps.put(LogConfig.SegmentBytesProp, 1000: java.lang.Integer)
+ val log = makeLog(config = LogConfig.fromProps(logConfig.originals,
logProps))
+
+ val producerEpoch = 0.toShort
+ val producerId = 1L
+ val appendProducer = appendTransactionalAsLeader(log, producerId,
producerEpoch)
+
+ appendProducer(Seq(1))
+ log.appendAsLeader(abortMarker(producerId, producerEpoch), leaderEpoch =
0, isFromClient = false)
+ appendProducer(Seq(2))
+ appendProducer(Seq(2))
+ log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch =
0, isFromClient = false)
+ log.roll()
+
+ cleaner.doClean(LogToClean(tp, log, 0L, 100L), deleteHorizonMs =
Long.MaxValue)
+ assertEquals(List(2), keysInLog(log))
+ assertEquals(List(1, 3, 4), offsetsInLog(log))
+
+ cleaner.doClean(LogToClean(tp, log, 0L, 100L), deleteHorizonMs =
Long.MaxValue)
+ assertEquals(List(2), keysInLog(log))
+ assertEquals(List(3, 4), offsetsInLog(log))
+ }
+
@Test
def testCommitMarkerRetentionWithEmptyBatch(): Unit = {
val tp = new TopicPartition("test", 0)
--
To stop receiving notification emails like this one, please contact
[email protected].