This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.3 by this push:
new 8de3084 KAFKA-8764: LogCleanerManager endless loop while
compacting/cleaning (#7932)
8de3084 is described below
commit 8de308491e0a54792de53e430bc242ce52d2ac6d
Author: Tomislav <[email protected]>
AuthorDate: Tue Jan 14 23:03:44 2020 +0100
KAFKA-8764: LogCleanerManager endless loop while compacting/cleaning (#7932)
This fix makes the LogCleaner tolerant of gaps in the offset sequence.
Previously, this could lead to endless loops of cleaning which required manual
intervention.
Reviewers: Jun Rao <[email protected]>, David Arthur <[email protected]>
---
core/src/main/scala/kafka/log/LogCleaner.scala | 15 +++++++-
.../test/scala/unit/kafka/log/LogCleanerTest.scala | 43 ++++++++++++++++++++++
2 files changed, 56 insertions(+), 2 deletions(-)
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala
b/core/src/main/scala/kafka/log/LogCleaner.scala
index 47abae5..beb3a47 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -37,6 +37,7 @@ import org.apache.kafka.common.utils.Time
import scala.collection.JavaConverters._
import scala.collection.{Iterable, Set, mutable}
+import scala.collection.mutable.ListBuffer
import scala.util.control.ControlThrowable
/**
@@ -865,6 +866,11 @@ private[log] class Cleaner(val id: Int,
stats: CleanerStats) {
map.clear()
val dirty = log.logSegments(start, end).toBuffer
+ val nextSegmentStartOffsets = new ListBuffer[Long]
+ if (dirty.nonEmpty) {
+ for (nextSegment <- dirty.tail)
nextSegmentStartOffsets.append(nextSegment.baseOffset)
+ nextSegmentStartOffsets.append(end)
+ }
info("Building offset map for log %s for %d segments in offset range [%d,
%d).".format(log.name, dirty.size, start, end))
val transactionMetadata = new CleanedTransactionMetadata
@@ -874,10 +880,10 @@ private[log] class Cleaner(val id: Int,
// Add all the cleanable dirty segments. We must take at least map.slots *
load_factor,
// but we may be able to fit more (if there is lots of duplication in the
dirty section of the log)
var full = false
- for (segment <- dirty if !full) {
+ for ( (segment, nextSegmentStartOffset) <-
dirty.zip(nextSegmentStartOffsets) if !full) {
checkDone(log.topicPartition)
- full = buildOffsetMapForSegment(log.topicPartition, segment, map, start,
log.config.maxMessageSize,
+ full = buildOffsetMapForSegment(log.topicPartition, segment, map, start,
nextSegmentStartOffset, log.config.maxMessageSize,
transactionMetadata, stats)
if (full)
debug("Offset map is full, %d segments fully mapped, segment with base
offset %d is partially mapped".format(dirty.indexOf(segment),
segment.baseOffset))
@@ -898,6 +904,7 @@ private[log] class Cleaner(val id: Int,
segment: LogSegment,
map: OffsetMap,
startOffset: Long,
+ nextSegmentStartOffset: Long,
maxLogMessageSize: Int,
transactionMetadata:
CleanedTransactionMetadata,
stats: CleanerStats): Boolean = {
@@ -951,6 +958,10 @@ private[log] class Cleaner(val id: Int,
if(position == startPosition)
growBuffersOrFail(segment.log, position, maxLogMessageSize, records)
}
+
+ // In the case of offsets gap, fast forward to latest expected offset in
this segment.
+ map.updateLatestOffset(nextSegmentStartOffset - 1L)
+
restoreBuffers()
false
}
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index ed19f34..aa0fb96 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -1554,6 +1554,49 @@ class LogCleanerTest {
assertEquals("The tombstone should be retained.", 1,
log.logSegments.head.log.batches.iterator.next().lastOffset)
}
+ /**
+ * Verify that the clean is able to move beyond missing offsets records in
dirty log
+ */
+ @Test
+ def testCleaningBeyondMissingOffsets(): Unit = {
+ val logProps = new Properties()
+ logProps.put(LogConfig.SegmentBytesProp, 1024*1024: java.lang.Integer)
+ logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
+ val logConfig = LogConfig(logProps)
+ val cleaner = makeCleaner(Int.MaxValue)
+
+ {
+ val log = makeLog(dir = TestUtils.randomPartitionLogDir(tmpdir), config
= logConfig)
+ writeToLog(log, (0 to 9) zip (0 to 9), (0L to 9L))
+ // roll new segment with baseOffset 11, leaving previous with holes in
offset range [9,10]
+ log.roll(Some(11L))
+
+ // active segment record
+ log.appendAsFollower(messageWithOffset(1015, 1015, 11L))
+
+ val (nextDirtyOffset, _) = cleaner.clean(LogToClean(log.topicPartition,
log, 0L, log.activeSegment.baseOffset, needCompactionNow = true))
+ assertEquals("Cleaning point should pass offset gap",
log.activeSegment.baseOffset, nextDirtyOffset)
+ }
+
+
+ {
+ val log = makeLog(dir = TestUtils.randomPartitionLogDir(tmpdir), config
= logConfig)
+ writeToLog(log, (0 to 9) zip (0 to 9), (0L to 9L))
+ // roll new segment with baseOffset 15, leaving previous with holes in
offset rage [10, 14]
+ log.roll(Some(15L))
+
+ writeToLog(log, (15 to 24) zip (15 to 24), (15L to 24L))
+ // roll new segment with baseOffset 30, leaving previous with holes in
offset range [25, 29]
+ log.roll(Some(30L))
+
+ // active segment record
+ log.appendAsFollower(messageWithOffset(1015, 1015, 30L))
+
+ val (nextDirtyOffset, _) = cleaner.clean(LogToClean(log.topicPartition,
log, 0L, log.activeSegment.baseOffset, needCompactionNow = true))
+ assertEquals("Cleaning point should pass offset gap in multiple
segments", log.activeSegment.baseOffset, nextDirtyOffset)
+ }
+ }
+
private def writeToLog(log: Log, keysAndValues: Iterable[(Int, Int)],
offsetSeq: Iterable[Long]): Iterable[Long] = {
for(((key, value), offset) <- keysAndValues.zip(offsetSeq))
yield log.appendAsFollower(messageWithOffset(key, value,
offset)).lastOffset