Repository: kafka
Updated Branches:
  refs/heads/trunk 8f3462552 -> af9fc503d


KAFKA-4099; Fix the potential frequent log rolling

Author: Jiangjie Qin <[email protected]>

Reviewers: Ismael Juma <[email protected]>, Jun Rao <[email protected]>

Closes #1809 from becketqin/KAFKA-4099


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/af9fc503
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/af9fc503
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/af9fc503

Branch: refs/heads/trunk
Commit: af9fc503dea5058df890fbd79249abb7634e06bc
Parents: 8f34625
Author: Jiangjie Qin <[email protected]>
Authored: Fri Sep 2 12:49:34 2016 -0700
Committer: Jun Rao <[email protected]>
Committed: Fri Sep 2 12:49:34 2016 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/Log.scala         | 11 ++++++----
 core/src/main/scala/kafka/log/LogSegment.scala  | 21 ++++++++++++--------
 .../src/test/scala/unit/kafka/log/LogTest.scala | 19 ++++++++++++------
 docs/upgrade.html                               |  2 +-
 4 files changed, 34 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/af9fc503/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala 
b/core/src/main/scala/kafka/log/Log.scala
index d343d6f..894beab 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -401,7 +401,8 @@ class Log(val dir: File,
         }
 
         // maybe roll the log if this segment is full
