This is an automated email from the ASF dual-hosted git repository.
jqin pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new afdf9a8 KAFKA-6492; Fix log truncation to empty segment
afdf9a8 is described below
commit afdf9a85101539e2afb2b0355d7a2008b3055780
Author: Jason Gustafson <[email protected]>
AuthorDate: Thu Feb 1 11:57:52 2018 -0800
KAFKA-6492; Fix log truncation to empty segment
This patch ensures that truncation to an empty segment forces resizing of
the index file in order to prevent premature rolling.
I have added unit tests which verify that appends are permitted following
truncation to an empty segment. Without the fix, this test case reproduces the
failure in which the rolled segment matches the current active segment.
Author: Jason Gustafson <[email protected]>
Reviewers: Ismael Juma <[email protected]>, Jiangjie (Becket) Qin
<[email protected]>
Closes #4498 from hachikuji/KAFKA-6492
---
core/src/main/scala/kafka/log/LogSegment.scala | 15 +++--
.../test/scala/unit/kafka/log/LogSegmentTest.scala | 64 ++++++++++++++++++++--
core/src/test/scala/unit/kafka/log/LogTest.scala | 22 ++++++++
3 files changed, 90 insertions(+), 11 deletions(-)
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala
b/core/src/main/scala/kafka/log/LogSegment.scala
index 45c820b..5970f42 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -28,7 +28,7 @@ import kafka.utils._
import org.apache.kafka.common.errors.CorruptRecordException
import org.apache.kafka.common.record.FileRecords.LogOffsetPosition
import org.apache.kafka.common.record._
-import org.apache.kafka.common.utils.{Time}
+import org.apache.kafka.common.utils.Time
import scala.collection.JavaConverters._
import scala.math._
@@ -345,20 +345,23 @@ class LogSegment private[log] (val log: FileRecords,
*/
@nonthreadsafe
def truncateTo(offset: Long): Int = {
+ // Do offset translation before truncating the index to avoid needless
scanning
+ // in case we truncate the full index
val mapping = translateOffset(offset)
- if (mapping == null)
- return 0
offsetIndex.truncateTo(offset)
timeIndex.truncateTo(offset)
txnIndex.truncateTo(offset)
- // after truncation, reset and allocate more space for the (new currently
active) index
+
+ // After truncation, reset and allocate more space for the (new currently
active) index
offsetIndex.resize(offsetIndex.maxIndexSize)
timeIndex.resize(timeIndex.maxIndexSize)
- val bytesTruncated = log.truncateTo(mapping.position)
- if(log.sizeInBytes == 0) {
+
+ val bytesTruncated = if (mapping == null) 0 else
log.truncateTo(mapping.position)
+ if (log.sizeInBytes == 0) {
created = time.milliseconds
rollingBasedTimestamp = None
}
+
bytesSinceLastIndexEntry = 0
if (maxTimestampSoFar >= 0)
loadLargestTimestamp()
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index 469b3cc..c45ed0d 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -22,7 +22,7 @@ import kafka.utils.TestUtils
import kafka.utils.TestUtils.checkEquals
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.record._
-import org.apache.kafka.common.utils.{Time, Utils}
+import org.apache.kafka.common.utils.{MockTime, Time, Utils}
import org.junit.Assert._
import org.junit.{After, Before, Test}
@@ -36,13 +36,16 @@ class LogSegmentTest {
var logDir: File = _
/* create a segment with the given base offset */
- def createSegment(offset: Long, indexIntervalBytes: Int = 10): LogSegment = {
+ def createSegment(offset: Long,
+ indexIntervalBytes: Int = 10,
+ maxSegmentMs: Int = Int.MaxValue,
+ time: Time = Time.SYSTEM): LogSegment = {
val ms = FileRecords.open(Log.logFile(logDir, offset))
val idx = new OffsetIndex(Log.offsetIndexFile(logDir, offset), offset,
maxIndexSize = 1000)
val timeIdx = new TimeIndex(Log.timeIndexFile(logDir, offset), offset,
maxIndexSize = 1500)
val txnIndex = new TransactionIndex(offset,
Log.transactionIndexFile(logDir, offset))
- val seg = new LogSegment(ms, idx, timeIdx, txnIndex, offset,
indexIntervalBytes, 0, maxSegmentMs = Int.MaxValue,
- maxSegmentBytes = Int.MaxValue, Time.SYSTEM)
+ val seg = new LogSegment(ms, idx, timeIdx, txnIndex, offset,
indexIntervalBytes, 0, maxSegmentMs = maxSegmentMs,
+ maxSegmentBytes = Int.MaxValue, time)
segments += seg
seg
}
@@ -158,6 +161,47 @@ class LogSegmentTest {
}
@Test
+ def testTruncateEmptySegment() {
+ // This tests the scenario in which the follower truncates to an empty
segment. In this
+ // case we must ensure that the index is resized so that the log segment
is not mistakenly
+ // rolled due to a full index
+
+ val maxSegmentMs = 300000
+ val time = new MockTime
+ val seg = createSegment(0, maxSegmentMs = maxSegmentMs, time = time)
+ seg.close()
+
+ val reopened = createSegment(0, maxSegmentMs = maxSegmentMs, time = time)
+ assertEquals(0, seg.timeIndex.sizeInBytes)
+ assertEquals(0, seg.offsetIndex.sizeInBytes)
+
+ time.sleep(500)
+ reopened.truncateTo(57)
+ assertEquals(0, reopened.timeWaitedForRoll(time.milliseconds(),
RecordBatch.NO_TIMESTAMP))
+ assertFalse(reopened.timeIndex.isFull)
+ assertFalse(reopened.offsetIndex.isFull)
+
+ assertFalse(reopened.shouldRoll(messagesSize = 1024,
+ maxTimestampInMessages = RecordBatch.NO_TIMESTAMP,
+ maxOffsetInMessages = 100L,
+ now = time.milliseconds()))
+
+ // The segment should not be rolled even if maxSegmentMs has been exceeded
+ time.sleep(maxSegmentMs + 1)
+ assertEquals(maxSegmentMs + 1,
reopened.timeWaitedForRoll(time.milliseconds(), RecordBatch.NO_TIMESTAMP))
+ assertFalse(reopened.shouldRoll(messagesSize = 1024,
+ maxTimestampInMessages = RecordBatch.NO_TIMESTAMP,
+ maxOffsetInMessages = 100L,
+ now = time.milliseconds()))
+
+ // But we should still roll the segment if we cannot fit the next offset
+ assertTrue(reopened.shouldRoll(messagesSize = 1024,
+ maxTimestampInMessages = RecordBatch.NO_TIMESTAMP,
+ maxOffsetInMessages = Int.MaxValue.toLong + 200,
+ now = time.milliseconds()))
+ }
+
+ @Test
def testReloadLargestTimestampAndNextOffsetAfterTruncation() {
val numMessages = 30
val seg = createSegment(40, 2 * records(0, "hello").sizeInBytes - 1)
@@ -183,10 +227,20 @@ class LogSegmentTest {
@Test
def testTruncateFull() {
// test the case where we fully truncate the log
- val seg = createSegment(40)
+ val time = new MockTime
+ val seg = createSegment(40, time = time)
seg.append(40, 41, RecordBatch.NO_TIMESTAMP, -1L, records(40, "hello",
"there"))
+
+ // If the segment is empty after truncation, the create time should be
reset
+ time.sleep(500)
+ assertEquals(500, seg.timeWaitedForRoll(time.milliseconds(),
RecordBatch.NO_TIMESTAMP))
+
seg.truncateTo(0)
+ assertEquals(0, seg.timeWaitedForRoll(time.milliseconds(),
RecordBatch.NO_TIMESTAMP))
+ assertFalse(seg.timeIndex.isFull)
+ assertFalse(seg.offsetIndex.isFull)
assertNull("Segment should be empty.", seg.read(0, None, 1024))
+
seg.append(40, 41, RecordBatch.NO_TIMESTAMP, -1L, records(40, "hello",
"there"))
}
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala
b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 2f78ec3..6753939 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -153,6 +153,28 @@ class LogTest {
}
@Test
+ def testTruncateToEmptySegment(): Unit = {
+ val log = createLog(logDir, LogConfig())
+
+ // Force a segment roll by using a large offset. The first segment will be
empty
+ val records = TestUtils.records(List(new
SimpleRecord(mockTime.milliseconds, "key".getBytes, "value".getBytes)),
+ baseOffset = Int.MaxValue.toLong + 200)
+ appendAsFollower(log, records)
+ assertEquals(0, log.logSegments.head.size)
+ assertEquals(2, log.logSegments.size)
+
+ // Truncate to an offset before the base offset of the latest segment
+ log.truncateTo(0L)
+ assertEquals(1, log.logSegments.size)
+
+ // Now verify that we can still append to the active segment
+ appendAsFollower(log, TestUtils.records(List(new
SimpleRecord(mockTime.milliseconds, "key".getBytes, "value".getBytes)),
+ baseOffset = 100L))
+ assertEquals(1, log.logSegments.size)
+ assertEquals(101L, log.logEndOffset)
+ }
+
+ @Test
def testInitializationOfProducerSnapshotsUpgradePath(): Unit = {
// simulate the upgrade path by creating a new log with several segments,
deleting the
// snapshot files, and then reloading the log
--
To stop receiving notification emails like this one, please contact
[email protected].