[ 
https://issues.apache.org/jira/browse/KAFKA-6492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16349179#comment-16349179
 ] 

ASF GitHub Bot commented on KAFKA-6492:
---------------------------------------

becketqin closed pull request #4498: KAFKA-6492: Fix log truncation to empty 
segment
URL: https://github.com/apache/kafka/pull/4498
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/log/LogSegment.scala 
b/core/src/main/scala/kafka/log/LogSegment.scala
index 45c820bff8d..5970f42f6d9 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -28,7 +28,7 @@ import kafka.utils._
 import org.apache.kafka.common.errors.CorruptRecordException
 import org.apache.kafka.common.record.FileRecords.LogOffsetPosition
 import org.apache.kafka.common.record._
-import org.apache.kafka.common.utils.{Time}
+import org.apache.kafka.common.utils.Time
 
 import scala.collection.JavaConverters._
 import scala.math._
@@ -345,20 +345,23 @@ class LogSegment private[log] (val log: FileRecords,
    */
   @nonthreadsafe
   def truncateTo(offset: Long): Int = {
+    // Do offset translation before truncating the index to avoid needless 
scanning
+    // in case we truncate the full index
     val mapping = translateOffset(offset)
-    if (mapping == null)
-      return 0
     offsetIndex.truncateTo(offset)
     timeIndex.truncateTo(offset)
     txnIndex.truncateTo(offset)
-    // after truncation, reset and allocate more space for the (new currently  
active) index
+
+    // After truncation, reset and allocate more space for the (new currently 
active) index
     offsetIndex.resize(offsetIndex.maxIndexSize)
     timeIndex.resize(timeIndex.maxIndexSize)
