KAFKA-802 Flush message interval is based on compressed message count; reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c5462864 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c5462864 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c5462864 Branch: refs/heads/trunk Commit: c5462864aab05b98158bcbe623123db083b8e136 Parents: dd96761 Author: Neha Narkhede <[email protected]> Authored: Wed Mar 13 11:50:59 2013 -0700 Committer: Neha Narkhede <[email protected]> Committed: Wed Mar 13 11:50:59 2013 -0700 ---------------------------------------------------------------------- core/src/main/scala/kafka/log/Log.scala | 15 +++++++++------ core/src/main/scala/kafka/server/KafkaConfig.scala | 2 +- 2 files changed, 10 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/c5462864/core/src/main/scala/kafka/log/Log.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 34c5376..f6ee475 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -260,7 +260,7 @@ private[kafka] class Log(val dir: File, */ def append(messages: ByteBufferMessageSet, assignOffsets: Boolean = true): (Long, Long) = { val messageSetInfo = analyzeAndValidateMessageSet(messages) - + // if we have any valid messages, append them to the log if(messageSetInfo.count == 0) { (-1L, -1L) @@ -270,7 +270,9 @@ private[kafka] class Log(val dir: File, try { // they are valid, insert them in the log - val offsets = lock synchronized { + val offsetsAndNumAppendedMessages = lock synchronized { + val firstOffset = nextOffset.get + // maybe roll the log if this segment is full val segment = maybeRoll(segments.view.last) @@ -312,16 +314,17 @@ private[kafka] class Log(val dir: File, // advance the log end offset nextOffset.set(offsets._2 + 1) - + val numAppendedMessages = (nextOffset.get - firstOffset).toInt + // return the offset at which the messages were appended - offsets + (offsets._1, offsets._2, numAppendedMessages) } // maybe flush the log and index - maybeFlush(messageSetInfo.count) + maybeFlush(offsetsAndNumAppendedMessages._3) // return the first and last offset - offsets + (offsetsAndNumAppendedMessages._1, offsetsAndNumAppendedMessages._2) } catch { case e: IOException => throw new KafkaStorageException("I/O exception in append to log '%s'".format(name), e) } http://git-wip-us.apache.org/repos/asf/kafka/blob/c5462864/core/src/main/scala/kafka/server/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index b8970c8..549b4b0 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -110,7 +110,7 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro val logIndexIntervalBytes = props.getIntInRange("log.index.interval.bytes", 4096, (0, Int.MaxValue)) /* the number of messages accumulated on a log partition before messages are flushed to disk */ - val logFlushIntervalMessages = props.getIntInRange("log.flush.interval.messages", 500, (1, Int.MaxValue)) + val logFlushIntervalMessages = props.getIntInRange("log.flush.interval.messages", 10000, (1, Int.MaxValue)) /* the maximum time in ms that a message in selected topics is kept in memory before flushed to disk, e.g., topic1:3000,topic2: 6000 */ val logFlushIntervalMsPerTopicMap = props.getMap("log.flush.interval.ms.per.topic", _.toInt > 0).mapValues(_.toInt)
