Repository: kafka Updated Branches: refs/heads/trunk 1769642bb -> 5174df537
kafka-1864; Revisit defaults for the internal offsets topic; patched by Jun Rao; reviewed by Jeol Koshy, Neha Narkhede, and Gwen Shapira Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5174df53 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5174df53 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5174df53 Branch: refs/heads/trunk Commit: 5174df53778cb5cb2d6d86e4cec9f3185a2c85db Parents: 1769642 Author: Jun Rao <jun...@gmail.com> Authored: Fri Jan 16 18:56:32 2015 -0800 Committer: Jun Rao <jun...@gmail.com> Committed: Fri Jan 16 18:56:32 2015 -0800 ---------------------------------------------------------------------- core/src/main/scala/kafka/server/KafkaApis.scala | 11 +++++++++-- core/src/main/scala/kafka/server/KafkaConfig.scala | 6 +++++- core/src/main/scala/kafka/server/OffsetManager.scala | 4 ++-- .../kafka/api/ProducerFailureHandlingTest.scala | 7 +++++++ .../test/scala/unit/kafka/server/OffsetCommitTest.scala | 7 +++++++ 5 files changed, 30 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/5174df53/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index c011a1b..ec8d9f7 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -351,10 +351,17 @@ class KafkaApis(val requestChannel: RequestChannel, if (topic == OffsetManager.OffsetsTopicName || config.autoCreateTopicsEnable) { try { if (topic == OffsetManager.OffsetsTopicName) { - AdminUtils.createTopic(zkClient, topic, config.offsetsTopicPartitions, config.offsetsTopicReplicationFactor, + val aliveBrokers = metadataCache.getAliveBrokers + val offsetsTopicReplicationFactor = + if (aliveBrokers.length > 0) + Math.min(config.offsetsTopicReplicationFactor, aliveBrokers.length) + else + config.offsetsTopicReplicationFactor + AdminUtils.createTopic(zkClient, topic, config.offsetsTopicPartitions, + offsetsTopicReplicationFactor, offsetManager.offsetsTopicConfig) info("Auto creation of topic %s with %d partitions and replication factor %d is successful!" - .format(topic, config.offsetsTopicPartitions, config.offsetsTopicReplicationFactor)) + .format(topic, config.offsetsTopicPartitions, offsetsTopicReplicationFactor)) } else { AdminUtils.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor) http://git-wip-us.apache.org/repos/asf/kafka/blob/5174df53/core/src/main/scala/kafka/server/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index d3d8ac4..88689df 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -312,7 +312,11 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro val offsetsLoadBufferSize = props.getIntInRange("offsets.load.buffer.size", OffsetManagerConfig.DefaultLoadBufferSize, (1, Integer.MAX_VALUE)) - /** The replication factor for the offset commit topic (set higher to ensure availability). */ + /** The replication factor for the offsets topic (set higher to ensure availability). To + * ensure that the effective replication factor of the offsets topic is the configured value, + * the number of alive brokers has to be at least the replication factor at the time of the + * first request for the offsets topic. If not, either the offsets topic creation will fail or + * it will get a replication factor of min(alive brokers, configured replication factor) */ val offsetsTopicReplicationFactor: Short = props.getShortInRange("offsets.topic.replication.factor", OffsetManagerConfig.DefaultOffsetsTopicReplicationFactor, (1, Short.MaxValue)) http://git-wip-us.apache.org/repos/asf/kafka/blob/5174df53/core/src/main/scala/kafka/server/OffsetManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala index 3c79428..0bdd42f 100644 --- a/core/src/main/scala/kafka/server/OffsetManager.scala +++ b/core/src/main/scala/kafka/server/OffsetManager.scala @@ -75,9 +75,9 @@ object OffsetManagerConfig { val DefaultMaxMetadataSize = 4096 val DefaultLoadBufferSize = 5*1024*1024 val DefaultOffsetsRetentionCheckIntervalMs = 600000L - val DefaultOffsetsTopicNumPartitions = 1 + val DefaultOffsetsTopicNumPartitions = 50 val DefaultOffsetsTopicSegmentBytes = 100*1024*1024 - val DefaultOffsetsTopicReplicationFactor = 1.toShort + val DefaultOffsetsTopicReplicationFactor = 3.toShort val DefaultOffsetsTopicCompressionCodec = NoCompressionCodec val DefaultOffsetCommitTimeoutMs = 5000 val DefaultOffsetCommitRequiredAcks = (-1).toShort http://git-wip-us.apache.org/repos/asf/kafka/blob/5174df53/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala index 5ec613c..420a1dd 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala @@ -46,6 +46,13 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness { override val zkConnect = TestZKUtils.zookeeperConnect override val autoCreateTopicsEnable = false override val messageMaxBytes = serverMessageMaxBytes + // TODO: Currently, when there is no topic in a cluster, the controller doesn't send any UpdateMetadataRequest to + // the broker. As a result, the live broker list in metadataCache is empty. If the number of live brokers is 0, we + // try to create the offset topic with the default offsets.topic.replication.factor of 3. The creation will fail + // since there is not enough live brokers. This causes testCannotSendToInternalTopic() to fail. Temporarily fixing + // the issue by overriding offsets.topic.replication.factor to 1 for now. When we fix KAFKA-1867, we need to + // remove the following config override. + override val offsetsTopicReplicationFactor = 1.asInstanceOf[Short] } http://git-wip-us.apache.org/repos/asf/kafka/blob/5174df53/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala index 4a3a5b2..5b93239 100644 --- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala +++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala @@ -46,6 +46,13 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { override def setUp() { super.setUp() val config: Properties = createBrokerConfig(1, brokerPort) + // TODO: Currently, when there is no topic in a cluster, the controller doesn't send any UpdateMetadataRequest to + // the broker. As a result, the live broker list in metadataCache is empty. This causes the ConsumerMetadataRequest + // to fail since if the number of live brokers is 0, we try to create the offset topic with the default + // offsets.topic.replication.factor of 3. The creation will fail since there is not enough live brokers. In order + // for the unit test to pass, overriding offsets.topic.replication.factor to 1 for now. When we fix KAFKA-1867, we + // need to remove the following config override. + config.put("offsets.topic.replication.factor", "1") val logDirPath = config.getProperty("log.dir") logDir = new File(logDirPath) time = new MockTime()