[ https://issues.apache.org/jira/browse/KAFKA-6492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16349179#comment-16349179 ]
ASF GitHub Bot commented on KAFKA-6492: --------------------------------------- becketqin closed pull request #4498: KAFKA-6492: Fix log truncation to empty segment URL: https://github.com/apache/kafka/pull/4498 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 45c820bff8d..5970f42f6d9 100755 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -28,7 +28,7 @@ import kafka.utils._ import org.apache.kafka.common.errors.CorruptRecordException import org.apache.kafka.common.record.FileRecords.LogOffsetPosition import org.apache.kafka.common.record._ -import org.apache.kafka.common.utils.{Time} +import org.apache.kafka.common.utils.Time import scala.collection.JavaConverters._ import scala.math._ @@ -345,20 +345,23 @@ class LogSegment private[log] (val log: FileRecords, */ @nonthreadsafe def truncateTo(offset: Long): Int = { + // Do offset translation before truncating the index to avoid needless scanning + // in case we truncate the full index val mapping = translateOffset(offset) - if (mapping == null) - return 0 offsetIndex.truncateTo(offset) timeIndex.truncateTo(offset) txnIndex.truncateTo(offset) - // after truncation, reset and allocate more space for the (new currently active) index + + // After truncation, reset and allocate more space for the (new currently active) index offsetIndex.resize(offsetIndex.maxIndexSize) timeIndex.resize(timeIndex.maxIndexSize) - val bytesTruncated = log.truncateTo(mapping.position) - if(log.sizeInBytes == 0) { + + val bytesTruncated = if (mapping == null) 0 else log.truncateTo(mapping.position) + if (log.sizeInBytes == 0) { created = time.milliseconds rollingBasedTimestamp = None } + bytesSinceLastIndexEntry = 0 if (maxTimestampSoFar >= 0) loadLargestTimestamp() diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala index 469b3cca40e..c45ed0d2986 100644 --- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala @@ -22,7 +22,7 @@ import kafka.utils.TestUtils import kafka.utils.TestUtils.checkEquals import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.record._ -import org.apache.kafka.common.utils.{Time, Utils} +import org.apache.kafka.common.utils.{MockTime, Time, Utils} import org.junit.Assert._ import org.junit.{After, Before, Test} @@ -36,13 +36,16 @@ class LogSegmentTest { var logDir: File = _ /* create a segment with the given base offset */ - def createSegment(offset: Long, indexIntervalBytes: Int = 10): LogSegment = { + def createSegment(offset: Long, + indexIntervalBytes: Int = 10, + maxSegmentMs: Int = Int.MaxValue, + time: Time = Time.SYSTEM): LogSegment = { val ms = FileRecords.open(Log.logFile(logDir, offset)) val idx = new OffsetIndex(Log.offsetIndexFile(logDir, offset), offset, maxIndexSize = 1000) val timeIdx = new TimeIndex(Log.timeIndexFile(logDir, offset), offset, maxIndexSize = 1500) val txnIndex = new TransactionIndex(offset, Log.transactionIndexFile(logDir, offset)) - val seg = new LogSegment(ms, idx, timeIdx, txnIndex, offset, indexIntervalBytes, 0, maxSegmentMs = Int.MaxValue, - maxSegmentBytes = Int.MaxValue, Time.SYSTEM) + val seg = new LogSegment(ms, idx, timeIdx, txnIndex, offset, indexIntervalBytes, 0, maxSegmentMs = maxSegmentMs, + maxSegmentBytes = Int.MaxValue, time) segments += seg seg } @@ -157,6 +160,47 @@ class LogSegmentTest { } } + @Test + def testTruncateEmptySegment() { + // This tests the scenario in which the follower truncates to an empty segment. In this + // case we must ensure that the index is resized so that the log segment is not mistakenly + // rolled due to a full index + + val maxSegmentMs = 300000 + val time = new MockTime + val seg = createSegment(0, maxSegmentMs = maxSegmentMs, time = time) + seg.close() + + val reopened = createSegment(0, maxSegmentMs = maxSegmentMs, time = time) + assertEquals(0, seg.timeIndex.sizeInBytes) + assertEquals(0, seg.offsetIndex.sizeInBytes) + + time.sleep(500) + reopened.truncateTo(57) + assertEquals(0, reopened.timeWaitedForRoll(time.milliseconds(), RecordBatch.NO_TIMESTAMP)) + assertFalse(reopened.timeIndex.isFull) + assertFalse(reopened.offsetIndex.isFull) + + assertFalse(reopened.shouldRoll(messagesSize = 1024, + maxTimestampInMessages = RecordBatch.NO_TIMESTAMP, + maxOffsetInMessages = 100L, + now = time.milliseconds())) + + // The segment should not be rolled even if maxSegmentMs has been exceeded + time.sleep(maxSegmentMs + 1) + assertEquals(maxSegmentMs + 1, reopened.timeWaitedForRoll(time.milliseconds(), RecordBatch.NO_TIMESTAMP)) + assertFalse(reopened.shouldRoll(messagesSize = 1024, + maxTimestampInMessages = RecordBatch.NO_TIMESTAMP, + maxOffsetInMessages = 100L, + now = time.milliseconds())) + + // But we should still roll the segment if we cannot fit the next offset + assertTrue(reopened.shouldRoll(messagesSize = 1024, + maxTimestampInMessages = RecordBatch.NO_TIMESTAMP, + maxOffsetInMessages = Int.MaxValue.toLong + 200, + now = time.milliseconds())) + } + @Test def testReloadLargestTimestampAndNextOffsetAfterTruncation() { val numMessages = 30 @@ -183,10 +227,20 @@ class LogSegmentTest { @Test def testTruncateFull() { // test the case where we fully truncate the log - val seg = createSegment(40) + val time = new MockTime + val seg = createSegment(40, time = time) seg.append(40, 41, RecordBatch.NO_TIMESTAMP, -1L, records(40, "hello", "there")) + + // If the segment is empty after truncation, the create time should be reset + time.sleep(500) + assertEquals(500, seg.timeWaitedForRoll(time.milliseconds(), RecordBatch.NO_TIMESTAMP)) + seg.truncateTo(0) + assertEquals(0, seg.timeWaitedForRoll(time.milliseconds(), RecordBatch.NO_TIMESTAMP)) + assertFalse(seg.timeIndex.isFull) + assertFalse(seg.offsetIndex.isFull) assertNull("Segment should be empty.", seg.read(0, None, 1024)) + seg.append(40, 41, RecordBatch.NO_TIMESTAMP, -1L, records(40, "hello", "there")) } diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 2f78ec3cae1..6753939f3d8 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -152,6 +152,28 @@ class LogTest { log.appendAsLeader(nextRecords, leaderEpoch = 0) } + @Test + def testTruncateToEmptySegment(): Unit = { + val log = createLog(logDir, LogConfig()) + + // Force a segment roll by using a large offset. The first segment will be empty + val records = TestUtils.records(List(new SimpleRecord(mockTime.milliseconds, "key".getBytes, "value".getBytes)), + baseOffset = Int.MaxValue.toLong + 200) + appendAsFollower(log, records) + assertEquals(0, log.logSegments.head.size) + assertEquals(2, log.logSegments.size) + + // Truncate to an offset before the base offset of the latest segment + log.truncateTo(0L) + assertEquals(1, log.logSegments.size) + + // Now verify that we can still append to the active segment + appendAsFollower(log, TestUtils.records(List(new SimpleRecord(mockTime.milliseconds, "key".getBytes, "value".getBytes)), + baseOffset = 100L)) + assertEquals(1, log.logSegments.size) + assertEquals(101L, log.logEndOffset) + } + @Test def testInitializationOfProducerSnapshotsUpgradePath(): Unit = { // simulate the upgrade path by creating a new log with several segments, deleting the ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > LogSemgent.truncateTo() should always resize the index file > ----------------------------------------------------------- > > Key: KAFKA-6492 > URL: https://issues.apache.org/jira/browse/KAFKA-6492 > Project: Kafka > Issue Type: Bug > Components: core > Affects Versions: 0.10.0.2, 0.10.1.1, 0.10.2.1, 1.0.0, 0.11.0.2 > Reporter: Jiangjie Qin > Priority: Major > Fix For: 1.2.0 > > > The bug is the following: > # Initially on a follower broker there are two segments 0 and segment 10000. > Segment 0 is empty (maybe due to log compaction) > # log is truncated to 0. > # LogSemgent.Truncate() will not find a message to truncate in segment 0, so > it will skip resizing the index/timeindex files. > # When a new message is fetched, Log.maybeRoll() will try to roll a new > segment because the index file of segment 0 is already full (max size is 0) > # After creating the new segment 0, the replica fetcher thread finds that > there is already a segment 0 exists. So it just throws exception and dies. > The fix would be let the broker make sure the index files of active segments > are always resized properly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)