Repository: incubator-samza
Updated Branches:
  refs/heads/master f6d341508 -> 6f595beda


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/main/scala/org/apache/samza/util/Util.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala 
b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
index e06ca90..1a67586 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
@@ -19,25 +19,14 @@
 
 package org.apache.samza.util
 
-import java.io._
+import java.net.URL
+import java.io.BufferedReader
 import java.lang.management.ManagementFactory
-import java.util
+import java.io.File
+import org.apache.samza.system.SystemStream
 import java.util.Random
-import java.net.URL
-import org.apache.samza.SamzaException
-import org.apache.samza.checkpoint.CheckpointManagerFactory
-import org.apache.samza.config.Config
-import org.apache.samza.config.StorageConfig.Config2Storage
-import org.apache.samza.config.SystemConfig.Config2System
-import org.apache.samza.config.TaskConfig.Config2Task
-import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.system.{SystemStreamPartition, SystemFactory, 
StreamMetadataCache, SystemStream}
-import scala.collection.JavaConversions._
-import scala.collection
-import org.apache.samza.container.TaskName
-import 
org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory
-import org.apache.samza.container.TaskNamesToSystemStreamPartitions
-import org.apache.samza.container.grouper.task.GroupByContainerCount
+import org.apache.samza.job.model.JobModel
+import java.io.InputStreamReader
 
 object Util extends Logging {
   val random = new Random
@@ -83,88 +72,14 @@ object Util extends Logging {
   }
 
   /**
-   * For each input stream specified in config, exactly determine its 
partitions, returning a set of SystemStreamPartitions
-   * containing them all
-   *
-   * @param config Source of truth for systems and inputStreams
-   * @return Set of SystemStreamPartitions, one for each unique system, stream 
and partition
-   */
-  def getInputStreamPartitions(config: Config): Set[SystemStreamPartition] = {
-    val inputSystemStreams = config.getInputStreams
-    val systemNames = config.getSystemNames.toSet
-
-    // Map the name of each system to the corresponding SystemAdmin
-    val systemAdmins = systemNames.map(systemName => {
-      val systemFactoryClassName = config
-        .getSystemFactory(systemName)
-        .getOrElse(throw new SamzaException("A stream uses system %s, which is 
missing from the configuration." format systemName))
-      val systemFactory = Util.getObj[SystemFactory](systemFactoryClassName)
-      systemName -> systemFactory.getAdmin(systemName, config)
-    }).toMap
-
-    // Get the set of partitions for each SystemStream from the stream metadata
-    new StreamMetadataCache(systemAdmins)
-      .getStreamMetadata(inputSystemStreams)
-      .flatMap { case (systemStream, metadata) =>
-        metadata
-          .getSystemStreamPartitionMetadata
-          .keys
-          .map(new SystemStreamPartition(systemStream, _))
-      }.toSet
-  }
-
-  /**
-   * Assign mapping of which TaskNames go to which container
-   *
-   * @param config For factories for Grouper and TaskNameGrouper
-   * @param containerCount How many tasks are we we working with
-   * @return Map of int (taskId) to SSPTaskNameMap that taskID is responsible 
for
-   */
-  def assignContainerToSSPTaskNames(config:Config, containerCount:Int): 
Map[Int, TaskNamesToSystemStreamPartitions] = {
-    import org.apache.samza.config.JobConfig.Config2Job
-
-    val allSystemStreamPartitions: Set[SystemStreamPartition] = 
Util.getInputStreamPartitions(config)
-
-    val sspTaskNamesAsJava: util.Map[TaskName, 
util.Set[SystemStreamPartition]] = {
-      val factoryString = config.getSystemStreamPartitionGrouperFactory
-
-      info("Instantiating type " + factoryString + " to build 
SystemStreamPartition groupings")
-
-      val factory = 
Util.getObj[SystemStreamPartitionGrouperFactory](factoryString)
-
-      val grouper = factory.getSystemStreamPartitionGrouper(config)
-
-      val groups = grouper.group(allSystemStreamPartitions)
-
-      info("SystemStreamPartitionGrouper " + grouper + " has grouped the 
SystemStreamPartitions into the following taskNames:")
-      groups.foreach(g => info("TaskName: " + g._1 + " => " + g._2))
-
-      groups
-    }
-
-    val sspTaskNames = TaskNamesToSystemStreamPartitions(sspTaskNamesAsJava)
-
-    info("Assigning " + sspTaskNames.keySet.size + " SystemStreamPartitions 
taskNames to " + containerCount + " containers.")
-
-    // Here is where we should put in a pluggable option for the 
SSPTaskNameGrouper for locality, load-balancing, etc.
-    val sspTaskNameGrouper = new GroupByContainerCount(containerCount)
-
-    val containersToTaskNames = 
sspTaskNameGrouper.groupTaskNames(sspTaskNames).toMap
-
-    info("Grouped SystemStreamPartition TaskNames (size = " + 
containersToTaskNames.size + "): ")
-    containersToTaskNames.foreach(t => info("Container number: " + t._1 + " => 
" + t._2))
-
-    containersToTaskNames
-  }
-
-  /**
    * Returns a SystemStream object based on the system stream name given. For
    * example, kafka.topic would return new SystemStream("kafka", "topic").
    */
   def getSystemStreamFromNames(systemStreamNames: String): SystemStream = {
     val idx = systemStreamNames.indexOf('.')
-    if (idx < 0)
+    if (idx < 0) {
       throw new IllegalArgumentException("No '.' in stream name '" + 
systemStreamNames + "'. Stream names should be in the form 'system.stream'")
+    }
     new SystemStream(systemStreamNames.substring(0, idx), 
systemStreamNames.substring(idx + 1, systemStreamNames.length))
   }
 
@@ -177,130 +92,13 @@ object Util extends Logging {
   }
 
   /**
-   * Using previous taskName to partition mapping and current taskNames for 
this job run, create a new mapping that preserves
-   * the previous order and deterministically assigns any new taskNames to 
changelog partitions.  Be chatty about new or
-   * missing taskNames.
-   *
-   * @param currentTaskNames All the taskNames the current job is processing
-   * @param previousTaskNameMapping Previous mapping of taskNames to partition
-   * @return New mapping of taskNames to partitions for the changelog
-   */
-  def 
resolveTaskNameToChangelogPartitionMapping(currentTaskNames:Set[TaskName],
-    previousTaskNameMapping:Map[TaskName, Int]): Map[TaskName, Int] = {
-    info("Previous mapping of taskNames to partition: " + 
previousTaskNameMapping.toList.sorted)
-    info("Current set of taskNames: " + currentTaskNames.toList.sorted)
-
-    val previousTaskNames: Set[TaskName] = previousTaskNameMapping.keySet
-
-    if(previousTaskNames.equals(currentTaskNames)) {
-      info("No change in TaskName sets from previous run. Returning previous 
mapping.")
-      return previousTaskNameMapping
-    }
-
-    if(previousTaskNames.isEmpty) {
-      warn("No previous taskName mapping defined.  This is OK if it's the 
first time the job is being run, otherwise data may have been lost.")
-    }
-
-    val missingTaskNames = previousTaskNames -- currentTaskNames
-
-    if(missingTaskNames.isEmpty) {
-      info("No taskNames are missing between this run and previous")
-    } else {
-      warn("The following taskNames were previously defined and are no longer 
present: " + missingTaskNames)
-    }
-
-    val newTaskNames = currentTaskNames -- previousTaskNames
-
-    if(newTaskNames.isEmpty) {
-      info("No new taskNames have been added between this run and the 
previous")
-      previousTaskNameMapping // Return the old mapping since there are no new 
taskNames for which to account
-
-    } else {
-      warn("The following new taskNames have been added in this job run: " + 
newTaskNames)
-
-      // Sort the new taskNames and assign them partitions (starting at 0 for 
now)
-      val sortedNewTaskNames = newTaskNames.toList.sortWith { (a,b) => 
a.getTaskName < b.getTaskName }.zipWithIndex.toMap
-
-      // Find next largest partition to use based on previous mapping
-      val nextPartitionToUse = if(previousTaskNameMapping.size == 0) 0
-                               else previousTaskNameMapping.foldLeft(0)((a,b) 
=> math.max(a, b._2)) + 1
-
-      // Bump up the partition values
-      val newTaskNamesWithTheirPartitions = sortedNewTaskNames.map(c => c._1 
-> (c._2 + nextPartitionToUse))
-      
-      // Merge old and new
-      val newMapping = previousTaskNameMapping ++ 
newTaskNamesWithTheirPartitions
-      
-      info("New taskName to partition mapping: " + newMapping.toList.sortWith{ 
(a,b) => a._2 < b._2})
-      
-      newMapping
-    }
-  }
-
-  /**
-   * Read the TaskName to changelog partition mapping from the checkpoint 
manager, if one exists.
-   *
-   * @param config To pull out values for instantiating checkpoint manager
-   * @param tasksToSSPTaskNames Current TaskNames for the current job run
-   * @return Current mapping of TaskNames to changelog partitions
-   */
-  def getTaskNameToChangeLogPartitionMapping(config: Config, 
tasksToSSPTaskNames: Map[Int, TaskNamesToSystemStreamPartitions]) = {
-    val taskNameMaps: Set[TaskNamesToSystemStreamPartitions] = 
tasksToSSPTaskNames.map(_._2).toSet
-    val currentTaskNames: Set[TaskName] = 
taskNameMaps.map(_.keys).toSet.flatten
-
-    // We need to oh so quickly instantiate a checkpoint manager and grab the 
partition mapping from the log, then toss the manager aside
-    val checkpointManager = config.getCheckpointManagerFactory match {
-      case Some(checkpointFactoryClassName) =>
-        Util
-          .getObj[CheckpointManagerFactory](checkpointFactoryClassName)
-          .getCheckpointManager(config, new MetricsRegistryMap)
-      case _ => null
-    }
-
-    if(checkpointManager == null) {
-      // Check if we have a changelog configured, which requires a checkpoint 
manager
-
-      if(!config.getStoreNames.isEmpty) {
-        throw new SamzaException("Storage factories configured, but no 
checkpoint manager has been specified.  " +
-          "Unable to start job as there would be no place to store changelog 
partition mapping.")
-      }
-      // No need to do any mapping, just use what has been provided
-      Util.resolveTaskNameToChangelogPartitionMapping(currentTaskNames, 
Map[TaskName, Int]())
-    } else {
-
-      info("Got checkpoint manager: %s" format checkpointManager)
-
-      // Always put in a call to create so the log is available for the tasks 
on startup.
-      // Reasonably lame to hide it in here.  TODO: Pull out to more visible 
location.
-      checkpointManager.start
-
-      val previousMapping: Map[TaskName, Int] = {
-        val fromCM = checkpointManager.readChangeLogPartitionMapping()
-
-        fromCM.map(kv => kv._1 -> kv._2.intValue()).toMap // Java to Scala 
interop!!!
-      }
-
-      checkpointManager.stop
-
-      val newMapping = 
Util.resolveTaskNameToChangelogPartitionMapping(currentTaskNames, 
previousMapping)
-
-      if (newMapping != null) {
-        info("Writing new changelog partition mapping to checkpoint manager.")
-        checkpointManager.writeChangeLogPartitionMapping(newMapping.map(kv => 
kv._1 -> new java.lang.Integer(kv._2))) //Java to Scala interop!!!
-      }
-
-      newMapping
-    }
-  }
-
-  /**
    * Makes sure that an object is not null, and throws a NullPointerException
    * if it is.
    */
   def notNull[T](obj: T, msg: String) = if (obj == null) {
     throw new NullPointerException(msg)
   }
-  
+
   /**
    * Returns the name representing the JVM. It usually contains the PID of the 
process plus some additional information
    * @return String that contains the name representing this JVM
@@ -313,10 +111,13 @@ object Util extends Logging {
    * Reads a URL and returns its body as a string. Does no error handling.
    *
    * @param url HTTP URL to read from.
+   * @param timeout How long to wait before timing out when connecting to or 
reading from the HTTP server.
    * @return String payload of the body of the HTTP response.
    */
-  def read(url: URL): String = {
+  def read(url: URL, timeout: Int = 30000): String = {
     val conn = url.openConnection();
+    conn.setConnectTimeout(timeout)
+    conn.setReadTimeout(timeout)
     val br = new BufferedReader(new InputStreamReader(conn.getInputStream));
     var line: String = null;
     val body = Iterator.continually(br.readLine()).takeWhile(_ != 
null).mkString

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
 
b/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
new file mode 100644
index 0000000..76bc681
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers.model;
+
+import static org.junit.Assert.*;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.Partition;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.system.SystemStreamPartition;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.junit.Test;
+
+public class TestSamzaObjectMapper {
+  @Test
+  public void testJsonTaskModel() throws Exception {
+    ObjectMapper mapper = SamzaObjectMapper.getObjectMapper();
+    Map<String, String> configMap = new HashMap<String, String>();
+    configMap.put("a", "b");
+    Config config = new MapConfig(configMap);
+    Set<SystemStreamPartition> inputSystemStreamPartitions = new 
HashSet<SystemStreamPartition>();
+    inputSystemStreamPartitions.add(new SystemStreamPartition("foo", "bar", 
new Partition(1)));
+    TaskName taskName = new TaskName("test");
+    TaskModel taskModel = new TaskModel(taskName, inputSystemStreamPartitions, 
new Partition(2));
+    Map<TaskName, TaskModel> tasks = new HashMap<TaskName, TaskModel>();
+    tasks.put(taskName, taskModel);
+    ContainerModel containerModel = new ContainerModel(1, tasks);
+    Map<Integer, ContainerModel> containerMap = new HashMap<Integer, 
ContainerModel>();
+    containerMap.put(Integer.valueOf(1), containerModel);
+    JobModel jobModel = new JobModel(config, containerMap);
+    String str = mapper.writeValueAsString(jobModel);
+    JobModel obj = mapper.readValue(str, JobModel.class);
+    assertEquals(jobModel, obj);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index 393b09a..a0ea8b6 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -19,53 +19,59 @@
 
 package org.apache.samza.container
 
-import org.apache.samza.config.Config
-import org.junit.Assert._
-import org.junit.Test
+import scala.collection.JavaConversions._
+
 import org.apache.samza.Partition
+import org.apache.samza.config.Config
 import org.apache.samza.config.MapConfig
+import org.apache.samza.coordinator.JobCoordinator
+import org.apache.samza.coordinator.server.HttpServer
+import org.apache.samza.coordinator.server.JobServlet
+import org.apache.samza.job.model.ContainerModel
+import org.apache.samza.job.model.JobModel
+import org.apache.samza.job.model.TaskModel
 import org.apache.samza.metrics.JmxServer
+import org.apache.samza.serializers.SerdeManager
 import org.apache.samza.system.IncomingMessageEnvelope
-import org.apache.samza.system.SystemConsumers
+import org.apache.samza.system.StreamMetadataCache
 import org.apache.samza.system.SystemConsumer
-import org.apache.samza.system.SystemProducers
+import org.apache.samza.system.SystemConsumers
 import org.apache.samza.system.SystemProducer
-import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.system.SystemProducers
 import org.apache.samza.system.SystemStream
-import org.apache.samza.system.StreamMetadataCache
+import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.system.chooser.RoundRobinChooser
-import org.apache.samza.serializers.SerdeManager
-import org.apache.samza.task.StreamTask
-import org.apache.samza.task.MessageCollector
-import org.apache.samza.task.TaskCoordinator
+import org.apache.samza.task.ClosableTask
 import org.apache.samza.task.InitableTask
+import org.apache.samza.task.MessageCollector
+import org.apache.samza.task.StreamTask
 import org.apache.samza.task.TaskContext
-import org.apache.samza.task.ClosableTask
+import org.apache.samza.task.TaskCoordinator
 import org.apache.samza.task.TaskInstanceCollector
 import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin
+import org.junit.Assert._
+import org.junit.Test
 import org.scalatest.junit.AssertionsForJUnit
-import org.apache.samza.coordinator.server.HttpServer
-import org.apache.samza.coordinator.server.JobServlet
-import scala.collection.JavaConversions._
 
 class TestSamzaContainer extends AssertionsForJUnit {
   @Test
-  def testCoordinatorObjects {
-    val server = new HttpServer("/test")
+  def testReadJobModel {
+    val config = new MapConfig(Map("a" -> "b"))
+    val tasks = Map(
+      new TaskName("t1") -> new TaskModel(new TaskName("t1"), 
Set[SystemStreamPartition](), new Partition(0)),
+      new TaskName("t2") -> new TaskModel(new TaskName("t2"), 
Set[SystemStreamPartition](), new Partition(0)))
+    val containers = Map(
+      Integer.valueOf(0) -> new ContainerModel(0, tasks),
+      Integer.valueOf(1) -> new ContainerModel(1, tasks))
+    val jobModel = new JobModel(config, containers)
+    val server = new HttpServer
+    val coordinator = new JobCoordinator(jobModel, server)
+    coordinator.server.addServlet("/*", new JobServlet(jobModel))
     try {
-      val taskName = new TaskName("a")
-      val set = Set(new SystemStreamPartition("a", "b", new Partition(0)))
-      val config = new MapConfig(Map("a" -> "b", "c" -> "d"))
-      val containerToTaskMapping = Map(0 -> new 
TaskNamesToSystemStreamPartitions(Map(taskName -> set)))
-      val taskToChangelogMapping = Map[TaskName, Int](taskName -> 0)
-      server.addServlet("/job", new JobServlet(config, containerToTaskMapping, 
taskToChangelogMapping))
-      server.start
-      val (returnedConfig, returnedSspTaskNames, 
returnedTaskNameToChangeLogPartitionMapping) = 
SamzaContainer.getCoordinatorObjects(server.getUrl.toString + "/job")
-      assertEquals(config, returnedConfig)
-      assertEquals(containerToTaskMapping, returnedSspTaskNames)
-      assertEquals(taskToChangelogMapping, 
returnedTaskNameToChangeLogPartitionMapping)
+      coordinator.start
+      assertEquals(jobModel, 
SamzaContainer.readJobModel(server.getUrl.toString))
     } finally {
-      server.stop
+      coordinator.stop
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/test/scala/org/apache/samza/container/TestTaskNamesToSystemStreamPartitions.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/container/TestTaskNamesToSystemStreamPartitions.scala
 
b/samza-core/src/test/scala/org/apache/samza/container/TestTaskNamesToSystemStreamPartitions.scala
deleted file mode 100644
index 9a3406e..0000000
--- 
a/samza-core/src/test/scala/org/apache/samza/container/TestTaskNamesToSystemStreamPartitions.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.container
-
-import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.{SamzaException, Partition}
-import org.junit.Test
-import org.junit.Assert._
-
-class TestTaskNamesToSystemStreamPartitions {
-  var sspCounter = 0
-  def makeSSP(stream:String) = new SystemStreamPartition("system", stream, new 
Partition(42))
-
-  @Test
-  def toSetWorksCorrectly() {
-    val map = Map(new TaskName("tn1") -> Set(makeSSP("tn1-1"), 
makeSSP("tn1-2")),
-                  new TaskName("tn2") -> Set(makeSSP("tn2-1"), 
makeSSP("tn2-2")))
-    val tntssp = TaskNamesToSystemStreamPartitions(map)
-
-    val asSet = tntssp.toSet
-    val expected = Set(new TaskName("tn1") -> Set(makeSSP("tn1-1"), 
makeSSP("tn1-2")),
-                      (new TaskName("tn2") -> Set(makeSSP("tn2-1"), 
makeSSP("tn2-2"))))
-    assertEquals(expected, asSet)
-  }
-
-  @Test
-  def validateMethodCatchesDuplicatedSSPs() {
-    val duplicatedSSP1 = new SystemStreamPartition("sys", "str", new 
Partition(42))
-    val duplicatedSSP2 = new SystemStreamPartition("sys", "str", new 
Partition(42))
-    val notDuplicatedSSP1 = new SystemStreamPartition("sys", "str2", new 
Partition(42))
-    val notDuplicatedSSP2 = new SystemStreamPartition("sys", "str3", new 
Partition(42))
-
-    val badMapping = Map(new TaskName("a") -> Set(notDuplicatedSSP1, 
duplicatedSSP1), new TaskName("b") -> Set(notDuplicatedSSP2, duplicatedSSP2))
-
-    var caughtException = false
-    try {
-      TaskNamesToSystemStreamPartitions(badMapping)
-    } catch {
-      case se: SamzaException => assertEquals("Assigning the same 
SystemStreamPartition to multiple " +
-        "TaskNames is not currently supported.  Out of compliance 
SystemStreamPartitions and counts: " +
-        "Map(SystemStreamPartition [sys, str, 42] -> 2)", se.getMessage)
-        caughtException = true
-      case _: Throwable       =>
-    }
-    assertTrue("TaskNamesToSystemStreamPartitions should have rejected this 
mapping but didn't", caughtException)
-  }
-
-  @Test
-  def validateMethodAllowsUniqueSSPs() {
-    val sspSet1 = (0 to 10).map(p => new SystemStreamPartition("sys", "str", 
new Partition(p))).toSet
-    val sspSet2 = (0 to 10).map(p => new SystemStreamPartition("sys", "str2", 
new Partition(p))).toSet
-
-    TaskNamesToSystemStreamPartitions(Map(new TaskName("set1") -> sspSet1, new 
TaskName("set2") -> sspSet2))
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala
 
b/samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala
index 20f41a8..1ee5c06 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala
@@ -18,37 +18,56 @@
  */
 package org.apache.samza.container.grouper.task
 
-import org.apache.samza.container.{TaskName, TaskNamesToSystemStreamPartitions}
+import org.apache.samza.container.TaskName
 import org.apache.samza.system.SystemStreamPartition
 import org.junit.Assert._
 import org.junit.Test
+import org.apache.samza.job.model.TaskModel
+import org.apache.samza.Partition
+import scala.collection.JavaConversions
+import org.scalatest.Assertions.intercept
+import scala.collection.JavaConversions._
 
 class TestGroupByContainerCount {
-  val emptySSPSet = Set[SystemStreamPartition]()
-
   @Test
-  def weGetAsExactlyManyGroupsAsWeAskFor() {
-    // memoize the maps used in the test to avoid an O(n^3) loop
-    val tntsspCache = scala.collection.mutable.Map[Int, 
TaskNamesToSystemStreamPartitions]()
-
-    def tntsspOfSize(size:Int) = {
-      def getMap(size:Int) = TaskNamesToSystemStreamPartitions((0 until 
size).map(z => new TaskName("tn" + z) -> emptySSPSet).toMap)
-
-      tntsspCache.getOrElseUpdate(size, getMap(size))
-    }
+  def testEmptyTasks {
+    intercept[IllegalArgumentException] { new 
GroupByContainerCount(1).group(Set()) }
+  }
 
-    val maxTNTSSPSize = 1000
-    val maxNumGroups = 140
-    for(numGroups <- 1 to maxNumGroups) {
-      val grouper = new GroupByContainerCount(numGroups)
+  @Test
+  def testFewerTasksThanContainers {
+    intercept[IllegalArgumentException] { new 
GroupByContainerCount(2).group(Set(null)) }
+  }
 
-      for (tntsspSize <- numGroups to maxTNTSSPSize) {
-        val map = tntsspOfSize(tntsspSize)
-        assertEquals(tntsspSize, map.size)
+  @Test
+  def testHappyPath {
+    val taskModels = Set(
+      getTaskModel("1", 1),
+      getTaskModel("2", 2),
+      getTaskModel("3", 3),
+      getTaskModel("4", 4),
+      getTaskModel("5", 5))
+    val containers = new GroupByContainerCount(2)
+      .group(taskModels)
+      .map(containerModel => containerModel.getContainerId -> containerModel)
+      .toMap
+    assertEquals(2, containers.size)
+    val container0 = containers(0)
+    val container1 = containers(1)
+    assertNotNull(container0)
+    assertNotNull(container1)
+    assertEquals(0, container0.getContainerId)
+    assertEquals(1, container1.getContainerId)
+    assertEquals(3, container0.getTasks.size)
+    assertEquals(2, container1.getTasks.size)
+    assertTrue(container0.getTasks.containsKey(new TaskName("1")))
+    assertTrue(container0.getTasks.containsKey(new TaskName("3")))
+    assertTrue(container0.getTasks.containsKey(new TaskName("5")))
+    assertTrue(container1.getTasks.containsKey(new TaskName("2")))
+    assertTrue(container1.getTasks.containsKey(new TaskName("4")))
+  }
 
-        val grouped = grouper.groupTaskNames(map)
-        assertEquals("Asked for " + numGroups + " but got " + grouped.size, 
numGroups, grouped.size)
-      }
-    }
+  private def getTaskModel(name: String, partitionId: Int) = {
+    new TaskModel(new TaskName(name), Set[SystemStreamPartition](), new 
Partition(partitionId))
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
 
b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
new file mode 100644
index 0000000..1eb0eda
--- /dev/null
+++ 
b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator
+
+import org.junit.Test
+import org.junit.Assert._
+import scala.collection.JavaConversions._
+import org.apache.samza.config.MapConfig
+import org.apache.samza.config.TaskConfig
+import org.apache.samza.config.SystemConfig
+import org.apache.samza.container.TaskName
+import org.apache.samza.checkpoint.Checkpoint
+import org.apache.samza.checkpoint.CheckpointManagerFactory
+import org.apache.samza.checkpoint.CheckpointManager
+import org.apache.samza.metrics.MetricsRegistry
+import org.apache.samza.config.Config
+import org.apache.samza.system.SystemFactory
+import org.apache.samza.system.SystemAdmin
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.system.SystemStreamMetadata
+import 
org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
+import org.apache.samza.Partition
+import org.apache.samza.job.model.JobModel
+import org.apache.samza.job.model.ContainerModel
+import org.apache.samza.job.model.TaskModel
+
+class TestJobCoordinator {
+  /**
+   * Builds a coordinator from config, and then compares it with what was 
+   * expected. We simulate having a checkpoint manager that has 2 task 
+   * changelog entries, and our model adds a third task. Expectation is that 
+   * the JobCoordinator will assign the new task with a new changelog 
+   * partition.
+   */
+  @Test
+  def testJobCoordinator {
+    val containerCount = 2
+    val config = new MapConfig(Map(
+      TaskConfig.CHECKPOINT_MANAGER_FACTORY -> 
classOf[MockCheckpointManagerFactory].getCanonicalName,
+      TaskConfig.INPUT_STREAMS -> "test.stream1",
+      (SystemConfig.SYSTEM_FACTORY format "test") -> 
classOf[MockSystemFactory].getCanonicalName))
+    val coordinator = JobCoordinator(config, containerCount)
+
+    // Construct the expected JobModel, so we can compare it to 
+    // JobCoordinator's JobModel.
+    val task0Name = new TaskName("Partition 0")
+    val task1Name = new TaskName("Partition 1")
+    val task2Name = new TaskName("Partition 2")
+    val container0Tasks = Map(
+      task0Name -> new TaskModel(task0Name, Set(new 
SystemStreamPartition("test", "stream1", new Partition(0))), new Partition(4)),
+      task2Name -> new TaskModel(task2Name, Set(new 
SystemStreamPartition("test", "stream1", new Partition(2))), new Partition(5)))
+    val container1Tasks = Map(
+      task1Name -> new TaskModel(task1Name, Set(new 
SystemStreamPartition("test", "stream1", new Partition(1))), new Partition(3)))
+    val containers = Map(
+      Integer.valueOf(0) -> new ContainerModel(0, container0Tasks),
+      Integer.valueOf(1) -> new ContainerModel(1, container1Tasks))
+    val jobModel = new JobModel(config, containers)
+    assertEquals(config, coordinator.jobModel.getConfig)
+    assertEquals(jobModel, coordinator.jobModel)
+  }
+}
+
+object MockCheckpointManager {
+  var mapping: java.util.Map[TaskName, java.lang.Integer] = Map[TaskName, 
java.lang.Integer](
+    new TaskName("Partition 0") -> 4,
+    new TaskName("Partition 1") -> 3)
+}
+
+class MockCheckpointManagerFactory extends CheckpointManagerFactory {
+  def getCheckpointManager(config: Config, registry: MetricsRegistry) = new 
MockCheckpointManager
+}
+
+class MockCheckpointManager extends CheckpointManager {
+  def start() {}
+  def register(taskName: TaskName) {}
+  def writeCheckpoint(taskName: TaskName, checkpoint: Checkpoint) {}
+  def readLastCheckpoint(taskName: TaskName) = null
+  def readChangeLogPartitionMapping = MockCheckpointManager.mapping
+  def writeChangeLogPartitionMapping(mapping: java.util.Map[TaskName, 
java.lang.Integer]) {
+    MockCheckpointManager.mapping = mapping
+  }
+  def stop() {}
+}
+
+class MockSystemFactory extends SystemFactory {
+  def getConsumer(systemName: String, config: Config, registry: 
MetricsRegistry) = null
+  def getProducer(systemName: String, config: Config, registry: 
MetricsRegistry) = null
+  def getAdmin(systemName: String, config: Config) = new MockSystemAdmin
+}
+
+class MockSystemAdmin extends SystemAdmin {
+  def getOffsetsAfter(offsets: java.util.Map[SystemStreamPartition, String]) = 
null
+  def getSystemStreamMetadata(streamNames: java.util.Set[String]): 
java.util.Map[String, SystemStreamMetadata] = {
+    assertEquals(1, streamNames.size)
+    val partitionMetadata = Map(
+      new Partition(0) -> new SystemStreamPartitionMetadata(null, null, null),
+      new Partition(1) -> new SystemStreamPartitionMetadata(null, null, null),
+      // Create a new Partition(2), which wasn't in the prior changelog 
mapping.
+      new Partition(2) -> new SystemStreamPartitionMetadata(null, null, null))
+    Map(streamNames.toList.head -> new SystemStreamMetadata("foo", 
partitionMetadata))
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala 
b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
index 93c71b4..b75f440 100644
--- a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
+++ b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
@@ -23,7 +23,7 @@ import org.apache.samza.Partition
 import org.apache.samza.config.Config
 import org.apache.samza.config.Config
 import org.apache.samza.config.MapConfig
-import org.apache.samza.container.{TaskName, TaskNamesToSystemStreamPartitions}
+import org.apache.samza.container.TaskName
 import org.apache.samza.metrics.MetricsRegistry
 import org.apache.samza.metrics.MetricsRegistry
 import org.apache.samza.system.SystemFactory
@@ -37,72 +37,4 @@ import scala.collection.JavaConversions._
 import scala.util.Random
 
 class TestUtil {
-  val random = new Random(System.currentTimeMillis())
-
-  @Test
-  def testGetInputStreamPartitions {
-    val expectedPartitionsPerStream = 1
-    val inputSystemStreamNames = List("test.foo", "test.bar")
-    val config = new MapConfig(Map(
-      "task.inputs" -> inputSystemStreamNames.mkString(","),
-      "systems.test.samza.factory" -> classOf[MockSystemFactory].getName,
-      "systems.test.partitions.per.stream" -> 
expectedPartitionsPerStream.toString))
-    val systemStreamPartitions = Util.getInputStreamPartitions(config)
-    assertNotNull(systemStreamPartitions)
-    assertEquals(expectedPartitionsPerStream * inputSystemStreamNames.size, 
systemStreamPartitions.size)
-    inputSystemStreamNames.foreach(systemStreamName => {
-      (0 until expectedPartitionsPerStream).foreach(partitionNumber => {
-        val partition = new Partition(partitionNumber)
-        val systemStreamNameSplit = systemStreamName.split("\\.")
-        systemStreamPartitions.contains(new 
SystemStreamPartition(systemStreamNameSplit(0), systemStreamNameSplit(1), 
partition))
-      })
-    })
-  }
-
-  @Test
-  def testResolveTaskNameToChangelogPartitionMapping {
-    def testRunner(description:String, currentTaskNames:Set[TaskName], 
previousTaskNameMapping:Map[TaskName, Int],
-                   result:Map[TaskName, Int]) {
-      assertEquals("Failed: " + description, result,
-        Util.resolveTaskNameToChangelogPartitionMapping(currentTaskNames, 
previousTaskNameMapping))
-    }
-
-    testRunner("No change between runs",
-      Set(new TaskName("Partition 0")),
-      Map(new TaskName("Partition 0") -> 0),
-      Map(new TaskName("Partition 0") -> 0))
-
-    testRunner("New TaskName added, none missing this run",
-      Set(new TaskName("Partition 0"), new TaskName("Partition 1")),
-      Map(new TaskName("Partition 0") -> 0),
-      Map(new TaskName("Partition 0") -> 0, new TaskName("Partition 1") -> 1))
-
-    testRunner("New TaskName added, one missing this run",
-      Set(new TaskName("Partition 0"), new TaskName("Partition 2")),
-      Map(new TaskName("Partition 0") -> 0, new TaskName("Partition 1") -> 1),
-      Map(new TaskName("Partition 0") -> 0, new TaskName("Partition 1") -> 1, 
new TaskName("Partition 2") -> 2))
-
-    testRunner("New TaskName added, all previous missing this run",
-      Set(new TaskName("Partition 3"), new TaskName("Partition 4")),
-      Map(new TaskName("Partition 0") -> 0, new TaskName("Partition 1") -> 1, 
new TaskName("Partition 2") -> 2),
-      Map(new TaskName("Partition 0") -> 0, new TaskName("Partition 1") -> 1, 
new TaskName("Partition 2") -> 2, new TaskName("Partition 3") -> 3, new 
TaskName("Partition 4") -> 4))
-  }
-
-  /**
-   * Generate a random alphanumeric string of the specified length
-   * @param length Specifies length of the string to generate
-   * @return An alphanumeric string
-   */
-  def generateString (length : Int) : String = {
-    Random.alphanumeric.take(length).mkString
-  }
-}
-
-/**
- * Little mock for testing the input stream partition method.
- */
-class MockSystemFactory extends SystemFactory {
-  def getConsumer(systemName: String, config: Config, registry: 
MetricsRegistry) = null
-  def getProducer(systemName: String, config: Config, registry: 
MetricsRegistry) = null
-  def getAdmin(systemName: String, config: Config) = new 
SinglePartitionWithoutOffsetsSystemAdmin
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml 
b/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml
index 50a7426..2b1aa3e 100644
--- a/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml
+++ b/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml
@@ -128,14 +128,14 @@
             %th SystemStreamPartitions
             %th Container
         %tbody
-          - for((taskId, taskNames) <- state.runningTaskToTaskNames)
-            - for((taskName, ssps) <- taskNames)
+          - for((containerId, container) <- state.runningTasks)
+            - val containerModel = 
state.jobCoordinator.jobModel.getContainers.get(containerId)
+            - for((taskName, taskModel) <- containerModel.getTasks)
               %tr
-                %td= taskId
+                %td= containerId
                 %td= taskName
-                %td= ssps.map(_.toString).toList.sorted.mkString(", ")
+                %td= 
taskModel.getSystemStreamPartitions.map(_.toString).toList.sorted.mkString(", ")
                 %td
-                  - val container = state.runningTasks(taskId)
                   %a(target="_blank" 
href="http://#{container.nodeHttpAddress}/node/containerlogs/#{container.id.toString}/#{username}";)=
 container.id.toString
 
     %div.tab-pane#config

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala 
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala
index 723b673..91aff3c 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala
@@ -20,7 +20,6 @@
 package org.apache.samza.job.yarn
 
 import scala.collection.JavaConversions.asScalaBuffer
-
 import org.apache.hadoop.yarn.api.ApplicationConstants
 import org.apache.hadoop.yarn.api.records.{ Container, ContainerStatus, 
NodeReport }
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
@@ -28,17 +27,17 @@ import 
org.apache.hadoop.yarn.client.api.async.AMRMClientAsync
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.util.ConverterUtils
 import org.apache.samza.config.MapConfig
+import org.apache.samza.config.Config
 import org.apache.samza.config.ShellCommandConfig
 import org.apache.samza.config.YarnConfig
 import org.apache.samza.config.YarnConfig.Config2Yarn
-import org.apache.samza.config.serializers.JsonConfigSerializer
 import 
org.apache.samza.job.yarn.SamzaAppMasterTaskManager.DEFAULT_CONTAINER_MEM
 import org.apache.samza.job.yarn.SamzaAppMasterTaskManager.DEFAULT_CPU_CORES
 import org.apache.samza.metrics.JmxServer
 import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.util.hadoop.HttpFileSystem
-
 import org.apache.samza.util.Logging
+import org.apache.samza.serializers.model.SamzaObjectMapper
 
 /**
  * When YARN executes an application master, it needs a bash command to
@@ -68,7 +67,7 @@ object SamzaAppMaster extends Logging with 
AMRMClientAsync.CallbackHandler {
     info("got node manager port: %s" format nodePortString)
     val nodeHttpPortString = 
System.getenv(ApplicationConstants.Environment.NM_HTTP_PORT.toString)
     info("got node manager http port: %s" format nodeHttpPortString)
-    val config = new 
MapConfig(JsonConfigSerializer.fromJson(System.getenv(ShellCommandConfig.ENV_CONFIG)))
+    val config = new 
MapConfig(SamzaObjectMapper.getObjectMapper.readValue(System.getenv(ShellCommandConfig.ENV_CONFIG),
 classOf[Config]))
     info("got config: %s" format config)
     val hConfig = new YarnConfiguration
     hConfig.set("fs.http.impl", classOf[HttpFileSystem].getName)

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala
 
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala
index b076968..ce88698 100644
--- 
a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala
+++ 
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala
@@ -36,7 +36,6 @@ import org.apache.samza.webapp.ApplicationMasterWebServlet
 class SamzaAppMasterService(config: Config, state: SamzaAppMasterState, 
registry: ReadableMetricsRegistry, clientHelper: ClientHelper) extends 
YarnAppMasterListener with Logging {
   var rpcApp: HttpServer = null
   var webApp: HttpServer = null
-  var coordinatorApp: HttpServer = null
 
   override def onInit() {
     // try starting the samza AM dashboard at a random rpc and tracking port
@@ -50,13 +49,10 @@ class SamzaAppMasterService(config: Config, state: 
SamzaAppMasterState, registry
     webApp.addServlet("/*", new ApplicationMasterWebServlet(config, state))
     webApp.start
 
-    coordinatorApp = new HttpServer
-    coordinatorApp.addServlet("/*", new JobServlet(config, 
state.tasksToSSPTaskNames, state.taskNameToChangeLogPartitionMapping))
-    coordinatorApp.start
-
+    state.jobCoordinator.start
     state.rpcUrl = rpcApp.getUrl
     state.trackingUrl = webApp.getUrl
-    state.coordinatorUrl = coordinatorApp.getUrl
+    state.coordinatorUrl = state.jobCoordinator.server.getUrl
 
     info("Webapp is started at (rpc %s, tracking %s, coordinator %s)" format 
(state.rpcUrl, state.trackingUrl, state.coordinatorUrl))
   }
@@ -70,8 +66,6 @@ class SamzaAppMasterService(config: Config, state: 
SamzaAppMasterState, registry
       webApp.stop
     }
 
-    if (coordinatorApp != null) {
-      coordinatorApp.stop
-    }
+    state.jobCoordinator.stop
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala 
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala
index 423e903..8ba435e 100644
--- 
a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala
+++ 
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala
@@ -24,8 +24,9 @@ import org.apache.hadoop.yarn.api.records.ContainerId
 import java.util
 import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.container.TaskName
-import org.apache.samza.container.TaskNamesToSystemStreamPartitions
 import java.net.URL
+import org.apache.samza.job.model.JobModel
+import org.apache.samza.coordinator.JobCoordinator
 
 /**
  * Samza's application master has state that is usually manipulated based on
@@ -42,9 +43,7 @@ class SamzaAppMasterState(val taskId: Int, val containerId: 
ContainerId, val nod
   var unclaimedTasks = Set[Int]()
   var finishedTasks = Set[Int]()
   var runningTasks = Map[Int, YarnContainer]()
-  var runningTaskToTaskNames = Map[Int, util.Map[TaskName, 
util.Set[SystemStreamPartition]]]()
-  var tasksToSSPTaskNames = Map[Int, TaskNamesToSystemStreamPartitions]()
-  var taskNameToChangeLogPartitionMapping = Map[TaskName, Int]()
+  var jobCoordinator: JobCoordinator = null
   var status = FinalApplicationStatus.UNDEFINED
   var jobHealthy = true
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
 
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
index eb52529..b0b6543 100644
--- 
a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
+++ 
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
@@ -21,10 +21,7 @@ package org.apache.samza.job.yarn
 
 import java.nio.ByteBuffer
 import java.util.Collections
-
 import scala.collection.JavaConversions._
-import scala.collection.JavaConverters.mapAsJavaMapConverter
-
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.io.DataOutputBuffer
 import org.apache.hadoop.security.UserGroupInformation
@@ -45,9 +42,8 @@ import org.apache.samza.config.YarnConfig.Config2Yarn
 import org.apache.samza.job.CommandBuilder
 import org.apache.samza.job.ShellCommandBuilder
 import org.apache.samza.util.Util
-
 import org.apache.samza.util.Logging
-import org.apache.samza.container.TaskNamesToSystemStreamPartitions
+import org.apache.samza.coordinator.JobCoordinator
 
 object SamzaAppMasterTaskManager {
   val DEFAULT_CONTAINER_MEM = 1024
@@ -73,8 +69,7 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: 
Config, state: SamzaA
       info("No %s specified. Defaulting to one container." format 
YarnConfig.TASK_COUNT)
       1
     })
-  state.tasksToSSPTaskNames = Util.assignContainerToSSPTaskNames(config, 
state.taskCount)
-  state.taskNameToChangeLogPartitionMapping = 
Util.getTaskNameToChangeLogPartitionMapping(config, state.tasksToSSPTaskNames)
+  state.jobCoordinator = JobCoordinator(config, state.taskCount)
 
   var taskFailures = Map[Int, TaskFailure]()
   var tooManyFailedContainers = false
@@ -85,7 +80,7 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: 
Config, state: SamzaA
 
   override def onInit() {
     state.neededContainers = state.taskCount
-    state.unclaimedTasks = (0 until state.taskCount).toSet
+    state.unclaimedTasks = 
state.jobCoordinator.jobModel.getContainers.keySet.map(_.toInt).toSet
     containerManager = NMClient.createNMClient()
     containerManager.init(conf)
     containerManager.start
@@ -109,7 +104,7 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: 
Config, state: SamzaA
     state.unclaimedTasks.headOption match {
       case Some(taskId) => {
         info("Got available task id (%d) for container: %s" format (taskId, 
container))
-        val sspTaskNames: TaskNamesToSystemStreamPartitions = 
state.tasksToSSPTaskNames.getOrElse(taskId, TaskNamesToSystemStreamPartitions())
+        val sspTaskNames = 
state.jobCoordinator.jobModel.getContainers.get(taskId)
         info("Claimed SSP taskNames %s for container ID %s" format 
(sspTaskNames, taskId))
         val cmdBuilderClassName = 
config.getCommandClass.getOrElse(classOf[ShellCommandBuilder].getName)
         val cmdBuilder = 
Class.forName(cmdBuilderClassName).newInstance.asInstanceOf[CommandBuilder]
@@ -135,7 +130,6 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: 
Config, state: SamzaA
         }
         state.runningTasks += taskId -> new YarnContainer(container)
         state.unclaimedTasks -= taskId
-        state.runningTaskToTaskNames += taskId -> 
sspTaskNames.getJavaFriendlyType
 
         info("Claimed task ID %s for container %s on node %s 
(http://%s/node/containerlogs/%s)." format (taskId, containerIdStr, 
container.getNodeId.getHost, container.getNodeHttpAddress, containerIdStr))
 
@@ -157,7 +151,6 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: 
Config, state: SamzaA
     taskId match {
       case Some(taskId) => {
         state.runningTasks -= taskId
-        state.runningTaskToTaskNames -= taskId
       }
       case _ => None
     }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala 
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
index d7aa240..24b11da 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
@@ -30,12 +30,12 @@ import org.apache.samza.job.ApplicationStatus.Running
 import org.apache.samza.job.StreamJob
 import org.apache.samza.job.ApplicationStatus.SuccessfulFinish
 import org.apache.samza.job.ApplicationStatus.UnsuccessfulFinish
-import org.apache.samza.config.serializers.JsonConfigSerializer
 import org.apache.samza.config.YarnConfig.Config2Yarn
 import org.apache.samza.config.JobConfig.Config2Job
 import org.apache.samza.config.YarnConfig
 import org.apache.samza.config.ShellCommandConfig
 import org.apache.samza.SamzaException
+import org.apache.samza.serializers.model.SamzaObjectMapper
 
 object YarnJob {
   val DEFAULT_AM_CONTAINER_MEM = 1024
@@ -60,7 +60,7 @@ class YarnJob(config: Config, hadoopConfig: Configuration) 
extends StreamJob {
           format (ApplicationConstants.LOG_DIR_EXPANSION_VAR, 
ApplicationConstants.LOG_DIR_EXPANSION_VAR, ApplicationConstants.STDOUT, 
ApplicationConstants.STDERR)),
       Some({
         val envMap = Map(
-          ShellCommandConfig.ENV_CONFIG -> 
Util.envVarEscape(JsonConfigSerializer.toJson(config)),
+          ShellCommandConfig.ENV_CONFIG -> 
Util.envVarEscape(SamzaObjectMapper.getObjectMapper.writeValueAsString(config)),
           ShellCommandConfig.ENV_JAVA_OPTS -> 
Util.envVarEscape(config.getAmOpts.getOrElse("")))
         val envMapWithJavaHome = config.getAMJavaHome match {
           case Some(javaHome) => envMap + (ShellCommandConfig.ENV_JAVA_HOME -> 
javaHome)

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
 
b/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
index d6cf22d..0afee64 100644
--- 
a/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
+++ 
b/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
@@ -29,11 +29,12 @@ import scala.collection.JavaConversions._
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.codehaus.jackson.map.ObjectMapper
 import java.util.HashMap
+import org.apache.samza.serializers.model.SamzaObjectMapper
 
 class ApplicationMasterRestServlet(config: Config, state: SamzaAppMasterState, 
registry: ReadableMetricsRegistry) extends ScalatraServlet with ScalateSupport {
   val yarnConfig = new YarnConfiguration
   val client = new ClientHelper(yarnConfig)
-  val jsonMapper = new ObjectMapper()
+  val jsonMapper = SamzaObjectMapper.getObjectMapper
 
   before() {
     contentType = "application/json"
@@ -79,20 +80,18 @@ class ApplicationMasterRestServlet(config: Config, state: 
SamzaAppMasterState, r
   get("/am") {
     val containers = new HashMap[String, HashMap[String, Object]]
 
-    state.runningTasks.values.foreach(c => {
-      val containerIdStr = c.id.toString
-      val containerMap = new HashMap[String, Object]
-
-      val taskId = state.runningTasks.filter { case (_, container) => 
container.id.toString.equals(containerIdStr) }.keys.head
-      val taskNames = new 
java.util.ArrayList(state.runningTaskToTaskNames.get(taskId).get.toList)
-
-      containerMap.put("yarn-address", c.nodeHttpAddress)
-      containerMap.put("start-time", c.startTime.toString)
-      containerMap.put("up-time", c.upTime.toString)
-      containerMap.put("task-names", taskNames)
-      containerMap.put("task-id", taskId.toString)
-      containers.put(containerIdStr, containerMap)
-    })
+    state.runningTasks.foreach {
+      case (containerId, container) =>
+        val yarnContainerId = container.id.toString
+        val containerMap = new HashMap[String, Object]
+        val taskModels = 
state.jobCoordinator.jobModel.getContainers.get(containerId).getTasks
+        containerMap.put("yarn-address", container.nodeHttpAddress)
+        containerMap.put("start-time", container.startTime.toString)
+        containerMap.put("up-time", container.upTime.toString)
+        containerMap.put("task-models", taskModels)
+        containerMap.put("container-id", containerId.toString)
+        containers.put(yarnContainerId, containerMap)
+    }
 
     val status = Map[String, Object](
       "app-attempt-id" -> state.appAttemptId.toString,

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala
 
b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala
index a1c090d..81dea9d 100644
--- 
a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala
+++ 
b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala
@@ -29,26 +29,25 @@ import org.junit.Test
 import scala.collection.JavaConversions._
 import org.apache.samza.config.Config
 import org.apache.samza.container.TaskName
-import org.apache.samza.container.TaskNamesToSystemStreamPartitions
 import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.Partition
+import org.apache.samza.coordinator.JobCoordinator
 
 class TestSamzaAppMasterService {
   @Test
   def testAppMasterDashboardShouldStart {
+    val config = getDummyConfig
     val state = new SamzaAppMasterState(-1, 
ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "", 1, 
2)
-    val service = new SamzaAppMasterService(getDummyConfig, state, null, null)
+    val service = new SamzaAppMasterService(config, state, null, null)
     val taskName = new TaskName("test")
 
-    state.tasksToSSPTaskNames = Map[Int, TaskNamesToSystemStreamPartitions]()
-    state.taskNameToChangeLogPartitionMapping = Map[TaskName, Int]()
-    state.tasksToSSPTaskNames += 0 -> new 
TaskNamesToSystemStreamPartitions(Map(taskName -> Set(new 
SystemStreamPartition("a", "b", new Partition(0)), new 
SystemStreamPartition("a", "b", new Partition(0)))))
-    state.taskNameToChangeLogPartitionMapping += taskName -> 0
+    state.jobCoordinator = JobCoordinator(config, 1)
 
     // start the dashboard
     service.onInit
     assertTrue(state.rpcUrl.getPort > 0)
     assertTrue(state.trackingUrl.getPort > 0)
+    assertTrue(state.coordinatorUrl.getPort > 0)
 
     // check to see if it's running
     val url = new URL(state.rpcUrl.toString + "am")
@@ -70,8 +69,11 @@ class TestSamzaAppMasterService {
   @Test
   def testAppMasterDashboardWebServiceShouldStart {
     // Create some dummy config
+    val config = getDummyConfig
     val state = new SamzaAppMasterState(-1, 
ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "", 1, 
2)
-    val service = new SamzaAppMasterService(getDummyConfig, state, null, null)
+    val service = new SamzaAppMasterService(config, state, null, null)
+
+    state.jobCoordinator = JobCoordinator(config, 1)
 
     // start the dashboard
     service.onInit

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
 
b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
index 5f65dbd..cab5101 100644
--- 
a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
+++ 
b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
@@ -31,16 +31,19 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.util.ConverterUtils
 import org.apache.samza.Partition
 import org.apache.samza.config.Config
+import org.apache.samza.config.YarnConfig.Config2Yarn
 import org.apache.samza.config.MapConfig
 import org.apache.samza.metrics.MetricsRegistry
 import org.apache.samza.system.SystemFactory
 import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin
 import org.apache.samza.util.Util
 import org.junit.Test
 import scala.collection.JavaConversions._
 import TestSamzaAppMasterTaskManager._
 import java.net.URL
+import org.apache.samza.system.SystemAdmin
+import org.apache.samza.system.SystemStreamMetadata
+import 
org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
 
 object TestSamzaAppMasterTaskManager {
   def getContainer(containerId: ContainerId) = new Container {
@@ -235,7 +238,6 @@ class TestSamzaAppMasterTaskManager {
     taskManager.onContainerAllocated(getContainer(container2))
     assertEquals(0, state.neededContainers)
     assertEquals(1, state.runningTasks.size)
-    assertEquals(1, state.runningTaskToTaskNames.size)
     assertEquals(0, state.unclaimedTasks.size)
     assertEquals(1, containersRequested)
     assertEquals(1, containersStarted)
@@ -244,7 +246,6 @@ class TestSamzaAppMasterTaskManager {
     taskManager.onContainerAllocated(getContainer(container3))
     assertEquals(0, state.neededContainers)
     assertEquals(1, state.runningTasks.size)
-    assertEquals(1, state.runningTaskToTaskNames.size)
     assertEquals(0, state.unclaimedTasks.size)
     assertEquals(1, amClient.getClient.requests.size)
     assertEquals(1, amClient.getClient.getRelease.size)
@@ -260,7 +261,6 @@ class TestSamzaAppMasterTaskManager {
     assertFalse(taskManager.shouldShutdown)
     assertEquals(0, state.neededContainers)
     assertEquals(1, state.runningTasks.size)
-    assertEquals(1, state.runningTaskToTaskNames.size)
     assertEquals(0, state.unclaimedTasks.size)
     assertEquals(0, amClient.getClient.requests.size)
     assertEquals(0, amClient.getClient.getRelease.size)
@@ -277,7 +277,6 @@ class TestSamzaAppMasterTaskManager {
     assertEquals(0, state.neededContainers)
     assertTrue(state.jobHealthy)
     assertEquals(1, state.runningTasks.size)
-    assertEquals(1, state.runningTaskToTaskNames.size)
     assertEquals(0, state.unclaimedTasks.size)
   }
 
@@ -307,13 +306,11 @@ class TestSamzaAppMasterTaskManager {
     taskManager.onContainerAllocated(getContainer(container2))
     assertEquals(1, state.neededContainers)
     assertEquals(1, state.runningTasks.size)
-    assertEquals(1, state.runningTaskToTaskNames.size)
     assertEquals(1, state.unclaimedTasks.size)
     assertEquals(1, containersStarted)
     taskManager.onContainerAllocated(getContainer(container3))
     assertEquals(0, state.neededContainers)
     assertEquals(2, state.runningTasks.size)
-    assertEquals(2, state.runningTaskToTaskNames.size)
     assertEquals(0, state.unclaimedTasks.size)
     assertEquals(2, containersStarted)
 
@@ -321,7 +318,6 @@ class TestSamzaAppMasterTaskManager {
     taskManager.onContainerCompleted(getContainerStatus(container2, 0, ""))
     assertEquals(0, state.neededContainers)
     assertEquals(1, state.runningTasks.size)
-    assertEquals(1, state.runningTaskToTaskNames.size)
     assertEquals(0, state.unclaimedTasks.size)
     assertEquals(1, state.completedTasks)
 
@@ -329,7 +325,6 @@ class TestSamzaAppMasterTaskManager {
     taskManager.onContainerCompleted(getContainerStatus(container3, 1, 
"expected failure here"))
     assertEquals(1, state.neededContainers)
     assertEquals(0, state.runningTasks.size)
-    assertEquals(0, state.runningTaskToTaskNames.size)
     assertEquals(1, state.unclaimedTasks.size)
     assertEquals(1, state.completedTasks)
     assertFalse(taskManager.shouldShutdown)
@@ -338,7 +333,6 @@ class TestSamzaAppMasterTaskManager {
     taskManager.onContainerAllocated(getContainer(container3))
     assertEquals(0, state.neededContainers)
     assertEquals(1, state.runningTasks.size)
-    assertEquals(1, state.runningTaskToTaskNames.size)
     assertEquals(0, state.unclaimedTasks.size)
     assertEquals(3, containersStarted)
 
@@ -346,7 +340,6 @@ class TestSamzaAppMasterTaskManager {
     taskManager.onContainerCompleted(getContainerStatus(container3, 0, ""))
     assertEquals(0, state.neededContainers)
     assertEquals(0, state.runningTasks.size)
-    assertEquals(0, state.runningTaskToTaskNames.size)
     assertEquals(0, state.unclaimedTasks.size)
     assertEquals(2, state.completedTasks)
     assertTrue(taskManager.shouldShutdown)
@@ -379,19 +372,16 @@ class TestSamzaAppMasterTaskManager {
     assertEquals(0, amClient.getClient.getRelease.size)
     assertEquals(1, state.neededContainers)
     assertEquals(0, state.runningTasks.size)
-    assertEquals(0, state.runningTaskToTaskNames.size)
     assertEquals(1, state.unclaimedTasks.size)
     taskManager.onContainerAllocated(getContainer(container2))
     assertEquals(0, state.neededContainers)
     assertEquals(1, state.runningTasks.size)
-    assertEquals(1, state.runningTaskToTaskNames.size)
     assertEquals(0, state.unclaimedTasks.size)
     assertEquals(1, containersRequested)
     assertEquals(1, containersStarted)
     taskManager.onContainerAllocated(getContainer(container3))
     assertEquals(0, state.neededContainers)
     assertEquals(1, state.runningTasks.size)
-    assertEquals(1, state.runningTaskToTaskNames.size)
     assertEquals(0, state.unclaimedTasks.size)
     assertEquals(1, containersRequested)
     assertEquals(1, containersStarted)
@@ -413,6 +403,22 @@ class MockSystemFactory extends SystemFactory {
   }
 
   def getAdmin(systemName: String, config: Config) = {
-    new SinglePartitionWithoutOffsetsSystemAdmin
+    val containerCount = config.getTaskCount.getOrElse(1)
+    new MockSystemAdmin(containerCount)
+  }
+}
+
+/**
+ * Helper class that returns metadata for each stream that contains numTasks 
partitions in it.
+ */
+class MockSystemAdmin(numTasks: Int) extends SystemAdmin {
+  def getOffsetsAfter(offsets: java.util.Map[SystemStreamPartition, String]) = 
null
+  def getSystemStreamMetadata(streamNames: java.util.Set[String]) = {
+    streamNames.map(streamName => {
+      var partitionMetadata = (0 until numTasks).map(partitionId => {
+        new Partition(partitionId) -> new SystemStreamPartitionMetadata(null, 
null, null)
+      }).toMap
+      streamName -> new SystemStreamMetadata(streamName, partitionMetadata)
+    }).toMap[String, SystemStreamMetadata]
   }
 }

Reply via email to