This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch 1.1 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.1 by this push: new 1fcf67f KAFKA-6492; Fix log truncation to empty segment 1fcf67f is described below commit 1fcf67f5fb21999ae345ed0f6c72416a0a38ba89 Author: Jason Gustafson <ja...@confluent.io> AuthorDate: Thu Feb 1 11:57:52 2018 -0800 KAFKA-6492; Fix log truncation to empty segment This patch ensures that truncation to an empty segment forces resizing of the index file in order to prevent premature rolling. I have added unit tests which verify that appends are permitted following truncation to an empty segment. Without the fix, this test case reproduces the failure in which the rolled segment matches the current active segment. Author: Jason Gustafson <ja...@confluent.io> Reviewers: Ismael Juma <ism...@juma.me.uk>, Jiangjie (Becket) Qin <becket....@gmail.com> Closes #4498 from hachikuji/KAFKA-6492 --- core/src/main/scala/kafka/log/LogSegment.scala | 15 +++-- .../test/scala/unit/kafka/log/LogSegmentTest.scala | 64 ++++++++++++++++++++-- core/src/test/scala/unit/kafka/log/LogTest.scala | 22 ++++++++ 3 files changed, 90 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 45c820b..5970f42 100755 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -28,7 +28,7 @@ import kafka.utils._ import org.apache.kafka.common.errors.CorruptRecordException import org.apache.kafka.common.record.FileRecords.LogOffsetPosition import org.apache.kafka.common.record._ -import org.apache.kafka.common.utils.{Time} +import org.apache.kafka.common.utils.Time import scala.collection.JavaConverters._ import scala.math._ @@ -345,20 +345,23 @@ class LogSegment private[log] (val log: FileRecords, */ @nonthreadsafe def truncateTo(offset: Long): Int = { + // Do offset translation before truncating the index to avoid needless scanning + // in case we truncate the full index val mapping = translateOffset(offset) - if (mapping == null) - return 0 offsetIndex.truncateTo(offset) timeIndex.truncateTo(offset) txnIndex.truncateTo(offset) - // after truncation, reset and allocate more space for the (new currently active) index + + // After truncation, reset and allocate more space for the (new currently active) index offsetIndex.resize(offsetIndex.maxIndexSize) timeIndex.resize(timeIndex.maxIndexSize) - val bytesTruncated = log.truncateTo(mapping.position) - if(log.sizeInBytes == 0) { + + val bytesTruncated = if (mapping == null) 0 else log.truncateTo(mapping.position) + if (log.sizeInBytes == 0) { created = time.milliseconds rollingBasedTimestamp = None } + bytesSinceLastIndexEntry = 0 if (maxTimestampSoFar >= 0) loadLargestTimestamp() diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala index 469b3cc..c45ed0d 100644 --- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala @@ -22,7 +22,7 @@ import kafka.utils.TestUtils import kafka.utils.TestUtils.checkEquals import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.record._ -import org.apache.kafka.common.utils.{Time, Utils} +import org.apache.kafka.common.utils.{MockTime, Time, Utils} import org.junit.Assert._ import org.junit.{After, Before, Test} @@ -36,13 +36,16 @@ class LogSegmentTest { var logDir: File = _ /* create a segment with the given base offset */ - def createSegment(offset: Long, indexIntervalBytes: Int = 10): LogSegment = { + def createSegment(offset: Long, + indexIntervalBytes: Int = 10, + maxSegmentMs: Int = Int.MaxValue, + time: Time = Time.SYSTEM): LogSegment = { val ms = FileRecords.open(Log.logFile(logDir, offset)) val idx = new OffsetIndex(Log.offsetIndexFile(logDir, offset), offset, maxIndexSize = 1000) val timeIdx = new TimeIndex(Log.timeIndexFile(logDir, offset), offset, maxIndexSize = 1500) val txnIndex = new TransactionIndex(offset, Log.transactionIndexFile(logDir, offset)) - val seg = new LogSegment(ms, idx, timeIdx, txnIndex, offset, indexIntervalBytes, 0, maxSegmentMs = Int.MaxValue, - maxSegmentBytes = Int.MaxValue, Time.SYSTEM) + val seg = new LogSegment(ms, idx, timeIdx, txnIndex, offset, indexIntervalBytes, 0, maxSegmentMs = maxSegmentMs, + maxSegmentBytes = Int.MaxValue, time) segments += seg seg } @@ -158,6 +161,47 @@ class LogSegmentTest { } @Test + def testTruncateEmptySegment() { + // This tests the scenario in which the follower truncates to an empty segment. In this + // case we must ensure that the index is resized so that the log segment is not mistakenly + // rolled due to a full index + + val maxSegmentMs = 300000 + val time = new MockTime + val seg = createSegment(0, maxSegmentMs = maxSegmentMs, time = time) + seg.close() + + val reopened = createSegment(0, maxSegmentMs = maxSegmentMs, time = time) + assertEquals(0, seg.timeIndex.sizeInBytes) + assertEquals(0, seg.offsetIndex.sizeInBytes) + + time.sleep(500) + reopened.truncateTo(57) + assertEquals(0, reopened.timeWaitedForRoll(time.milliseconds(), RecordBatch.NO_TIMESTAMP)) + assertFalse(reopened.timeIndex.isFull) + assertFalse(reopened.offsetIndex.isFull) + + assertFalse(reopened.shouldRoll(messagesSize = 1024, + maxTimestampInMessages = RecordBatch.NO_TIMESTAMP, + maxOffsetInMessages = 100L, + now = time.milliseconds())) + + // The segment should not be rolled even if maxSegmentMs has been exceeded + time.sleep(maxSegmentMs + 1) + assertEquals(maxSegmentMs + 1, reopened.timeWaitedForRoll(time.milliseconds(), RecordBatch.NO_TIMESTAMP)) + assertFalse(reopened.shouldRoll(messagesSize = 1024, + maxTimestampInMessages = RecordBatch.NO_TIMESTAMP, + maxOffsetInMessages = 100L, + now = time.milliseconds())) + + // But we should still roll the segment if we cannot fit the next offset + assertTrue(reopened.shouldRoll(messagesSize = 1024, + maxTimestampInMessages = RecordBatch.NO_TIMESTAMP, + maxOffsetInMessages = Int.MaxValue.toLong + 200, + now = time.milliseconds())) + } + + @Test def testReloadLargestTimestampAndNextOffsetAfterTruncation() { val numMessages = 30 val seg = createSegment(40, 2 * records(0, "hello").sizeInBytes - 1) @@ -183,10 +227,20 @@ class LogSegmentTest { @Test def testTruncateFull() { // test the case where we fully truncate the log - val seg = createSegment(40) + val time = new MockTime + val seg = createSegment(40, time = time) seg.append(40, 41, RecordBatch.NO_TIMESTAMP, -1L, records(40, "hello", "there")) + + // If the segment is empty after truncation, the create time should be reset + time.sleep(500) + assertEquals(500, seg.timeWaitedForRoll(time.milliseconds(), RecordBatch.NO_TIMESTAMP)) + seg.truncateTo(0) + assertEquals(0, seg.timeWaitedForRoll(time.milliseconds(), RecordBatch.NO_TIMESTAMP)) + assertFalse(seg.timeIndex.isFull) + assertFalse(seg.offsetIndex.isFull) assertNull("Segment should be empty.", seg.read(0, None, 1024)) + seg.append(40, 41, RecordBatch.NO_TIMESTAMP, -1L, records(40, "hello", "there")) } diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 2f78ec3..6753939 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -153,6 +153,28 @@ class LogTest { } @Test + def testTruncateToEmptySegment(): Unit = { + val log = createLog(logDir, LogConfig()) + + // Force a segment roll by using a large offset. The first segment will be empty + val records = TestUtils.records(List(new SimpleRecord(mockTime.milliseconds, "key".getBytes, "value".getBytes)), + baseOffset = Int.MaxValue.toLong + 200) + appendAsFollower(log, records) + assertEquals(0, log.logSegments.head.size) + assertEquals(2, log.logSegments.size) + + // Truncate to an offset before the base offset of the latest segment + log.truncateTo(0L) + assertEquals(1, log.logSegments.size) + + // Now verify that we can still append to the active segment + appendAsFollower(log, TestUtils.records(List(new SimpleRecord(mockTime.milliseconds, "key".getBytes, "value".getBytes)), + baseOffset = 100L)) + assertEquals(1, log.logSegments.size) + assertEquals(101L, log.logEndOffset) + } + + @Test def testInitializationOfProducerSnapshotsUpgradePath(): Unit = { // simulate the upgrade path by creating a new log with several segments, deleting the // snapshot files, and then reloading the log -- To stop receiving notification emails like this one, please contact j...@apache.org.