kafka-2249; KafkaConfig does not preserve original Properties; patched by Gwen Shapira; reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5c904074 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5c904074 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5c904074 Branch: refs/heads/trunk Commit: 5c9040745466945a04ea0315de583ccdab0614ac Parents: ba86f0a Author: Gwen Shapira <csh...@gmail.com> Authored: Thu Jun 18 14:07:33 2015 -0700 Committer: Jun Rao <jun...@gmail.com> Committed: Thu Jun 18 14:07:33 2015 -0700 ---------------------------------------------------------------------- .../kafka/common/config/AbstractConfig.java | 12 +- .../main/scala/kafka/cluster/Partition.scala | 2 +- .../kafka/controller/KafkaController.scala | 4 +- .../controller/PartitionLeaderSelector.scala | 2 +- core/src/main/scala/kafka/log/LogConfig.scala | 156 ++--- core/src/main/scala/kafka/log/LogManager.scala | 2 +- .../src/main/scala/kafka/server/KafkaApis.scala | 4 +- .../main/scala/kafka/server/KafkaConfig.scala | 573 +++++-------------- .../main/scala/kafka/server/KafkaServer.scala | 55 +- .../kafka/server/ReplicaFetcherThread.scala | 4 +- .../scala/kafka/server/TopicConfigManager.scala | 5 +- core/src/main/scala/kafka/utils/CoreUtils.scala | 26 - .../test/scala/other/kafka/StressTestLog.scala | 10 +- .../other/kafka/TestLinearWriteSpeed.scala | 7 +- .../unit/kafka/log/BrokerCompressionTest.scala | 7 +- .../test/scala/unit/kafka/log/CleanerTest.scala | 55 +- .../kafka/log/LogCleanerIntegrationTest.scala | 8 +- .../scala/unit/kafka/log/LogConfigTest.scala | 19 +- .../scala/unit/kafka/log/LogManagerTest.scala | 17 +- .../src/test/scala/unit/kafka/log/LogTest.scala | 121 +++- .../kafka/server/DynamicConfigChangeTest.scala | 17 +- .../kafka/server/KafkaConfigConfigDefTest.scala | 20 +- 22 files changed, 444 insertions(+), 682 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/5c904074/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java index c4fa058..bae528d 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java @@ -57,15 +57,19 @@ public class AbstractConfig { return values.get(key); } - public int getInt(String key) { + public Short getShort(String key) { + return (Short) get(key); + } + + public Integer getInt(String key) { return (Integer) get(key); } - public long getLong(String key) { + public Long getLong(String key) { return (Long) get(key); } - public double getDouble(String key) { + public Double getDouble(String key) { return (Double) get(key); } @@ -92,7 +96,7 @@ public class AbstractConfig { return keys; } - public Map<String, ?> originals() { + public Map<String, Object> originals() { Map<String, Object> copy = new HashMap<String, Object>(); copy.putAll(originals); return copy; http://git-wip-us.apache.org/repos/asf/kafka/blob/5c904074/core/src/main/scala/kafka/cluster/Partition.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 730a232..6cb6477 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -86,7 +86,7 @@ class Partition(val topic: String, case Some(replica) => replica case None => if (isReplicaLocal(replicaId)) { - val config = LogConfig.fromProps(logManager.defaultConfig.toProps, AdminUtils.fetchTopicConfig(zkClient, topic)) + val config = LogConfig.fromProps(logManager.defaultConfig.originals, AdminUtils.fetchTopicConfig(zkClient, topic)) val log = logManager.createLog(TopicAndPartition(topic, partitionId), config) val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParentFile.getAbsolutePath) val offsetMap = checkpoint.read http://git-wip-us.apache.org/repos/asf/kafka/blob/5c904074/core/src/main/scala/kafka/controller/KafkaController.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 69bba24..3635057 100755 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -325,7 +325,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt info("starting the partition rebalance scheduler") autoRebalanceScheduler.startup() autoRebalanceScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance, - 5, config.leaderImbalanceCheckIntervalSeconds, TimeUnit.SECONDS) + 5, config.leaderImbalanceCheckIntervalSeconds.toLong, TimeUnit.SECONDS) } deleteTopicManager.start() } @@ -1013,7 +1013,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt // if the replica to be removed from the ISR is the last surviving member of the ISR and unclean leader election // is disallowed for the corresponding topic, then we must preserve the ISR membership so that the replica can // eventually be restored as the leader. - if (newIsr.isEmpty && !LogConfig.fromProps(config.toProps, AdminUtils.fetchTopicConfig(zkClient, + if (newIsr.isEmpty && !LogConfig.fromProps(config.originals, AdminUtils.fetchTopicConfig(zkClient, topicAndPartition.topic)).uncleanLeaderElectionEnable) { info("Retaining last ISR %d of partition %s since unclean leader election is disabled".format(replicaId, topicAndPartition)) newIsr = leaderAndIsr.isr http://git-wip-us.apache.org/repos/asf/kafka/blob/5c904074/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala index 3b15ab4..bb6b5c8 100644 --- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala +++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala @@ -61,7 +61,7 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext, confi case true => // Prior to electing an unclean (i.e. non-ISR) leader, ensure that doing so is not disallowed by the configuration // for unclean leader election. - if (!LogConfig.fromProps(config.toProps, AdminUtils.fetchTopicConfig(controllerContext.zkClient, + if (!LogConfig.fromProps(config.originals, AdminUtils.fetchTopicConfig(controllerContext.zkClient, topicAndPartition.topic)).uncleanLeaderElectionEnable) { throw new NoReplicaOnlineException(("No broker in ISR for partition " + "%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) + http://git-wip-us.apache.org/repos/asf/kafka/blob/5c904074/core/src/main/scala/kafka/log/LogConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala index f64fd79..e9af221 100755 --- a/core/src/main/scala/kafka/log/LogConfig.scala +++ b/core/src/main/scala/kafka/log/LogConfig.scala @@ -18,92 +18,52 @@ package kafka.log import java.util.Properties +import kafka.server.KafkaConfig import org.apache.kafka.common.utils.Utils import scala.collection._ -import org.apache.kafka.common.config.ConfigDef +import org.apache.kafka.common.config.{AbstractConfig, ConfigDef} import kafka.message.BrokerCompressionCodec import kafka.message.Message object Defaults { - val SegmentSize = 1024 * 1024 - val SegmentMs = Long.MaxValue - val SegmentJitterMs = 0L - val FlushInterval = Long.MaxValue - val FlushMs = Long.MaxValue - val RetentionSize = Long.MaxValue - val RetentionMs = Long.MaxValue - val MaxMessageSize = Int.MaxValue - val MaxIndexSize = 1024 * 1024 - val IndexInterval = 4096 - val FileDeleteDelayMs = 60 * 1000L - val DeleteRetentionMs = 24 * 60 * 60 * 1000L - val MinCleanableDirtyRatio = 0.5 - val Compact = false - val UncleanLeaderElectionEnable = true - val MinInSyncReplicas = 1 - val CompressionType = "producer" + val SegmentSize = kafka.server.Defaults.LogSegmentBytes + val SegmentMs = kafka.server.Defaults.LogRollHours * 60 * 60 * 1000L + val SegmentJitterMs = kafka.server.Defaults.LogRollJitterHours * 60 * 60 * 1000L + val FlushInterval = kafka.server.Defaults.LogFlushIntervalMessages + val FlushMs = kafka.server.Defaults.LogFlushSchedulerIntervalMs + val RetentionSize = kafka.server.Defaults.LogRetentionBytes + val RetentionMs = kafka.server.Defaults.LogRetentionHours * 60 * 60 * 1000L + val MaxMessageSize = kafka.server.Defaults.MessageMaxBytes + val MaxIndexSize = kafka.server.Defaults.LogIndexSizeMaxBytes + val IndexInterval = kafka.server.Defaults.LogIndexIntervalBytes + val FileDeleteDelayMs = kafka.server.Defaults.LogDeleteDelayMs + val DeleteRetentionMs = kafka.server.Defaults.LogCleanerDeleteRetentionMs + val MinCleanableDirtyRatio = kafka.server.Defaults.LogCleanerMinCleanRatio + val Compact = kafka.server.Defaults.LogCleanupPolicy + val UncleanLeaderElectionEnable = kafka.server.Defaults.UncleanLeaderElectionEnable + val MinInSyncReplicas = kafka.server.Defaults.MinInSyncReplicas + val CompressionType = kafka.server.Defaults.CompressionType } -/** - * Configuration settings for a log - * @param segmentSize The hard maximum for the size of a segment file in the log - * @param segmentMs The soft maximum on the amount of time before a new log segment is rolled - * @param segmentJitterMs The maximum random jitter subtracted from segmentMs to avoid thundering herds of segment rolling - * @param flushInterval The number of messages that can be written to the log before a flush is forced - * @param flushMs The amount of time the log can have dirty data before a flush is forced - * @param retentionSize The approximate total number of bytes this log can use - * @param retentionMs The approximate maximum age of the last segment that is retained - * @param maxIndexSize The maximum size of an index file - * @param indexInterval The approximate number of bytes between index entries - * @param fileDeleteDelayMs The time to wait before deleting a file from the filesystem - * @param deleteRetentionMs The time to retain delete markers in the log. Only applicable for logs that are being compacted. - * @param minCleanableRatio The ratio of bytes that are available for cleaning to the bytes already cleaned - * @param compact Should old segments in this log be deleted or deduplicated? - * @param uncleanLeaderElectionEnable Indicates whether unclean leader election is enabled - * @param minInSyncReplicas If number of insync replicas drops below this number, we stop accepting writes with -1 (or all) required acks - * @param compressionType compressionType for a given topic - * - */ -case class LogConfig(segmentSize: Int = Defaults.SegmentSize, - segmentMs: Long = Defaults.SegmentMs, - segmentJitterMs: Long = Defaults.SegmentJitterMs, - flushInterval: Long = Defaults.FlushInterval, - flushMs: Long = Defaults.FlushMs, - retentionSize: Long = Defaults.RetentionSize, - retentionMs: Long = Defaults.RetentionMs, - maxMessageSize: Int = Defaults.MaxMessageSize, - maxIndexSize: Int = Defaults.MaxIndexSize, - indexInterval: Int = Defaults.IndexInterval, - fileDeleteDelayMs: Long = Defaults.FileDeleteDelayMs, - deleteRetentionMs: Long = Defaults.DeleteRetentionMs, - minCleanableRatio: Double = Defaults.MinCleanableDirtyRatio, - compact: Boolean = Defaults.Compact, - uncleanLeaderElectionEnable: Boolean = Defaults.UncleanLeaderElectionEnable, - minInSyncReplicas: Int = Defaults.MinInSyncReplicas, - compressionType: String = Defaults.CompressionType) { - - def toProps: Properties = { - val props = new Properties() - import LogConfig._ - props.put(SegmentBytesProp, segmentSize.toString) - props.put(SegmentMsProp, segmentMs.toString) - props.put(SegmentJitterMsProp, segmentJitterMs.toString) - props.put(SegmentIndexBytesProp, maxIndexSize.toString) - props.put(FlushMessagesProp, flushInterval.toString) - props.put(FlushMsProp, flushMs.toString) - props.put(RetentionBytesProp, retentionSize.toString) - props.put(RetentionMsProp, retentionMs.toString) - props.put(MaxMessageBytesProp, maxMessageSize.toString) - props.put(IndexIntervalBytesProp, indexInterval.toString) - props.put(DeleteRetentionMsProp, deleteRetentionMs.toString) - props.put(FileDeleteDelayMsProp, fileDeleteDelayMs.toString) - props.put(MinCleanableDirtyRatioProp, minCleanableRatio.toString) - props.put(CleanupPolicyProp, if(compact) "compact" else "delete") - props.put(UncleanLeaderElectionEnableProp, uncleanLeaderElectionEnable.toString) - props.put(MinInSyncReplicasProp, minInSyncReplicas.toString) - props.put(CompressionTypeProp, compressionType) - props - } +case class LogConfig(props: java.util.Map[_, _]) extends AbstractConfig(LogConfig.configDef, props) { + + val segmentSize = getInt(LogConfig.SegmentBytesProp) + val segmentMs = getLong(LogConfig.SegmentMsProp) + val segmentJitterMs = getLong(LogConfig.SegmentJitterMsProp) + val maxIndexSize = getInt(LogConfig.SegmentIndexBytesProp) + val flushInterval = getLong(LogConfig.FlushMessagesProp) + val flushMs = getLong(LogConfig.FlushMsProp) + val retentionSize = getLong(LogConfig.RetentionBytesProp) + val retentionMs = getLong(LogConfig.RetentionMsProp) + val maxMessageSize = getInt(LogConfig.MaxMessageBytesProp) + val indexInterval = getInt(LogConfig.IndexIntervalBytesProp) + val fileDeleteDelayMs = getLong(LogConfig.FileDeleteDelayMsProp) + val deleteRetentionMs = getLong(LogConfig.DeleteRetentionMsProp) + val minCleanableRatio = getDouble(LogConfig.MinCleanableDirtyRatioProp) + val compact = getString(LogConfig.CleanupPolicyProp).toLowerCase != LogConfig.Delete + val uncleanLeaderElectionEnable = getBoolean(LogConfig.UncleanLeaderElectionEnableProp) + val minInSyncReplicas = getInt(LogConfig.MinInSyncReplicasProp) + val compressionType = getString(LogConfig.CompressionTypeProp).toLowerCase def randomSegmentJitter: Long = if (segmentJitterMs == 0) 0 else Utils.abs(scala.util.Random.nextInt()) % math.min(segmentJitterMs, segmentMs) @@ -111,6 +71,10 @@ case class LogConfig(segmentSize: Int = Defaults.SegmentSize, object LogConfig { + def main(args: Array[String]) { + System.out.println(configDef.toHtmlTable) + } + val Delete = "delete" val Compact = "compact" @@ -179,7 +143,7 @@ object LogConfig { .define(FileDeleteDelayMsProp, LONG, Defaults.FileDeleteDelayMs, atLeast(0), MEDIUM, FileDeleteDelayMsDoc) .define(MinCleanableDirtyRatioProp, DOUBLE, Defaults.MinCleanableDirtyRatio, between(0, 1), MEDIUM, MinCleanableRatioDoc) - .define(CleanupPolicyProp, STRING, if (Defaults.Compact) Compact else Delete, in(Compact, Delete), MEDIUM, + .define(CleanupPolicyProp, STRING, Defaults.Compact, in(Compact, Delete), MEDIUM, CompactDoc) .define(UncleanLeaderElectionEnableProp, BOOLEAN, Defaults.UncleanLeaderElectionEnable, MEDIUM, UncleanLeaderElectionEnableDoc) @@ -187,6 +151,8 @@ object LogConfig { .define(CompressionTypeProp, STRING, Defaults.CompressionType, in(BrokerCompressionCodec.brokerCompressionOptions:_*), MEDIUM, CompressionTypeDoc) } + def apply(): LogConfig = LogConfig(new Properties()) + def configNames() = { import JavaConversions._ configDef.names().toList.sorted @@ -194,37 +160,13 @@ object LogConfig { /** - * Parse the given properties instance into a LogConfig object - */ - def fromProps(props: Properties): LogConfig = { - import kafka.utils.CoreUtils.evaluateDefaults - val parsed = configDef.parse(evaluateDefaults(props)) - new LogConfig(segmentSize = parsed.get(SegmentBytesProp).asInstanceOf[Int], - segmentMs = parsed.get(SegmentMsProp).asInstanceOf[Long], - segmentJitterMs = parsed.get(SegmentJitterMsProp).asInstanceOf[Long], - maxIndexSize = parsed.get(SegmentIndexBytesProp).asInstanceOf[Int], - flushInterval = parsed.get(FlushMessagesProp).asInstanceOf[Long], - flushMs = parsed.get(FlushMsProp).asInstanceOf[Long], - retentionSize = parsed.get(RetentionBytesProp).asInstanceOf[Long], - retentionMs = parsed.get(RetentionMsProp).asInstanceOf[Long], - maxMessageSize = parsed.get(MaxMessageBytesProp).asInstanceOf[Int], - indexInterval = parsed.get(IndexIntervalBytesProp).asInstanceOf[Int], - fileDeleteDelayMs = parsed.get(FileDeleteDelayMsProp).asInstanceOf[Long], - deleteRetentionMs = parsed.get(DeleteRetentionMsProp).asInstanceOf[Long], - minCleanableRatio = parsed.get(MinCleanableDirtyRatioProp).asInstanceOf[Double], - compact = parsed.get(CleanupPolicyProp).asInstanceOf[String].toLowerCase != Delete, - uncleanLeaderElectionEnable = parsed.get(UncleanLeaderElectionEnableProp).asInstanceOf[Boolean], - minInSyncReplicas = parsed.get(MinInSyncReplicasProp).asInstanceOf[Int], - compressionType = parsed.get(CompressionTypeProp).asInstanceOf[String].toLowerCase()) - } - - /** * Create a log config instance using the given properties and defaults */ - def fromProps(defaults: Properties, overrides: Properties): LogConfig = { - val props = new Properties(defaults) + def fromProps(defaults: java.util.Map[_ <: Object, _ <: Object], overrides: Properties): LogConfig = { + val props = new Properties() + props.putAll(defaults) props.putAll(overrides) - fromProps(props) + LogConfig(props) } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/5c904074/core/src/main/scala/kafka/log/LogManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index e781eba..538fc83 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -356,7 +356,7 @@ class LogManager(val logDirs: Array[File], .format(topicAndPartition.topic, topicAndPartition.partition, dataDir.getAbsolutePath, - {import JavaConversions._; config.toProps.mkString(", ")})) + {import JavaConversions._; config.originals.mkString(", ")})) log } } http://git-wip-us.apache.org/repos/asf/kafka/blob/5c904074/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index c7debe4..ad6f058 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -428,9 +428,9 @@ class KafkaApis(val requestChannel: RequestChannel, val aliveBrokers = metadataCache.getAliveBrokers val offsetsTopicReplicationFactor = if (aliveBrokers.length > 0) - Math.min(config.offsetsTopicReplicationFactor, aliveBrokers.length) + Math.min(config.offsetsTopicReplicationFactor.toInt, aliveBrokers.length) else - config.offsetsTopicReplicationFactor + config.offsetsTopicReplicationFactor.toInt AdminUtils.createTopic(zkClient, topic, config.offsetsTopicPartitions, offsetsTopicReplicationFactor, offsetManager.offsetsTopicConfig) http://git-wip-us.apache.org/repos/asf/kafka/blob/5c904074/core/src/main/scala/kafka/server/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 2d75186..e0b2480 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -26,7 +26,7 @@ import kafka.consumer.ConsumerConfig import kafka.message.{BrokerCompressionCodec, CompressionCodec, Message, MessageSet} import kafka.utils.CoreUtils import org.apache.kafka.clients.CommonClientConfigs -import org.apache.kafka.common.config.ConfigDef +import org.apache.kafka.common.config.{ConfigException, AbstractConfig, ConfigDef} import org.apache.kafka.common.metrics.MetricsReporter import org.apache.kafka.common.protocol.SecurityProtocol import scala.collection.{mutable, immutable, JavaConversions, Map} @@ -141,6 +141,10 @@ object Defaults { object KafkaConfig { + def main(args: Array[String]) { + System.out.println(configDef.toHtmlTable) + } + /** ********* Zookeeper Configuration ***********/ val ZkConnectProp = "zookeeper.connect" val ZkSessionTimeoutMsProp = "zookeeper.session.timeout.ms" @@ -482,14 +486,14 @@ object KafkaConfig { .define(ProducerPurgatoryPurgeIntervalRequestsProp, INT, Defaults.ProducerPurgatoryPurgeIntervalRequests, MEDIUM, ProducerPurgatoryPurgeIntervalRequestsDoc) .define(AutoLeaderRebalanceEnableProp, BOOLEAN, Defaults.AutoLeaderRebalanceEnable, HIGH, AutoLeaderRebalanceEnableDoc) .define(LeaderImbalancePerBrokerPercentageProp, INT, Defaults.LeaderImbalancePerBrokerPercentage, HIGH, LeaderImbalancePerBrokerPercentageDoc) - .define(LeaderImbalanceCheckIntervalSecondsProp, INT, Defaults.LeaderImbalanceCheckIntervalSeconds, HIGH, LeaderImbalanceCheckIntervalSecondsDoc) + .define(LeaderImbalanceCheckIntervalSecondsProp, LONG, Defaults.LeaderImbalanceCheckIntervalSeconds, HIGH, LeaderImbalanceCheckIntervalSecondsDoc) .define(UncleanLeaderElectionEnableProp, BOOLEAN, Defaults.UncleanLeaderElectionEnable, HIGH, UncleanLeaderElectionEnableDoc) .define(InterBrokerSecurityProtocolProp, STRING, Defaults.InterBrokerSecurityProtocol, MEDIUM, InterBrokerSecurityProtocolDoc) .define(InterBrokerProtocolVersionProp, STRING, Defaults.InterBrokerProtocolVersion, MEDIUM, InterBrokerProtocolVersionDoc) /** ********* Controlled shutdown configuration ***********/ .define(ControlledShutdownMaxRetriesProp, INT, Defaults.ControlledShutdownMaxRetries, MEDIUM, ControlledShutdownMaxRetriesDoc) - .define(ControlledShutdownRetryBackoffMsProp, INT, Defaults.ControlledShutdownRetryBackoffMs, MEDIUM, ControlledShutdownRetryBackoffMsDoc) + .define(ControlledShutdownRetryBackoffMsProp, LONG, Defaults.ControlledShutdownRetryBackoffMs, MEDIUM, ControlledShutdownRetryBackoffMsDoc) .define(ControlledShutdownEnableProp, BOOLEAN, Defaults.ControlledShutdownEnable, MEDIUM, ControlledShutdownEnableDoc) /** ********* Consumer coordinator configuration ***********/ @@ -520,139 +524,6 @@ object KafkaConfig { } /** - * Parse the given properties instance into a KafkaConfig object - */ - def fromProps(props: Properties): KafkaConfig = { - import kafka.utils.CoreUtils.evaluateDefaults - val parsed = configDef.parse(evaluateDefaults(props)) - new KafkaConfig( - /** ********* Zookeeper Configuration ***********/ - zkConnect = parsed.get(ZkConnectProp).asInstanceOf[String], - zkSessionTimeoutMs = parsed.get(ZkSessionTimeoutMsProp).asInstanceOf[Int], - _zkConnectionTimeoutMs = Option(parsed.get(ZkConnectionTimeoutMsProp)).map(_.asInstanceOf[Int]), - zkSyncTimeMs = parsed.get(ZkSyncTimeMsProp).asInstanceOf[Int], - - /** ********* General Configuration ***********/ - maxReservedBrokerId = parsed.get(MaxReservedBrokerIdProp).asInstanceOf[Int], - brokerId = parsed.get(BrokerIdProp).asInstanceOf[Int], - messageMaxBytes = parsed.get(MessageMaxBytesProp).asInstanceOf[Int], - numNetworkThreads = parsed.get(NumNetworkThreadsProp).asInstanceOf[Int], - numIoThreads = parsed.get(NumIoThreadsProp).asInstanceOf[Int], - backgroundThreads = parsed.get(BackgroundThreadsProp).asInstanceOf[Int], - queuedMaxRequests = parsed.get(QueuedMaxRequestsProp).asInstanceOf[Int], - - /** ********* Socket Server Configuration ***********/ - port = parsed.get(PortProp).asInstanceOf[Int], - hostName = parsed.get(HostNameProp).asInstanceOf[String], - _listeners = Option(parsed.get(ListenersProp)).map(_.asInstanceOf[String]), - _advertisedHostName = Option(parsed.get(AdvertisedHostNameProp)).map(_.asInstanceOf[String]), - _advertisedPort = Option(parsed.get(AdvertisedPortProp)).map(_.asInstanceOf[Int]), - _advertisedListeners = Option(parsed.get(AdvertisedListenersProp)).map(_.asInstanceOf[String]), - socketSendBufferBytes = parsed.get(SocketSendBufferBytesProp).asInstanceOf[Int], - socketReceiveBufferBytes = parsed.get(SocketReceiveBufferBytesProp).asInstanceOf[Int], - socketRequestMaxBytes = parsed.get(SocketRequestMaxBytesProp).asInstanceOf[Int], - maxConnectionsPerIp = parsed.get(MaxConnectionsPerIpProp).asInstanceOf[Int], - _maxConnectionsPerIpOverrides = parsed.get(MaxConnectionsPerIpOverridesProp).asInstanceOf[String], - connectionsMaxIdleMs = parsed.get(ConnectionsMaxIdleMsProp).asInstanceOf[Long], - - /** ********* Log Configuration ***********/ - numPartitions = parsed.get(NumPartitionsProp).asInstanceOf[Int], - _logDir = parsed.get(LogDirProp).asInstanceOf[String], - _logDirs = Option(parsed.get(LogDirsProp)).map(_.asInstanceOf[String]), - - logSegmentBytes = parsed.get(LogSegmentBytesProp).asInstanceOf[Int], - logRollTimeHours = parsed.get(LogRollTimeHoursProp).asInstanceOf[Int], - _logRollTimeMillis = Option(parsed.get(LogRollTimeMillisProp)).map(_.asInstanceOf[Long]), - - logRollTimeJitterHours = parsed.get(LogRollTimeJitterHoursProp).asInstanceOf[Int], - _logRollTimeJitterMillis = Option(parsed.get(LogRollTimeJitterMillisProp)).map(_.asInstanceOf[Long]), - - logRetentionTimeHours = parsed.get(LogRetentionTimeHoursProp).asInstanceOf[Int], - _logRetentionTimeMins = Option(parsed.get(LogRetentionTimeMinutesProp)).map(_.asInstanceOf[Int]), - _logRetentionTimeMillis = Option(parsed.get(LogRetentionTimeMillisProp)).map(_.asInstanceOf[Long]), - - logRetentionBytes = parsed.get(LogRetentionBytesProp).asInstanceOf[Long], - logCleanupIntervalMs = parsed.get(LogCleanupIntervalMsProp).asInstanceOf[Long], - logCleanupPolicy = parsed.get(LogCleanupPolicyProp).asInstanceOf[String], - logCleanerThreads = parsed.get(LogCleanerThreadsProp).asInstanceOf[Int], - logCleanerIoMaxBytesPerSecond = parsed.get(LogCleanerIoMaxBytesPerSecondProp).asInstanceOf[Double], - logCleanerDedupeBufferSize = parsed.get(LogCleanerDedupeBufferSizeProp).asInstanceOf[Long], - logCleanerIoBufferSize = parsed.get(LogCleanerIoBufferSizeProp).asInstanceOf[Int], - logCleanerDedupeBufferLoadFactor = parsed.get(LogCleanerDedupeBufferLoadFactorProp).asInstanceOf[Double], - logCleanerBackoffMs = parsed.get(LogCleanerBackoffMsProp).asInstanceOf[Long], - logCleanerMinCleanRatio = parsed.get(LogCleanerMinCleanRatioProp).asInstanceOf[Double], - logCleanerEnable = parsed.get(LogCleanerEnableProp).asInstanceOf[Boolean], - logCleanerDeleteRetentionMs = parsed.get(LogCleanerDeleteRetentionMsProp).asInstanceOf[Long], - logIndexSizeMaxBytes = parsed.get(LogIndexSizeMaxBytesProp).asInstanceOf[Int], - logIndexIntervalBytes = parsed.get(LogIndexIntervalBytesProp).asInstanceOf[Int], - logFlushIntervalMessages = parsed.get(LogFlushIntervalMessagesProp).asInstanceOf[Long], - logDeleteDelayMs = parsed.get(LogDeleteDelayMsProp).asInstanceOf[Long], - logFlushSchedulerIntervalMs = parsed.get(LogFlushSchedulerIntervalMsProp).asInstanceOf[Long], - _logFlushIntervalMs = Option(parsed.get(LogFlushIntervalMsProp)).map(_.asInstanceOf[Long]), - logFlushOffsetCheckpointIntervalMs = parsed.get(LogFlushOffsetCheckpointIntervalMsProp).asInstanceOf[Int], - numRecoveryThreadsPerDataDir = parsed.get(NumRecoveryThreadsPerDataDirProp).asInstanceOf[Int], - autoCreateTopicsEnable = parsed.get(AutoCreateTopicsEnableProp).asInstanceOf[Boolean], - minInSyncReplicas = parsed.get(MinInSyncReplicasProp).asInstanceOf[Int], - - /** ********* Replication configuration ***********/ - controllerSocketTimeoutMs = parsed.get(ControllerSocketTimeoutMsProp).asInstanceOf[Int], - defaultReplicationFactor = parsed.get(DefaultReplicationFactorProp).asInstanceOf[Int], - replicaLagTimeMaxMs = parsed.get(ReplicaLagTimeMaxMsProp).asInstanceOf[Long], - replicaSocketTimeoutMs = parsed.get(ReplicaSocketTimeoutMsProp).asInstanceOf[Int], - replicaSocketReceiveBufferBytes = parsed.get(ReplicaSocketReceiveBufferBytesProp).asInstanceOf[Int], - replicaFetchMaxBytes = parsed.get(ReplicaFetchMaxBytesProp).asInstanceOf[Int], - replicaFetchWaitMaxMs = parsed.get(ReplicaFetchWaitMaxMsProp).asInstanceOf[Int], - replicaFetchMinBytes = parsed.get(ReplicaFetchMinBytesProp).asInstanceOf[Int], - replicaFetchBackoffMs = parsed.get(ReplicaFetchBackoffMsProp).asInstanceOf[Int], - numReplicaFetchers = parsed.get(NumReplicaFetchersProp).asInstanceOf[Int], - replicaHighWatermarkCheckpointIntervalMs = parsed.get(ReplicaHighWatermarkCheckpointIntervalMsProp).asInstanceOf[Long], - fetchPurgatoryPurgeIntervalRequests = parsed.get(FetchPurgatoryPurgeIntervalRequestsProp).asInstanceOf[Int], - producerPurgatoryPurgeIntervalRequests = parsed.get(ProducerPurgatoryPurgeIntervalRequestsProp).asInstanceOf[Int], - autoLeaderRebalanceEnable = parsed.get(AutoLeaderRebalanceEnableProp).asInstanceOf[Boolean], - leaderImbalancePerBrokerPercentage = parsed.get(LeaderImbalancePerBrokerPercentageProp).asInstanceOf[Int], - leaderImbalanceCheckIntervalSeconds = parsed.get(LeaderImbalanceCheckIntervalSecondsProp).asInstanceOf[Int], - uncleanLeaderElectionEnable = parsed.get(UncleanLeaderElectionEnableProp).asInstanceOf[Boolean], - interBrokerSecurityProtocol = SecurityProtocol.valueOf(parsed.get(InterBrokerSecurityProtocolProp).asInstanceOf[String]), - interBrokerProtocolVersion = ApiVersion(parsed.get(InterBrokerProtocolVersionProp).asInstanceOf[String]), - - /** ********* Controlled shutdown configuration ***********/ - controlledShutdownMaxRetries = parsed.get(ControlledShutdownMaxRetriesProp).asInstanceOf[Int], - controlledShutdownRetryBackoffMs = parsed.get(ControlledShutdownRetryBackoffMsProp).asInstanceOf[Int], - controlledShutdownEnable = parsed.get(ControlledShutdownEnableProp).asInstanceOf[Boolean], - - /** ********* Consumer coordinator configuration ***********/ - consumerMinSessionTimeoutMs = parsed.get(ConsumerMinSessionTimeoutMsProp).asInstanceOf[Int], - consumerMaxSessionTimeoutMs = parsed.get(ConsumerMaxSessionTimeoutMsProp).asInstanceOf[Int], - - /** ********* Offset management configuration ***********/ - offsetMetadataMaxSize = parsed.get(OffsetMetadataMaxSizeProp).asInstanceOf[Int], - offsetsLoadBufferSize = parsed.get(OffsetsLoadBufferSizeProp).asInstanceOf[Int], - offsetsTopicReplicationFactor = parsed.get(OffsetsTopicReplicationFactorProp).asInstanceOf[Short], - offsetsTopicPartitions = parsed.get(OffsetsTopicPartitionsProp).asInstanceOf[Int], - offsetsTopicSegmentBytes = parsed.get(OffsetsTopicSegmentBytesProp).asInstanceOf[Int], - offsetsTopicCompressionCodec = Option(parsed.get(OffsetsTopicCompressionCodecProp)).map(_.asInstanceOf[Int]).map(value => CompressionCodec.getCompressionCodec(value)).orNull, - offsetsRetentionMinutes = parsed.get(OffsetsRetentionMinutesProp).asInstanceOf[Int], - offsetsRetentionCheckIntervalMs = parsed.get(OffsetsRetentionCheckIntervalMsProp).asInstanceOf[Long], - offsetCommitTimeoutMs = parsed.get(OffsetCommitTimeoutMsProp).asInstanceOf[Int], - offsetCommitRequiredAcks = parsed.get(OffsetCommitRequiredAcksProp).asInstanceOf[Short], - deleteTopicEnable = parsed.get(DeleteTopicEnableProp).asInstanceOf[Boolean], - compressionType = parsed.get(CompressionTypeProp).asInstanceOf[String], - metricNumSamples = parsed.get(MetricNumSamplesProp).asInstanceOf[Int], - metricSampleWindowMs = parsed.get(MetricSampleWindowMsProp).asInstanceOf[Long], - _metricReporterClasses = parsed.get(MetricReporterClassesProp).asInstanceOf[java.util.List[String]] - ) - } - - /** - * Create a log config instance using the given properties and defaults - */ - def fromProps(defaults: Properties, overrides: Properties): KafkaConfig = { - val props = new Properties(defaults) - props.putAll(overrides) - fromProps(props) - } - - /** * Check that property names are valid */ def validateNames(props: Properties) { @@ -662,171 +533,149 @@ object KafkaConfig { require(names.contains(name), "Unknown configuration \"%s\".".format(name)) } - /** - * Check that the given properties contain only valid kafka config names and that all values can be parsed and are valid - */ - def validate(props: Properties) { - validateNames(props) - configDef.parse(props) + def fromProps(props: Properties): KafkaConfig = { + KafkaConfig(props) + } - // to bootstrap KafkaConfig.validateValues() - KafkaConfig.fromProps(props) + def fromProps(defaults: Properties, overrides: Properties): KafkaConfig = { + val props = new Properties() + props.putAll(defaults) + props.putAll(overrides) + fromProps(props) } + } -class KafkaConfig (/** ********* Zookeeper Configuration ***********/ - val zkConnect: String, - val zkSessionTimeoutMs: Int = Defaults.ZkSessionTimeoutMs, - private val _zkConnectionTimeoutMs: Option[Int] = None, - val zkSyncTimeMs: Int = Defaults.ZkSyncTimeMs, - - /** ********* General Configuration ***********/ - val maxReservedBrokerId: Int = Defaults.MaxReservedBrokerId, - var brokerId: Int = Defaults.BrokerId, - val messageMaxBytes: Int = Defaults.MessageMaxBytes, - val numNetworkThreads: Int = Defaults.NumNetworkThreads, - val numIoThreads: Int = Defaults.NumIoThreads, - val backgroundThreads: Int = Defaults.BackgroundThreads, - val queuedMaxRequests: Int = Defaults.QueuedMaxRequests, - - /** ********* Socket Server Configuration ***********/ - val port: Int = Defaults.Port, - val hostName: String = Defaults.HostName, - private val _listeners: Option[String] = None, - private val _advertisedHostName: Option[String] = None, - private val _advertisedPort: Option[Int] = None, - private val _advertisedListeners: Option[String] = None, - val socketSendBufferBytes: Int = Defaults.SocketSendBufferBytes, - val socketReceiveBufferBytes: Int = Defaults.SocketReceiveBufferBytes, - val socketRequestMaxBytes: Int = Defaults.SocketRequestMaxBytes, - val maxConnectionsPerIp: Int = Defaults.MaxConnectionsPerIp, - private val _maxConnectionsPerIpOverrides: String = Defaults.MaxConnectionsPerIpOverrides, - val connectionsMaxIdleMs: Long = Defaults.ConnectionsMaxIdleMs, - - /** ********* Log Configuration ***********/ - val numPartitions: Int = Defaults.NumPartitions, - private val _logDir: String = Defaults.LogDir, - private val _logDirs: Option[String] = None, - - val logSegmentBytes: Int = Defaults.LogSegmentBytes, - - val logRollTimeHours: Int = Defaults.LogRollHours, - private val _logRollTimeMillis: Option[Long] = None, - - val logRollTimeJitterHours: Int = Defaults.LogRollJitterHours, - private val _logRollTimeJitterMillis: Option[Long] = None, - - val logRetentionTimeHours: Int = Defaults.LogRetentionHours, - private val _logRetentionTimeMins: Option[Int] = None, - private val _logRetentionTimeMillis: Option[Long] = None, - - val logRetentionBytes: Long = Defaults.LogRetentionBytes, - val logCleanupIntervalMs: Long = Defaults.LogCleanupIntervalMs, - val logCleanupPolicy: String = Defaults.LogCleanupPolicy, - val logCleanerThreads: Int = Defaults.LogCleanerThreads, - val logCleanerIoMaxBytesPerSecond: Double = Defaults.LogCleanerIoMaxBytesPerSecond, - val logCleanerDedupeBufferSize: Long = Defaults.LogCleanerDedupeBufferSize, - val logCleanerIoBufferSize: Int = Defaults.LogCleanerIoBufferSize, - val logCleanerDedupeBufferLoadFactor: Double = Defaults.LogCleanerDedupeBufferLoadFactor, - val logCleanerBackoffMs: Long = Defaults.LogCleanerBackoffMs, - val logCleanerMinCleanRatio: Double = Defaults.LogCleanerMinCleanRatio, - val logCleanerEnable: Boolean = Defaults.LogCleanerEnable, - val logCleanerDeleteRetentionMs: Long = Defaults.LogCleanerDeleteRetentionMs, - val logIndexSizeMaxBytes: Int = Defaults.LogIndexSizeMaxBytes, - val logIndexIntervalBytes: Int = Defaults.LogIndexIntervalBytes, - val logFlushIntervalMessages: Long = Defaults.LogFlushIntervalMessages, - val logDeleteDelayMs: Long = Defaults.LogDeleteDelayMs, - val logFlushSchedulerIntervalMs: Long = Defaults.LogFlushSchedulerIntervalMs, - private val _logFlushIntervalMs: Option[Long] = None, - val logFlushOffsetCheckpointIntervalMs: Int = Defaults.LogFlushOffsetCheckpointIntervalMs, - val numRecoveryThreadsPerDataDir: Int = Defaults.NumRecoveryThreadsPerDataDir, - val autoCreateTopicsEnable: Boolean = Defaults.AutoCreateTopicsEnable, - - val minInSyncReplicas: Int = Defaults.MinInSyncReplicas, - - /** ********* Replication configuration ***********/ - val controllerSocketTimeoutMs: Int = Defaults.ControllerSocketTimeoutMs, - val defaultReplicationFactor: Int = Defaults.DefaultReplicationFactor, - val replicaLagTimeMaxMs: Long = Defaults.ReplicaLagTimeMaxMs, - val replicaSocketTimeoutMs: Int = Defaults.ReplicaSocketTimeoutMs, - val replicaSocketReceiveBufferBytes: Int = Defaults.ReplicaSocketReceiveBufferBytes, - val replicaFetchMaxBytes: Int = Defaults.ReplicaFetchMaxBytes, - val replicaFetchWaitMaxMs: Int = Defaults.ReplicaFetchWaitMaxMs, - val replicaFetchMinBytes: Int = Defaults.ReplicaFetchMinBytes, - val replicaFetchBackoffMs: Int = Defaults.ReplicaFetchBackoffMs, - val numReplicaFetchers: Int = Defaults.NumReplicaFetchers, - val replicaHighWatermarkCheckpointIntervalMs: Long = Defaults.ReplicaHighWatermarkCheckpointIntervalMs, - val fetchPurgatoryPurgeIntervalRequests: Int = Defaults.FetchPurgatoryPurgeIntervalRequests, - val producerPurgatoryPurgeIntervalRequests: Int = Defaults.ProducerPurgatoryPurgeIntervalRequests, - val autoLeaderRebalanceEnable: Boolean = Defaults.AutoLeaderRebalanceEnable, - val leaderImbalancePerBrokerPercentage: Int = Defaults.LeaderImbalancePerBrokerPercentage, - val leaderImbalanceCheckIntervalSeconds: Int = Defaults.LeaderImbalanceCheckIntervalSeconds, - val uncleanLeaderElectionEnable: Boolean = Defaults.UncleanLeaderElectionEnable, - val interBrokerSecurityProtocol: SecurityProtocol = SecurityProtocol.valueOf(Defaults.InterBrokerSecurityProtocol), - val interBrokerProtocolVersion: ApiVersion = ApiVersion(Defaults.InterBrokerProtocolVersion), - - /** ********* Controlled shutdown configuration ***********/ - val controlledShutdownMaxRetries: Int = Defaults.ControlledShutdownMaxRetries, - val controlledShutdownRetryBackoffMs: Int = Defaults.ControlledShutdownRetryBackoffMs, - val controlledShutdownEnable: Boolean = Defaults.ControlledShutdownEnable, - - /** ********* Consumer coordinator configuration ***********/ - val consumerMinSessionTimeoutMs: Int = Defaults.ConsumerMinSessionTimeoutMs, - val consumerMaxSessionTimeoutMs: Int = Defaults.ConsumerMaxSessionTimeoutMs, - - /** ********* Offset management configuration ***********/ - val offsetMetadataMaxSize: Int = Defaults.OffsetMetadataMaxSize, - val offsetsLoadBufferSize: Int = Defaults.OffsetsLoadBufferSize, - val offsetsTopicReplicationFactor: Short = Defaults.OffsetsTopicReplicationFactor, - val offsetsTopicPartitions: Int = Defaults.OffsetsTopicPartitions, - val offsetsTopicSegmentBytes: Int = Defaults.OffsetsTopicSegmentBytes, - val offsetsTopicCompressionCodec: CompressionCodec = CompressionCodec.getCompressionCodec(Defaults.OffsetsTopicCompressionCodec), - val offsetsRetentionMinutes: Int = Defaults.OffsetsRetentionMinutes, - val offsetsRetentionCheckIntervalMs: Long = Defaults.OffsetsRetentionCheckIntervalMs, - val offsetCommitTimeoutMs: Int = Defaults.OffsetCommitTimeoutMs, - val offsetCommitRequiredAcks: Short = Defaults.OffsetCommitRequiredAcks, - - val deleteTopicEnable: Boolean = Defaults.DeleteTopicEnable, - val compressionType: String = Defaults.CompressionType, - - val metricSampleWindowMs: Long = Defaults.MetricSampleWindowMs, - val metricNumSamples: Int = Defaults.MetricNumSamples, - private val _metricReporterClasses: java.util.List[String] = util.Arrays.asList(Defaults.MetricReporterClasses) - ) { - - val zkConnectionTimeoutMs: Int = _zkConnectionTimeoutMs.getOrElse(zkSessionTimeoutMs) - - val listeners = getListeners() - val advertisedHostName: String = _advertisedHostName.getOrElse(hostName) - val advertisedPort: Int = _advertisedPort.getOrElse(port) - val advertisedListeners = getAdvertisedListeners() - val logDirs = CoreUtils.parseCsvList(_logDirs.getOrElse(_logDir)) - - val logRollTimeMillis = _logRollTimeMillis.getOrElse(60 * 60 * 1000L * logRollTimeHours) - val logRollTimeJitterMillis = _logRollTimeJitterMillis.getOrElse(60 * 60 * 1000L * logRollTimeJitterHours) - val logRetentionTimeMillis = getLogRetentionTimeMillis +case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(KafkaConfig.configDef, props) { + + /** ********* Zookeeper Configuration ***********/ + val zkConnect: String = getString(KafkaConfig.ZkConnectProp) + val zkSessionTimeoutMs: Int = getInt(KafkaConfig.ZkSessionTimeoutMsProp) + val zkConnectionTimeoutMs: java.lang.Integer = + Option(getInt(KafkaConfig.ZkConnectionTimeoutMsProp)).getOrElse(getInt(KafkaConfig.ZkSessionTimeoutMsProp)) + val zkSyncTimeMs: Int = getInt(KafkaConfig.ZkSyncTimeMsProp) - val logFlushIntervalMs = _logFlushIntervalMs.getOrElse(logFlushSchedulerIntervalMs) + /** ********* General Configuration ***********/ + val maxReservedBrokerId: Int = getInt(KafkaConfig.MaxReservedBrokerIdProp) + var brokerId: Int = getInt(KafkaConfig.BrokerIdProp) + val numNetworkThreads = getInt(KafkaConfig.NumNetworkThreadsProp) + val backgroundThreads = getInt(KafkaConfig.BackgroundThreadsProp) + val queuedMaxRequests = getInt(KafkaConfig.QueuedMaxRequestsProp) + val numIoThreads = getInt(KafkaConfig.NumIoThreadsProp) + val messageMaxBytes = getInt(KafkaConfig.MessageMaxBytesProp) + /** ********* Socket Server Configuration ***********/ + val hostName = getString(KafkaConfig.HostNameProp) + val port = getInt(KafkaConfig.PortProp) + val advertisedHostName = Option(getString(KafkaConfig.AdvertisedHostNameProp)).getOrElse(hostName) + val advertisedPort: java.lang.Integer = Option(getInt(KafkaConfig.AdvertisedPortProp)).getOrElse(port) + + val socketSendBufferBytes = getInt(KafkaConfig.SocketSendBufferBytesProp) + val socketReceiveBufferBytes = getInt(KafkaConfig.SocketReceiveBufferBytesProp) + val socketRequestMaxBytes = getInt(KafkaConfig.SocketRequestMaxBytesProp) + val maxConnectionsPerIp = getInt(KafkaConfig.MaxConnectionsPerIpProp) val maxConnectionsPerIpOverrides: Map[String, Int] = - getMap(KafkaConfig.MaxConnectionsPerIpOverridesProp, _maxConnectionsPerIpOverrides).map { case (k, v) => (k, v.toInt)} + getMap(KafkaConfig.MaxConnectionsPerIpOverridesProp, getString(KafkaConfig.MaxConnectionsPerIpOverridesProp)).map { case (k, v) => (k, v.toInt)} + val connectionsMaxIdleMs = getLong(KafkaConfig.ConnectionsMaxIdleMsProp) + - val metricReporterClasses: java.util.List[MetricsReporter] = getMetricClasses(_metricReporterClasses) + /** ********* Log Configuration ***********/ + val autoCreateTopicsEnable = getBoolean(KafkaConfig.AutoCreateTopicsEnableProp) + val numPartitions = getInt(KafkaConfig.NumPartitionsProp) + val logDirs = CoreUtils.parseCsvList( Option(getString(KafkaConfig.LogDirsProp)).getOrElse(getString(KafkaConfig.LogDirProp))) + val logSegmentBytes = getInt(KafkaConfig.LogSegmentBytesProp) + val logFlushIntervalMessages = getLong(KafkaConfig.LogFlushIntervalMessagesProp) + val logCleanerThreads = getInt(KafkaConfig.LogCleanerThreadsProp) + val numRecoveryThreadsPerDataDir = getInt(KafkaConfig.NumRecoveryThreadsPerDataDirProp) + val logFlushSchedulerIntervalMs = getLong(KafkaConfig.LogFlushSchedulerIntervalMsProp) + val logFlushOffsetCheckpointIntervalMs = getInt(KafkaConfig.LogFlushOffsetCheckpointIntervalMsProp).toLong + val logCleanupIntervalMs = getLong(KafkaConfig.LogCleanupIntervalMsProp) + val logCleanupPolicy = getString(KafkaConfig.LogCleanupPolicyProp) + val offsetsRetentionMinutes = getInt(KafkaConfig.OffsetsRetentionMinutesProp) + val offsetsRetentionCheckIntervalMs = getLong(KafkaConfig.OffsetsRetentionCheckIntervalMsProp) + val logRetentionBytes = getLong(KafkaConfig.LogRetentionBytesProp) + val logCleanerDedupeBufferSize = getLong(KafkaConfig.LogCleanerDedupeBufferSizeProp) + val logCleanerDedupeBufferLoadFactor = getDouble(KafkaConfig.LogCleanerDedupeBufferLoadFactorProp) + val logCleanerIoBufferSize = getInt(KafkaConfig.LogCleanerIoBufferSizeProp) + val logCleanerIoMaxBytesPerSecond = getDouble(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp) + val logCleanerDeleteRetentionMs = getLong(KafkaConfig.LogCleanerDeleteRetentionMsProp) + val logCleanerBackoffMs = getLong(KafkaConfig.LogCleanerBackoffMsProp) + val logCleanerMinCleanRatio = getDouble(KafkaConfig.LogCleanerMinCleanRatioProp) + val logCleanerEnable = getBoolean(KafkaConfig.LogCleanerEnableProp) + val logIndexSizeMaxBytes = getInt(KafkaConfig.LogIndexSizeMaxBytesProp) + val logIndexIntervalBytes = getInt(KafkaConfig.LogIndexIntervalBytesProp) + val logDeleteDelayMs = getLong(KafkaConfig.LogDeleteDelayMsProp) + val logRollTimeMillis: java.lang.Long = Option(getLong(KafkaConfig.LogRollTimeMillisProp)).getOrElse(60 * 60 * 1000L * getInt(KafkaConfig.LogRollTimeHoursProp)) + val logRollTimeJitterMillis: java.lang.Long = Option(getLong(KafkaConfig.LogRollTimeJitterMillisProp)).getOrElse(60 * 60 * 1000L * getInt(KafkaConfig.LogRollTimeJitterHoursProp)) + val logFlushIntervalMs: java.lang.Long = Option(getLong(KafkaConfig.LogFlushIntervalMsProp)).getOrElse(getLong(KafkaConfig.LogFlushSchedulerIntervalMsProp)) + val minInSyncReplicas = getInt(KafkaConfig.MinInSyncReplicasProp) + + /** ********* Replication configuration ***********/ + val controllerSocketTimeoutMs: Int = getInt(KafkaConfig.ControllerSocketTimeoutMsProp) + val defaultReplicationFactor: Int = getInt(KafkaConfig.DefaultReplicationFactorProp) + val replicaLagTimeMaxMs = getLong(KafkaConfig.ReplicaLagTimeMaxMsProp) + val replicaSocketTimeoutMs = getInt(KafkaConfig.ReplicaSocketTimeoutMsProp) + val replicaSocketReceiveBufferBytes = getInt(KafkaConfig.ReplicaSocketReceiveBufferBytesProp) + val replicaFetchMaxBytes = getInt(KafkaConfig.ReplicaFetchMaxBytesProp) + val replicaFetchWaitMaxMs = getInt(KafkaConfig.ReplicaFetchWaitMaxMsProp) + val replicaFetchMinBytes = getInt(KafkaConfig.ReplicaFetchMinBytesProp) + val replicaFetchBackoffMs = getInt(KafkaConfig.ReplicaFetchBackoffMsProp) + val numReplicaFetchers = getInt(KafkaConfig.NumReplicaFetchersProp) + val replicaHighWatermarkCheckpointIntervalMs = getLong(KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp) + val fetchPurgatoryPurgeIntervalRequests = getInt(KafkaConfig.FetchPurgatoryPurgeIntervalRequestsProp) + val producerPurgatoryPurgeIntervalRequests = getInt(KafkaConfig.ProducerPurgatoryPurgeIntervalRequestsProp) + val autoLeaderRebalanceEnable = getBoolean(KafkaConfig.AutoLeaderRebalanceEnableProp) + val leaderImbalancePerBrokerPercentage = getInt(KafkaConfig.LeaderImbalancePerBrokerPercentageProp) + val leaderImbalanceCheckIntervalSeconds = getLong(KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp) + val uncleanLeaderElectionEnable = getBoolean(KafkaConfig.UncleanLeaderElectionEnableProp) + val interBrokerSecurityProtocol = SecurityProtocol.valueOf(getString(KafkaConfig.InterBrokerSecurityProtocolProp)) + val interBrokerProtocolVersion = ApiVersion(getString(KafkaConfig.InterBrokerProtocolVersionProp)) + + /** ********* Controlled shutdown configuration ***********/ + val controlledShutdownMaxRetries = getInt(KafkaConfig.ControlledShutdownMaxRetriesProp) + val controlledShutdownRetryBackoffMs = getLong(KafkaConfig.ControlledShutdownRetryBackoffMsProp) + val controlledShutdownEnable = getBoolean(KafkaConfig.ControlledShutdownEnableProp) + + /** ********* Consumer coordinator configuration ***********/ + val consumerMinSessionTimeoutMs = getInt(KafkaConfig.ConsumerMinSessionTimeoutMsProp) + val consumerMaxSessionTimeoutMs = getInt(KafkaConfig.ConsumerMaxSessionTimeoutMsProp) + + /** ********* Offset management configuration ***********/ + val offsetMetadataMaxSize = getInt(KafkaConfig.OffsetMetadataMaxSizeProp) + val offsetsLoadBufferSize = getInt(KafkaConfig.OffsetsLoadBufferSizeProp) + val offsetsTopicReplicationFactor = getShort(KafkaConfig.OffsetsTopicReplicationFactorProp) + val offsetsTopicPartitions = getInt(KafkaConfig.OffsetsTopicPartitionsProp) + val offsetCommitTimeoutMs = getInt(KafkaConfig.OffsetCommitTimeoutMsProp) + val offsetCommitRequiredAcks = getShort(KafkaConfig.OffsetCommitRequiredAcksProp) + val offsetsTopicSegmentBytes = getInt(KafkaConfig.OffsetsTopicSegmentBytesProp) + val offsetsTopicCompressionCodec = Option(getInt(KafkaConfig.OffsetsTopicCompressionCodecProp)).map(value => CompressionCodec.getCompressionCodec(value)).orNull + + /** ********* Metric Configuration **************/ + val metricNumSamples = getInt(KafkaConfig.MetricNumSamplesProp) + val metricSampleWindowMs = getLong(KafkaConfig.MetricSampleWindowMsProp) + val metricReporterClasses: java.util.List[MetricsReporter] = getConfiguredInstances(KafkaConfig.MetricReporterClassesProp, classOf[MetricsReporter]) + + val deleteTopicEnable = getBoolean(KafkaConfig.DeleteTopicEnableProp) + val compressionType = getString(KafkaConfig.CompressionTypeProp) + + + val listeners = getListeners + val advertisedListeners = getAdvertisedListeners + val logRetentionTimeMillis = getLogRetentionTimeMillis private def getLogRetentionTimeMillis: Long = { val millisInMinute = 60L * 1000L val millisInHour = 60L * millisInMinute - val millis = { - _logRetentionTimeMillis.getOrElse( - _logRetentionTimeMins match { - case Some(mins) => millisInMinute * mins - case None => millisInHour * logRetentionTimeHours - } - ) - } - if (millis < 0) return -1 - millis + val millis: java.lang.Long = + Option(getLong(KafkaConfig.LogRetentionTimeMillisProp)).getOrElse( + Option(getInt(KafkaConfig.LogRetentionTimeMinutesProp)) match { + case Some(mins) => millisInMinute * mins + case None => getInt(KafkaConfig.LogRetentionTimeHoursProp) * millisInHour + }) + + if (millis < 0) return -1 + millis } private def getMap(propName: String, propValue: String): Map[String, String] = { @@ -855,9 +704,9 @@ class KafkaConfig (/** ********* Zookeeper Configuration ***********/ // If the user did not define listeners but did define host or port, let's use them in backward compatible way // If none of those are defined, we default to PLAINTEXT://:9092 private def getListeners(): immutable.Map[SecurityProtocol, EndPoint] = { - if (_listeners.isDefined) { - validateUniquePortAndProtocol(_listeners.get) - CoreUtils.listenerListToEndPoints(_listeners.get) + if (getString(KafkaConfig.ListenersProp) != null) { + validateUniquePortAndProtocol(getString(KafkaConfig.ListenersProp)) + CoreUtils.listenerListToEndPoints(getString(KafkaConfig.ListenersProp)) } else { CoreUtils.listenerListToEndPoints("PLAINTEXT://" + hostName + ":" + port) } @@ -867,11 +716,12 @@ class KafkaConfig (/** ********* Zookeeper Configuration ***********/ // If he didn't but did define advertised host or port, we'll use those and fill in the missing value from regular host / port or defaults // If none of these are defined, we'll use the listeners private def getAdvertisedListeners(): immutable.Map[SecurityProtocol, EndPoint] = { - if (_advertisedListeners.isDefined) { - validateUniquePortAndProtocol(_advertisedListeners.get) - CoreUtils.listenerListToEndPoints(_advertisedListeners.get) - } else if (_advertisedHostName.isDefined || _advertisedPort.isDefined ) { - CoreUtils.listenerListToEndPoints("PLAINTEXT://" + advertisedHostName + ":" + advertisedPort) + if (getString(KafkaConfig.AdvertisedListenersProp) != null) { + validateUniquePortAndProtocol(getString(KafkaConfig.AdvertisedListenersProp)) + CoreUtils.listenerListToEndPoints(getString(KafkaConfig.AdvertisedListenersProp)) + } else if (getString(KafkaConfig.AdvertisedHostNameProp) != null || getInt(KafkaConfig.AdvertisedPortProp) != null) { + CoreUtils.listenerListToEndPoints("PLAINTEXT://" + + getString(KafkaConfig.AdvertisedHostNameProp) + ":" + getInt(KafkaConfig.AdvertisedPortProp)) } else { getListeners() } @@ -886,7 +736,7 @@ class KafkaConfig (/** ********* Zookeeper Configuration ***********/ val reporterName = iterator.next() if (!reporterName.isEmpty) { val reporter: MetricsReporter = CoreUtils.createObject[MetricsReporter](reporterName) - reporter.configure(toProps.asInstanceOf[java.util.Map[String, _]]) + reporter.configure(originals) reporterList.add(reporter) } } @@ -895,19 +745,13 @@ class KafkaConfig (/** ********* Zookeeper Configuration ***********/ } - - validateValues() private def validateValues() { require(brokerId >= -1 && brokerId <= maxReservedBrokerId, "broker.id must be equal or greater than -1 and not greater than reserved.broker.max.id") require(logRollTimeMillis >= 1, "log.roll.ms must be equal or greater than 1") require(logRollTimeJitterMillis >= 0, "log.roll.jitter.ms must be equal or greater than 0") - - require(_logRetentionTimeMins.forall(_ >= 1)|| _logRetentionTimeMins.forall(_ .equals(-1)), "log.retention.minutes must be unlimited (-1) or, equal or greater than 1") - require(logRetentionTimeHours >= 1 || logRetentionTimeHours == -1, "log.retention.hours must be unlimited (-1) or, equal or greater than 1") require(logRetentionTimeMillis >= 1 || logRetentionTimeMillis == -1, "log.retention.ms must be unlimited (-1) or, equal or greater than 1") - require(logDirs.size > 0) require(logCleanerDedupeBufferSize / logCleanerThreads > 1024 * 1024, "log.cleaner.dedupe.buffer.size must be at least 1MB per cleaner thread.") require(replicaFetchWaitMaxMs <= replicaSocketTimeoutMs, "replica.socket.timeout.ms should always be at least replica.fetch.wait.max.ms" + @@ -920,127 +764,4 @@ class KafkaConfig (/** ********* Zookeeper Configuration ***********/ require(BrokerCompressionCodec.isValid(compressionType), "compression.type : " + compressionType + " is not valid." + " Valid options are " + BrokerCompressionCodec.brokerCompressionOptions.mkString(",")) } - - def toProps: Properties = { - val props = new Properties() - import kafka.server.KafkaConfig._ - /** ********* Zookeeper Configuration ***********/ - props.put(ZkConnectProp, zkConnect) - props.put(ZkSessionTimeoutMsProp, zkSessionTimeoutMs.toString) - _zkConnectionTimeoutMs.foreach(value => props.put(ZkConnectionTimeoutMsProp, value.toString)) - props.put(ZkSyncTimeMsProp, zkSyncTimeMs.toString) - - /** ********* General Configuration ***********/ - props.put(MaxReservedBrokerIdProp, maxReservedBrokerId.toString) - props.put(BrokerIdProp, brokerId.toString) - props.put(MessageMaxBytesProp, messageMaxBytes.toString) - props.put(NumNetworkThreadsProp, numNetworkThreads.toString) - props.put(NumIoThreadsProp, numIoThreads.toString) - props.put(BackgroundThreadsProp, backgroundThreads.toString) - props.put(QueuedMaxRequestsProp, queuedMaxRequests.toString) - - /** ********* Socket Server Configuration ***********/ - props.put(PortProp, port.toString) - props.put(HostNameProp, hostName) - _listeners.foreach(props.put(ListenersProp, _)) - _advertisedHostName.foreach(props.put(AdvertisedHostNameProp, _)) - _advertisedPort.foreach(value => props.put(AdvertisedPortProp, value.toString)) - _advertisedListeners.foreach(props.put(AdvertisedListenersProp, _)) - props.put(SocketSendBufferBytesProp, socketSendBufferBytes.toString) - props.put(SocketReceiveBufferBytesProp, socketReceiveBufferBytes.toString) - props.put(SocketRequestMaxBytesProp, socketRequestMaxBytes.toString) - props.put(MaxConnectionsPerIpProp, maxConnectionsPerIp.toString) - props.put(MaxConnectionsPerIpOverridesProp, _maxConnectionsPerIpOverrides) - props.put(ConnectionsMaxIdleMsProp, connectionsMaxIdleMs.toString) - - /** ********* Log Configuration ***********/ - props.put(NumPartitionsProp, numPartitions.toString) - props.put(LogDirProp, _logDir) - _logDirs.foreach(value => props.put(LogDirsProp, value)) - props.put(LogSegmentBytesProp, logSegmentBytes.toString) - - props.put(LogRollTimeHoursProp, logRollTimeHours.toString) - _logRollTimeMillis.foreach(v => props.put(LogRollTimeMillisProp, v.toString)) - - props.put(LogRollTimeJitterHoursProp, logRollTimeJitterHours.toString) - _logRollTimeJitterMillis.foreach(v => props.put(LogRollTimeJitterMillisProp, v.toString)) - - - props.put(LogRetentionTimeHoursProp, logRetentionTimeHours.toString) - _logRetentionTimeMins.foreach(v => props.put(LogRetentionTimeMinutesProp, v.toString)) - _logRetentionTimeMillis.foreach(v => props.put(LogRetentionTimeMillisProp, v.toString)) - - props.put(LogRetentionBytesProp, logRetentionBytes.toString) - props.put(LogCleanupIntervalMsProp, logCleanupIntervalMs.toString) - props.put(LogCleanupPolicyProp, logCleanupPolicy) - props.put(LogCleanerThreadsProp, logCleanerThreads.toString) - props.put(LogCleanerIoMaxBytesPerSecondProp, logCleanerIoMaxBytesPerSecond.toString) - props.put(LogCleanerDedupeBufferSizeProp, logCleanerDedupeBufferSize.toString) - props.put(LogCleanerIoBufferSizeProp, logCleanerIoBufferSize.toString) - props.put(LogCleanerDedupeBufferLoadFactorProp, logCleanerDedupeBufferLoadFactor.toString) - props.put(LogCleanerBackoffMsProp, logCleanerBackoffMs.toString) - props.put(LogCleanerMinCleanRatioProp, logCleanerMinCleanRatio.toString) - props.put(LogCleanerEnableProp, logCleanerEnable.toString) - props.put(LogCleanerDeleteRetentionMsProp, logCleanerDeleteRetentionMs.toString) - props.put(LogIndexSizeMaxBytesProp, logIndexSizeMaxBytes.toString) - props.put(LogIndexIntervalBytesProp, logIndexIntervalBytes.toString) - props.put(LogFlushIntervalMessagesProp, logFlushIntervalMessages.toString) - props.put(LogDeleteDelayMsProp, logDeleteDelayMs.toString) - props.put(LogFlushSchedulerIntervalMsProp, logFlushSchedulerIntervalMs.toString) - _logFlushIntervalMs.foreach(v => props.put(LogFlushIntervalMsProp, v.toString)) - props.put(LogFlushOffsetCheckpointIntervalMsProp, logFlushOffsetCheckpointIntervalMs.toString) - props.put(NumRecoveryThreadsPerDataDirProp, numRecoveryThreadsPerDataDir.toString) - props.put(AutoCreateTopicsEnableProp, autoCreateTopicsEnable.toString) - props.put(MinInSyncReplicasProp, minInSyncReplicas.toString) - - /** ********* Replication configuration ***********/ - props.put(ControllerSocketTimeoutMsProp, controllerSocketTimeoutMs.toString) - props.put(DefaultReplicationFactorProp, defaultReplicationFactor.toString) - props.put(ReplicaLagTimeMaxMsProp, replicaLagTimeMaxMs.toString) - props.put(ReplicaSocketTimeoutMsProp, replicaSocketTimeoutMs.toString) - props.put(ReplicaSocketReceiveBufferBytesProp, replicaSocketReceiveBufferBytes.toString) - props.put(ReplicaFetchMaxBytesProp, replicaFetchMaxBytes.toString) - props.put(ReplicaFetchWaitMaxMsProp, replicaFetchWaitMaxMs.toString) - props.put(ReplicaFetchMinBytesProp, replicaFetchMinBytes.toString) - props.put(ReplicaFetchBackoffMsProp, replicaFetchBackoffMs.toString) - props.put(NumReplicaFetchersProp, numReplicaFetchers.toString) - props.put(ReplicaHighWatermarkCheckpointIntervalMsProp, replicaHighWatermarkCheckpointIntervalMs.toString) - props.put(FetchPurgatoryPurgeIntervalRequestsProp, fetchPurgatoryPurgeIntervalRequests.toString) - props.put(ProducerPurgatoryPurgeIntervalRequestsProp, producerPurgatoryPurgeIntervalRequests.toString) - props.put(AutoLeaderRebalanceEnableProp, autoLeaderRebalanceEnable.toString) - props.put(LeaderImbalancePerBrokerPercentageProp, leaderImbalancePerBrokerPercentage.toString) - props.put(LeaderImbalanceCheckIntervalSecondsProp, leaderImbalanceCheckIntervalSeconds.toString) - props.put(UncleanLeaderElectionEnableProp, uncleanLeaderElectionEnable.toString) - props.put(InterBrokerSecurityProtocolProp, interBrokerSecurityProtocol.toString) - props.put(InterBrokerProtocolVersionProp, interBrokerProtocolVersion.toString) - - - /** ********* Controlled shutdown configuration ***********/ - props.put(ControlledShutdownMaxRetriesProp, controlledShutdownMaxRetries.toString) - props.put(ControlledShutdownRetryBackoffMsProp, controlledShutdownRetryBackoffMs.toString) - props.put(ControlledShutdownEnableProp, controlledShutdownEnable.toString) - - /** ********* Consumer coordinator configuration ***********/ - props.put(ConsumerMinSessionTimeoutMsProp, consumerMinSessionTimeoutMs.toString) - props.put(ConsumerMaxSessionTimeoutMsProp, consumerMaxSessionTimeoutMs.toString) - - /** ********* Offset management configuration ***********/ - props.put(OffsetMetadataMaxSizeProp, offsetMetadataMaxSize.toString) - props.put(OffsetsLoadBufferSizeProp, offsetsLoadBufferSize.toString) - props.put(OffsetsTopicReplicationFactorProp, offsetsTopicReplicationFactor.toString) - props.put(OffsetsTopicPartitionsProp, offsetsTopicPartitions.toString) - props.put(OffsetsTopicSegmentBytesProp, offsetsTopicSegmentBytes.toString) - props.put(OffsetsTopicCompressionCodecProp, offsetsTopicCompressionCodec.codec.toString) - props.put(OffsetsRetentionMinutesProp, offsetsRetentionMinutes.toString) - props.put(OffsetsRetentionCheckIntervalMsProp, offsetsRetentionCheckIntervalMs.toString) - props.put(OffsetCommitTimeoutMsProp, offsetCommitTimeoutMs.toString) - props.put(OffsetCommitRequiredAcksProp, offsetCommitRequiredAcks.toString) - props.put(DeleteTopicEnableProp, deleteTopicEnable.toString) - props.put(CompressionTypeProp, compressionType.toString) - props.put(MetricNumSamplesProp, metricNumSamples.toString) - props.put(MetricSampleWindowMsProp, metricSampleWindowMs.toString) - props.put(MetricReporterClassesProp, JavaConversions.collectionAsScalaIterable(_metricReporterClasses).mkString(",")) - - props - } } http://git-wip-us.apache.org/repos/asf/kafka/blob/5c904074/core/src/main/scala/kafka/server/KafkaServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index b320ce9..9de2a6f 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -17,6 +17,9 @@ package kafka.server +import java.util +import java.util.Properties + import kafka.admin._ import kafka.log.LogConfig import kafka.log.CleanerConfig @@ -388,23 +391,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg def boundPort(): Int = socketServer.boundPort() private def createLogManager(zkClient: ZkClient, brokerState: BrokerState): LogManager = { - val defaultLogConfig = LogConfig(segmentSize = config.logSegmentBytes, - segmentMs = config.logRollTimeMillis, - segmentJitterMs = config.logRollTimeJitterMillis, - flushInterval = config.logFlushIntervalMessages, - flushMs = config.logFlushIntervalMs.toLong, - retentionSize = config.logRetentionBytes, - retentionMs = config.logRetentionTimeMillis, - maxMessageSize = config.messageMaxBytes, - maxIndexSize = config.logIndexSizeMaxBytes, - indexInterval = config.logIndexIntervalBytes, - deleteRetentionMs = config.logCleanerDeleteRetentionMs, - fileDeleteDelayMs = config.logDeleteDelayMs, - minCleanableRatio = config.logCleanerMinCleanRatio, - compact = config.logCleanupPolicy.trim.toLowerCase == "compact", - minInSyncReplicas = config.minInSyncReplicas, - compressionType = config.compressionType) - val defaultProps = defaultLogConfig.toProps + val defaultProps = copyKafkaConfigToLog(config.originals) + val defaultLogConfig = LogConfig(defaultProps) + val configs = AdminUtils.fetchAllTopicConfigs(zkClient).mapValues(LogConfig.fromProps(defaultProps, _)) // read the log configurations from zookeeper val cleanerConfig = CleanerConfig(numThreads = config.logCleanerThreads, @@ -428,6 +417,38 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg time = time) } + // Copy the subset of properties that are relevant to Logs + // I'm listing out individual properties here since the names are slightly different in each Config class... + private def copyKafkaConfigToLog(serverProps: java.util.Map[String, Object]): java.util.Map[String, Object] = { + + val logProps = new util.HashMap[String, Object]() + val entryset = serverProps.entrySet.iterator + while (entryset.hasNext) { + val entry = entryset.next + entry.getKey match { + case KafkaConfig.LogSegmentBytesProp => logProps.put(LogConfig.SegmentBytesProp, entry.getValue) + case KafkaConfig.LogRollTimeMillisProp => logProps.put(LogConfig.SegmentMsProp, entry.getValue) + case KafkaConfig.LogRollTimeJitterMillisProp => logProps.put(LogConfig.SegmentJitterMsProp, entry.getValue) + case KafkaConfig.LogIndexSizeMaxBytesProp => logProps.put(LogConfig.SegmentIndexBytesProp, entry.getValue) + case KafkaConfig.LogFlushIntervalMessagesProp => logProps.put(LogConfig.FlushMessagesProp, entry.getValue) + case KafkaConfig.LogFlushIntervalMsProp => logProps.put(LogConfig.FlushMsProp, entry.getValue) + case KafkaConfig.LogRetentionBytesProp => logProps.put(LogConfig.RetentionBytesProp, entry.getValue) + case KafkaConfig.LogRetentionTimeMillisProp => logProps.put(LogConfig.RetentionMsProp, entry.getValue) + case KafkaConfig.MessageMaxBytesProp => logProps.put(LogConfig.MaxMessageBytesProp, entry.getValue) + case KafkaConfig.LogIndexIntervalBytesProp => logProps.put(LogConfig.IndexIntervalBytesProp, entry.getValue) + case KafkaConfig.LogCleanerDeleteRetentionMsProp => logProps.put(LogConfig.DeleteRetentionMsProp, entry.getValue) + case KafkaConfig.LogDeleteDelayMsProp => logProps.put(LogConfig.FileDeleteDelayMsProp, entry.getValue) + case KafkaConfig.LogCleanerMinCleanRatioProp => logProps.put(LogConfig.MinCleanableDirtyRatioProp, entry.getValue) + case KafkaConfig.LogCleanupPolicyProp => logProps.put(LogConfig.CleanupPolicyProp, entry.getValue) + case KafkaConfig.MinInSyncReplicasProp => logProps.put(LogConfig.MinInSyncReplicasProp, entry.getValue) + case KafkaConfig.CompressionTypeProp => logProps.put(LogConfig.CompressionTypeProp, entry.getValue) + case KafkaConfig.UncleanLeaderElectionEnableProp => logProps.put(LogConfig.UncleanLeaderElectionEnableProp, entry.getValue) + case _ => // we just leave those out + } + } + logProps + } + private def createOffsetManager(): OffsetManager = { val offsetManagerConfig = OffsetManagerConfig( maxMetadataSize = config.offsetMetadataMaxSize, http://git-wip-us.apache.org/repos/asf/kafka/blob/5c904074/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 181cbc1..c89d00b 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -90,7 +90,7 @@ class ReplicaFetcherThread(name:String, // Prior to truncating the follower's log, ensure that doing so is not disallowed by the configuration for unclean leader election. // This situation could only happen if the unclean election configuration for a topic changes while a replica is down. Otherwise, // we should never encounter this situation since a non-ISR leader cannot be elected if disallowed by the broker configuration. - if (!LogConfig.fromProps(brokerConfig.toProps, AdminUtils.fetchTopicConfig(replicaMgr.zkClient, + if (!LogConfig.fromProps(brokerConfig.originals, AdminUtils.fetchTopicConfig(replicaMgr.zkClient, topicAndPartition.topic)).uncleanLeaderElectionEnable) { // Log a fatal error and shutdown the broker to ensure that data loss does not unexpectedly occur. fatal("Halting because log truncation is not allowed for topic %s,".format(topicAndPartition.topic) + @@ -120,6 +120,6 @@ class ReplicaFetcherThread(name:String, // any logic for partitions whose leader has changed def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition]) { - delayPartitions(partitions, brokerConfig.replicaFetchBackoffMs) + delayPartitions(partitions, brokerConfig.replicaFetchBackoffMs.toLong) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/5c904074/core/src/main/scala/kafka/server/TopicConfigManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/TopicConfigManager.scala b/core/src/main/scala/kafka/server/TopicConfigManager.scala index b675a7e..01b1b0a 100644 --- a/core/src/main/scala/kafka/server/TopicConfigManager.scala +++ b/core/src/main/scala/kafka/server/TopicConfigManager.scala @@ -101,9 +101,10 @@ class TopicConfigManager(private val zkClient: ZkClient, val topic = json.substring(1, json.length - 1) // hacky way to dequote if (logsByTopic.contains(topic)) { /* combine the default properties with the overrides in zk to create the new LogConfig */ - val props = new Properties(logManager.defaultConfig.toProps) + val props = new Properties() + props.putAll(logManager.defaultConfig.originals) props.putAll(AdminUtils.fetchTopicConfig(zkClient, topic)) - val logConfig = LogConfig.fromProps(props) + val logConfig = LogConfig(props) for (log <- logsByTopic(topic)) log.config = logConfig info("Processed topic config change %d for topic %s, setting new config to %s.".format(changeId, topic, props)) http://git-wip-us.apache.org/repos/asf/kafka/blob/5c904074/core/src/main/scala/kafka/utils/CoreUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala b/core/src/main/scala/kafka/utils/CoreUtils.scala index d0a8fa7..f5d704c 100755 --- a/core/src/main/scala/kafka/utils/CoreUtils.scala +++ b/core/src/main/scala/kafka/utils/CoreUtils.scala @@ -254,32 +254,6 @@ object CoreUtils extends Logging { } /** - * Turn {@linkplain java.util.Properties} with default values into a {@linkplain java.util.Map}. Following example - * illustrates difference from the cast - * <pre> - * val defaults = new Properties() - * defaults.put("foo", "bar") - * val props = new Properties(defaults) - * - * props.getProperty("foo") // "bar" - * props.get("foo") // null - * evaluateDefaults(props).get("foo") // "bar" - * </pre> - * - * @param props properties to evaluate - * @return new java.util.Map instance - */ - def evaluateDefaults(props: Properties): java.util.Map[String, String] = { - import java.util._ - import JavaConversions.asScalaSet - val evaluated = new HashMap[String, String]() - for (name <- props.stringPropertyNames()) { - evaluated.put(name, props.getProperty(name)) - } - evaluated - } - - /** * Read a big-endian integer from a byte array */ def readInt(bytes: Array[Byte], offset: Int): Int = { http://git-wip-us.apache.org/repos/asf/kafka/blob/5c904074/core/src/test/scala/other/kafka/StressTestLog.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala b/core/src/test/scala/other/kafka/StressTestLog.scala index c0e248d..225d77b 100755 --- a/core/src/test/scala/other/kafka/StressTestLog.scala +++ b/core/src/test/scala/other/kafka/StressTestLog.scala @@ -17,6 +17,7 @@ package kafka +import java.util.Properties import java.util.concurrent.atomic._ import kafka.common._ import kafka.message._ @@ -33,10 +34,13 @@ object StressTestLog { def main(args: Array[String]) { val dir = TestUtils.tempDir() val time = new MockTime + val logProprties = new Properties() + logProprties.put(LogConfig.SegmentBytesProp, 64*1024*1024: java.lang.Integer) + logProprties.put(LogConfig.MaxMessageBytesProp, Int.MaxValue: java.lang.Integer) + logProprties.put(LogConfig.SegmentIndexBytesProp, 1024*1024: java.lang.Integer) + val log = new Log(dir = dir, - config = LogConfig(segmentSize = 64*1024*1024, - maxMessageSize = Int.MaxValue, - maxIndexSize = 1024*1024), + config = LogConfig(logProprties), recoveryPoint = 0L, scheduler = time.scheduler, time = time) http://git-wip-us.apache.org/repos/asf/kafka/blob/5c904074/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala index 3034c4f..236d857 100755 --- a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala +++ b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala @@ -20,7 +20,7 @@ package kafka import java.io._ import java.nio._ import java.nio.channels._ -import java.util.Random +import java.util.{Properties, Random} import kafka.log._ import kafka.utils._ import kafka.message._ @@ -110,7 +110,10 @@ object TestLinearWriteSpeed { writables(i) = new ChannelWritable(new File(dir, "kafka-test-" + i + ".dat"), buffer) } else if(options.has(logOpt)) { val segmentSize = rand.nextInt(512)*1024*1024 + 64*1024*1024 // vary size to avoid herd effect - writables(i) = new LogWritable(new File(dir, "kafka-test-" + i), new LogConfig(segmentSize=segmentSize, flushInterval = flushInterval), scheduler, messageSet) + val logProperties = new Properties() + logProperties.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer) + logProperties.put(LogConfig.FlushMessagesProp, flushInterval: java.lang.Long) + writables(i) = new LogWritable(new File(dir, "kafka-test-" + i), new LogConfig(logProperties), scheduler, messageSet) } else { System.err.println("Must specify what to write to with one of --log, --channel, or --mmap") System.exit(1) http://git-wip-us.apache.org/repos/asf/kafka/blob/5c904074/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala index 375555f..6180b87 100755 --- a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala +++ b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala @@ -26,7 +26,7 @@ import org.junit.Assert._ import org.junit.runner.RunWith import org.junit.runners.Parameterized import org.junit.runners.Parameterized.Parameters -import java.util.{ Collection, ArrayList } +import java.util.{Properties, Collection, ArrayList} import kafka.server.KafkaConfig import org.apache.kafka.common.record.CompressionType import scala.collection.JavaConversions._ @@ -54,9 +54,10 @@ class BrokerCompressionTest(messageCompression: String, brokerCompression: Strin @Test def testBrokerSideCompression() { val messageCompressionCode = CompressionCodec.getCompressionCodec(messageCompression) - + val logProps = new Properties() + logProps.put(LogConfig.CompressionTypeProp,brokerCompression) /*configure broker-side compression */ - val log = new Log(logDir, logConfig.copy(compressionType = brokerCompression), recoveryPoint = 0L, time.scheduler, time = time) + val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time) /* append two messages */ log.append(new ByteBufferMessageSet(messageCompressionCode, new Message("hello".getBytes), new Message("there".getBytes))) http://git-wip-us.apache.org/repos/asf/kafka/blob/5c904074/core/src/test/scala/unit/kafka/log/CleanerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/CleanerTest.scala b/core/src/test/scala/unit/kafka/log/CleanerTest.scala index 8b8249a..0e2a6a1 100755 --- a/core/src/test/scala/unit/kafka/log/CleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/CleanerTest.scala @@ -17,6 +17,8 @@ package kafka.log +import java.util.Properties + import junit.framework.Assert._ import org.scalatest.junit.JUnitSuite import org.junit.{After, Test} @@ -35,7 +37,11 @@ import org.apache.kafka.common.utils.Utils class CleanerTest extends JUnitSuite { val dir = TestUtils.tempDir() - val logConfig = LogConfig(segmentSize=1024, maxIndexSize=1024, compact=true) + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer) + logProps.put(LogConfig.SegmentIndexBytesProp, 1024: java.lang.Integer) + logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact) + val logConfig = LogConfig(logProps) val time = new MockTime() val throttler = new Throttler(desiredRatePerSec = Double.MaxValue, checkIntervalMs = Long.MaxValue, time = time) @@ -50,8 +56,11 @@ class CleanerTest extends JUnitSuite { @Test def testCleanSegments() { val cleaner = makeCleaner(Int.MaxValue) - val log = makeLog(config = logConfig.copy(segmentSize = 1024)) - + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer) + + val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) + // append messages to the log until we have four segments while(log.numberOfSegments < 4) log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt)) @@ -72,7 +81,10 @@ class CleanerTest extends JUnitSuite { @Test def testCleaningWithDeletes() { val cleaner = makeCleaner(Int.MaxValue) - val log = makeLog(config = logConfig.copy(segmentSize = 1024)) + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer) + + val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) // append messages with the keys 0 through N while(log.numberOfSegments < 2) @@ -98,7 +110,11 @@ class CleanerTest extends JUnitSuite { val cleaner = makeCleaner(Int.MaxValue) // create a log with compaction turned off so we can append unkeyed messages - val log = makeLog(config = logConfig.copy(segmentSize = 1024, compact = false)) + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer) + logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Delete) + + val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) // append unkeyed messages while(log.numberOfSegments < 2) @@ -114,7 +130,9 @@ class CleanerTest extends JUnitSuite { val expectedSizeAfterCleaning = log.size - sizeWithUnkeyedMessages // turn on compaction and compact the log - val compactedLog = makeLog(config = logConfig.copy(segmentSize = 1024)) + logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer) + + val compactedLog = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 0)) assertEquals("Log should only contain keyed messages after cleaning.", 0, unkeyedMessageCountInLog(log)) @@ -139,7 +157,10 @@ class CleanerTest extends JUnitSuite { @Test def testCleanSegmentsWithAbort() { val cleaner = makeCleaner(Int.MaxValue, abortCheckDone) - val log = makeLog(config = logConfig.copy(segmentSize = 1024)) + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer) + + val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) // append messages to the log until we have four segments while(log.numberOfSegments < 4) @@ -159,7 +180,11 @@ class CleanerTest extends JUnitSuite { @Test def testSegmentGrouping() { val cleaner = makeCleaner(Int.MaxValue) - val log = makeLog(config = logConfig.copy(segmentSize = 300, indexInterval = 1)) + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, 300: java.lang.Integer) + logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer) + + val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) // append some messages to the log var i = 0 @@ -208,7 +233,12 @@ class CleanerTest extends JUnitSuite { @Test def testSegmentGroupingWithSparseOffsets() { val cleaner = makeCleaner(Int.MaxValue) - val log = makeLog(config = logConfig.copy(segmentSize = 1024, indexInterval = 1)) + + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, 300: java.lang.Integer) + logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer) + + val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) // fill up first segment while (log.numberOfSegments == 1) @@ -288,7 +318,12 @@ class CleanerTest extends JUnitSuite { @Test def testRecoveryAfterCrash() { val cleaner = makeCleaner(Int.MaxValue) - val config = logConfig.copy(segmentSize = 300, indexInterval = 1, fileDeleteDelayMs = 10) + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, 300: java.lang.Integer) + logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer) + logProps.put(LogConfig.FileDeleteDelayMsProp, 10: java.lang.Integer) + + val config = LogConfig.fromProps(logConfig.originals, logProps) def recoverAndCheck(config: LogConfig, expectedKeys : Iterable[Int]) : Log = { // Recover log file and check that after recovery, keys are as expected http://git-wip-us.apache.org/repos/asf/kafka/blob/5c904074/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala index 471ddff..381e9aa 100755 --- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala @@ -18,6 +18,7 @@ package kafka.log import java.io.File +import java.util.Properties import kafka.common.TopicAndPartition import kafka.message._ @@ -127,8 +128,13 @@ class LogCleanerIntegrationTest(compressionCodec: String) extends JUnit3Suite { for(i <- 0 until parts) { val dir = new File(logDir, "log-" + i) dir.mkdirs() + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer) + logProps.put(LogConfig.SegmentIndexBytesProp, 100*1024: java.lang.Integer) + logProps.put(LogConfig.FileDeleteDelayMsProp, deleteDelay: java.lang.Integer) + logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact) val log = new Log(dir = dir, - LogConfig(segmentSize = segmentSize, maxIndexSize = 100*1024, fileDeleteDelayMs = deleteDelay, compact = true), + LogConfig(logProps), recoveryPoint = 0L, scheduler = time.scheduler, time = time)