Updated Branches:
  refs/heads/master 49134a47b -> c512da98c

SAMZA-77; fix uuid strings for kafka client ids.


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/c512da98
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/c512da98
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/c512da98

Branch: refs/heads/master
Commit: c512da98cb39f2614c4a379612406d66e53838fa
Parents: 49134a4
Author: Chris Riccomini <[email protected]>
Authored: Thu Nov 21 10:48:07 2013 -0800
Committer: Chris Riccomini <[email protected]>
Committed: Thu Nov 21 10:48:07 2013 -0800

----------------------------------------------------------------------
 .../org/apache/samza/config/KafkaConfig.scala   |  6 +-
 .../system/kafka/KafkaSystemConsumer.scala      |  2 +-
 .../apache/samza/config/TestKafkaConfig.scala   | 76 ++++++++++++++++++++
 3 files changed, 80 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/c512da98/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 
b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index 4947b87..978620a 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -78,8 +78,8 @@ class KafkaConfig(config: Config) extends 
ScalaMapConfig(config) {
   // kafka config
   def getKafkaSystemConsumerConfig(
     systemName: String,
-    clientId: String = "undefined-samza-consumer-" format 
UUID.randomUUID.toString,
-    groupId: String = "undefined-samza-consumer-group-" format 
UUID.randomUUID.toString,
+    clientId: String = "undefined-samza-consumer-%s" format 
UUID.randomUUID.toString,
+    groupId: String = "undefined-samza-consumer-group-%s" format 
UUID.randomUUID.toString,
     injectedProps: Map[String, String] = Map()) = {
 
     val subConf = config.subset("systems.%s.consumer." format systemName, true)
@@ -93,7 +93,7 @@ class KafkaConfig(config: Config) extends 
ScalaMapConfig(config) {
 
   def getKafkaSystemProducerConfig(
     systemName: String,
-    clientId: String = "undefined-samza-producer-" format 
UUID.randomUUID.toString,
+    clientId: String = "undefined-samza-producer-%s" format 
UUID.randomUUID.toString,
     injectedProps: Map[String, String] = Map()) = {
 
     val subConf = config.subset("systems.%s.producer." format systemName, true)

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/c512da98/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
index 9b83259..7624a8e 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
@@ -56,7 +56,7 @@ private[kafka] class KafkaSystemConsumer(
   systemName: String,
   brokerListString: String,
   metrics: KafkaSystemConsumerMetrics,
-  clientId: String = "undefined-client-id-" + UUID.randomUUID.toString,
+  clientId: String = "undefined-client-id-%s" format UUID.randomUUID.toString,
   timeout: Int = Int.MaxValue,
   bufferSize: Int = 1024000,
   brokerMetadataFailureRefreshMs: Long = 10000,

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/c512da98/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala 
b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
new file mode 100644
index 0000000..93cf5a5
--- /dev/null
+++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config
+
+import org.junit.Assert._
+import org.junit.Test
+import java.net.URI
+import java.io.File
+import java.util.Properties
+import scala.collection.JavaConversions._
+import org.apache.samza.config.factories.PropertiesConfigFactory
+
+class TestKafkaConfig {
+
+  @Test
+  def testIdGeneration = {
+    val factory = new PropertiesConfigFactory()
+    val props = new Properties
+    props.setProperty(" systems.kafka.samza.factory", 
"org.apache.samza.system.kafka.KafkaSystemFactory")
+    props.setProperty( "systems.kafka.consumer.zookeeper.connect", 
"localhost:2181/")
+    props.setProperty( "systems.kafka.producer.metadata.broker.list", 
"localhost:9092")
+
+    val mapConfig = new MapConfig(props.toMap[String, String])
+    val kafkaConfig = new KafkaConfig(mapConfig)
+  
+    val consumerConfig1 = kafkaConfig.getKafkaSystemConsumerConfig("kafka")
+    val consumerClientId1 = consumerConfig1.clientId
+    val groupId1 = consumerConfig1.groupId
+    val consumerConfig2 = kafkaConfig.getKafkaSystemConsumerConfig("kafka")
+    val consumerClientId2 = consumerConfig2.clientId
+    val groupId2 = consumerConfig2.groupId
+    assert( consumerClientId1.startsWith("undefined-samza-consumer-"))
+    assert(consumerClientId2.startsWith("undefined-samza-consumer-"))
+    assert(groupId1.startsWith("undefined-samza-consumer-group-"))
+    assert(groupId2.startsWith("undefined-samza-consumer-group-"))
+    assert(consumerClientId1 != consumerClientId2)
+    assert(groupId1 != groupId2)
+
+    val consumerConfig3 = kafkaConfig.getKafkaSystemConsumerConfig("kafka", 
"TestClientId", "TestGroupId")
+    val consumerClientId3 = consumerConfig3.clientId
+    val groupId3 = consumerConfig3.groupId
+    assert(consumerClientId3 == "TestClientId")
+    assert(groupId3 == "TestGroupId")
+
+    val producerConfig1 = kafkaConfig.getKafkaSystemProducerConfig("kafka")
+    val producerClientId1 = producerConfig1.clientId
+    val producerConfig2 = kafkaConfig.getKafkaSystemProducerConfig("kafka")
+    val producerClientId2 = producerConfig2.clientId
+
+    assert( producerClientId1.startsWith("undefined-samza-producer-"))
+    assert(producerClientId2.startsWith("undefined-samza-producer-"))
+    assert(producerClientId1 != producerClientId2)
+
+    val producerConfig3 = kafkaConfig.getKafkaSystemProducerConfig("kafka", 
"TestClientId")
+    val producerClientId3 = producerConfig3.clientId
+    assert(producerClientId3 == "TestClientId")
+
+  }
+}

Reply via email to