Repository: samza
Updated Branches:
  refs/heads/master 078afb57f -> 003ad1068


http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
index 51545a0..59a8854 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
@@ -19,13 +19,10 @@
 
 package org.apache.samza.system.kafka
 
-import org.apache.samza.metrics.MetricsHelper
-import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.metrics.MetricsRegistry
 import java.util.concurrent.ConcurrentHashMap
+
 import kafka.common.TopicAndPartition
-import org.apache.samza.metrics.Counter
-import org.apache.samza.metrics.Gauge
+import org.apache.samza.metrics._
 
 class KafkaSystemConsumerMetrics(val systemName: String = "unknown", val 
registry: MetricsRegistry = new MetricsRegistryMap) extends MetricsHelper {
   val offsets = new ConcurrentHashMap[TopicAndPartition, Counter]
@@ -34,67 +31,66 @@ class KafkaSystemConsumerMetrics(val systemName: String = 
"unknown", val registr
   val lag = new ConcurrentHashMap[TopicAndPartition, Gauge[Long]]
   val highWatermark = new ConcurrentHashMap[TopicAndPartition, Gauge[Long]]
 
-  /*
-   * (String, Int) = (host, port) of BrokerProxy.
-   */
-
-  val reconnects = new ConcurrentHashMap[(String, Int), Counter]
-  val brokerBytesRead = new ConcurrentHashMap[(String, Int), Counter]
-  val brokerReads = new ConcurrentHashMap[(String, Int), Counter]
-  val brokerSkippedFetchRequests = new ConcurrentHashMap[(String, Int), 
Counter]
-  val topicPartitions = new ConcurrentHashMap[(String, Int), Gauge[Int]]
+  val clientBytesRead = new ConcurrentHashMap[String, Counter]
+  val clientReads = new ConcurrentHashMap[String, Counter]
+  val clientSkippedFetchRequests = new ConcurrentHashMap[String, Counter]
+  val topicPartitions = new ConcurrentHashMap[String, Gauge[Int]]
 
   def registerTopicAndPartition(tp: TopicAndPartition) = {
     if (!offsets.contains(tp)) {
-      offsets.put(tp, newCounter("%s-%s-offset-change" format (tp.topic, 
tp.partition)))
-      bytesRead.put(tp, newCounter("%s-%s-bytes-read" format (tp.topic, 
tp.partition)))
-      reads.put(tp, newCounter("%s-%s-messages-read" format (tp.topic, 
tp.partition)))
-      highWatermark.put(tp, newGauge("%s-%s-high-watermark" format (tp.topic, 
tp.partition), -1L))
-      lag.put(tp, newGauge("%s-%s-messages-behind-high-watermark" format 
(tp.topic, tp.partition), 0L))
+      offsets.put(tp, newCounter("%s-%s-offset-change" format(tp.topic, 
tp.partition)))
+      bytesRead.put(tp, newCounter("%s-%s-bytes-read" format(tp.topic, 
tp.partition)))
+      reads.put(tp, newCounter("%s-%s-messages-read" format(tp.topic, 
tp.partition)))
+      highWatermark.put(tp, newGauge("%s-%s-high-watermark" format(tp.topic, 
tp.partition), -1L))
+      lag.put(tp, newGauge("%s-%s-messages-behind-high-watermark" 
format(tp.topic, tp.partition), 0L))
     }
   }
 
-  def registerBrokerProxy(host: String, port: Int) {
-    reconnects.put((host, port), newCounter("%s-%s-reconnects" format (host, 
port)))
-    brokerBytesRead.put((host, port), newCounter("%s-%s-bytes-read" format 
(host, port)))
-    brokerReads.put((host, port), newCounter("%s-%s-messages-read" format 
(host, port)))
-    brokerSkippedFetchRequests.put((host, port), 
newCounter("%s-%s-skipped-fetch-requests" format (host, port)))
-    topicPartitions.put((host, port), newGauge("%s-%s-topic-partitions" format 
(host, port), 0))
+  def registerClientProxy(clientName: String) {
+    clientBytesRead.put(clientName, newCounter("%s-bytes-read" format 
clientName))
+    clientReads.put((clientName), newCounter("%s-messages-read" format 
clientName))
+    clientSkippedFetchRequests.put((clientName), 
newCounter("%s-skipped-fetch-requests" format clientName))
+    topicPartitions.put(clientName, newGauge("%s-registered-topic-partitions" 
format clientName, 0))
   }
 
   // java friendlier interfaces
   // Gauges
-  def setTopicPartitionValue(host: String, port: Int, value: Int) {
-    topicPartitions.get((host,port)).set(value)
+  def setNumTopicPartitions(clientName: String, value: Int) {
+    topicPartitions.get(clientName).set(value)
   }
+
   def setLagValue(topicAndPartition: TopicAndPartition, value: Long) {
     lag.get((topicAndPartition)).set(value);
   }
+
   def setHighWatermarkValue(topicAndPartition: TopicAndPartition, value: Long) 
{
     highWatermark.get((topicAndPartition)).set(value);
   }
 
   // Counters
-  def incBrokerReads(host: String, port: Int) {
-    brokerReads.get((host,port)).inc
+  def incClientReads(clientName: String) {
+    clientReads.get(clientName).inc
   }
+
   def incReads(topicAndPartition: TopicAndPartition) {
     reads.get(topicAndPartition).inc;
   }
+
   def incBytesReads(topicAndPartition: TopicAndPartition, inc: Long) {
     bytesRead.get(topicAndPartition).inc(inc);
   }
-  def incBrokerBytesReads(host: String, port: Int, incBytes: Long) {
-    brokerBytesRead.get((host,port)).inc(incBytes)
+
+  def incClientBytesReads(clientName: String, incBytes: Long) {
+    clientBytesRead.get(clientName).inc(incBytes)
   }
-  def incBrokerSkippedFetchRequests(host: String, port: Int) {
-    brokerSkippedFetchRequests.get((host,port)).inc()
+
+  def incClientSkippedFetchRequests(clientName: String) {
+    clientSkippedFetchRequests.get(clientName).inc()
   }
+
   def setOffsets(topicAndPartition: TopicAndPartition, offset: Long) {
     offsets.get(topicAndPartition).set(offset)
   }
-  def incReconnects(host: String, port: Int) {
-    reconnects.get((host,port)).inc()
-  }
+
   override def getPrefix = systemName + "-"
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
index 9f0b5f2..ba5390b 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
@@ -20,21 +20,19 @@
 package org.apache.samza.system.kafka
 
 import java.util.Properties
+
 import kafka.utils.ZkUtils
+import org.apache.kafka.clients.producer.KafkaProducer
 import org.apache.samza.SamzaException
 import org.apache.samza.config.ApplicationConfig.ApplicationMode
-import org.apache.samza.util.{Logging, KafkaUtil, ExponentialSleepStrategy, 
ClientUtilTopicMetadataStore}
-import org.apache.samza.config.{KafkaConfig, ApplicationConfig, StreamConfig, 
Config}
-import org.apache.samza.metrics.MetricsRegistry
 import org.apache.samza.config.KafkaConfig.Config2Kafka
-import org.apache.samza.config.TaskConfig.Config2Task
-import org.apache.kafka.clients.producer.KafkaProducer
-import org.apache.samza.system.SystemFactory
 import org.apache.samza.config.StorageConfig._
-import org.apache.samza.system.SystemProducer
-import org.apache.samza.system.SystemAdmin
 import org.apache.samza.config.SystemConfig.Config2System
-import org.apache.samza.system.SystemConsumer
+import org.apache.samza.config.TaskConfig.Config2Task
+import org.apache.samza.config._
+import org.apache.samza.metrics.MetricsRegistry
+import org.apache.samza.system.{SystemAdmin, SystemConsumer, SystemFactory, 
SystemProducer}
+import org.apache.samza.util._
 
 object KafkaSystemFactory extends Logging {
   def getInjectedProducerProperties(systemName: String, config: Config) = if 
(config.isChangelogSystem(systemName)) {
@@ -46,49 +44,27 @@ object KafkaSystemFactory extends Logging {
 }
 
 class KafkaSystemFactory extends SystemFactory with Logging {
+
   def getConsumer(systemName: String, config: Config, registry: 
MetricsRegistry): SystemConsumer = {
-    val clientId = KafkaUtil.getClientId("samza-consumer", config)
+    val clientId = KafkaConsumerConfig.getConsumerClientId( config)
     val metrics = new KafkaSystemConsumerMetrics(systemName, registry)
 
-    // Kind of goofy to need a producer config for consumers, but we need 
metadata.
-    val producerConfig = config.getKafkaSystemProducerConfig(systemName, 
clientId)
-    val bootstrapServers = producerConfig.bootsrapServers
-    val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, 
clientId)
+    val kafkaConsumer = KafkaSystemConsumer.getKafkaConsumerImpl(systemName, 
clientId, config)
+    info("Created kafka consumer for system %s, clientId %s: %s" format 
(systemName, clientId, kafkaConsumer))
 
-    val timeout = consumerConfig.socketTimeoutMs
-    val bufferSize = consumerConfig.socketReceiveBufferBytes
-    val fetchSize = new StreamFetchSizes(consumerConfig.fetchMessageMaxBytes, 
config.getFetchMessageMaxBytesTopics(systemName))
-    val consumerMinSize = consumerConfig.fetchMinBytes
-    val consumerMaxWait = consumerConfig.fetchWaitMaxMs
-    val autoOffsetResetDefault = consumerConfig.autoOffsetReset
-    val autoOffsetResetTopics = config.getAutoOffsetResetTopics(systemName)
-    val fetchThreshold = 
config.getConsumerFetchThreshold(systemName).getOrElse("50000").toInt
-    val fetchThresholdBytes = 
config.getConsumerFetchThresholdBytes(systemName).getOrElse("-1").toLong
-    val offsetGetter = new GetOffset(autoOffsetResetDefault, 
autoOffsetResetTopics)
-    val metadataStore = new ClientUtilTopicMetadataStore(bootstrapServers, 
clientId, timeout)
-
-    new KafkaSystemConsumer(
-      systemName = systemName,
-      systemAdmin = getAdmin(systemName, config),
-      metrics = metrics,
-      metadataStore = metadataStore,
-      clientId = clientId,
-      timeout = timeout,
-      bufferSize = bufferSize,
-      fetchSize = fetchSize,
-      consumerMinSize = consumerMinSize,
-      consumerMaxWait = consumerMaxWait,
-      fetchThreshold = fetchThreshold,
-      fetchThresholdBytes = fetchThresholdBytes,
-      fetchLimitByBytesEnabled = 
config.isConsumerFetchThresholdBytesEnabled(systemName),
-      offsetGetter = offsetGetter)
+    val kafkaSystemConsumer = new KafkaSystemConsumer(kafkaConsumer, 
systemName, config, clientId, metrics, new SystemClock)
+    info("Created samza system consumer %s" format  
(kafkaSystemConsumer.toString))
+
+    kafkaSystemConsumer
   }
 
   def getProducer(systemName: String, config: Config, registry: 
MetricsRegistry): SystemProducer = {
-    val clientId = KafkaUtil.getClientId("samza-producer", config)
+    val clientId = KafkaConsumerConfig.getProducerClientId(config)
     val injectedProps = 
KafkaSystemFactory.getInjectedProducerProperties(systemName, config)
     val producerConfig = config.getKafkaSystemProducerConfig(systemName, 
clientId, injectedProps)
-    val getProducer = () => { new KafkaProducer[Array[Byte], 
Array[Byte]](producerConfig.getProducerProperties) }
+    val getProducer = () => {
+      new KafkaProducer[Array[Byte], 
Array[Byte]](producerConfig.getProducerProperties)
+    }
     val metrics = new KafkaSystemProducerMetrics(systemName, registry)
 
     // Unlike consumer, no need to use encoders here, since they come for free
@@ -104,7 +80,7 @@ class KafkaSystemFactory extends SystemFactory with Logging {
   }
 
   def getAdmin(systemName: String, config: Config): SystemAdmin = {
-    val clientId = KafkaUtil.getClientId("samza-admin", config)
+    val clientId = KafkaConsumerConfig.getAdminClientId(config)
     val producerConfig = config.getKafkaSystemProducerConfig(systemName, 
clientId)
     val bootstrapServers = producerConfig.bootsrapServers
     val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, 
clientId)
@@ -119,13 +95,13 @@ class KafkaSystemFactory extends SystemFactory with 
Logging {
     val coordinatorStreamReplicationFactor = 
config.getCoordinatorReplicationFactor.toInt
     val storeToChangelog = config.getKafkaChangelogEnabledStores()
     // Construct the meta information for each topic, if the replication 
factor is not defined, we use 2 as the number of replicas for the change log 
stream.
-    val topicMetaInformation = storeToChangelog.map{case (storeName, 
topicName) =>
-    {
-       val replicationFactor = 
config.getChangelogStreamReplicationFactor(storeName).toInt
-       val changelogInfo = ChangelogInfo(replicationFactor, 
config.getChangelogKafkaProperties(storeName))
-       info("Creating topic meta information for topic: %s with replication 
factor: %s" format (topicName, replicationFactor))
-       (topicName, changelogInfo)
-    }}
+    val topicMetaInformation = storeToChangelog.map { case (storeName, 
topicName) => {
+      val replicationFactor = 
config.getChangelogStreamReplicationFactor(storeName).toInt
+      val changelogInfo = ChangelogInfo(replicationFactor, 
config.getChangelogKafkaProperties(storeName))
+      info("Creating topic meta information for topic: %s with replication 
factor: %s" format(topicName, replicationFactor))
+      (topicName, changelogInfo)
+    }
+    }
 
     val deleteCommittedMessages = 
config.deleteCommittedMessages(systemName).exists(isEnabled => 
isEnabled.toBoolean)
     val intermediateStreamProperties: Map[String, Properties] = 
getIntermediateStreamProperties(config)
@@ -150,7 +126,7 @@ class KafkaSystemFactory extends SystemFactory with Logging 
{
       "segment.bytes" -> segmentBytes)) { case (props, (k, v)) => props.put(k, 
v); props }
   }
 
-  def getIntermediateStreamProperties(config : Config): Map[String, 
Properties] = {
+  def getIntermediateStreamProperties(config: Config): Map[String, Properties] 
= {
     val appConfig = new ApplicationConfig(config)
     if (appConfig.getAppMode == ApplicationMode.BATCH) {
       val streamConfig = new StreamConfig(config)

http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-kafka/src/test/java/org/apache/samza/config/TestKafkaConsumerConfig.java
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/java/org/apache/samza/config/TestKafkaConsumerConfig.java
 
b/samza-kafka/src/test/java/org/apache/samza/config/TestKafkaConsumerConfig.java
new file mode 100644
index 0000000..de5d093
--- /dev/null
+++ 
b/samza-kafka/src/test/java/org/apache/samza/config/TestKafkaConsumerConfig.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.samza.config;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.samza.SamzaException;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestKafkaConsumerConfig {
+
+  public final static String SYSTEM_NAME = "testSystem";
+  public final static String KAFKA_PRODUCER_PROPERTY_PREFIX = "systems." + 
SYSTEM_NAME + ".producer.";
+  public final static String KAFKA_CONSUMER_PROPERTY_PREFIX = "systems." + 
SYSTEM_NAME + ".consumer.";
+  private final static String CLIENT_ID = "clientId";
+
+  @Test
+  public void testDefaults() {
+    Map<String, String> props = new HashMap<>();
+
+    props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + 
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // should be ignored
+    props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + 
ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
+        "Ignore"); // should be ignored
+    props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + 
ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
+        "100"); // should NOT be ignored
+
+    props.put(JobConfig.JOB_NAME(), "jobName");
+
+    // if KAFKA_CONSUMER_PROPERTY_PREFIX is set, then PRODUCER should be 
ignored
+    props.put(KAFKA_PRODUCER_PROPERTY_PREFIX + "bootstrap.servers", 
"ignroeThis:9092");
+    props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + "bootstrap.servers", 
"useThis:9092");
+
+    Config config = new MapConfig(props);
+    KafkaConsumerConfig kafkaConsumerConfig =
+        KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, SYSTEM_NAME, 
CLIENT_ID);
+
+    Assert.assertEquals("false", 
kafkaConsumerConfig.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
+
+    
Assert.assertEquals(KafkaConsumerConfig.DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS,
+        kafkaConsumerConfig.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG));
+
+    Assert.assertEquals(RangeAssignor.class.getName(),
+        
kafkaConsumerConfig.get(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG));
+
+    Assert.assertEquals("useThis:9092", 
kafkaConsumerConfig.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
+    Assert.assertEquals("100", 
kafkaConsumerConfig.get(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG));
+
+    Assert.assertEquals(ByteArrayDeserializer.class.getName(),
+        kafkaConsumerConfig.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG));
+
+    Assert.assertEquals(ByteArrayDeserializer.class.getName(),
+        
kafkaConsumerConfig.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG));
+
+    Assert.assertEquals(CLIENT_ID, 
kafkaConsumerConfig.get(ConsumerConfig.CLIENT_ID_CONFIG));
+
+    Assert.assertEquals(KafkaConsumerConfig.getConsumerGroupId(config),
+        kafkaConsumerConfig.get(ConsumerConfig.GROUP_ID_CONFIG));
+
+    
Assert.assertEquals(KafkaConsumerConfig.CONSUMER_CLIENT_ID_PREFIX.replace("-", 
"_") + "-jobName-1",
+        KafkaConsumerConfig.getConsumerClientId(config));
+    Assert.assertEquals("jobName-1", 
KafkaConsumerConfig.getConsumerGroupId(config));
+
+    props.put(JobConfig.JOB_ID(), "jobId");
+    config = new MapConfig(props);
+
+    
Assert.assertEquals(KafkaConsumerConfig.CONSUMER_CLIENT_ID_PREFIX.replace("-", 
"_") + "-jobName-jobId",
+        KafkaConsumerConfig.getConsumerClientId(config));
+    Assert.assertEquals("jobName-jobId", 
KafkaConsumerConfig.getConsumerGroupId(config));
+  }
+
+  // test stuff that should not be overridden
+  @Test
+  public void testNotOverride() {
+    Map<String, String> props = new HashMap<>();
+
+    // if KAFKA_CONSUMER_PROPERTY_PREFIX is not set, then PRODUCER should be 
used
+    props.put(KAFKA_PRODUCER_PROPERTY_PREFIX + 
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "useThis:9092");
+    props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + 
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+        TestKafkaConsumerConfig.class.getName());
+    props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + 
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+        TestKafkaConsumerConfig.class.getName());
+
+    props.put(JobConfig.JOB_NAME(), "jobName");
+
+    Config config = new MapConfig(props);
+    KafkaConsumerConfig kafkaConsumerConfig =
+        KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, SYSTEM_NAME, 
CLIENT_ID);
+
+    Assert.assertEquals("useThis:9092", 
kafkaConsumerConfig.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
+
+    Assert.assertEquals(TestKafkaConsumerConfig.class.getName(),
+        kafkaConsumerConfig.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG));
+
+    Assert.assertEquals(TestKafkaConsumerConfig.class.getName(),
+        
kafkaConsumerConfig.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG));
+  }
+
+  @Test
+  public void testGetConsumerClientId() {
+    Map<String, String> map = new HashMap<>();
+
+    map.put(JobConfig.JOB_NAME(), "jobName");
+    map.put(JobConfig.JOB_ID(), "jobId");
+    String result = KafkaConsumerConfig.getConsumerClientId("consumer", new 
MapConfig(map));
+    Assert.assertEquals("consumer-jobName-jobId", result);
+
+    result = KafkaConsumerConfig.getConsumerClientId("consumer-", new 
MapConfig(map));
+    Assert.assertEquals("consumer_-jobName-jobId", result);
+
+    result = KafkaConsumerConfig.getConsumerClientId("super-duper-consumer", 
new MapConfig(map));
+    Assert.assertEquals("super_duper_consumer-jobName-jobId", result);
+
+    map.put(JobConfig.JOB_NAME(), " very important!job");
+    result = KafkaConsumerConfig.getConsumerClientId("consumer", new 
MapConfig(map));
+    Assert.assertEquals("consumer-_very_important_job-jobId", result);
+
+    map.put(JobConfig.JOB_ID(), "number-#3");
+    result = KafkaConsumerConfig.getConsumerClientId("consumer", new 
MapConfig(map));
+    Assert.assertEquals("consumer-_very_important_job-number__3", result);
+  }
+
+  @Test(expected = SamzaException.class)
+  public void testNoBootstrapServers() {
+    KafkaConsumerConfig kafkaConsumerConfig =
+        KafkaConsumerConfig.getKafkaSystemConsumerConfig(new 
MapConfig(Collections.emptyMap()), SYSTEM_NAME,
+            "clientId");
+
+    Assert.fail("didn't get exception for the missing config:" + 
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
 
b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
index 77f47f9..7e968bf 100644
--- 
a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
+++ 
b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
@@ -19,17 +19,14 @@
 
 package org.apache.samza.system.kafka;
 
-import java.util.*;
 import java.util.HashMap;
 import java.util.Map;
-
+import java.util.Properties;
 import kafka.api.TopicMetadata;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.StreamValidationException;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.util.ScalaJavaUtil;
-import org.junit.After;
-import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
 
@@ -71,7 +68,6 @@ public class TestKafkaSystemAdminJava extends 
TestKafkaSystemAdmin {
 
     admin.createStream(spec);
     admin.validateStream(spec);
-
   }
 
   @Test
@@ -143,7 +139,8 @@ public class TestKafkaSystemAdminJava extends 
TestKafkaSystemAdmin {
   public void testCreateStream() {
     StreamSpec spec = new StreamSpec("testId", "testStream", "testSystem", 8);
 
-    assertTrue("createStream should return true if the stream does not exist 
and then is created.", systemAdmin().createStream(spec));
+    assertTrue("createStream should return true if the stream does not exist 
and then is created.",
+        systemAdmin().createStream(spec));
     systemAdmin().validateStream(spec);
 
     assertFalse("createStream should return false if the stream already 
exists.", systemAdmin().createStream(spec));
@@ -162,7 +159,8 @@ public class TestKafkaSystemAdminJava extends 
TestKafkaSystemAdmin {
     StreamSpec spec1 = new StreamSpec("testId", "testStreamPartition", 
"testSystem", 8);
     StreamSpec spec2 = new StreamSpec("testId", "testStreamPartition", 
"testSystem", 4);
 
-    assertTrue("createStream should return true if the stream does not exist 
and then is created.", systemAdmin().createStream(spec1));
+    assertTrue("createStream should return true if the stream does not exist 
and then is created.",
+        systemAdmin().createStream(spec1));
 
     systemAdmin().validateStream(spec2);
   }
@@ -172,7 +170,8 @@ public class TestKafkaSystemAdminJava extends 
TestKafkaSystemAdmin {
     StreamSpec spec1 = new StreamSpec("testId", "testStreamName1", 
"testSystem", 8);
     StreamSpec spec2 = new StreamSpec("testId", "testStreamName2", 
"testSystem", 8);
 
-    assertTrue("createStream should return true if the stream does not exist 
and then is created.", systemAdmin().createStream(spec1));
+    assertTrue("createStream should return true if the stream does not exist 
and then is created.",
+        systemAdmin().createStream(spec1));
 
     systemAdmin().validateStream(spec2);
   }
@@ -181,7 +180,8 @@ public class TestKafkaSystemAdminJava extends 
TestKafkaSystemAdmin {
   public void testClearStream() {
     StreamSpec spec = new StreamSpec("testId", "testStreamClear", 
"testSystem", 8);
 
-    assertTrue("createStream should return true if the stream does not exist 
and then is created.", systemAdmin().createStream(spec));
+    assertTrue("createStream should return true if the stream does not exist 
and then is created.",
+        systemAdmin().createStream(spec));
     assertTrue(systemAdmin().clearStream(spec));
 
     scala.collection.immutable.Set<String> topic = new 
scala.collection.immutable.Set.Set1<>(spec.getPhysicalName());

http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
deleted file mode 100644
index d510076..0000000
--- 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
+++ /dev/null
@@ -1,434 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.samza.system.kafka
-
-import java.nio.ByteBuffer
-import java.util.concurrent.CountDownLatch
-
-import kafka.api.{PartitionOffsetsResponse, _}
-import kafka.common.TopicAndPartition
-import kafka.consumer.SimpleConsumer
-import kafka.message.{ByteBufferMessageSet, Message, MessageAndOffset, 
MessageSet}
-import org.apache.kafka.common.protocol.Errors
-import org.apache.samza.SamzaException
-import org.apache.samza.util.Logging
-import org.junit.Assert._
-import org.junit._
-import org.mockito.Matchers._
-import org.mockito.Mockito._
-import org.mockito.invocation.InvocationOnMock
-import org.mockito.stubbing.Answer
-import org.mockito.{Matchers, Mockito}
-
-import scala.collection.JavaConverters._
-
-class TestBrokerProxy extends Logging {
-  val tp2 = new TopicAndPartition("Redbird", 2013)
-  var fetchTp1 = true // control whether fetching tp1 messages or not
-
-  @Test def brokerProxyRetrievesMessagesCorrectly() = {
-    val (bp, tp, sink) = getMockBrokerProxy()
-
-    bp.start
-    bp.addTopicPartition(tp, Option("0"))
-    // Add tp2, which should never receive messages since sink disables it.
-    bp.addTopicPartition(tp2, Option("0"))
-    Thread.sleep(1000)
-    assertEquals(2, sink.receivedMessages.size)
-    assertEquals(42, sink.receivedMessages(0)._2.offset)
-    assertEquals(84, sink.receivedMessages(1)._2.offset)
-  }
-
-  @Test def brokerProxySkipsFetchForEmptyRequests() = {
-    val (bp, tp, sink) = getMockBrokerProxy()
-
-    bp.start
-    // Only add tp2, which should never receive messages since sink disables 
it.
-    bp.addTopicPartition(tp2, Option("0"))
-    Thread.sleep(1000)
-    assertEquals(0, sink.receivedMessages.size)
-    assertTrue(bp.metrics.brokerSkippedFetchRequests.get((bp.host, 
bp.port)).getCount > 0)
-    assertEquals(0, bp.metrics.brokerReads.get((bp.host, bp.port)).getCount)
-  }
-
-  @Test def brokerProxyThrowsExceptionOnDuplicateTopicPartitions() = {
-    val (bp, tp, _) = getMockBrokerProxy()
-    bp.start
-    bp.addTopicPartition(tp, Option("0"))
-
-    try {
-      bp.addTopicPartition(tp, Option("1"))
-      fail("Should have thrown an exception")
-    } catch {
-      case se: SamzaException => assertEquals(se.getMessage, "Already 
consuming TopicPartition [Redbird,2012]")
-      case other: Exception => fail("Got some other exception than what we 
were expecting: " + other)
-    }
-  }
-
-  def getMockBrokerProxy() = {
-    val sink = new MessageSink {
-      val receivedMessages = new 
scala.collection.mutable.ListBuffer[(TopicAndPartition, MessageAndOffset, 
Boolean)]()
-
-      def abdicate(tp: TopicAndPartition, nextOffset: Long) {}
-
-      def refreshDropped() {}
-
-      def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, 
highWatermark: Long) {
-        receivedMessages += ((tp, msg, msg.offset.equals(highWatermark)))
-      }
-
-      def setIsAtHighWatermark(tp: TopicAndPartition, isAtHighWatermark: 
Boolean) {
-      }
-
-      // Never need messages for tp2.
-      def needsMoreMessages(tp: TopicAndPartition): Boolean = !tp.equals(tp2) 
&& fetchTp1
-    }
-
-    val system = "daSystem"
-    val host = "host"
-    val port = 2222
-    val tp = new TopicAndPartition("Redbird", 2012)
-    val metrics = new KafkaSystemConsumerMetrics(system)
-
-    metrics.registerBrokerProxy(host, port)
-    metrics.registerTopicAndPartition(tp)
-    metrics.topicPartitions.get((host, port)).set(1)
-
-    val bp = new BrokerProxy(
-      host,
-      port,
-      system,
-      "daClientId",
-      metrics,
-      sink,
-      offsetGetter = new GetOffset("fail", Map("Redbird" -> "largest"))) {
-
-      override val sleepMSWhileNoTopicPartitions = 100
-      // Speed up for test
-      var alreadyCreatedConsumer = false
-
-      // Scala traits and Mockito mocks don't mix, unfortunately.
-      override def createSimpleConsumer() = {
-        if (alreadyCreatedConsumer) {
-          System.err.println("Should only be creating one consumer in this 
test!")
-          throw new InterruptedException("Should only be creating one consumer 
in this test!")
-        }
-        alreadyCreatedConsumer = true
-
-        new DefaultFetchSimpleConsumer("a", 1, 2, 3, "b", new 
StreamFetchSizes(42)) {
-          val sc = Mockito.mock(classOf[SimpleConsumer])
-          val mockOffsetResponse = {
-            val offsetResponse = Mockito.mock(classOf[OffsetResponse])
-            val partitionOffsetResponse = {
-              val por = Mockito.mock(classOf[PartitionOffsetsResponse])
-              when(por.offsets).thenReturn(List(1l).toSeq)
-              por
-            }
-
-            val map = scala.Predef.Map[TopicAndPartition, 
PartitionOffsetsResponse](tp -> partitionOffsetResponse, tp2 -> 
partitionOffsetResponse)
-            when(offsetResponse.partitionErrorAndOffsets).thenReturn(map)
-            offsetResponse
-          }
-
-          
when(sc.getOffsetsBefore(any(classOf[OffsetRequest]))).thenReturn(mockOffsetResponse)
-
-          val fetchResponse = {
-            val fetchResponse = Mockito.mock(classOf[FetchResponse])
-
-            val messageSet = {
-              val messageSet = Mockito.mock(classOf[ByteBufferMessageSet])
-
-              def getMessage() = new Message(Mockito.mock(classOf[ByteBuffer]))
-              val messages = List(new MessageAndOffset(getMessage, 42), new 
MessageAndOffset(getMessage, 84))
-
-              when(messageSet.sizeInBytes).thenReturn(43)
-              when(messageSet.size).thenReturn(44)
-              when(messageSet.iterator).thenReturn(messages.iterator)
-              when(messageSet.head).thenReturn(messages.head)
-              messageSet
-            }
-
-            val fetchResponsePartitionData = 
FetchResponsePartitionData(Errors.NONE, 500, messageSet)
-            val map = scala.Predef.Map[TopicAndPartition, 
FetchResponsePartitionData](tp -> fetchResponsePartitionData)
-
-            when(fetchResponse.data).thenReturn(map.toSeq)
-            when(fetchResponse.messageSet(any(classOf[String]), 
any(classOf[Int]))).thenReturn(messageSet)
-            fetchResponse
-          }
-          when(sc.fetch(any(classOf[FetchRequest]))).thenReturn(fetchResponse)
-
-          override def close() = sc.close()
-
-          override def send(request: TopicMetadataRequest): 
TopicMetadataResponse = sc.send(request)
-
-          override def fetch(request: FetchRequest): FetchResponse = {
-            // Verify that we only get fetch requests for one tp, even though
-            // two were registered. This is to verify that
-            // sink.needsMoreMessages works.
-            assertEquals(1, request.requestInfo.size)
-            sc.fetch(request)
-          }
-
-          when(sc.earliestOrLatestOffset(any(classOf[TopicAndPartition]), 
any(classOf[Long]), any(classOf[Int]))).thenReturn(100)
-
-          override def getOffsetsBefore(request: OffsetRequest): 
OffsetResponse = sc.getOffsetsBefore(request)
-
-          override def commitOffsets(request: OffsetCommitRequest): 
OffsetCommitResponse = sc.commitOffsets(request)
-
-          override def fetchOffsets(request: OffsetFetchRequest): 
OffsetFetchResponse = sc.fetchOffsets(request)
-
-          override def earliestOrLatestOffset(topicAndPartition: 
TopicAndPartition, earliestOrLatest: Long, consumerId: Int): Long = 
sc.earliestOrLatestOffset(topicAndPartition, earliestOrLatest, consumerId)
-        }
-      }
-
-    }
-
-    (bp, tp, sink)
-  }
-
-  @Test def brokerProxyUpdateLatencyMetrics() = {
-    val (bp, tp, _) = getMockBrokerProxy()
-
-    bp.start
-    bp.addTopicPartition(tp, Option("0"))
-    Thread.sleep(1000)
-    // update when fetching messages
-    assertEquals(500, bp.metrics.highWatermark.get(tp).getValue)
-    assertEquals(415, bp.metrics.lag.get(tp).getValue)
-
-    fetchTp1 = false
-    Thread.sleep(1000)
-    // update when not fetching messages
-    assertEquals(100, bp.metrics.highWatermark.get(tp).getValue)
-    assertEquals(15, bp.metrics.lag.get(tp).getValue)
-
-    fetchTp1 = true
-  }
-
- @Test def brokerProxyCorrectlyHandlesOffsetOutOfRange(): Unit = {
-    // Need to wait for the thread to do some work before ending the test
-    val countdownLatch = new CountDownLatch(1)
-    var failString: String = null
-
-    val mockMessageSink = mock(classOf[MessageSink])
-    when(mockMessageSink.needsMoreMessages(any())).thenReturn(true)
-
-    val doNothingMetrics = new KafkaSystemConsumerMetrics()
-
-    val tp = new TopicAndPartition("topic", 42)
-
-    val mockOffsetGetter = mock(classOf[GetOffset])
-    // This will be used by the simple consumer below, and this is the 
response that simple consumer needs
-    
when(mockOffsetGetter.isValidOffset(any(classOf[DefaultFetchSimpleConsumer]), 
Matchers.eq(tp), Matchers.eq("0"))).thenReturn(true)
-    
when(mockOffsetGetter.getResetOffset(any(classOf[DefaultFetchSimpleConsumer]), 
Matchers.eq(tp))).thenReturn(1492l)
-
-    var callsToCreateSimpleConsumer = 0
-    val mockSimpleConsumer = mock(classOf[DefaultFetchSimpleConsumer])
-
-    // Create an answer that first indicates offset out of range on first 
invocation and on second
-    // verifies that the parameters have been updated to what we expect them 
to be
-    val answer = new Answer[FetchResponse]() {
-      var invocationCount = 0
-
-      def answer(invocation: InvocationOnMock): FetchResponse = {
-        val arguments = 
invocation.getArguments()(0).asInstanceOf[List[Object]](0).asInstanceOf[(String,
 Long)]
-
-        if (invocationCount == 0) {
-          if (arguments !=(tp, 0)) {
-            failString = "First invocation did not have the right arguments: " 
+ arguments
-            countdownLatch.countDown()
-          }
-          val mfr = mock(classOf[FetchResponse])
-          when(mfr.hasError).thenReturn(true)
-          when(mfr.error("topic", 42)).thenReturn(Errors.OFFSET_OUT_OF_RANGE)
-
-          val messageSet = mock(classOf[MessageSet])
-          when(messageSet.iterator).thenReturn(Iterator.empty)
-          val response = mock(classOf[FetchResponsePartitionData])
-          when(response.error).thenReturn(Errors.OFFSET_OUT_OF_RANGE)
-          val responseMap = Map(tp -> response)
-          when(mfr.data).thenReturn(responseMap.toSeq)
-          invocationCount += 1
-          mfr
-        } else {
-          if (arguments !=(tp, 1492)) {
-            failString = "On second invocation, arguments were not correct: " 
+ arguments
-          }
-          countdownLatch.countDown()
-          Thread.currentThread().interrupt()
-          null
-        }
-      }
-    }
-
-    when(mockSimpleConsumer.defaultFetch(any())).thenAnswer(answer)
-
-    // So now we have a fetch response that will fail.  Prime the 
mockGetOffset to send us to a new offset
-
-    val bp = new BrokerProxy("host", 423, "system", "clientID", 
doNothingMetrics, mockMessageSink, Int.MaxValue, 1024000, new 
StreamFetchSizes(256 * 1024), 524288, 1000, mockOffsetGetter) {
-
-      override def createSimpleConsumer() = {
-        if (callsToCreateSimpleConsumer > 1) {
-          failString = "Tried to create more than one simple consumer"
-          countdownLatch.countDown()
-        }
-        callsToCreateSimpleConsumer += 1
-        mockSimpleConsumer
-      }
-    }
-
-    bp.addTopicPartition(tp, Option("0"))
-    bp.start
-    countdownLatch.await()
-    bp.stop
-    if (failString != null) {
-      fail(failString)
-    }
-  }
-
-  /**
-   * Test that makes sure that BrokerProxy abdicates all TopicAndPartitions
-   * that it owns when a consumer failure occurs.
-   */
-  @Test def brokerProxyAbdicatesOnConnectionFailure(): Unit = {
-    val countdownLatch = new CountDownLatch(1)
-    var abdicated: Option[TopicAndPartition] = None
-    @volatile var refreshDroppedCount = 0
-    val mockMessageSink = new MessageSink {
-      override def setIsAtHighWatermark(tp: TopicAndPartition, 
isAtHighWatermark: Boolean) {
-      }
-
-      override def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, 
highWatermark: Long) {
-      }
-
-      override def abdicate(tp: TopicAndPartition, nextOffset: Long) {
-        abdicated = Some(tp)
-        countdownLatch.countDown
-      }
-
-      override def refreshDropped() {
-        refreshDroppedCount += 1
-      }
-
-      override def needsMoreMessages(tp: TopicAndPartition): Boolean = {
-        true
-      }
-    }
-
-    val doNothingMetrics = new KafkaSystemConsumerMetrics()
-    val tp = new TopicAndPartition("topic", 42)
-    val mockOffsetGetter = mock(classOf[GetOffset])
-    val mockSimpleConsumer = mock(classOf[DefaultFetchSimpleConsumer])
-
-    
when(mockOffsetGetter.isValidOffset(any(classOf[DefaultFetchSimpleConsumer]), 
Matchers.eq(tp), Matchers.eq("0"))).thenReturn(true)
-    
when(mockOffsetGetter.getResetOffset(any(classOf[DefaultFetchSimpleConsumer]), 
Matchers.eq(tp))).thenReturn(1492l)
-    when(mockSimpleConsumer.defaultFetch(any())).thenThrow(new 
SamzaException("Pretend this is a ClosedChannelException. Can't use 
ClosedChannelException because it's checked, and Mockito doesn't like that."))
-
-    val bp = new BrokerProxy("host", 567, "system", "clientID", 
doNothingMetrics, mockMessageSink, Int.MaxValue, 1024000, new 
StreamFetchSizes(256 * 1024), 524288, 1000, mockOffsetGetter) {
-      override def createSimpleConsumer() = {
-        mockSimpleConsumer
-      }
-    }
-
-    val waitForRefresh = () => {
-      val currentRefreshDroppedCount = refreshDroppedCount
-      while (refreshDroppedCount == currentRefreshDroppedCount) {
-        Thread.sleep(100)
-      }
-    }
-
-    bp.addTopicPartition(tp, Option("0"))
-    bp.start
-    // BP should refresh on startup.
-    waitForRefresh()
-    countdownLatch.await()
-    // BP should continue refreshing after it's abdicated all 
TopicAndPartitions.
-    waitForRefresh()
-    bp.stop
-    assertEquals(tp, abdicated.getOrElse(null))
-  }
-
-  @Test def brokerProxyAbdicatesHardErrors(): Unit = {
-    val doNothingMetrics = new KafkaSystemConsumerMetrics
-    val mockMessageSink = new MessageSink {
-      override def needsMoreMessages(tp: TopicAndPartition): Boolean = true
-      override def abdicate(tp: TopicAndPartition, nextOffset: Long) {}
-      override def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, 
highWatermark: Long) {}
-      override def refreshDropped() {throw new OutOfMemoryError("Test - OOME")}
-      override def setIsAtHighWatermark(tp: TopicAndPartition, 
isAtHighWatermark: Boolean): Unit = {}
-    }
-    val mockOffsetGetter = mock(classOf[GetOffset])
-    val mockSimpleConsumer = mock(classOf[DefaultFetchSimpleConsumer])
-
-    val bp = new BrokerProxy("host", 658, "system", "clientID", 
doNothingMetrics, mockMessageSink, Int.MaxValue, 1024000, new 
StreamFetchSizes(256 * 1024), 524288, 1000, mockOffsetGetter) {
-      override def createSimpleConsumer() = {
-        mockSimpleConsumer
-      }
-    }
-    var caughtError = false
-    try {
-      bp.thread.run
-    } catch {
-      case e: SamzaException => {
-        assertEquals(e.getMessage, "Got out of memory error in broker proxy 
thread.")
-        info("Received OutOfMemoryError in broker proxy.")
-        caughtError = true
-      }
-    }
-    assertEquals(true, caughtError)
-    val mockMessageSink2 = new MessageSink {
-      override def needsMoreMessages(tp: TopicAndPartition): Boolean = true
-      override def abdicate(tp: TopicAndPartition, nextOffset: Long): Unit = {}
-      override def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, 
highWatermark: Long): Unit = {}
-      override def refreshDropped(): Unit = {throw new 
StackOverflowError("Test - SOE")}
-      override def setIsAtHighWatermark(tp: TopicAndPartition, 
isAtHighWatermark: Boolean): Unit = {}
-    }
-    caughtError = false
-    val bp2 = new BrokerProxy("host", 689, "system", "clientID2", 
doNothingMetrics, mockMessageSink2, Int.MaxValue, 1024000, new 
StreamFetchSizes(256 * 1024), 524288, 1000, mockOffsetGetter) {
-      override def createSimpleConsumer() = {
-        mockSimpleConsumer
-      }
-    }
-    try {
-      bp2.thread.run
-    } catch {
-      case e: SamzaException => {
-        assertEquals(e.getMessage, "Got stack overflow error in broker proxy 
thread.")
-        info("Received StackOverflowError in broker proxy.")
-        caughtError = true
-      }
-    }
-    assertEquals(true, caughtError)
-  }
-
-  @Test
-       def brokerProxyStopCloseConsumer: Unit = {
-    val mockSimpleConsumer = mock(classOf[DefaultFetchSimpleConsumer])
-    val bp = new BrokerProxy("host", 0, "system", "clientID", new 
KafkaSystemConsumerMetrics(), null){
-      override def createSimpleConsumer() = {
-        mockSimpleConsumer
-      }
-    }
-    bp.start
-    bp.stop
-    verify(mockSimpleConsumer).close
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
new file mode 100644
index 0000000..5791545
--- /dev/null
+++ 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
@@ -0,0 +1,220 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.samza.system.kafka;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.samza.Partition;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.KafkaConfig;
+import org.apache.samza.config.KafkaConsumerConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.apache.samza.util.NoOpMetricsRegistry;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+
+public class TestKafkaSystemConsumer {
+  public final String TEST_SYSTEM = "test-system";
+  public final String TEST_STREAM = "test-stream";
+  public final String TEST_CLIENT_ID = "testClientId";
+  public final String BOOTSTRAP_SERVER = "127.0.0.1:8888";
+  public final String FETCH_THRESHOLD_MSGS = "50000";
+  public final String FETCH_THRESHOLD_BYTES = "100000";
+
+  private KafkaSystemConsumer createConsumer(String fetchMsg, String 
fetchBytes) {
+    final Map<String, String> map = new HashMap<>();
+
+    map.put(String.format(KafkaConfig.CONSUMER_FETCH_THRESHOLD(), 
TEST_SYSTEM), fetchMsg);
+    map.put(String.format(KafkaConfig.CONSUMER_FETCH_THRESHOLD_BYTES(), 
TEST_SYSTEM), fetchBytes);
+    map.put(String.format("systems.%s.consumer.%s", TEST_SYSTEM, 
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG),
+        BOOTSTRAP_SERVER);
+    map.put(JobConfig.JOB_NAME(), "jobName");
+
+    Config config = new MapConfig(map);
+    KafkaConsumerConfig consumerConfig =
+        KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, TEST_SYSTEM, 
TEST_CLIENT_ID);
+    final KafkaConsumer<byte[], byte[]> kafkaConsumer = new 
MockKafkaConsumer(consumerConfig);
+
+    MockKafkaSystemConsumer newKafkaSystemConsumer =
+        new MockKafkaSystemConsumer(kafkaConsumer, TEST_SYSTEM, config, 
TEST_CLIENT_ID,
+            new KafkaSystemConsumerMetrics(TEST_SYSTEM, new 
NoOpMetricsRegistry()), System::currentTimeMillis);
+
+    return newKafkaSystemConsumer;
+  }
+
+  @Test
+  public void testConfigValidations() {
+
+    final KafkaSystemConsumer consumer = createConsumer(FETCH_THRESHOLD_MSGS, 
FETCH_THRESHOLD_BYTES);
+
+    consumer.start();
+    // should be no failures
+  }
+
+  @Test
+  public void testFetchThresholdShouldDivideEvenlyAmongPartitions() {
+    final KafkaSystemConsumer consumer = createConsumer(FETCH_THRESHOLD_MSGS, 
FETCH_THRESHOLD_BYTES);
+    final int partitionsNum = 50;
+    for (int i = 0; i < partitionsNum; i++) {
+      consumer.register(new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, 
new Partition(i)), "0");
+    }
+
+    consumer.start();
+
+    Assert.assertEquals(Long.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum, 
consumer.perPartitionFetchThreshold);
+    Assert.assertEquals(Long.valueOf(FETCH_THRESHOLD_BYTES) / 2 / 
partitionsNum,
+        consumer.perPartitionFetchThresholdBytes);
+
+    consumer.stop();
+  }
+
+  @Test
+  public void testConsumerRegisterOlderOffsetOfTheSamzaSSP() {
+
+    KafkaSystemConsumer consumer = createConsumer(FETCH_THRESHOLD_MSGS, 
FETCH_THRESHOLD_BYTES);
+
+    SystemStreamPartition ssp0 = new SystemStreamPartition(TEST_SYSTEM, 
TEST_STREAM, new Partition(0));
+    SystemStreamPartition ssp1 = new SystemStreamPartition(TEST_SYSTEM, 
TEST_STREAM, new Partition(1));
+    SystemStreamPartition ssp2 = new SystemStreamPartition(TEST_SYSTEM, 
TEST_STREAM, new Partition(2));
+
+    consumer.register(ssp0, "0");
+    consumer.register(ssp0, "5");
+    consumer.register(ssp1, "2");
+    consumer.register(ssp1, "3");
+    consumer.register(ssp2, "0");
+
+    assertEquals("0", 
consumer.topicPartitionsToOffset.get(KafkaSystemConsumer.toTopicPartition(ssp0)));
+    assertEquals("2", 
consumer.topicPartitionsToOffset.get(KafkaSystemConsumer.toTopicPartition(ssp1)));
+    assertEquals("0", 
consumer.topicPartitionsToOffset.get(KafkaSystemConsumer.toTopicPartition(ssp2)));
+  }
+
+  @Test
+  public void testFetchThresholdBytes() {
+
+    SystemStreamPartition ssp0 = new SystemStreamPartition(TEST_SYSTEM, 
TEST_STREAM, new Partition(0));
+    SystemStreamPartition ssp1 = new SystemStreamPartition(TEST_SYSTEM, 
TEST_STREAM, new Partition(1));
+    int partitionsNum = 2;
+    int ime0Size = Integer.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum; // 
fake size
+    int ime1Size = Integer.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum - 1; 
// fake size
+    int ime11Size = 20;
+    ByteArraySerializer bytesSerde = new ByteArraySerializer();
+    IncomingMessageEnvelope ime0 = new IncomingMessageEnvelope(ssp0, "0", 
bytesSerde.serialize("", "key0".getBytes()),
+        bytesSerde.serialize("", "value0".getBytes()), ime0Size);
+    IncomingMessageEnvelope ime1 = new IncomingMessageEnvelope(ssp1, "0", 
bytesSerde.serialize("", "key1".getBytes()),
+        bytesSerde.serialize("", "value1".getBytes()), ime1Size);
+    IncomingMessageEnvelope ime11 = new IncomingMessageEnvelope(ssp1, "0", 
bytesSerde.serialize("", "key11".getBytes()),
+        bytesSerde.serialize("", "value11".getBytes()), ime11Size);
+    KafkaSystemConsumer consumer = createConsumer(FETCH_THRESHOLD_MSGS, 
FETCH_THRESHOLD_BYTES);
+
+    consumer.register(ssp0, "0");
+    consumer.register(ssp1, "0");
+    consumer.start();
+    consumer.messageSink.addMessage(ssp0, ime0);
+    // queue for ssp0 should be full now, because we added message of size 
FETCH_THRESHOLD_MSGS/partitionsNum
+    Assert.assertEquals(false, consumer.messageSink.needsMoreMessages(ssp0));
+    consumer.messageSink.addMessage(ssp1, ime1);
+    // queue for ssp1 should be less then full now, because we added message 
of size (FETCH_THRESHOLD_MSGS/partitionsNum - 1)
+    Assert.assertEquals(true, consumer.messageSink.needsMoreMessages(ssp1));
+    consumer.messageSink.addMessage(ssp1, ime11);
+    // queue for ssp1 should full now, because we added message of size 20 on 
top
+    Assert.assertEquals(false, consumer.messageSink.needsMoreMessages(ssp1));
+
+    Assert.assertEquals(1, consumer.getNumMessagesInQueue(ssp0));
+    Assert.assertEquals(2, consumer.getNumMessagesInQueue(ssp1));
+    Assert.assertEquals(ime0Size, consumer.getMessagesSizeInQueue(ssp0));
+    Assert.assertEquals(ime1Size + ime11Size, 
consumer.getMessagesSizeInQueue(ssp1));
+
+    consumer.stop();
+  }
+
+  @Test
+  public void testFetchThresholdBytesDiabled() {
+    // Pass 0 as fetchThresholdByBytes, which disables checking for limit by 
size
+
+    SystemStreamPartition ssp0 = new SystemStreamPartition(TEST_SYSTEM, 
TEST_STREAM, new Partition(0));
+    SystemStreamPartition ssp1 = new SystemStreamPartition(TEST_SYSTEM, 
TEST_STREAM, new Partition(1));
+    int partitionsNum = 2;
+    int ime0Size = Integer.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum; // 
fake size, upto the limit
+    int ime1Size = Integer.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum - 
100; // fake size, below the limit
+    int ime11Size = 20;// event with the second message still below the size 
limit
+    ByteArraySerializer bytesSerde = new ByteArraySerializer();
+    IncomingMessageEnvelope ime0 = new IncomingMessageEnvelope(ssp0, "0", 
bytesSerde.serialize("", "key0".getBytes()),
+        bytesSerde.serialize("", "value0".getBytes()), ime0Size);
+    IncomingMessageEnvelope ime1 = new IncomingMessageEnvelope(ssp1, "0", 
bytesSerde.serialize("", "key1".getBytes()),
+        bytesSerde.serialize("", "value1".getBytes()), ime1Size);
+    IncomingMessageEnvelope ime11 = new IncomingMessageEnvelope(ssp1, "0", 
bytesSerde.serialize("", "key11".getBytes()),
+        bytesSerde.serialize("", "value11".getBytes()), ime11Size);
+
+    // limit by number of messages 4/2 = 2 per partition
+    // limit by number of bytes - disabled
+    KafkaSystemConsumer consumer = createConsumer("4", "0"); // should disable
+
+    consumer.register(ssp0, "0");
+    consumer.register(ssp1, "0");
+    consumer.start();
+    consumer.messageSink.addMessage(ssp0, ime0);
+    // should be full by size, but not full by number of messages (1 of 2)
+    Assert.assertEquals(true, consumer.messageSink.needsMoreMessages(ssp0));
+    consumer.messageSink.addMessage(ssp1, ime1);
+    // not full neither by size nor by messages
+    Assert.assertEquals(true, consumer.messageSink.needsMoreMessages(ssp1));
+    consumer.messageSink.addMessage(ssp1, ime11);
+    // not full by size, but should be full by messages
+    Assert.assertEquals(false, consumer.messageSink.needsMoreMessages(ssp1));
+
+    Assert.assertEquals(1, consumer.getNumMessagesInQueue(ssp0));
+    Assert.assertEquals(2, consumer.getNumMessagesInQueue(ssp1));
+    Assert.assertEquals(ime0Size, consumer.getMessagesSizeInQueue(ssp0));
+    Assert.assertEquals(ime1Size + ime11Size, 
consumer.getMessagesSizeInQueue(ssp1));
+
+    consumer.stop();
+  }
+
+  // mock kafkaConsumer and SystemConsumer
+  static class MockKafkaConsumer extends KafkaConsumer {
+    public MockKafkaConsumer(Map<String, Object> configs) {
+      super(configs);
+    }
+  }
+
+  static class MockKafkaSystemConsumer extends KafkaSystemConsumer {
+    public MockKafkaSystemConsumer(Consumer kafkaConsumer, String systemName, 
Config config, String clientId,
+        KafkaSystemConsumerMetrics metrics, Clock clock) {
+      super(kafkaConsumer, systemName, config, clientId, metrics, clock);
+    }
+
+    @Override
+    void startConsumer() {
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
deleted file mode 100644
index 8656d10..0000000
--- 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.system.kafka
-
-import kafka.api.TopicMetadata
-import kafka.api.PartitionMetadata
-import kafka.cluster.Broker
-import kafka.common.TopicAndPartition
-import kafka.message.Message
-import kafka.message.MessageAndOffset
-import org.apache.kafka.common.protocol.Errors
-import org.apache.samza.system.IncomingMessageEnvelope
-import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.Partition
-import org.apache.samza.util.TopicMetadataStore
-import org.junit.Test
-import org.junit.Assert._
-import org.apache.samza.system.SystemAdmin
-import org.mockito.Mockito._
-import org.mockito.Matchers._
-
-class TestKafkaSystemConsumer {
-  val systemAdmin: SystemAdmin = mock(classOf[KafkaSystemAdmin])
-  private val SSP: SystemStreamPartition = new SystemStreamPartition("test", 
"test", new Partition(0))
-  private val envelope: IncomingMessageEnvelope = new 
IncomingMessageEnvelope(SSP, null, null, null)
-  private val envelopeWithSize: IncomingMessageEnvelope = new 
IncomingMessageEnvelope(SSP, null, null, null, 100)
-  private val clientId = "TestClientId"
-
-  @Test
-  def testFetchThresholdShouldDivideEvenlyAmongPartitions {
-    val metadataStore = new MockMetadataStore
-    val consumer = new KafkaSystemConsumer("", systemAdmin, new 
KafkaSystemConsumerMetrics, metadataStore, clientId, fetchThreshold = 50000) {
-      override def refreshBrokers {
-      }
-    }
-
-    for (i <- 0 until 50) {
-      consumer.register(new SystemStreamPartition("test-system", 
"test-stream", new Partition(i)), "0")
-    }
-
-    consumer.start
-
-    assertEquals(1000, consumer.perPartitionFetchThreshold)
-  }
-
-  @Test
-  def testBrokerCreationShouldTriggerStart {
-    val systemName = "test-system"
-    val streamName = "test-stream"
-    val metrics = new KafkaSystemConsumerMetrics
-    // Lie and tell the store that the partition metadata is empty. We can't
-    // use partition metadata because it has Broker in its constructor, which
-    // is package private to Kafka.
-    val metadataStore = new MockMetadataStore(Map(streamName -> 
TopicMetadata(streamName, Seq.empty, Errors.NONE)))
-    var hosts = List[String]()
-    var getHostPortCount = 0
-    val consumer = new KafkaSystemConsumer(systemName, systemAdmin, metrics, 
metadataStore, clientId) {
-      override def getLeaderHostPort(partitionMetadata: 
Option[PartitionMetadata]): Option[(String, Int)] = {
-        // Generate a unique host every time getHostPort is called.
-        getHostPortCount += 1
-        Some("localhost-%s" format getHostPortCount, 0)
-      }
-
-      override def createBrokerProxy(host: String, port: Int): BrokerProxy = {
-        new BrokerProxy(host, port, systemName, "", metrics, sink) {
-          override def addTopicPartition(tp: TopicAndPartition, nextOffset: 
Option[String]) = {
-            // Skip this since we normally do verification of offsets, which
-            // tries to connect to Kafka. Rather than mock that, just forget 
it.
-            nextOffsets.size
-          }
-
-          override def start {
-            hosts :+= host
-          }
-        }
-      }
-    }
-
-    consumer.register(new SystemStreamPartition(systemName, streamName, new 
Partition(0)), "1")
-    assertEquals(0, hosts.size)
-    consumer.start
-    assertEquals(List("localhost-1"), hosts)
-    // Should trigger a refresh with a new host.
-    consumer.sink.abdicate(new TopicAndPartition(streamName, 0), 2)
-    assertEquals(List("localhost-1", "localhost-2"), hosts)
-  }
-
-  @Test
-  def testConsumerRegisterOlderOffsetOfTheSamzaSSP {
-    when(systemAdmin.offsetComparator(anyString, 
anyString)).thenCallRealMethod()
-
-    val metadataStore = new MockMetadataStore
-    val consumer = new KafkaSystemConsumer("", systemAdmin, new 
KafkaSystemConsumerMetrics, metadataStore, clientId, fetchThreshold = 50000)
-    val ssp0 = new SystemStreamPartition("test-system", "test-stream", new 
Partition(0))
-    val ssp1 = new SystemStreamPartition("test-system", "test-stream", new 
Partition(1))
-    val ssp2 = new SystemStreamPartition("test-system", "test-stream", new 
Partition(2))
-
-    consumer.register(ssp0, "0")
-    consumer.register(ssp0, "5")
-    consumer.register(ssp1, "2")
-    consumer.register(ssp1, "3")
-    consumer.register(ssp2, "0")
-
-    assertEquals("0", 
consumer.topicPartitionsAndOffsets(KafkaSystemConsumer.toTopicAndPartition(ssp0)))
-    assertEquals("2", 
consumer.topicPartitionsAndOffsets(KafkaSystemConsumer.toTopicAndPartition(ssp1)))
-    assertEquals("0", 
consumer.topicPartitionsAndOffsets(KafkaSystemConsumer.toTopicAndPartition(ssp2)))
-  }
-
-  @Test
-  def testFetchThresholdBytesShouldDivideEvenlyAmongPartitions {
-    val metadataStore = new MockMetadataStore
-    val consumer = new KafkaSystemConsumer("", systemAdmin, new 
KafkaSystemConsumerMetrics, metadataStore, clientId,
-      fetchThreshold = 50000, fetchThresholdBytes = 60000L, 
fetchLimitByBytesEnabled = true) {
-      override def refreshBrokers {
-      }
-    }
-
-    for (i <- 0 until 10) {
-      consumer.register(new SystemStreamPartition("test-system", 
"test-stream", new Partition(i)), "0")
-    }
-
-    consumer.start
-
-    assertEquals(5000, consumer.perPartitionFetchThreshold)
-    assertEquals(3000, consumer.perPartitionFetchThresholdBytes)
-  }
-
-  @Test
-  def testFetchThresholdBytes {
-    val metadataStore = new MockMetadataStore
-    val consumer = new KafkaSystemConsumer("test-system", systemAdmin, new 
KafkaSystemConsumerMetrics, metadataStore, clientId,
-      fetchThreshold = 50000, fetchThresholdBytes = 60000L, 
fetchLimitByBytesEnabled = true) {
-      override def refreshBrokers {
-      }
-    }
-
-    for (i <- 0 until 10) {
-      consumer.register(new SystemStreamPartition("test-system", 
"test-stream", new Partition(i)), "0")
-    }
-
-    consumer.start
-
-    val msg = Array[Byte](5, 112, 9, 126)
-    val msgAndOffset: MessageAndOffset = MessageAndOffset(new Message(msg), 
887654)
-    // 4 data + 18 Message overhead + 80 IncomingMessageEnvelope overhead
-    consumer.sink.addMessage(new TopicAndPartition("test-stream", 0),  
msgAndOffset, 887354)
-
-    assertEquals(106, consumer.getMessagesSizeInQueue(new 
SystemStreamPartition("test-system", "test-stream", new Partition(0))))
-  }
-
-  @Test
-  def testFetchThresholdBytesDisabled {
-    val metadataStore = new MockMetadataStore
-    val consumer = new KafkaSystemConsumer("", systemAdmin, new 
KafkaSystemConsumerMetrics, metadataStore, clientId,
-      fetchThreshold = 50000, fetchThresholdBytes = 60000L) {
-      override def refreshBrokers {
-      }
-    }
-
-    for (i <- 0 until 10) {
-      consumer.register(new SystemStreamPartition("test-system", 
"test-stream", new Partition(i)), "0")
-    }
-
-    consumer.start
-
-    assertEquals(5000, consumer.perPartitionFetchThreshold)
-    assertEquals(0, consumer.perPartitionFetchThresholdBytes)
-    assertEquals(0, consumer.getMessagesSizeInQueue(new 
SystemStreamPartition("test-system", "test-stream", new Partition(0))))
-  }
-}
-
-class MockMetadataStore(var metadata: Map[String, TopicMetadata] = Map()) 
extends TopicMetadataStore {
-  def getTopicInfo(topics: Set[String]): Map[String, TopicMetadata] = metadata
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
 
b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
index 864d2e5..8405c63 100644
--- 
a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
+++ 
b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
@@ -37,7 +37,7 @@ import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.samza.config._
 import org.apache.samza.container.TaskName
-import org.apache.samza.job.local.ThreadJobFactory
+import org.apache.samza.job.local.{ThreadJob, ThreadJobFactory}
 import org.apache.samza.job.model.{ContainerModel, JobModel}
 import org.apache.samza.job.{ApplicationStatus, JobRunner, StreamJob}
 import org.apache.samza.metrics.MetricsRegistryMap
@@ -223,6 +223,13 @@ class StreamTaskTestUtil {
    * interrupt, which is forwarded on to ThreadJob, and marked as a failure).
    */
   def stopJob(job: StreamJob) {
+    // make sure we don't kill the job before it was started.
+    // eventProcesses guarantees all the consumers have been initialized
+    val tasks = TestTask.tasks
+    val task = tasks.values.toList.head
+    task.eventProcessed.await(60, TimeUnit.SECONDS)
+    assertEquals(0, task.eventProcessed.getCount)
+
     // Shutdown task.
     job.kill
     val status = job.waitForFinish(60000)
@@ -279,7 +286,10 @@ class StreamTaskTestUtil {
     val taskConfig = new TaskConfig(jobModel.getConfig)
     val checkpointManager = taskConfig.getCheckpointManager(new 
MetricsRegistryMap())
     checkpointManager match {
-      case Some(checkpointManager) => checkpointManager.createResources
+      case Some(checkpointManager) => {
+        checkpointManager.createResources
+        checkpointManager.stop
+      }
       case _ => assert(checkpointManager != null, "No checkpoint manager 
factory configured")
     }
 
@@ -323,6 +333,7 @@ object TestTask {
 abstract class TestTask extends StreamTask with InitableTask {
   var received = ArrayBuffer[String]()
   val initFinished = new CountDownLatch(1)
+  val eventProcessed = new CountDownLatch(1)
   @volatile var gotMessage = new CountDownLatch(1)
 
   def init(config: Config, context: TaskContext) {
@@ -334,6 +345,8 @@ abstract class TestTask extends StreamTask with 
InitableTask {
   def process(envelope: IncomingMessageEnvelope, collector: MessageCollector, 
coordinator: TaskCoordinator) {
     val msg = envelope.getMessage.asInstanceOf[String]
 
+    eventProcessed.countDown()
+
     System.err.println("TestTask.process(): %s" format msg)
 
     received += msg

http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala
 
b/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala
index e4d47d1..ccb7cd4 100644
--- 
a/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala
+++ 
b/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala
@@ -77,7 +77,6 @@ class TestShutdownStatefulTask extends StreamTaskTestUtil {
     val (job, task) = startJob
 
     // Validate that restored is empty.
-    assertEquals(0, task.initFinished.getCount)
     assertEquals(0, task.asInstanceOf[ShutdownStateStoreTask].restored.size)
     assertEquals(0, task.received.size)
 
@@ -88,7 +87,6 @@ class TestShutdownStatefulTask extends StreamTaskTestUtil {
     send(task, "2")
     send(task, "99")
     send(task, "99")
-
     stopJob(job)
 
   }
@@ -120,7 +118,7 @@ class ShutdownStateStoreTask extends TestTask {
       .asInstanceOf[KeyValueStore[String, String]]
     val iter = store.all
     iter.asScala.foreach( p => restored += (p.getKey -> p.getValue))
-    System.err.println("ShutdownStateStoreTask.createStream(): %s" format 
restored)
+    System.out.println("ShutdownStateStoreTask.createStream(): %s" format 
restored)
     iter.close
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
 
b/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
index 0b405f0..b30b896 100644
--- 
a/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
+++ 
b/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
@@ -157,7 +157,7 @@ public class YarnJobValidationTool {
     coordinatorStreamManager.start();
     coordinatorStreamManager.bootstrap();
     ChangelogStreamManager changelogStreamManager = new 
ChangelogStreamManager(coordinatorStreamManager);
-    JobModelManager jobModelManager = 
JobModelManager.apply(coordinatorStreamManager, 
changelogStreamManager.readPartitionMapping());
+    JobModelManager jobModelManager = 
JobModelManager.apply(coordinatorStreamManager.getConfig(), 
changelogStreamManager.readPartitionMapping());
     validator.init(config);
     Map<String, String> jmxUrls = 
jobModelManager.jobModel().getAllContainerToHostValues(SetContainerHostMapping.JMX_TUNNELING_URL_KEY);
     for (Map.Entry<String, String> entry : jmxUrls.entrySet()) {

http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala
 
b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala
index da23b91..1ad4522 100644
--- 
a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala
+++ 
b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala
@@ -106,7 +106,9 @@ class TestSamzaYarnAppMasterService {
     coordinatorStreamManager.start
     coordinatorStreamManager.bootstrap
     val changelogPartitionManager = new 
ChangelogStreamManager(coordinatorStreamManager)
-    JobModelManager(coordinatorStreamManager, 
changelogPartitionManager.readPartitionMapping())
+    val jobModelManager = JobModelManager(coordinatorStreamManager.getConfig, 
changelogPartitionManager.readPartitionMapping())
+    coordinatorStreamManager.stop()
+    jobModelManager
   }
 
   private def getDummyConfig: Config = new MapConfig(Map[String, String](

Reply via email to