Repository: kafka Updated Branches: refs/heads/trunk 6f5930d63 -> 6b0349791
KAFKA-5344; set message.timestamp.difference.max.ms back to Long.MaxValue Author: Jiangjie Qin <[email protected]> Reviewers: Ismael Juma <[email protected]> Closes #3163 from becketqin/KAFKA-5344 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6b034979 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6b034979 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6b034979 Branch: refs/heads/trunk Commit: 6b03497915665bb4823073a5a34b03be709eb287 Parents: 6f5930d Author: Jiangjie Qin <[email protected]> Authored: Tue May 30 15:38:04 2017 +0100 Committer: Ismael Juma <[email protected]> Committed: Tue May 30 15:44:34 2017 +0100 ---------------------------------------------------------------------- core/src/main/scala/kafka/server/KafkaConfig.scala | 16 ++++------------ .../scala/unit/kafka/server/KafkaConfigTest.scala | 1 - docs/upgrade.html | 2 -- 3 files changed, 4 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/6b034979/core/src/main/scala/kafka/server/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index de036a7..6e94043 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -106,7 +106,7 @@ object Defaults { // lazy val as `InterBrokerProtocolVersion` is defined later lazy val LogMessageFormatVersion = InterBrokerProtocolVersion val LogMessageTimestampType = "CreateTime" - val LogMessageTimestampDifferenceMaxMs = LogRetentionHours * 60 * 60 * 1000L + val LogMessageTimestampDifferenceMaxMs = Long.MaxValue val NumRecoveryThreadsPerDataDir = 1 val AutoCreateTopicsEnable = true val MinInSyncReplicas = 1 @@ -514,8 +514,7 @@ object KafkaConfig { val LogMessageTimestampDifferenceMaxMsDoc = "The maximum difference allowed between the timestamp when a broker receives " + "a message and the timestamp specified in the message. If log.message.timestamp.type=CreateTime, a message will be rejected " + "if the difference in timestamp exceeds this threshold. This configuration is ignored if log.message.timestamp.type=LogAppendTime." + - "The maximum timestamp difference allowed should be no greater than log.retention.ms to avoid unnecessarily frequent log rolling. For " + - "this reason, the default is the value of log.retention.ms." + "The maximum timestamp difference allowed should be no greater than log.retention.ms to avoid unnecessarily frequent log rolling." val NumRecoveryThreadsPerDataDirDoc = "The number of threads per data directory to be used for log recovery at startup and flushing at shutdown" val AutoCreateTopicsEnableDoc = "Enable auto creation of topic on the server" val MinInSyncReplicasDoc = "When a producer sets acks to \"all\" (or \"-1\"), " + @@ -747,7 +746,7 @@ object KafkaConfig { .define(MinInSyncReplicasProp, INT, Defaults.MinInSyncReplicas, atLeast(1), HIGH, MinInSyncReplicasDoc) .define(LogMessageFormatVersionProp, STRING, Defaults.LogMessageFormatVersion, MEDIUM, LogMessageFormatVersionDoc) .define(LogMessageTimestampTypeProp, STRING, Defaults.LogMessageTimestampType, in("CreateTime", "LogAppendTime"), MEDIUM, LogMessageTimestampTypeDoc) - .define(LogMessageTimestampDifferenceMaxMsProp, LONG, null, MEDIUM, LogMessageTimestampDifferenceMaxMsDoc) + .define(LogMessageTimestampDifferenceMaxMsProp, LONG, Defaults.LogMessageTimestampDifferenceMaxMs, MEDIUM, LogMessageTimestampDifferenceMaxMsDoc) .define(CreateTopicPolicyClassNameProp, CLASS, null, LOW, CreateTopicPolicyClassNameDoc) /** ********* Replication configuration ***********/ @@ -959,7 +958,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra val logMessageFormatVersionString = getString(KafkaConfig.LogMessageFormatVersionProp) val logMessageFormatVersion = ApiVersion(logMessageFormatVersionString) val logMessageTimestampType = TimestampType.forName(getString(KafkaConfig.LogMessageTimestampTypeProp)) - val logMessageTimestampDifferenceMaxMs = getMessageTimestampDifferenceMaxMs + val logMessageTimestampDifferenceMaxMs: Long = getLong(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp) /** ********* Replication configuration ***********/ val controllerSocketTimeoutMs: Int = getInt(KafkaConfig.ControllerSocketTimeoutMsProp) @@ -1086,13 +1085,6 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra millis } - private def getMessageTimestampDifferenceMaxMs: Long = { - Option(getLong(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp)) match { - case Some(value) => value - case None => getLogRetentionTimeMillis - } - } - private def getMap(propName: String, propValue: String): Map[String, String] = { try { CoreUtils.parseCsvMap(propValue) http://git-wip-us.apache.org/repos/asf/kafka/blob/6b034979/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index bf89533..df8a6d7 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -712,7 +712,6 @@ class KafkaConfigTest { assertEquals(12 * 60L * 1000L * 60, config.logRollTimeMillis) assertEquals(11 * 60L * 1000L * 60, config.logRollTimeJitterMillis) assertEquals(10 * 60L * 1000L * 60, config.logRetentionTimeMillis) - assertEquals(config.logRetentionTimeMillis, config.logMessageTimestampDifferenceMaxMs) assertEquals(123L, config.logFlushIntervalMs) assertEquals(SnappyCompressionCodec, config.offsetsTopicCompressionCodec) assertEquals(Sensor.RecordingLevel.DEBUG.toString, config.metricRecordingLevel) http://git-wip-us.apache.org/repos/asf/kafka/blob/6b034979/docs/upgrade.html ---------------------------------------------------------------------- diff --git a/docs/upgrade.html b/docs/upgrade.html index dab5fa7..2b62a2b 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -60,8 +60,6 @@ <li>The <code>offsets.topic.replication.factor</code> broker config is now enforced upon auto topic creation. Internal auto topic creation will fail with a GROUP_COORDINATOR_NOT_AVAILABLE error until the cluster size meets this replication factor requirement.</li> - <li>By default <code>message.timestamp.difference.max.ms</code> is the same as <code>retention.ms</code> instead of - <code>Long.MAX_VALUE</code>.</li> <li>The broker configuration <code>max.message.bytes</code> now applies to the total size of a batch of messages. Previously the setting applied to batches of compressed messages, or to non-compressed messages individually. In practice, the change is minor since a message batch may consist of only a single message, so the limitation on the size of
