Repository: kafka Updated Branches: refs/heads/trunk 8f3462552 -> af9fc503d
KAFKA-4099; Fix the potential frequent log rolling Author: Jiangjie Qin <[email protected]> Reviewers: Ismael Juma <[email protected]>, Jun Rao <[email protected]> Closes #1809 from becketqin/KAFKA-4099 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/af9fc503 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/af9fc503 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/af9fc503 Branch: refs/heads/trunk Commit: af9fc503dea5058df890fbd79249abb7634e06bc Parents: 8f34625 Author: Jiangjie Qin <[email protected]> Authored: Fri Sep 2 12:49:34 2016 -0700 Committer: Jun Rao <[email protected]> Committed: Fri Sep 2 12:49:34 2016 -0700 ---------------------------------------------------------------------- core/src/main/scala/kafka/log/Log.scala | 11 ++++++---- core/src/main/scala/kafka/log/LogSegment.scala | 21 ++++++++++++-------- .../src/test/scala/unit/kafka/log/LogTest.scala | 19 ++++++++++++------ docs/upgrade.html | 2 +- 4 files changed, 34 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/af9fc503/core/src/main/scala/kafka/log/Log.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index d343d6f..894beab 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -401,7 +401,8 @@ class Log(val dir: File, } // maybe roll the log if this segment is full - val segment = maybeRoll(validMessages.sizeInBytes) + val segment = maybeRoll(messagesSize = validMessages.sizeInBytes, + maxTimestampInMessages = appendInfo.maxTimestamp) // now append to the log segment.append(firstOffset = appendInfo.firstOffset, largestTimestamp = appendInfo.maxTimestamp, @@ -736,6 +737,7 @@ class Log(val dir: File, * Roll the log over to a new empty log segment if necessary. * * @param messagesSize The messages set size in bytes + * @param maxTimestampInMessages The maximum timestamp in the messages. * logSegment will be rolled if one of the following conditions met * <ol> * <li> The logSegment is full @@ -745,16 +747,17 @@ class Log(val dir: File, * </ol> * @return The currently active segment after (perhaps) rolling to a new segment */ - private def maybeRoll(messagesSize: Int): LogSegment = { + private def maybeRoll(messagesSize: Int, maxTimestampInMessages: Long): LogSegment = { val segment = activeSegment - val reachedRollMs = segment.timeWaitedForRoll(time.milliseconds) > config.segmentMs - segment.rollJitterMs + val now = time.milliseconds + val reachedRollMs = segment.timeWaitedForRoll(now, maxTimestampInMessages) > config.segmentMs - segment.rollJitterMs if (segment.size > config.segmentSize - messagesSize || (segment.size > 0 && reachedRollMs) || segment.index.isFull || segment.timeIndex.isFull) { debug(s"Rolling new log segment in $name (log_size = ${segment.size}/${config.segmentSize}}, " + s"index_size = ${segment.index.entries}/${segment.index.maxEntries}, " + s"time_index_size = ${segment.timeIndex.entries}/${segment.timeIndex.maxEntries}, " + - s"inactive_time_ms = ${segment.timeWaitedForRoll(time.milliseconds)}/${config.segmentMs - segment.rollJitterMs}).") + s"inactive_time_ms = ${segment.timeWaitedForRoll(now, maxTimestampInMessages)}/${config.segmentMs - segment.rollJitterMs}).") roll() } else { segment http://git-wip-us.apache.org/repos/asf/kafka/blob/af9fc503/core/src/main/scala/kafka/log/LogSegment.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 3d94452..ccc2472 100755 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -339,20 +339,25 @@ class LogSegment(val log: FileMessageSet, } /** - * The time this segment has waited to be rolled. If the first message in the segment does not have a timestamp, - * the time is based on the create time of the segment. Otherwise the time is based on the timestamp of that message. + * The time this segment has waited to be rolled. + * If the first message has a timestamp we use the message timestamp to determine when to roll a segment. A segment + * is rolled if the difference between the new message's timestamp and the first message's timestamp exceeds the + * segment rolling time. + * If the first message does not have a timestamp, we use the wall clock time to determine when to roll a segment. A + * segment is rolled if the difference between the current wall clock time and the segment create time exceeds the + * segment rolling time. */ - def timeWaitedForRoll(now: Long) : Long= { + def timeWaitedForRoll(now: Long, messageTimestamp: Long) : Long = { // Load the timestamp of the first message into memory - if (!rollingBasedTimestamp.isDefined) { + if (rollingBasedTimestamp.isEmpty) { val iter = log.iterator if (iter.hasNext) rollingBasedTimestamp = Some(iter.next.message.timestamp) - else - // If the log is empty, we return time elapsed since the segment is created. - return now - created } - now - {if (rollingBasedTimestamp.get >= 0) rollingBasedTimestamp.get else created} + rollingBasedTimestamp match { + case Some(t) if t >= 0 => messageTimestamp - t + case _ => now - created + } } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/af9fc503/core/src/test/scala/unit/kafka/log/LogTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 7f6ef6e..4935aae 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -75,6 +75,7 @@ class LogTest extends JUnitSuite { scheduler = time.scheduler, time = time) assertEquals("Log begins with a single empty segment.", 1, log.numberOfSegments) + // Test the segment rolling behavior when messages do not have a timestamp. time.sleep(log.config.segmentMs + 1) log.append(set) assertEquals("Log doesn't roll if doing so creates an empty segment.", 1, log.numberOfSegments) @@ -88,19 +89,25 @@ class LogTest extends JUnitSuite { assertEquals("Changing time beyond rollMs and appending should create a new segment.", numSegments, log.numberOfSegments) } - time.sleep(log.config.segmentMs + 1) + // Append a message with timestamp to a segment whose first messgae do not have a timestamp. val setWithTimestamp = TestUtils.singleMessageSet(payload = "test".getBytes, timestamp = time.milliseconds + log.config.segmentMs + 1) log.append(setWithTimestamp) - assertEquals("A new segment should have been rolled out", 5, log.numberOfSegments) + assertEquals("Segment should not have been rolled out because the log rolling should be based on wall clock.", 4, log.numberOfSegments) + // Test the segment rolling behavior when messages have timestamps. time.sleep(log.config.segmentMs + 1) - log.append(set) - assertEquals("Log should not roll because the roll should depend on the index of the first time index entry.", 5, log.numberOfSegments) + log.append(setWithTimestamp) + assertEquals("A new segment should have been rolled out", 5, log.numberOfSegments) + // move the wall clock beyond log rolling time time.sleep(log.config.segmentMs + 1) - log.append(set) - assertEquals("Log should roll because the time since the timestamp of first time index entry has expired.", 6, log.numberOfSegments) + log.append(setWithTimestamp) + assertEquals("Log should not roll because the roll should depend on timestamp of the first message.", 5, log.numberOfSegments) + + val setWithExpiredTimestamp = TestUtils.singleMessageSet(payload = "test".getBytes, timestamp = time.milliseconds) + log.append(setWithExpiredTimestamp) + assertEquals("Log should roll because the timestamp in the message should make the log segment expire.", 6, log.numberOfSegments) val numSegments = log.numberOfSegments time.sleep(log.config.segmentMs + 1) http://git-wip-us.apache.org/repos/asf/kafka/blob/af9fc503/docs/upgrade.html ---------------------------------------------------------------------- diff --git a/docs/upgrade.html b/docs/upgrade.html index eef21cf..d4ba71a 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -26,7 +26,7 @@ However, please notice the <a href="#upgrade_10_1_breaking">Potential breaking c <h5><a id="upgrade_10_1_breaking" href="#upgrade_10_1_breaking">Potential breaking changes in 0.10.1.0</a></h5> <ul> <li> The log retention time is no longer based on last modified time of the log segments. Instead it will be based on the largest timestamp of the messages in a log segment.</li> - <li> The log rolling time is no longer depending on log segment create time. Instead it is now based on the timestamp of the first message in a log segment. i.e. if the timestamp of the first message in the segment is T, the log will be rolled out at T + log.roll.ms </li> + <li> The log rolling time is no longer depending on log segment create time. Instead it is now based on the timestamp in the messages. More specifically. if the timestamp of the first message in the segment is T, the log will be rolled out when a new message has a timestamp greater than or equal to T + log.roll.ms </li> <li> The open file handlers of 0.10.0 will increase by ~33% because of the addition of time index files for each segment.</li> <li> The time index and offset index share the same index size configuration. Since each time index entry is 1.5x the size of offset index entry. User may need to increase log.index.size.max.bytes to avoid potential frequent log rolling. </li> <li> Due to the increased number of index files, on some brokers with large amount the log segments (e.g. >15K), the log loading process during the broker startup could be longer. Based on our experiment, setting the num.recovery.threads.per.data.dir to one may reduce the log loading time. </li>