-        val segment = maybeRoll(validMessages.sizeInBytes)
+        val segment = maybeRoll(messagesSize = validMessages.sizeInBytes,
+                                maxTimestampInMessages = 
appendInfo.maxTimestamp)
 
         // now append to the log
         segment.append(firstOffset = appendInfo.firstOffset, largestTimestamp 
= appendInfo.maxTimestamp,
@@ -736,6 +737,7 @@ class Log(val dir: File,
    * Roll the log over to a new empty log segment if necessary.
    *
    * @param messagesSize The messages set size in bytes
+   * @param maxTimestampInMessages The maximum timestamp in the messages.
    * logSegment will be rolled if one of the following conditions met
    * <ol>
    * <li> The logSegment is full
@@ -745,16 +747,17 @@ class Log(val dir: File,
    * </ol>
    * @return The currently active segment after (perhaps) rolling to a new 
segment
    */
-  private def maybeRoll(messagesSize: Int): LogSegment = {
+  private def maybeRoll(messagesSize: Int, maxTimestampInMessages: Long): 
LogSegment = {
     val segment = activeSegment
-    val reachedRollMs = segment.timeWaitedForRoll(time.milliseconds) > 
config.segmentMs - segment.rollJitterMs
+    val now = time.milliseconds
+    val reachedRollMs = segment.timeWaitedForRoll(now, maxTimestampInMessages) 
> config.segmentMs - segment.rollJitterMs
     if (segment.size > config.segmentSize - messagesSize ||
         (segment.size > 0 && reachedRollMs) ||
         segment.index.isFull || segment.timeIndex.isFull) {
       debug(s"Rolling new log segment in $name (log_size = 
${segment.size}/${config.segmentSize}}, " +
           s"index_size = ${segment.index.entries}/${segment.index.maxEntries}, 
" +
           s"time_index_size = 
${segment.timeIndex.entries}/${segment.timeIndex.maxEntries}, " +
-          s"inactive_time_ms = 
${segment.timeWaitedForRoll(time.milliseconds)}/${config.segmentMs - 
segment.rollJitterMs}).")
+          s"inactive_time_ms = ${segment.timeWaitedForRoll(now, 
maxTimestampInMessages)}/${config.segmentMs - segment.rollJitterMs}).")
       roll()
     } else {
       segment

http://git-wip-us.apache.org/repos/asf/kafka/blob/af9fc503/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala 
b/core/src/main/scala/kafka/log/LogSegment.scala
index 3d94452..ccc2472 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -339,20 +339,25 @@ class LogSegment(val log: FileMessageSet,
   }
 
   /**
-   * The time this segment has waited to be rolled. If the first message in 
the segment does not have a timestamp,
-   * the time is based on the create time of the segment. Otherwise the time 
is based on the timestamp of that message.
+   * The time this segment has waited to be rolled.
+   * If the first message has a timestamp we use the message timestamp to 
determine when to roll a segment. A segment
+   * is rolled if the difference between the new message's timestamp and the 
first message's timestamp exceeds the
+   * segment rolling time.
+   * If the first message does not have a timestamp, we use the wall clock 
time to determine when to roll a segment. A
+   * segment is rolled if the difference between the current wall clock time 
and the segment create time exceeds the
+   * segment rolling time.
    */
-  def timeWaitedForRoll(now: Long) : Long= {
+  def timeWaitedForRoll(now: Long, messageTimestamp: Long) : Long = {
     // Load the timestamp of the first message into memory
-    if (!rollingBasedTimestamp.isDefined) {
+    if (rollingBasedTimestamp.isEmpty) {
       val iter = log.iterator
       if (iter.hasNext)
         rollingBasedTimestamp = Some(iter.next.message.timestamp)
-      else
-        // If the log is empty, we return time elapsed since the segment is 
created.
-        return now - created
     }
-    now - {if (rollingBasedTimestamp.get >= 0) rollingBasedTimestamp.get else 
created}
+    rollingBasedTimestamp match {
+      case Some(t) if t >= 0 => messageTimestamp - t
+      case _ => now - created
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/af9fc503/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala 
b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 7f6ef6e..4935aae 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -75,6 +75,7 @@ class LogTest extends JUnitSuite {
                       scheduler = time.scheduler,
                       time = time)
     assertEquals("Log begins with a single empty segment.", 1, 
log.numberOfSegments)
+    // Test the segment rolling behavior when messages do not have a timestamp.
     time.sleep(log.config.segmentMs + 1)
     log.append(set)
     assertEquals("Log doesn't roll if doing so creates an empty segment.", 1, 
log.numberOfSegments)
@@ -88,19 +89,25 @@ class LogTest extends JUnitSuite {
       assertEquals("Changing time beyond rollMs and appending should create a 
new segment.", numSegments, log.numberOfSegments)
     }
 
-    time.sleep(log.config.segmentMs + 1)
+    // Append a message with timestamp to a segment whose first messgae do not 
have a timestamp.
     val setWithTimestamp =
       TestUtils.singleMessageSet(payload = "test".getBytes, timestamp = 
time.milliseconds + log.config.segmentMs + 1)
     log.append(setWithTimestamp)
-    assertEquals("A new segment should have been rolled out", 5, 
log.numberOfSegments)
+    assertEquals("Segment should not have been rolled out because the log 
rolling should be based on wall clock.", 4, log.numberOfSegments)
 
+    // Test the segment rolling behavior when messages have timestamps.
     time.sleep(log.config.segmentMs + 1)
-    log.append(set)
-    assertEquals("Log should not roll because the roll should depend on the 
index of the first time index entry.", 5, log.numberOfSegments)
+    log.append(setWithTimestamp)
+    assertEquals("A new segment should have been rolled out", 5, 
log.numberOfSegments)
 
+    // move the wall clock beyond log rolling time
     time.sleep(log.config.segmentMs + 1)
-    log.append(set)
-    assertEquals("Log should roll because the time since the timestamp of 
first time index entry has expired.", 6, log.numberOfSegments)
+    log.append(setWithTimestamp)
+    assertEquals("Log should not roll because the roll should depend on 
timestamp of the first message.", 5, log.numberOfSegments)
+
+    val setWithExpiredTimestamp = TestUtils.singleMessageSet(payload = 
"test".getBytes, timestamp = time.milliseconds)
+    log.append(setWithExpiredTimestamp)
+    assertEquals("Log should roll because the timestamp in the message should 
make the log segment expire.", 6, log.numberOfSegments)
 
     val numSegments = log.numberOfSegments
     time.sleep(log.config.segmentMs + 1)

http://git-wip-us.apache.org/repos/asf/kafka/blob/af9fc503/docs/upgrade.html
----------------------------------------------------------------------
diff --git a/docs/upgrade.html b/docs/upgrade.html
index eef21cf..d4ba71a 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -26,7 +26,7 @@ However, please notice the <a 
href="#upgrade_10_1_breaking">Potential breaking c
 <h5><a id="upgrade_10_1_breaking" href="#upgrade_10_1_breaking">Potential 
breaking changes in 0.10.1.0</a></h5>
 <ul>
     <li> The log retention time is no longer based on last modified time of 
the log segments. Instead it will be based on the largest timestamp of the 
messages in a log segment.</li>
-    <li> The log rolling time is no longer depending on log segment create 
time. Instead it is now based on the timestamp of the first message in a log 
segment. i.e. if the timestamp of the first message in the segment is T, the 
log will be rolled out at T + log.roll.ms </li>
+    <li> The log rolling time is no longer depending on log segment create 
time. Instead it is now based on the timestamp in the messages. More 
specifically. if the timestamp of the first message in the segment is T, the 
log will be rolled out when a new message has a timestamp greater than or equal 
to T + log.roll.ms </li>
     <li> The open file handlers of 0.10.0 will increase by ~33% because of the 
addition of time index files for each segment.</li>
     <li> The time index and offset index share the same index size 
configuration. Since each time index entry is 1.5x the size of offset index 
entry. User may need to increase log.index.size.max.bytes to avoid potential 
frequent log rolling. </li>
     <li> Due to the increased number of index files, on some brokers with 
large amount the log segments (e.g. >15K), the log loading process during the 
broker startup could be longer. Based on our experiment, setting the 
num.recovery.threads.per.data.dir to one may reduce the log loading time. </li>

Reply via email to