Repository: samza
Updated Branches:
  refs/heads/master 2e04e1772 -> 75e70e569


http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
index 264966d..877adc5 100644
--- 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
+++ 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
@@ -27,7 +27,6 @@ import org.apache.samza.Partition;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.coordinator.StreamPartitionCountMonitor;
-import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory;
 import org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.system.MockSystemFactory;
@@ -72,8 +71,7 @@ public class TestClusterBasedJobCoordinator {
     Config config = new MapConfig(configMap);
 
     // mimic job runner code to write the config to coordinator stream
-    CoordinatorStreamSystemFactory coordinatorFactory = new 
CoordinatorStreamSystemFactory();
-    CoordinatorStreamSystemProducer producer = 
coordinatorFactory.getCoordinatorStreamSystemProducer(config, 
mock(MetricsRegistry.class));
+    CoordinatorStreamSystemProducer producer = new 
CoordinatorStreamSystemProducer(config, mock(MetricsRegistry.class));
     producer.writeConfig("test-job", config);
 
     ClusterBasedJobCoordinator clusterCoordinator = new 
ClusterBasedJobCoordinator(config);
@@ -91,8 +89,7 @@ public class TestClusterBasedJobCoordinator {
     Config config = new MapConfig(configMap);
 
     // mimic job runner code to write the config to coordinator stream
-    CoordinatorStreamSystemFactory coordinatorFactory = new 
CoordinatorStreamSystemFactory();
-    CoordinatorStreamSystemProducer producer = 
coordinatorFactory.getCoordinatorStreamSystemProducer(config, 
mock(MetricsRegistry.class));
+    CoordinatorStreamSystemProducer producer = new 
CoordinatorStreamSystemProducer(config, mock(MetricsRegistry.class));
     producer.writeConfig("test-job", config);
 
     ClusterBasedJobCoordinator clusterCoordinator = new 
ClusterBasedJobCoordinator(config);

http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java 
b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
index 0a3e9c8..db8ab19 100644
--- 
a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
+++ 
b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
@@ -34,6 +34,7 @@ import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemAdmins;
 import org.apache.samza.system.SystemStreamMetadata;
 import org.apache.samza.system.SystemStreamPartition;
 import org.junit.Before;
@@ -233,7 +234,7 @@ public class TestExecutionPlanner {
     systemAdmins = new HashMap<>();
     systemAdmins.put("system1", systemAdmin1);
     systemAdmins.put("system2", systemAdmin2);
-    streamManager = new StreamManager(systemAdmins);
+    streamManager = new StreamManager(new SystemAdmins(systemAdmins));
 
     runner = mock(ApplicationRunner.class);
     when(runner.getStreamSpec("input1")).thenReturn(input1);

http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
 
b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
index b48c82d..3c2ba70 100644
--- 
a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
+++ 
b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
@@ -36,6 +36,7 @@ import org.apache.samza.serializers.Serde;
 import org.apache.samza.serializers.StringSerde;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemAdmins;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.junit.Test;
 
@@ -113,7 +114,7 @@ public class TestJobGraphJsonGenerator {
     SystemAdmin systemAdmin2 = createSystemAdmin(system2Map);
     systemAdmins.put("system1", systemAdmin1);
     systemAdmins.put("system2", systemAdmin2);
-    StreamManager streamManager = new StreamManager(systemAdmins);
+    StreamManager streamManager = new StreamManager(new 
SystemAdmins(systemAdmins));
 
     StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config);
     streamGraph.setDefaultSerde(KVSerde.of(new NoOpSerde<>(), new 
NoOpSerde<>()));
@@ -186,7 +187,7 @@ public class TestJobGraphJsonGenerator {
     SystemAdmin systemAdmin2 = createSystemAdmin(system2Map);
     systemAdmins.put("hdfs", systemAdmin1);
     systemAdmins.put("kafka", systemAdmin2);
-    StreamManager streamManager = new StreamManager(systemAdmins);
+    StreamManager streamManager = new StreamManager(new 
SystemAdmins(systemAdmins));
 
     StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config);
     MessageStream<KV<String, PageViewEvent>> inputStream = 
streamGraph.getInputStream("PageView");

http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/test/java/org/apache/samza/execution/TestStreamManager.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/execution/TestStreamManager.java 
b/samza-core/src/test/java/org/apache/samza/execution/TestStreamManager.java
index dc36df8..ed28067 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestStreamManager.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestStreamManager.java
@@ -28,6 +28,7 @@ import org.apache.samza.config.MapConfig;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemAdmins;
 import org.apache.samza.system.SystemStreamMetadata;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
@@ -58,7 +59,7 @@ public class TestStreamManager {
     sysAdmins.put(SYSTEM1, admin1);
     sysAdmins.put(SYSTEM2, admin2);
 
-    StreamManager manager = new StreamManager(sysAdmins);
+    StreamManager manager = new StreamManager(new SystemAdmins(sysAdmins));
     manager.createStreams(specList);
 
     ArgumentCaptor<StreamSpec> captor = 
ArgumentCaptor.forClass(StreamSpec.class);
@@ -95,7 +96,7 @@ public class TestStreamManager {
     Set<String> streams = new HashSet<>();
     streams.add(STREAM1);
     streams.add(STREAM2);
-    StreamManager manager = new StreamManager(sysAdmins);
+    StreamManager manager = new StreamManager(new SystemAdmins(sysAdmins));
     Map<String, Integer> counts = manager.getStreamPartitionCounts(SYSTEM1, 
streams);
 
     assertTrue(counts.get(STREAM1).equals(1));
@@ -131,7 +132,7 @@ public class TestStreamManager {
     config.put("stores.test-store.factory", "dummyfactory");
     config.put("stores.test-store.changelog", SYSTEM2 + "." + STREAM2);
 
-    StreamManager manager = new StreamManager(sysAdmins);
+    StreamManager manager = new StreamManager(new SystemAdmins(sysAdmins));
     manager.clearStreamsFromPreviousRun(new MapConfig(config));
 
     ArgumentCaptor<StreamSpec> captor = 
ArgumentCaptor.forClass(StreamSpec.class);

http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java
 
b/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java
index eb0ebe9..870d586 100644
--- 
a/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java
+++ 
b/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java
@@ -93,11 +93,13 @@ public class TestApplicationRunnerMain {
 
     @Override
     public void run(StreamApplication streamApp) {
+      super.run(streamApp);
       runCount++;
     }
 
     @Override
     public void kill(StreamApplication streamApp) {
+      super.kill(streamApp);
       killCount++;
     }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 
b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
index a4f2328..64c0088 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
@@ -20,15 +20,12 @@
 package org.apache.samza.checkpoint
 
 import java.util
-import java.util.Collections
-import java.util.Collections.EmptyMap
-
 import org.apache.samza.container.TaskName
 import org.apache.samza.Partition
 import org.apache.samza.system._
 import org.apache.samza.system.SystemStreamMetadata.{OffsetType, 
SystemStreamPartitionMetadata}
 import org.junit.Assert._
-import org.junit.{Ignore, Test}
+import org.junit.Test
 import org.apache.samza.SamzaException
 import org.apache.samza.config.MapConfig
 import org.scalatest.Assertions.intercept
@@ -64,7 +61,7 @@ class TestOffsetManager {
     val config = new MapConfig
     val checkpointManager = getCheckpointManager(systemStreamPartition, 
taskName)
     val systemAdmins = Map("test-system" -> getSystemAdmin)
-    val offsetManager = OffsetManager(systemStreamMetadata, config, 
checkpointManager, systemAdmins, Map(), new OffsetManagerMetrics)
+    val offsetManager = OffsetManager(systemStreamMetadata, config, 
checkpointManager, new SystemAdmins(systemAdmins.asJava), Map(), new 
OffsetManagerMetrics)
     offsetManager.register(taskName, Set(systemStreamPartition))
     offsetManager.start
     assertTrue(checkpointManager.isStarted)
@@ -97,7 +94,7 @@ class TestOffsetManager {
     val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
     val config = new MapConfig
     val checkpointManager = getCheckpointManager(systemStreamPartition, 
taskName)
-    val systemAdmins = Map("test-system" -> getSystemAdmin)
+    val systemAdmins = new SystemAdmins(Map("test-system" -> 
getSystemAdmin).asJava)
     val offsetManager = OffsetManager(systemStreamMetadata, config, 
checkpointManager, systemAdmins, Map(), new OffsetManagerMetrics)
     offsetManager.register(taskName, Set(systemStreamPartition))
     offsetManager.start
@@ -155,7 +152,7 @@ class TestOffsetManager {
     val checkpointManager = getCheckpointManager(systemStreamPartition1, 
taskName1)
     val systemAdmins = Map("test-system" -> getSystemAdmin)
     val config = new MapConfig
-    val offsetManager = OffsetManager(systemStreamMetadata, config, 
checkpointManager, systemAdmins)
+    val offsetManager = OffsetManager(systemStreamMetadata, config, 
checkpointManager, new SystemAdmins(systemAdmins.asJava))
     // Register both partitions. Partition 2 shouldn't have a checkpoint.
     offsetManager.register(taskName1, Set(systemStreamPartition1))
     offsetManager.register(taskName2, Set(systemStreamPartition2))
@@ -264,7 +261,7 @@ class TestOffsetManager {
       Map()
 
     val offsetManager = OffsetManager(systemStreamMetadata, config, 
checkpointManager,
-                                      systemAdmins, checkpointListeners, new 
OffsetManagerMetrics)
+                                      new SystemAdmins(systemAdmins.asJava), 
checkpointListeners, new OffsetManagerMetrics)
     offsetManager.register(taskName, Set(systemStreamPartition, 
systemStreamPartition2))
 
     offsetManager.start
@@ -310,7 +307,7 @@ class TestOffsetManager {
     val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
     val checkpointManager = getCheckpointManager(systemStreamPartition, 
taskName)
     val systemAdmins = Map("test-system" -> getSystemAdmin)
-    val offsetManager = OffsetManager(systemStreamMetadata, new MapConfig, 
checkpointManager, systemAdmins, Map(), new OffsetManagerMetrics)
+    val offsetManager = OffsetManager(systemStreamMetadata, new MapConfig, 
checkpointManager, new SystemAdmins(systemAdmins.asJava), Map(), new 
OffsetManagerMetrics)
     offsetManager.register(taskName, Set(systemStreamPartition))
     offsetManager.start
 
@@ -376,7 +373,7 @@ class TestOffsetManager {
     }
   }
 
-  private def getSystemAdmin = {
+  private def getSystemAdmin: SystemAdmin = {
     new SystemAdmin {
       def getOffsetsAfter(offsets: java.util.Map[SystemStreamPartition, 
String]) =
         offsets.asScala.mapValues(offset => (offset.toLong + 
1).toString).asJava

http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index 8ff6e88..63d58c9 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.AtomicReference
 
 import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.{SamzaContainerStatus, Partition}
+import org.apache.samza.{Partition, SamzaContainerStatus}
 import org.apache.samza.checkpoint.{Checkpoint, CheckpointManager}
 import org.apache.samza.config.{Config, MapConfig}
 import org.apache.samza.coordinator.JobModelManager
@@ -33,8 +33,8 @@ import org.apache.samza.job.model.{ContainerModel, JobModel, 
TaskModel}
 import org.apache.samza.serializers.SerdeManager
 import org.apache.samza.storage.TaskStorageManager
 import org.apache.samza.system.chooser.RoundRobinChooser
-import org.apache.samza.system.{IncomingMessageEnvelope, StreamMetadataCache, 
SystemConsumer, SystemConsumers, SystemProducer, SystemProducers, SystemStream, 
SystemStreamPartition}
-import org.apache.samza.task.{ClosableTask, InitableTask, MessageCollector, 
StreamTask, TaskContext, TaskCoordinator, TaskInstanceCollector}
+import org.apache.samza.system._
+import org.apache.samza.task._
 import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin
 import org.junit.Assert._
 import org.junit.Test
@@ -45,6 +45,7 @@ import org.scalatest.mockito.MockitoSugar
 
 import scala.collection.JavaConverters._
 import org.mockito.Mockito.when
+
 import scala.collection.JavaConversions._
 
 class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
@@ -128,8 +129,8 @@ class TestSamzaContainer extends AssertionsForJUnit with 
MockitoSugar {
       new SystemStreamPartition("test", "stream1", new Partition(1)),
       new SystemStreamPartition("test", "stream2", new Partition(0)),
       new SystemStreamPartition("test", "stream2", new Partition(1)))
-    val systemAdmins = Map("test" -> new 
SinglePartitionWithoutOffsetsSystemAdmin)
-    val metadata = new 
StreamMetadataCache(systemAdmins).getStreamMetadata(inputStreams.map(_.getSystemStream).toSet)
+    val systemAdminMap = Map("test" -> new 
SinglePartitionWithoutOffsetsSystemAdmin)
+    val metadata = new StreamMetadataCache(new 
SystemAdmins(systemAdminMap)).getStreamMetadata(inputStreams.map(_.getSystemStream))
     assertNotNull(metadata)
     assertEquals(2, metadata.size)
     val stream1Metadata = metadata(new SystemStream("test", "stream1"))
@@ -158,6 +159,7 @@ class TestSamzaContainer extends AssertionsForJUnit with 
MockitoSugar {
     }
     val config = new MapConfig
     val taskName = new TaskName("taskName")
+    val systemAdmins = new SystemAdmins(config)
     val consumerMultiplexer = new SystemConsumers(
       new RoundRobinChooser,
       Map[String, SystemConsumer]())
@@ -190,6 +192,7 @@ class TestSamzaContainer extends AssertionsForJUnit with 
MockitoSugar {
       containerContext = containerContext,
       taskInstances = Map(taskName -> taskInstance),
       runLoop = runLoop,
+      systemAdmins = systemAdmins,
       consumerMultiplexer = consumerMultiplexer,
       producerMultiplexer = producerMultiplexer,
       metrics = new SamzaContainerMetrics)
@@ -239,6 +242,7 @@ class TestSamzaContainer extends AssertionsForJUnit with 
MockitoSugar {
     }
     val config = new MapConfig
     val taskName = new TaskName("taskName")
+    val systemAdmins = new SystemAdmins(config)
     val consumerMultiplexer = new SystemConsumers(
       new RoundRobinChooser,
       Map[String, SystemConsumer]())
@@ -270,6 +274,7 @@ class TestSamzaContainer extends AssertionsForJUnit with 
MockitoSugar {
       containerContext = containerContext,
       taskInstances = Map(taskName -> taskInstance),
       runLoop = mockRunLoop,
+      systemAdmins = systemAdmins,
       consumerMultiplexer = consumerMultiplexer,
       producerMultiplexer = producerMultiplexer,
       metrics = new SamzaContainerMetrics)
@@ -319,6 +324,7 @@ class TestSamzaContainer extends AssertionsForJUnit with 
MockitoSugar {
     }
     val config = new MapConfig
     val taskName = new TaskName("taskName")
+    val systemAdmins = new SystemAdmins(config)
     val consumerMultiplexer = new SystemConsumers(
       new RoundRobinChooser,
       Map[String, SystemConsumer]())
@@ -351,6 +357,7 @@ class TestSamzaContainer extends AssertionsForJUnit with 
MockitoSugar {
       containerContext = containerContext,
       taskInstances = Map(taskName -> taskInstance),
       runLoop = runLoop,
+      systemAdmins = systemAdmins,
       consumerMultiplexer = consumerMultiplexer,
       producerMultiplexer = producerMultiplexer,
       metrics = new SamzaContainerMetrics)
@@ -398,6 +405,7 @@ class TestSamzaContainer extends AssertionsForJUnit with 
MockitoSugar {
     }
     val config = new MapConfig
     val taskName = new TaskName("taskName")
+    val systemAdmins = new SystemAdmins(config)
     val consumerMultiplexer = new SystemConsumers(
       new RoundRobinChooser,
       Map[String, SystemConsumer]())
@@ -433,6 +441,7 @@ class TestSamzaContainer extends AssertionsForJUnit with 
MockitoSugar {
       containerContext = containerContext,
       taskInstances = Map(taskName -> taskInstance),
       runLoop = mockRunLoop,
+      systemAdmins = systemAdmins,
       consumerMultiplexer = consumerMultiplexer,
       producerMultiplexer = producerMultiplexer,
       metrics = new SamzaContainerMetrics)
@@ -474,6 +483,7 @@ class TestSamzaContainer extends AssertionsForJUnit with 
MockitoSugar {
     }
     val config = new MapConfig
     val taskName = new TaskName("taskName")
+    val systemAdmins = new SystemAdmins(config)
     val consumerMultiplexer = new SystemConsumers(
       new RoundRobinChooser,
       Map[String, SystemConsumer]())
@@ -509,6 +519,7 @@ class TestSamzaContainer extends AssertionsForJUnit with 
MockitoSugar {
       containerContext = containerContext,
       taskInstances = Map(taskName -> taskInstance),
       runLoop = mockRunLoop,
+      systemAdmins = systemAdmins,
       consumerMultiplexer = consumerMultiplexer,
       producerMultiplexer = producerMultiplexer,
       metrics = new SamzaContainerMetrics)
@@ -542,6 +553,7 @@ class TestSamzaContainer extends AssertionsForJUnit with 
MockitoSugar {
     }
     val config = new MapConfig
     val taskName = new TaskName("taskName")
+    val systemAdmins = new SystemAdmins(config)
     val consumerMultiplexer = new SystemConsumers(
       new RoundRobinChooser,
       Map[String, SystemConsumer]())
@@ -576,6 +588,7 @@ class TestSamzaContainer extends AssertionsForJUnit with 
MockitoSugar {
       containerContext = containerContext,
       taskInstances = Map(taskName -> taskInstance),
       runLoop = null,
+      systemAdmins = systemAdmins,
       consumerMultiplexer = consumerMultiplexer,
       producerMultiplexer = producerMultiplexer,
       metrics = containerMetrics)

http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala 
b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
index c45c5a1..de1647f 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
@@ -49,6 +49,7 @@ import org.scalatest.Assertions.intercept
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ListBuffer
+import scala.collection.JavaConverters._
 
 class TestTaskInstance {
   @Test
@@ -322,7 +323,8 @@ class TestTaskInstance {
     val containerContext = new SamzaContainerContext("0", config, 
Set(taskName).asJava, new MetricsRegistryMap)
     val offsetManager = new OffsetManager()
     offsetManager.startingOffsets += taskName -> Map(partition0 -> "0", 
partition1 -> "100")
-    val systemAdmins = Map("system" -> new MockSystemAdmin)
+    val systemAdmin: SystemAdmin = new MockSystemAdmin
+    val systemAdmins = new SystemAdmins(Map("system" -> systemAdmin).asJava)
     var result = new ListBuffer[IncomingMessageEnvelope]
 
     val task = new StreamTask {

http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
 
b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
index f092d75..3b65a62 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
@@ -240,7 +240,7 @@ class TestJobCoordinator extends FlatSpec with 
PrivateMethodTester {
     val systemNames = Set("test")
 
     // Map the name of each system to the corresponding SystemAdmin
-    val systemAdmins = systemNames.map(systemName => {
+    val systemAdminMap = systemNames.map(systemName => {
       val systemFactoryClassName = config
         .getSystemFactory(systemName)
         .getOrElse(throw new SamzaException("A stream uses system %s, which is 
missing from the configuration." format systemName))
@@ -248,7 +248,7 @@ class TestJobCoordinator extends FlatSpec with 
PrivateMethodTester {
       systemName -> systemFactory.getAdmin(systemName, config)
     }).toMap
 
-    val streamMetadataCache = new StreamMetadataCache(systemAdmins)
+    val streamMetadataCache = new StreamMetadataCache(new 
SystemAdmins(systemAdminMap.asJava))
     val getInputStreamPartitions = 
PrivateMethod[immutable.Set[Any]]('getInputStreamPartitions)
     val getMatchedInputStreamPartitions = 
PrivateMethod[immutable.Set[Any]]('getMatchedInputStreamPartitions)
 

http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala
 
b/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala
index c7eab3b..b66cd64 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala
@@ -20,11 +20,11 @@
 package org.apache.samza.coordinator
 
 import java.util.concurrent.{CountDownLatch, TimeUnit}
-
+import java.util.HashMap
 import org.apache.samza.Partition
 import org.apache.samza.metrics.{Gauge, MetricsRegistryMap}
 import 
org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
-import org.apache.samza.system.{StreamMetadataCache, SystemAdmin, 
SystemStream, SystemStreamMetadata}
+import org.apache.samza.system._
 import org.junit.Assert._
 import org.junit.Test
 import org.mockito.Matchers
@@ -234,7 +234,7 @@ class TestStreamPartitionCountMonitor extends 
AssertionsForJUnit with MockitoSug
     }
   }
 
-  class MockStreamMetadataCache extends StreamMetadataCache(Map[String, 
SystemAdmin]()) {
+  class MockStreamMetadataCache extends StreamMetadataCache(new 
SystemAdmins(new HashMap[String, SystemAdmin])) {
     /**
      * Returns metadata about each of the given streams (such as first offset, 
newest
      * offset, etc). If the metadata isn't in the cache, it is retrieved from 
the systems

http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
 
b/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
index b239119..0b951f4 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
@@ -21,11 +21,11 @@ package org.apache.samza.processor
 import java.util.Collections
 
 import org.apache.samza.config.MapConfig
-import org.apache.samza.container.{SamzaContainerListener, RunLoop, 
SamzaContainer, SamzaContainerContext, SamzaContainerMetrics, TaskInstance, 
TaskInstanceMetrics, TaskName}
+import org.apache.samza.container._
 import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.serializers.SerdeManager
 import org.apache.samza.system.chooser.RoundRobinChooser
-import org.apache.samza.system.{SystemConsumer, SystemConsumers, 
SystemProducer, SystemProducers}
+import org.apache.samza.system._
 import org.apache.samza.task.{StreamTask, TaskInstanceCollector}
 
 
@@ -33,6 +33,7 @@ object StreamProcessorTestUtils {
   def getDummyContainer(mockRunloop: RunLoop, streamTask: StreamTask) = {
     val config = new MapConfig
     val taskName = new TaskName("taskName")
+    val adminMultiplexer = new SystemAdmins(config)
     val consumerMultiplexer = new SystemConsumers(
       new RoundRobinChooser,
       Map[String, SystemConsumer]())
@@ -56,6 +57,7 @@ object StreamProcessorTestUtils {
       containerContext = containerContext,
       taskInstances = Map(taskName -> taskInstance),
       runLoop = mockRunloop,
+      systemAdmins = adminMultiplexer,
       consumerMultiplexer = consumerMultiplexer,
       producerMultiplexer = producerMultiplexer,
       metrics = new SamzaContainerMetrics)

http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
 
b/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
index 29f6eb7..90a4c01 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
@@ -718,7 +718,7 @@ class TaskStorageManagerBuilder extends MockitoSugar {
       storeBaseDir = storeBaseDir,
       loggedStoreBaseDir = loggedStoreBaseDir,
       partition = partition,
-      systemAdmins = systemAdmins,
+      systemAdmins = new SystemAdmins(systemAdmins.asJava),
       new StorageConfig(new MapConfig()).getChangeLogDeleteRetentionsInMs,
       SystemClock.instance
     )

http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/test/scala/org/apache/samza/system/TestStreamMetadataCache.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/system/TestStreamMetadataCache.scala
 
b/samza-core/src/test/scala/org/apache/samza/system/TestStreamMetadataCache.scala
index f55e4bf..e48764b 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/system/TestStreamMetadataCache.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/system/TestStreamMetadataCache.scala
@@ -42,7 +42,7 @@ class TestStreamMetadataCache extends AssertionsForJUnit with 
MockitoSugar with
     val systemAdmins = Map("foo" -> mock[SystemAdmin])
     
when(systemAdmins("foo").getSystemStreamMetadata(Set("bar").asJava)).thenReturn(makeMetadata(Set("bar")).asJava)
     val streams = Set(new SystemStream("foo", "bar"))
-    val cache = new StreamMetadataCache(systemAdmins)
+    val cache = new StreamMetadataCache(new SystemAdmins(systemAdmins.asJava))
 
     val result = cache.getStreamMetadata(streams)
     streams shouldEqual result.keySet
@@ -56,7 +56,7 @@ class TestStreamMetadataCache extends AssertionsForJUnit with 
MockitoSugar with
     val systemAdmins = Map("system" -> mock[SystemAdmin])
     
when(systemAdmins("system").getSystemStreamMetadata(Set("stream").asJava)).thenReturn(makeMetadata().asJava)
     val streams = Set(new SystemStream("system", "stream"))
-    val cache = new StreamMetadataCache(systemAdmins = systemAdmins, clock = 
clock)
+    val cache = new StreamMetadataCache(new SystemAdmins(systemAdmins.asJava), 
clock = clock)
 
     when(clock.currentTimeMillis).thenReturn(0)
     cache.getStreamMetadata(streams)
@@ -84,7 +84,7 @@ class TestStreamMetadataCache extends AssertionsForJUnit with 
MockitoSugar with
       new SystemStream("sys1", "stream1a"), new SystemStream("sys1", 
"stream1b"),
       new SystemStream("sys2", "stream2a"), new SystemStream("sys2", 
"stream2b")
     )
-    val result = new 
StreamMetadataCache(systemAdmins).getStreamMetadata(streams)
+    val result = new StreamMetadataCache(new 
SystemAdmins(systemAdmins.asJava)).getStreamMetadata(streams)
     result.keySet shouldEqual streams
     streams.foreach(stream => {
       val expectedPartitions = if (stream.getSystem == "sys1") 3 else 5
@@ -101,7 +101,7 @@ class TestStreamMetadataCache extends AssertionsForJUnit 
with MockitoSugar with
       .thenReturn(makeMetadata(Set("stream1")).asJava) // metadata doesn't 
include stream2
     val streams = Set(new SystemStream("system", "stream1"), new 
SystemStream("system", "stream2"))
     val exception = intercept[SamzaException] {
-      new StreamMetadataCache(systemAdmins).getStreamMetadata(streams)
+      new StreamMetadataCache(new 
SystemAdmins(systemAdmins.asJava)).getStreamMetadata(streams)
     }
     exception.getMessage should startWith ("Cannot get metadata for unknown 
streams")
   }
@@ -113,7 +113,7 @@ class TestStreamMetadataCache extends AssertionsForJUnit 
with MockitoSugar with
       .thenReturn(Map[String, SystemStreamMetadata]("stream" -> null).asJava)
     val streams = Set(new SystemStream("system", "stream"))
     val exception = intercept[SamzaException] {
-      new StreamMetadataCache(systemAdmins).getStreamMetadata(streams)
+      new StreamMetadataCache(new 
SystemAdmins(systemAdmins.asJava)).getStreamMetadata(streams)
     }
     exception.getMessage should startWith ("Cannot get metadata for unknown 
streams")
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala
 
b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala
index 3c07545..e56206a 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala
@@ -21,13 +21,10 @@ package org.apache.samza.system.chooser
 
 import java.util.Arrays
 
-import org.apache.samza.system.IncomingMessageEnvelope
-import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.system._
 import org.apache.samza.Partition
 import org.apache.samza.container.MockSystemAdmin
 import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.system.SystemStream
-import org.apache.samza.system.SystemStreamMetadata
 import 
org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
 import org.junit.Assert._
 import org.junit.Test
@@ -187,7 +184,9 @@ class TestBootstrappingChooser(getChooser: (MessageChooser, 
Map[SystemStream, Sy
     val mock = new MockMessageChooser
     val metadata1 = getMetadata(envelope1, "123")
     val metadata2 = getMetadata(envelope2, "321")
-    val chooser = new BootstrappingChooser(mock, 
Map(envelope1.getSystemStreamPartition.getSystemStream -> metadata1, 
envelope2.getSystemStreamPartition.getSystemStream -> metadata2), new 
BootstrappingChooserMetrics(), Map("kafka" -> new MockSystemAdmin))
+    val systemAdmin: SystemAdmin = new MockSystemAdmin
+    val chooser = new BootstrappingChooser(mock, 
Map(envelope1.getSystemStreamPartition.getSystemStream -> metadata1,
+      envelope2.getSystemStreamPartition.getSystemStream -> metadata2), new 
BootstrappingChooserMetrics(), new SystemAdmins(Map("kafka" -> 
systemAdmin).asJava))
 
     chooser.register(envelope1.getSystemStreamPartition, "1")
     chooser.register(envelope2.getSystemStreamPartition, "1")
@@ -205,7 +204,9 @@ class TestBootstrappingChooser(getChooser: (MessageChooser, 
Map[SystemStream, Sy
     val mock = new MockMessageChooser
     val metadata1 = getMetadata(envelope1, "123")
     val metadata2 = getMetadata(envelope2, "321")
-    val chooser = new BootstrappingChooser(mock, 
Map(envelope1.getSystemStreamPartition.getSystemStream -> metadata1, 
envelope2.getSystemStreamPartition.getSystemStream -> metadata2), new 
BootstrappingChooserMetrics(), Map("kafka" -> new MockSystemAdmin))
+    val systemAdmin: SystemAdmin = new MockSystemAdmin
+    val chooser = new BootstrappingChooser(mock, 
Map(envelope1.getSystemStreamPartition.getSystemStream -> metadata1,
+      envelope2.getSystemStreamPartition.getSystemStream -> metadata2), new 
BootstrappingChooserMetrics(), new SystemAdmins(Map("kafka" -> 
systemAdmin).asJava))
 
     // Envelope1 is registered by multiple tasks, each one of them having 
different offsets.
     chooser.register(envelope1.getSystemStreamPartition, "1")
@@ -234,7 +235,13 @@ object TestBootstrappingChooser {
   // just batch size defined should behave just like plain vanilla batching
   // chooser.
   @Parameters
-  def parameters: java.util.Collection[Array[(MessageChooser, 
Map[SystemStream, SystemStreamMetadata]) => MessageChooser]] = Arrays.asList(
-    Array((wrapped: MessageChooser, bootstrapStreamMetadata: Map[SystemStream, 
SystemStreamMetadata]) => new BootstrappingChooser(wrapped, 
bootstrapStreamMetadata, new BootstrappingChooserMetrics(), Map("kafka" -> new 
MockSystemAdmin))),
-    Array((wrapped: MessageChooser, bootstrapStreamMetadata: Map[SystemStream, 
SystemStreamMetadata]) => new DefaultChooser(wrapped, bootstrapStreamMetadata = 
bootstrapStreamMetadata, registry = new MetricsRegistryMap(), systemAdmins = 
Map("kafka" -> new MockSystemAdmin))))
+  def parameters: java.util.Collection[Array[(MessageChooser, 
Map[SystemStream, SystemStreamMetadata]) => MessageChooser]] = {
+    val systemAdmin: SystemAdmin = new MockSystemAdmin
+    val systemAdmins = new SystemAdmins(Map("kafka" -> systemAdmin).asJava)
+    Arrays.asList(
+      Array((wrapped: MessageChooser, bootstrapStreamMetadata: 
Map[SystemStream, SystemStreamMetadata]) =>
+        new BootstrappingChooser(wrapped, bootstrapStreamMetadata, new 
BootstrappingChooserMetrics(), systemAdmins)),
+      Array((wrapped: MessageChooser, bootstrapStreamMetadata: 
Map[SystemStream, SystemStreamMetadata]) =>
+        new DefaultChooser(wrapped, bootstrapStreamMetadata = 
bootstrapStreamMetadata, registry = new MetricsRegistryMap(), systemAdmins = 
systemAdmins)))
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala
 
b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala
index b873762..df5282c 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala
@@ -24,7 +24,7 @@ import org.apache.samza.config.{DefaultChooserConfig, 
MapConfig}
 import org.apache.samza.container.MockSystemAdmin
 import org.apache.samza.metrics.MetricsRegistryMap
 import 
org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
-import org.apache.samza.system.{IncomingMessageEnvelope, SystemStream, 
SystemStreamMetadata, SystemStreamPartition}
+import org.apache.samza.system._
 import org.apache.samza.util.BlockingEnvelopeMap
 import org.junit.Assert._
 import org.junit.Test
@@ -56,6 +56,7 @@ class TestDefaultChooser {
       envelope5.getSystemStreamPartition().getPartition() -> 
env5Metadata).asJava)
     val stream3Metadata = new SystemStreamMetadata("stream3", Map(
       envelope8.getSystemStreamPartition().getPartition() -> 
env8Metadata).asJava)
+    val systemAdmin: SystemAdmin = new MockSystemAdmin()
     val chooser = new DefaultChooser(
       mock0,
       Some(2),
@@ -70,7 +71,7 @@ class TestDefaultChooser {
         envelope1.getSystemStreamPartition.getSystemStream -> streamMetadata,
         envelope8.getSystemStreamPartition.getSystemStream -> stream3Metadata),
       new MetricsRegistryMap(),
-      Map("kafka" -> new MockSystemAdmin()))
+      new SystemAdmins(Map("kafka" -> systemAdmin).asJava))
 
     chooser.register(envelope1.getSystemStreamPartition, null)
     chooser.register(envelope2.getSystemStreamPartition, null)

http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index ca138c7..50d22b1 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -77,6 +77,8 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec,
     Preconditions.checkNotNull(systemConsumer)
     Preconditions.checkNotNull(systemAdmin)
 
+    systemAdmin.start()
+
     info(s"Creating checkpoint stream: ${checkpointSpec.getPhysicalName} with 
" +
       s"partition count: ${checkpointSpec.getPartitionCount}")
     systemAdmin.createStream(checkpointSpec)
@@ -171,6 +173,8 @@ class KafkaCheckpointManager(checkpointSpec: 
KafkaStreamSpec,
   }
 
   override def stop = {
+    systemAdmin.stop()
+
     if (systemProducer != null) {
       systemProducer.stop
     } else {

http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
index a9a9bd7..1f4672d 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
@@ -148,6 +148,17 @@ class KafkaSystemAdmin(
 
   import KafkaSystemAdmin._
 
+  @volatile var running = false
+
+  override def start() = {
+    running = true
+  }
+
+  override def stop() = {
+    running = false
+  }
+
+
   override def getSystemStreamPartitionCounts(streams: util.Set[String], 
cacheTTL: Long): util.Map[String, SystemStreamMetadata] = {
     getSystemStreamPartitionCounts(streams, new 
ExponentialSleepStrategy(initialDelayMs = 500), cacheTTL)
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
index 3530713..3a1ffe9 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
@@ -144,6 +144,7 @@ private[kafka] class KafkaSystemConsumer(
       }
     }
 
+    systemAdmin.start()
     refreshBrokers
   }
 
@@ -161,6 +162,7 @@ private[kafka] class KafkaSystemConsumer(
   }
 
   def stop() {
+    systemAdmin.stop()
     brokerProxies.values.foreach(_.stop)
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java
----------------------------------------------------------------------
diff --git 
a/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java 
b/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java
index da7b907..4d52877 100644
--- 
a/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java
+++ 
b/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java
@@ -35,7 +35,6 @@ import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.StorageConfig;
 import org.apache.samza.container.LocalityManager;
 import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer;
-import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory;
 import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.rest.model.Task;
@@ -93,10 +92,9 @@ public class SamzaTaskProxy implements TaskProxy {
    * @return built and initialized CoordinatorStreamSystemConsumer.
    */
   protected CoordinatorStreamSystemConsumer 
initializeCoordinatorStreamConsumer(JobInstance jobInstance) {
-    CoordinatorStreamSystemFactory coordinatorStreamSystemFactory = new 
CoordinatorStreamSystemFactory();
     Config coordinatorSystemConfig = getCoordinatorSystemConfig(jobInstance);
     LOG.debug("Using config: {} to create coordinatorStream consumer.", 
coordinatorSystemConfig);
-    CoordinatorStreamSystemConsumer consumer = 
coordinatorStreamSystemFactory.getCoordinatorStreamSystemConsumer(coordinatorSystemConfig,
 METRICS_REGISTRY);
+    CoordinatorStreamSystemConsumer consumer = new 
CoordinatorStreamSystemConsumer(coordinatorSystemConfig, METRICS_REGISTRY);
     LOG.debug("Registering coordinator system stream consumer.");
     consumer.register();
     LOG.debug("Starting coordinator system stream consumer.");

http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
 
b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
index 5215f7e..d432be7 100644
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
@@ -120,6 +120,7 @@ public class SamzaSqlApplicationRunner extends 
AbstractApplicationRunner {
 
   @Override
   public void run(StreamApplication streamApp) {
+    super.run(streamApp);
     Validate.isInstanceOf(SamzaSqlApplication.class, streamApp);
     appRunner.run(streamApp);
   }
@@ -127,6 +128,7 @@ public class SamzaSqlApplicationRunner extends 
AbstractApplicationRunner {
   @Override
   public void kill(StreamApplication streamApp) {
     appRunner.kill(streamApp);
+    super.kill(streamApp);
   }
 
   @Override

Reply via email to