Updated Branches: refs/heads/master 49134a47b -> c512da98c
SAMZA-77; fix uuid strings for kafka client ids. Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/c512da98 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/c512da98 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/c512da98 Branch: refs/heads/master Commit: c512da98cb39f2614c4a379612406d66e53838fa Parents: 49134a4 Author: Chris Riccomini <[email protected]> Authored: Thu Nov 21 10:48:07 2013 -0800 Committer: Chris Riccomini <[email protected]> Committed: Thu Nov 21 10:48:07 2013 -0800 ---------------------------------------------------------------------- .../org/apache/samza/config/KafkaConfig.scala | 6 +- .../system/kafka/KafkaSystemConsumer.scala | 2 +- .../apache/samza/config/TestKafkaConfig.scala | 76 ++++++++++++++++++++ 3 files changed, 80 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/c512da98/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala index 4947b87..978620a 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala @@ -78,8 +78,8 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) { // kafka config def getKafkaSystemConsumerConfig( systemName: String, - clientId: String = "undefined-samza-consumer-" format UUID.randomUUID.toString, - groupId: String = "undefined-samza-consumer-group-" format UUID.randomUUID.toString, + clientId: String = "undefined-samza-consumer-%s" format UUID.randomUUID.toString, + groupId: String = "undefined-samza-consumer-group-%s" format UUID.randomUUID.toString, injectedProps: Map[String, String] = Map()) = { val subConf = config.subset("systems.%s.consumer." format systemName, true) @@ -93,7 +93,7 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) { def getKafkaSystemProducerConfig( systemName: String, - clientId: String = "undefined-samza-producer-" format UUID.randomUUID.toString, + clientId: String = "undefined-samza-producer-%s" format UUID.randomUUID.toString, injectedProps: Map[String, String] = Map()) = { val subConf = config.subset("systems.%s.producer." format systemName, true) http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/c512da98/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala index 9b83259..7624a8e 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala @@ -56,7 +56,7 @@ private[kafka] class KafkaSystemConsumer( systemName: String, brokerListString: String, metrics: KafkaSystemConsumerMetrics, - clientId: String = "undefined-client-id-" + UUID.randomUUID.toString, + clientId: String = "undefined-client-id-%s" format UUID.randomUUID.toString, timeout: Int = Int.MaxValue, bufferSize: Int = 1024000, brokerMetadataFailureRefreshMs: Long = 10000, http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/c512da98/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala new file mode 100644 index 0000000..93cf5a5 --- /dev/null +++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.config + +import org.junit.Assert._ +import org.junit.Test +import java.net.URI +import java.io.File +import java.util.Properties +import scala.collection.JavaConversions._ +import org.apache.samza.config.factories.PropertiesConfigFactory + +class TestKafkaConfig { + + @Test + def testIdGeneration = { + val factory = new PropertiesConfigFactory() + val props = new Properties + props.setProperty(" systems.kafka.samza.factory", "org.apache.samza.system.kafka.KafkaSystemFactory") + props.setProperty( "systems.kafka.consumer.zookeeper.connect", "localhost:2181/") + props.setProperty( "systems.kafka.producer.metadata.broker.list", "localhost:9092") + + val mapConfig = new MapConfig(props.toMap[String, String]) + val kafkaConfig = new KafkaConfig(mapConfig) + + val consumerConfig1 = kafkaConfig.getKafkaSystemConsumerConfig("kafka") + val consumerClientId1 = consumerConfig1.clientId + val groupId1 = consumerConfig1.groupId + val consumerConfig2 = kafkaConfig.getKafkaSystemConsumerConfig("kafka") + val consumerClientId2 = consumerConfig2.clientId + val groupId2 = consumerConfig2.groupId + assert( consumerClientId1.startsWith("undefined-samza-consumer-")) + assert(consumerClientId2.startsWith("undefined-samza-consumer-")) + assert(groupId1.startsWith("undefined-samza-consumer-group-")) + assert(groupId2.startsWith("undefined-samza-consumer-group-")) + assert(consumerClientId1 != consumerClientId2) + assert(groupId1 != groupId2) + + val consumerConfig3 = kafkaConfig.getKafkaSystemConsumerConfig("kafka", "TestClientId", "TestGroupId") + val consumerClientId3 = consumerConfig3.clientId + val groupId3 = consumerConfig3.groupId + assert(consumerClientId3 == "TestClientId") + assert(groupId3 == "TestGroupId") + + val producerConfig1 = kafkaConfig.getKafkaSystemProducerConfig("kafka") + val producerClientId1 = producerConfig1.clientId + val producerConfig2 = kafkaConfig.getKafkaSystemProducerConfig("kafka") + val producerClientId2 = producerConfig2.clientId + + assert( producerClientId1.startsWith("undefined-samza-producer-")) + assert(producerClientId2.startsWith("undefined-samza-producer-")) + assert(producerClientId1 != producerClientId2) + + val producerConfig3 = kafkaConfig.getKafkaSystemProducerConfig("kafka", "TestClientId") + val producerClientId3 = producerConfig3.clientId + assert(producerClientId3 == "TestClientId") + + } +}