-    val bytesTruncated = log.truncateTo(mapping.position)
-    if(log.sizeInBytes == 0) {
+
+    val bytesTruncated = if (mapping == null) 0 else 
log.truncateTo(mapping.position)
+    if (log.sizeInBytes == 0) {
       created = time.milliseconds
       rollingBasedTimestamp = None
     }
+
     bytesSinceLastIndexEntry = 0
     if (maxTimestampSoFar >= 0)
       loadLargestTimestamp()
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala 
b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index 469b3cca40e..c45ed0d2986 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -22,7 +22,7 @@ import kafka.utils.TestUtils
 import kafka.utils.TestUtils.checkEquals
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.record._
-import org.apache.kafka.common.utils.{Time, Utils}
+import org.apache.kafka.common.utils.{MockTime, Time, Utils}
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
 
@@ -36,13 +36,16 @@ class LogSegmentTest {
   var logDir: File = _
 
   /* create a segment with the given base offset */
-  def createSegment(offset: Long, indexIntervalBytes: Int = 10): LogSegment = {
+  def createSegment(offset: Long,
+                    indexIntervalBytes: Int = 10,
+                    maxSegmentMs: Int = Int.MaxValue,
+                    time: Time = Time.SYSTEM): LogSegment = {
     val ms = FileRecords.open(Log.logFile(logDir, offset))
     val idx = new OffsetIndex(Log.offsetIndexFile(logDir, offset), offset, 
maxIndexSize = 1000)
     val timeIdx = new TimeIndex(Log.timeIndexFile(logDir, offset), offset, 
maxIndexSize = 1500)
     val txnIndex = new TransactionIndex(offset, 
Log.transactionIndexFile(logDir, offset))
-    val seg = new LogSegment(ms, idx, timeIdx, txnIndex, offset, 
indexIntervalBytes, 0, maxSegmentMs = Int.MaxValue,
-      maxSegmentBytes = Int.MaxValue, Time.SYSTEM)
+    val seg = new LogSegment(ms, idx, timeIdx, txnIndex, offset, 
indexIntervalBytes, 0, maxSegmentMs = maxSegmentMs,
+      maxSegmentBytes = Int.MaxValue, time)
     segments += seg
     seg
   }
@@ -157,6 +160,47 @@ class LogSegmentTest {
     }
   }
 
+  @Test
+  def testTruncateEmptySegment() {
+    // This tests the scenario in which the follower truncates to an empty 
segment. In this
+    // case we must ensure that the index is resized so that the log segment 
is not mistakenly
+    // rolled due to a full index
+
+    val maxSegmentMs = 300000
+    val time = new MockTime
+    val seg = createSegment(0, maxSegmentMs = maxSegmentMs, time = time)
+    seg.close()
+
+    val reopened = createSegment(0, maxSegmentMs = maxSegmentMs, time = time)
+    assertEquals(0, seg.timeIndex.sizeInBytes)
+    assertEquals(0, seg.offsetIndex.sizeInBytes)
+
+    time.sleep(500)
+    reopened.truncateTo(57)
+    assertEquals(0, reopened.timeWaitedForRoll(time.milliseconds(), 
RecordBatch.NO_TIMESTAMP))
+    assertFalse(reopened.timeIndex.isFull)
+    assertFalse(reopened.offsetIndex.isFull)
+
+    assertFalse(reopened.shouldRoll(messagesSize = 1024,
+      maxTimestampInMessages = RecordBatch.NO_TIMESTAMP,
+      maxOffsetInMessages = 100L,
+      now = time.milliseconds()))
+
+    // The segment should not be rolled even if maxSegmentMs has been exceeded
+    time.sleep(maxSegmentMs + 1)
+    assertEquals(maxSegmentMs + 1, 
reopened.timeWaitedForRoll(time.milliseconds(), RecordBatch.NO_TIMESTAMP))
+    assertFalse(reopened.shouldRoll(messagesSize = 1024,
+      maxTimestampInMessages = RecordBatch.NO_TIMESTAMP,
+      maxOffsetInMessages = 100L,
+      now = time.milliseconds()))
+
+    // But we should still roll the segment if we cannot fit the next offset
+    assertTrue(reopened.shouldRoll(messagesSize = 1024,
+      maxTimestampInMessages = RecordBatch.NO_TIMESTAMP,
+      maxOffsetInMessages = Int.MaxValue.toLong + 200,
+      now = time.milliseconds()))
+  }
+
   @Test
   def testReloadLargestTimestampAndNextOffsetAfterTruncation() {
     val numMessages = 30
@@ -183,10 +227,20 @@ class LogSegmentTest {
   @Test
   def testTruncateFull() {
     // test the case where we fully truncate the log
-    val seg = createSegment(40)
+    val time = new MockTime
+    val seg = createSegment(40, time = time)
     seg.append(40, 41, RecordBatch.NO_TIMESTAMP, -1L, records(40, "hello", 
"there"))
+
+    // If the segment is empty after truncation, the create time should be 
reset
+    time.sleep(500)
+    assertEquals(500, seg.timeWaitedForRoll(time.milliseconds(), 
RecordBatch.NO_TIMESTAMP))
+
     seg.truncateTo(0)
+    assertEquals(0, seg.timeWaitedForRoll(time.milliseconds(), 
RecordBatch.NO_TIMESTAMP))
+    assertFalse(seg.timeIndex.isFull)
+    assertFalse(seg.offsetIndex.isFull)
     assertNull("Segment should be empty.", seg.read(0, None, 1024))
+
     seg.append(40, 41, RecordBatch.NO_TIMESTAMP, -1L, records(40, "hello", 
"there"))
   }
 
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala 
b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 2f78ec3cae1..6753939f3d8 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -152,6 +152,28 @@ class LogTest {
     log.appendAsLeader(nextRecords, leaderEpoch = 0)
   }
 
+  @Test
+  def testTruncateToEmptySegment(): Unit = {
+    val log = createLog(logDir, LogConfig())
+
+    // Force a segment roll by using a large offset. The first segment will be 
empty
+    val records = TestUtils.records(List(new 
SimpleRecord(mockTime.milliseconds, "key".getBytes, "value".getBytes)),
+      baseOffset = Int.MaxValue.toLong + 200)
+    appendAsFollower(log, records)
+    assertEquals(0, log.logSegments.head.size)
+    assertEquals(2, log.logSegments.size)
+
+    // Truncate to an offset before the base offset of the latest segment
+    log.truncateTo(0L)
+    assertEquals(1, log.logSegments.size)
+
+    // Now verify that we can still append to the active segment
+    appendAsFollower(log, TestUtils.records(List(new 
SimpleRecord(mockTime.milliseconds, "key".getBytes, "value".getBytes)),
+      baseOffset = 100L))
+    assertEquals(1, log.logSegments.size)
+    assertEquals(101L, log.logEndOffset)
+  }
+
   @Test
   def testInitializationOfProducerSnapshotsUpgradePath(): Unit = {
     // simulate the upgrade path by creating a new log with several segments, 
deleting the


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> LogSemgent.truncateTo() should always resize the index file
> -----------------------------------------------------------
>
>                 Key: KAFKA-6492
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6492
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.10.0.2, 0.10.1.1, 0.10.2.1, 1.0.0, 0.11.0.2
>            Reporter: Jiangjie Qin
>            Priority: Major
>             Fix For: 1.2.0
>
>
> The bug is the following:
>  # Initially on a follower broker there are two segments 0 and segment 10000. 
> Segment 0 is empty (maybe due to log compaction)
>  # log is truncated to 0.
>  # LogSemgent.Truncate() will not find a message to truncate in segment 0, so 
> it will skip resizing the index/timeindex files. 
>  # When a new message is fetched, Log.maybeRoll() will try to roll a new 
> segment because the index file of segment 0 is already full (max size is 0)
>  # After creating the new segment 0, the replica fetcher thread finds that 
> there is already a segment 0 exists. So it just throws exception and dies.
> The fix would be let the broker make sure the index files of active segments 
> are always resized properly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to