SAMZA-123: Move topic partition grouping to the AM and generalize

Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/da79b6f9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/da79b6f9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/da79b6f9

Branch: refs/heads/master
Commit: da79b6f92f44f44ca7649fbca20cc5389b28e29f
Parents: 7cecf0a
Author: Jakob Homan <[email protected]>
Authored: Mon Jul 28 15:11:57 2014 -0700
Committer: Jakob Homan <[email protected]>
Committed: Mon Jul 28 15:11:57 2014 -0700

----------------------------------------------------------------------
 .gitignore                                      |   2 +
 build.gradle                                    |   1 +
 .../0.7.0/container/samza-container.md          |  10 +-
 .../org/apache/samza/checkpoint/Checkpoint.java |  40 +-
 .../samza/checkpoint/CheckpointManager.java     |  34 +-
 .../samza/container/SamzaContainerContext.java  |  14 +-
 .../container/SystemStreamPartitionGrouper.java |  40 ++
 .../SystemStreamPartitionGrouperFactory.java    |  28 ++
 .../org/apache/samza/container/TaskName.java    |  63 ++++
 .../org/apache/samza/job/CommandBuilder.java    |  13 +-
 .../java/org/apache/samza/task/TaskContext.java |   9 +-
 .../samza/checkpoint/CheckpointTool.scala       | 104 +++---
 .../apache/samza/checkpoint/OffsetManager.scala |  68 ++--
 .../file/FileSystemCheckpointManager.scala      |  57 ++-
 .../org/apache/samza/config/JobConfig.scala     |   6 +
 .../samza/config/ShellCommandConfig.scala       |   7 +-
 .../org/apache/samza/container/RunLoop.scala    |  45 ++-
 .../apache/samza/container/SamzaContainer.scala | 112 +++---
 .../SystemStreamPartitionTaskNameGrouper.scala  |  38 ++
 .../apache/samza/container/TaskInstance.scala   |  75 ++--
 .../samza/container/TaskInstanceMetrics.scala   |   8 +-
 .../TaskNamesToSystemStreamPartitions.scala     | 145 ++++++++
 .../groupers/GroupByPartition.scala             |  41 +++
 .../groupers/GroupBySystemStreamPartition.scala |  38 ++
 ...leSystemStreamPartitionTaskNameGrouper.scala |  50 +++
 .../apache/samza/job/ShellCommandBuilder.scala  |  66 +++-
 .../samza/job/local/LocalJobFactory.scala       |  28 +-
 .../samza/serializers/CheckpointSerde.scala     | 121 +++---
 .../samza/storage/TaskStorageManager.scala      |  13 +-
 .../apache/samza/task/ReadableCoordinator.scala |   4 +-
 .../main/scala/org/apache/samza/util/Util.scala | 193 ++++++++--
 .../samza/checkpoint/TestCheckpointTool.scala   |  43 ++-
 .../samza/checkpoint/TestOffsetManager.scala    |  81 ++--
 .../file/TestFileSystemCheckpointManager.scala  |  47 ++-
 .../SystemStreamPartitionGrouperTestBase.scala  |  57 +++
 .../apache/samza/container/TestRunLoop.scala    |  68 ++--
 .../samza/container/TestSamzaContainer.scala    |  11 +-
 .../samza/container/TestTaskInstance.scala      |   6 +-
 .../TestTaskNamesToSystemStreamPartitions.scala |  71 ++++
 .../groupers/TestGroupByPartition.scala         |  37 ++
 .../TestGroupBySystemStreamPartition.scala      |  41 +++
 ...leSystemStreamPartitionTaskNameGrouper.scala |  54 +++
 .../org/apache/samza/job/TestJobRunner.scala    |   1 -
 .../samza/job/TestShellCommandBuilder.scala     |  52 +++
 .../apache/samza/metrics/TestJmxServer.scala    |   1 -
 .../samza/serializers/TestCheckpointSerde.scala |  46 +--
 .../TestFileReaderSystemConsumer.scala          |  14 +-
 .../samza/task/TestReadableCoordinator.scala    |  12 +-
 .../scala/org/apache/samza/util/TestUtil.scala  |  64 ++--
 .../kafka/KafkaCheckpointLogKey.scala           | 186 ++++++++++
 .../kafka/KafkaCheckpointManager.scala          | 366 +++++++++++++------
 .../kafka/KafkaCheckpointManagerFactory.scala   |  44 +--
 .../samza/system/kafka/TopicMetadataCache.scala |   2 -
 .../kafka/TestKafkaCheckpointLogKey.scala       |  71 ++++
 .../kafka/TestKafkaCheckpointManager.scala      |  81 ++--
 .../system/kafka/TestKafkaSystemAdmin.scala     |   4 +-
 .../samza/storage/kv/LevelDbKeyValueStore.scala |   8 +-
 .../samza/test/integration/join/Emitter.java    |   7 +-
 .../performance/TestKeyValuePerformance.scala   |   9 +-
 .../test/integration/TestStatefulTask.scala     | 119 +++---
 .../resources/scalate/WEB-INF/views/index.scaml |  25 +-
 .../samza/job/yarn/SamzaAppMasterState.scala    |   8 +-
 .../job/yarn/SamzaAppMasterTaskManager.scala    |  19 +-
 .../webapp/ApplicationMasterRestServlet.scala   |   5 +-
 .../yarn/TestSamzaAppMasterTaskManager.scala    |  59 +--
 65 files changed, 2335 insertions(+), 857 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index db9d3ec..7cbbfd6 100644
--- a/.gitignore
+++ b/.gitignore
@@ -24,3 +24,5 @@ build
 samza-test/state
 docs/learn/documentation/0.7.0/api/javadocs
 .DS_Store
+out/
+*.patch

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index d1ee5e8..c262b5f 100644
--- a/build.gradle
+++ b/build.gradle
@@ -144,6 +144,7 @@ project(":samza-kafka_$scalaVersion") {
     compile "org.apache.zookeeper:zookeeper:$zookeeperVersion"
     compile "org.codehaus.jackson:jackson-jaxrs:$jacksonVersion"
     compile "org.apache.kafka:kafka_$scalaVersion:$kafkaVersion"
+    compile "org.codehaus.jackson:jackson-jaxrs:$jacksonVersion"
     testCompile "org.apache.kafka:kafka_$scalaVersion:$kafkaVersion:test"
     testCompile "junit:junit:$junitVersion"
     testCompile "org.mockito:mockito-all:$mockitoVersion"

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/docs/learn/documentation/0.7.0/container/samza-container.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/0.7.0/container/samza-container.md 
b/docs/learn/documentation/0.7.0/container/samza-container.md
index a96ab4a..ab4f0e4 100644
--- a/docs/learn/documentation/0.7.0/container/samza-container.md
+++ b/docs/learn/documentation/0.7.0/container/samza-container.md
@@ -45,7 +45,7 @@ public interface InitableTask {
 }
 {% endhighlight %}
 
-How many instances of your task class are created depends on the number of 
partitions in the job's input streams. If your Samza job has ten partitions, 
there will be ten instantiations of your task class: one for each partition. 
The first task instance will receive all messages for partition one, the second 
instance will receive all messages for partition two, and so on.
+By default, how many instances of your task class are created depends on the 
number of partitions in the job's input streams. If your Samza job has ten 
partitions, there will be ten instantiations of your task class: one for each 
partition. The first task instance will receive all messages for partition one, 
the second instance will receive all messages for partition two, and so on.
 
 <img src="/img/0.7.0/learn/documentation/container/tasks-and-partitions.svg" 
alt="Illustration of tasks consuming partitions" class="diagram-large">
 
@@ -53,7 +53,13 @@ The number of partitions in the input streams is determined 
by the systems from
 
 If a Samza job has more than one input stream, the number of task instances 
for the Samza job is the maximum number of partitions across all input streams. 
For example, if a Samza job is reading from PageViewEvent (12 partitions), and 
ServiceMetricEvent (14 partitions), then the Samza job would have 14 task 
instances (numbered 0 through 13). Task instances 12 and 13 only receive events 
from ServiceMetricEvent, because there is no corresponding PageViewEvent 
partition.
 
-There is [work underway](https://issues.apache.org/jira/browse/SAMZA-71) to 
make the assignment of partitions to tasks more flexible in future versions of 
Samza.
+With this default approach to assigning input streams to task instances, Samza 
is effectively performing a group-by operation on the input streams with their 
partitions as the key. Other strategies for grouping input stream partitions 
are possible by implementing a new 
[SystemStreamPartitionGrouper](../api/javadocs/org/apache/samza/container/SystemStreamPartitionGrouper.html)
 and factory, and configuring the job to use it via the 
job.systemstreampartition.grouper.factory configuration value.
+
+Samza provides the above-discussed per-partition grouper as well as the 
[GroupBySystemStreamPartitionGrouper](../api/javadocs/org/apache/samza/container/systemstreampartition/groupers/GroupBySystemStreamPartition),
 which provides a separate task class instance for every input stream 
partition, effectively grouping by the input stream itself. This provides 
maximum scalability in terms of how many containers can be used to process 
those input streams and is appropriate for very high volume jobs that need no 
grouping of the input streams.
+
+Considering the above example of a PageViewEvent partitioned 12 ways and a 
ServiceMetricEvent partitioned 14 ways, the GroupBySystemStreamPartitionGrouper 
would create 12 + 14 = 26 task instances, which would then be distributed 
across the number of containers configured, as discussed below.
+
+Note that once a job has been started using a particular 
SystemStreamPartitionGrouper and that job is using state or checkpointing, it 
is not possible to change that grouping in subsequent job starts, as the 
previous checkpoints and state information would likely be incorrect under the 
new grouping approach.
 
 ### Containers and resource allocation
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java 
b/samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
index 6fad1fa..593d118 100644
--- a/samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
+++ b/samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
@@ -19,24 +19,24 @@
 
 package org.apache.samza.checkpoint;
 
+import org.apache.samza.system.SystemStreamPartition;
+
 import java.util.Collections;
 import java.util.Map;
 
-import org.apache.samza.system.SystemStream;
-
 /**
  * A checkpoint is a mapping of all the streams a job is consuming and the 
most recent current offset for each.
  * It is used to restore a {@link org.apache.samza.task.StreamTask}, either as 
part of a job restart or as part
  * of restarting a failed container within a running job.
  */
 public class Checkpoint {
-  private final Map<SystemStream, String> offsets;
+  private final Map<SystemStreamPartition, String> offsets;
 
   /**
    * Constructs a new checkpoint based off a map of Samza stream offsets.
    * @param offsets Map of Samza streams to their current offset.
    */
-  public Checkpoint(Map<SystemStream, String> offsets) {
+  public Checkpoint(Map<SystemStreamPartition, String> offsets) {
     this.offsets = offsets;
   }
 
@@ -44,33 +44,25 @@ public class Checkpoint {
    * Gets a unmodifiable view of the current Samza stream offsets.
    * @return A unmodifiable view of a Map of Samza streams to their recorded 
offsets.
    */
-  public Map<SystemStream, String> getOffsets() {
+  public Map<SystemStreamPartition, String> getOffsets() {
     return Collections.unmodifiableMap(offsets);
   }
 
   @Override
-  public int hashCode() {
-    final int prime = 31;
-    int result = 1;
-    result = prime * result + ((offsets == null) ? 0 : offsets.hashCode());
-    return result;
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (!(o instanceof Checkpoint)) return false;
+
+    Checkpoint that = (Checkpoint) o;
+
+    if (offsets != null ? !offsets.equals(that.offsets) : that.offsets != 
null) return false;
+
+    return true;
   }
 
   @Override
-  public boolean equals(Object obj) {
-    if (this == obj)
-      return true;
-    if (obj == null)
-      return false;
-    if (getClass() != obj.getClass())
-      return false;
-    Checkpoint other = (Checkpoint) obj;
-    if (offsets == null) {
-      if (other.offsets != null)
-        return false;
-    } else if (!offsets.equals(other.offsets))
-      return false;
-    return true;
+  public int hashCode() {
+    return offsets != null ? offsets.hashCode() : 0;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java 
b/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
index a6e1ba6..092cb91 100644
--- a/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
+++ b/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
@@ -19,7 +19,10 @@
 
 package org.apache.samza.checkpoint;
 
-import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.container.TaskName;
+
+import java.util.Map;
 
 /**
  * CheckpointManagers read and write {@link 
org.apache.samza.checkpoint.Checkpoint} to some
@@ -30,23 +33,38 @@ public interface CheckpointManager {
 
   /**
    * Registers this manager to write checkpoints of a specific Samza stream 
partition.
-   * @param partition Specific Samza stream partition of which to write 
checkpoints for.
+   * @param taskName Specific Samza taskName of which to write checkpoints for.
    */
-  public void register(Partition partition);
+  public void register(TaskName taskName);
 
   /**
    * Writes a checkpoint based on the current state of a Samza stream 
partition.
-   * @param partition Specific Samza stream partition of which to write a 
checkpoint of.
+   * @param taskName Specific Samza taskName of which to write a checkpoint of.
    * @param checkpoint Reference to a Checkpoint object to store offset data 
in.
    */
-  public void writeCheckpoint(Partition partition, Checkpoint checkpoint);
+  public void writeCheckpoint(TaskName taskName, Checkpoint checkpoint);
 
   /**
-   * Returns the last recorded checkpoint for a specified Samza stream 
partition.
-   * @param partition Specific Samza stream partition for which to get the 
last checkpoint of.
+   * Returns the last recorded checkpoint for a specified taskName.
+   * @param taskName Specific Samza taskName for which to get the last 
checkpoint of.
    * @return A Checkpoint object with the recorded offset data of the 
specified partition.
    */
-  public Checkpoint readLastCheckpoint(Partition partition);
+  public Checkpoint readLastCheckpoint(TaskName taskName);
+
+  /**
+   * Read the taskName to partition mapping that is being maintained by this 
CheckpointManager
+   *
+   * @return TaskName to task log partition mapping, or an empty map if there 
were no messages.
+   */
+  public Map<TaskName, Integer> readChangeLogPartitionMapping();
+
+  /**
+   * Write the taskName to partition mapping that is being maintained by this 
CheckpointManager
+   *
+   * @param mapping Each TaskName's partition within the changelog
+   */
+  public void writeChangeLogPartitionMapping(Map<TaskName, Integer> mapping);
 
   public void stop();
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java 
b/samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java
index 78d56a9..c8693c8 100644
--- 
a/samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java
+++ 
b/samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java
@@ -19,32 +19,32 @@
 
 package org.apache.samza.container;
 
-import java.util.Collection;
-
-import org.apache.samza.Partition;
 import org.apache.samza.config.Config;
 
+import java.util.Collection;
+import java.util.Collections;
+
 /**
  * A SamzaContainerContext maintains per-container information for the tasks 
it executes.
  */
 public class SamzaContainerContext {
   public final String name;
   public final Config config;
-  public final Collection<Partition> partitions;
+  public final Collection<TaskName> taskNames;
 
   /**
    * An immutable context object that can passed to tasks to give them 
information
    * about the container in which they are executing.
    * @param name The name of the container (either a YARN AM or 
SamzaContainer).
    * @param config The job configuration.
-   * @param partitions The set of input partitions assigned to this container.
+   * @param taskNames The set of taskName keys for which this container is 
responsible.
    */
   public SamzaContainerContext(
       String name,
       Config config,
-      Collection<Partition> partitions) {
+      Collection<TaskName> taskNames) {
     this.name = name;
     this.config = config;
-    this.partitions = partitions;
+    this.taskNames = Collections.unmodifiableCollection(taskNames);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-api/src/main/java/org/apache/samza/container/SystemStreamPartitionGrouper.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/container/SystemStreamPartitionGrouper.java
 
b/samza-api/src/main/java/org/apache/samza/container/SystemStreamPartitionGrouper.java
new file mode 100644
index 0000000..897d9f5
--- /dev/null
+++ 
b/samza-api/src/main/java/org/apache/samza/container/SystemStreamPartitionGrouper.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container;
+
+import org.apache.samza.system.SystemStreamPartition;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Group a set of SystemStreamPartitions into logical taskNames that share a 
common characteristic, defined
+ * by the implementation.  Each taskName has a key that uniquely describes 
what sets may be in it, but does
+ * not generally enumerate the elements of those sets.  For example, a 
SystemStreamPartitionGrouper that
+ * groups SystemStreamPartitions (each with 4 partitions) by their partition, 
would end up generating
+ * four TaskNames: 0, 1, 2, 3.  These TaskNames describe the partitions but do 
not list all of the
+ * SystemStreamPartitions, which allows new SystemStreamPartitions to be added 
later without changing
+ * the definition of the TaskNames, assuming these new SystemStreamPartitions 
do not have more than
+ * four partitions.  On the other hand, a SystemStreamPartitionGrouper that 
wanted each SystemStreamPartition
+ * to be its own, unique group would use the SystemStreamPartition's entire 
description to generate
+ * the TaskNames.
+ */
+public interface SystemStreamPartitionGrouper {
+  public Map<TaskName, Set<SystemStreamPartition>> 
group(Set<SystemStreamPartition> ssps);
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-api/src/main/java/org/apache/samza/container/SystemStreamPartitionGrouperFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/container/SystemStreamPartitionGrouperFactory.java
 
b/samza-api/src/main/java/org/apache/samza/container/SystemStreamPartitionGrouperFactory.java
new file mode 100644
index 0000000..10ac6e2
--- /dev/null
+++ 
b/samza-api/src/main/java/org/apache/samza/container/SystemStreamPartitionGrouperFactory.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container;
+
+import org.apache.samza.config.Config;
+
+/**
+ * Return an instance a SystemStreamPartitionGrouper per the particular 
implementation
+ */
+public interface SystemStreamPartitionGrouperFactory {
+  public SystemStreamPartitionGrouper getSystemStreamPartitionGrouper(Config 
config);
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-api/src/main/java/org/apache/samza/container/TaskName.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/container/TaskName.java 
b/samza-api/src/main/java/org/apache/samza/container/TaskName.java
new file mode 100644
index 0000000..13a1206
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/container/TaskName.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container;
+
+/**
+ * A unique identifier of a set of a SystemStreamPartitions that have been 
grouped by
+ * a {@link org.apache.samza.container.SystemStreamPartitionGrouper}.  The
+ * SystemStreamPartitionGrouper determines the TaskName for each set it 
creates.
+ */
+public class TaskName implements Comparable<TaskName> {
+  private final String taskName;
+
+  public String getTaskName() {
+    return taskName;
+  }
+
+  public TaskName(String taskName) {
+    this.taskName = taskName;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    TaskName taskName1 = (TaskName) o;
+
+    if (!taskName.equals(taskName1.taskName)) return false;
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    return taskName.hashCode();
+  }
+
+  @Override
+  public String toString() {
+    return taskName;
+  }
+
+  @Override
+  public int compareTo(TaskName that) {
+    return taskName.compareTo(that.taskName);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java 
b/samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java
index cb40092..f510ce5 100644
--- a/samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java
+++ b/samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java
@@ -20,6 +20,7 @@
 package org.apache.samza.job;
 
 import org.apache.samza.config.Config;
+import org.apache.samza.container.TaskName;
 import org.apache.samza.system.SystemStreamPartition;
 
 import java.util.Map;
@@ -30,12 +31,13 @@ import java.util.Set;
  * such as YARN or the LocalJobRunner.
  */
 public abstract class CommandBuilder {
-  protected Set<SystemStreamPartition> systemStreamPartitions;
+  protected Map<TaskName, Set<SystemStreamPartition>> 
taskNameToSystemStreamPartitionsMapping;
+  protected Map<TaskName, Integer> taskNameToChangeLogPartitionMapping;
   protected String name;
   protected Config config;
 
-  public CommandBuilder setStreamPartitions(Set<SystemStreamPartition> ssp) {
-    this.systemStreamPartitions = ssp;
+  public CommandBuilder 
setTaskNameToSystemStreamPartitionsMapping(Map<TaskName, 
Set<SystemStreamPartition>> systemStreamPartitionTaskNames) {
+    this.taskNameToSystemStreamPartitionsMapping = 
systemStreamPartitionTaskNames;
     return this;
   }
   
@@ -54,6 +56,11 @@ public abstract class CommandBuilder {
     return this;
   }
 
+  public CommandBuilder setTaskNameToChangeLogPartitionMapping(Map<TaskName, 
Integer> mapping) {
+    this.taskNameToChangeLogPartitionMapping = mapping;
+    return this;
+  }
+
   public abstract String buildCommand();
 
   public abstract Map<String, String> buildEnvironment();

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-api/src/main/java/org/apache/samza/task/TaskContext.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/task/TaskContext.java 
b/samza-api/src/main/java/org/apache/samza/task/TaskContext.java
index 7c1b085..35de8cc 100644
--- a/samza-api/src/main/java/org/apache/samza/task/TaskContext.java
+++ b/samza-api/src/main/java/org/apache/samza/task/TaskContext.java
@@ -19,8 +19,11 @@
 
 package org.apache.samza.task;
 
-import org.apache.samza.Partition;
+import org.apache.samza.container.TaskName;
 import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.SystemStreamPartition;
+
+import java.util.Set;
 
 /**
  * A TaskContext provides resources about the {@link 
org.apache.samza.task.StreamTask}, particularly during
@@ -29,7 +32,9 @@ import org.apache.samza.metrics.MetricsRegistry;
 public interface TaskContext {
   MetricsRegistry getMetricsRegistry();
 
-  Partition getPartition();
+  Set<SystemStreamPartition> getSystemStreamPartitions();
 
   Object getStore(String name);
+
+  TaskName getTaskName();
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala 
b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
index 5735a39..84ea4ca 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
@@ -19,17 +19,19 @@
 
 package org.apache.samza.checkpoint
 
+import grizzled.slf4j.Logging
 import java.net.URI
 import java.util.regex.Pattern
 import joptsimple.OptionSet
-import org.apache.samza.{Partition, SamzaException}
-import org.apache.samza.config.{Config, StreamConfig}
+import org.apache.samza.checkpoint.CheckpointTool.TaskNameToCheckpointMap
 import org.apache.samza.config.TaskConfig.Config2Task
+import org.apache.samza.config.{Config, StreamConfig}
+import org.apache.samza.container.TaskName
 import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.util.{CommandLine, Util}
+import org.apache.samza.{Partition, SamzaException}
 import scala.collection.JavaConversions._
-import grizzled.slf4j.Logging
 
 /**
  * Command-line tool for inspecting and manipulating the checkpoints for a job.
@@ -44,7 +46,10 @@ import grizzled.slf4j.Logging
  * containing the offsets you want. It needs to be in the same format as the 
tool
  * prints out the latest checkpoint:
  *
- *   systems.<system>.streams.<topic>.partitions.<partition>=<offset>
+ *   
tasknames.<taskname>.systems.<system>.streams.<topic>.partitions.<partition>=<offset>
+ *
+ * The provided offset definitions will be grouped by <taskname> and written to
+ * individual checkpoint entries for each <taskname>
  *
  * NOTE: A job only reads its checkpoint when it starts up. Therefore, if you 
want
  * your checkpoint change to take effect, you have to first stop the job, then
@@ -56,8 +61,10 @@ import grizzled.slf4j.Logging
  */
 object CheckpointTool {
   /** Format in which SystemStreamPartition is represented in a properties 
file */
-  val SSP_PATTERN = StreamConfig.STREAM_PREFIX + "partitions.%d"
-  val SSP_REGEX = 
Pattern.compile("systems\\.(.+)\\.streams\\.(.+)\\.partitions\\.([0-9]+)")
+  val SSP_PATTERN = "tasknames.%s." + StreamConfig.STREAM_PREFIX + 
"partitions.%d"
+  val SSP_REGEX = 
Pattern.compile("tasknames\\.(.+)\\.systems\\.(.+)\\.streams\\.(.+)\\.partitions\\.([0-9]+)")
+
+  type TaskNameToCheckpointMap = Map[TaskName, Map[SystemStreamPartition, 
String]]
 
   class CheckpointToolCommandLine extends CommandLine with Logging {
     val newOffsetsOpt =
@@ -68,20 +75,31 @@ object CheckpointTool {
             .ofType(classOf[URI])
             .describedAs("path")
 
-    var newOffsets: Map[SystemStreamPartition, String] = null
+    var newOffsets: TaskNameToCheckpointMap = null
 
-    def parseOffsets(propertiesFile: Config): Map[SystemStreamPartition, 
String] = {
-      propertiesFile.entrySet.flatMap(entry => {
+    def parseOffsets(propertiesFile: Config): TaskNameToCheckpointMap = {
+      val taskNameSSPPairs = propertiesFile.entrySet.flatMap(entry => {
         val matcher = SSP_REGEX.matcher(entry.getKey)
         if (matcher.matches) {
-          val partition = new Partition(Integer.parseInt(matcher.group(3)))
-          val ssp = new SystemStreamPartition(matcher.group(1), 
matcher.group(2), partition)
-          Some(ssp -> entry.getValue)
+          val taskname = new TaskName(matcher.group(1))
+          val partition = new Partition(Integer.parseInt(matcher.group(4)))
+          val ssp = new SystemStreamPartition(matcher.group(2), 
matcher.group(3), partition)
+          Some(taskname -> Map(ssp -> entry.getValue))
         } else {
           warn("Warning: ignoring unrecognised property: %s = %s" format 
(entry.getKey, entry.getValue))
           None
         }
-      }).toMap
+      }).toList
+
+      if(taskNameSSPPairs.isEmpty) {
+        return null
+      }
+
+      // Need to turn taskNameSSPPairs List[(taskname, 
Map[SystemStreamPartition, Offset])] to Map[TaskName, Map[SSP, Offset]]
+      taskNameSSPPairs                      // List[(taskname, 
Map[SystemStreamPartition, Offset])]
+        .groupBy(_._1)                      // Group by taskname
+        .mapValues(m => m.map(_._2))        // Drop the extra taskname that we 
grouped on
+        .mapValues(m => m.reduce( _ ++ _))  // Merge all the maps of 
SSPs->Offset into one for the whole taskname
     }
 
     override def loadConfig(options: OptionSet) = {
@@ -103,7 +121,7 @@ object CheckpointTool {
   }
 }
 
-class CheckpointTool(config: Config, newOffsets: Map[SystemStreamPartition, 
String]) extends Logging {
+class CheckpointTool(config: Config, newOffsets: TaskNameToCheckpointMap) 
extends Logging {
   val manager = config.getCheckpointManagerFactory match {
     case Some(className) =>
       
Util.getObj[CheckpointManagerFactory](className).getCheckpointManager(config, 
new MetricsRegistryMap)
@@ -113,52 +131,48 @@ class CheckpointTool(config: Config, newOffsets: 
Map[SystemStreamPartition, Stri
 
   // The CheckpointManagerFactory needs to perform this same operation when 
initializing
   // the manager. TODO figure out some way of avoiding duplicated work.
-  val partitions = 
Util.getInputStreamPartitions(config).map(_.getPartition).toSet
 
   def run {
     info("Using %s" format manager)
-    partitions.foreach(manager.register)
+
+    // Find all the TaskNames that would be generated for this job config
+    val taskNames = Util.assignContainerToSSPTaskNames(config, 
1).get(0).get.keys.toSet
+
+    taskNames.foreach(manager.register)
     manager.start
-    val lastCheckpoint = readLastCheckpoint
 
-    logCheckpoint(lastCheckpoint, "Current checkpoint")
+    val lastCheckpoints = taskNames.map(tn => tn -> 
readLastCheckpoint(tn)).toMap
+
+    lastCheckpoints.foreach(lcp => logCheckpoint(lcp._1, lcp._2, "Current 
checkpoint for taskname "+ lcp._1))
 
     if (newOffsets != null) {
-      logCheckpoint(newOffsets, "New offset to be written")
-      writeNewCheckpoint(newOffsets)
-      manager.stop
-      info("Ok, new checkpoint has been written.")
+      newOffsets.foreach(no => {
+        logCheckpoint(no._1, no._2, "New offset to be written for taskname " + 
no._1)
+        writeNewCheckpoint(no._1, no._2)
+        info("Ok, new checkpoint has been written for taskname " + no._1)
+      })
     }
+
+    manager.stop
   }
 
-  /** Load the most recent checkpoint state for all partitions. */
-  def readLastCheckpoint: Map[SystemStreamPartition, String] = {
-    partitions.flatMap(partition => {
-      manager.readLastCheckpoint(partition)
-        .getOffsets
-        .map { case (systemStream, offset) =>
-          new SystemStreamPartition(systemStream, partition) -> offset
-        }
-    }).toMap
+  /** Load the most recent checkpoint state for all a specified TaskName. */
+  def readLastCheckpoint(taskName:TaskName): Map[SystemStreamPartition, 
String] = {
+    manager.readLastCheckpoint(taskName).getOffsets.toMap
   }
 
   /**
-   * Store a new checkpoint state for all given partitions, overwriting the
-   * current state. Any partitions that are not mentioned will not
-   * be changed.
+   * Store a new checkpoint state for specified TaskName, overwriting any 
previous
+   * checkpoint for that TaskName
    */
-  def writeNewCheckpoint(newOffsets: Map[SystemStreamPartition, String]) {
-    newOffsets.groupBy(_._1.getPartition).foreach {
-      case (partition, offsets) =>
-        val streamOffsets = offsets.map { case (ssp, offset) => 
ssp.getSystemStream -> offset }.toMap
-        val checkpoint = new Checkpoint(streamOffsets)
-        manager.writeCheckpoint(partition, checkpoint)
-    }
+  def writeNewCheckpoint(tn: TaskName, newOffsets: Map[SystemStreamPartition, 
String]) {
+    val checkpoint = new Checkpoint(newOffsets)
+    manager.writeCheckpoint(tn, checkpoint)
   }
 
-  def logCheckpoint(checkpoint: Map[SystemStreamPartition, String], prefix: 
String) {
-    checkpoint.map { case (ssp, offset) =>
-      (CheckpointTool.SSP_PATTERN + " = %s") format (ssp.getSystem, 
ssp.getStream, ssp.getPartition.getPartitionId, offset)
-    }.toList.sorted.foreach(line => info(prefix + ": " + line))
+  def logCheckpoint(tn: TaskName, checkpoint: Map[SystemStreamPartition, 
String], prefix: String) {
+    def logLine(tn:TaskName, ssp:SystemStreamPartition, offset:String) = 
(prefix + ": " + CheckpointTool.SSP_PATTERN + " = %s") format (tn.toString, 
ssp.getSystem, ssp.getStream, ssp.getPartition.getPartitionId, offset)
+
+    checkpoint.keys.toList.sorted.foreach(ssp => info(logLine(tn, ssp, 
checkpoint.get(ssp).get)))
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala 
b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
index 9487b58..4efe997 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
@@ -20,7 +20,6 @@
 package org.apache.samza.checkpoint
 
 import org.apache.samza.system.SystemStream
-import org.apache.samza.Partition
 import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.system.SystemStreamMetadata
 import org.apache.samza.system.SystemStreamMetadata.OffsetType
@@ -31,6 +30,8 @@ import org.apache.samza.config.Config
 import org.apache.samza.config.StreamConfig.Config2Stream
 import org.apache.samza.config.SystemConfig.Config2System
 import org.apache.samza.system.SystemAdmin
+import org.apache.samza.container.TaskName
+import scala.collection._
 
 /**
  * OffsetSetting encapsulates a SystemStream's metadata, default offset, and
@@ -150,13 +151,13 @@ class OffsetManager(
 
   /**
    * The set of system stream partitions that have been registered with the
-   * OffsetManager. These are the SSPs that will be tracked within the offset
-   * manager.
+   * OffsetManager, grouped by the taskName they belong to. These are the SSPs
+   * that will be tracked within the offset manager.
    */
-  var systemStreamPartitions = Set[SystemStreamPartition]()
+  val systemStreamPartitions = mutable.Map[TaskName, 
mutable.Set[SystemStreamPartition]]()
 
-  def register(systemStreamPartition: SystemStreamPartition) {
-    systemStreamPartitions += systemStreamPartition
+  def register(taskName: TaskName, systemStreamPartitionsToRegister: 
Set[SystemStreamPartition]) {
+    systemStreamPartitions.getOrElseUpdate(taskName, 
mutable.Set[SystemStreamPartition]()).addAll(systemStreamPartitionsToRegister)
   }
 
   def start {
@@ -193,20 +194,18 @@ class OffsetManager(
   }
 
   /**
-   * Checkpoint all offsets for a given partition using the CheckpointManager.
+   * Checkpoint all offsets for a given TaskName using the CheckpointManager.
    */
-  def checkpoint(partition: Partition) {
+  def checkpoint(taskName: TaskName) {
     if (checkpointManager != null) {
-      debug("Checkpointing offsets for partition %s." format partition)
+      debug("Checkpointing offsets for taskName %s." format taskName)
 
-      val partitionOffsets = lastProcessedOffsets
-        .filterKeys(_.getPartition.equals(partition))
-        .map { case (systemStreamPartition, offset) => 
(systemStreamPartition.getSystemStream, offset) }
-        .toMap
+      val sspsForTaskName = systemStreamPartitions.getOrElse(taskName, throw 
new SamzaException("No such SystemStreamPartition set " + taskName + " 
registered for this checkpointmanager")).toSet
+      val partitionOffsets = 
lastProcessedOffsets.filterKeys(sspsForTaskName.contains(_))
 
-      checkpointManager.writeCheckpoint(partition, new 
Checkpoint(partitionOffsets))
+      checkpointManager.writeCheckpoint(taskName, new 
Checkpoint(partitionOffsets))
     } else {
-      debug("Skipping checkpointing for partition %s because no checkpoint 
manager is defined." format partition)
+      debug("Skipping checkpointing for taskName %s because no checkpoint 
manager is defined." format taskName)
     }
   }
 
@@ -221,23 +220,13 @@ class OffsetManager(
   }
 
   /**
-   * Returns a set of partitions that have been registered with this offset
-   * manager.
-   */
-  private def getPartitions = {
-    systemStreamPartitions
-      .map(_.getPartition)
-      .toSet
-  }
-
-  /**
    * Register all partitions with the CheckpointManager.
    */
   private def registerCheckpointManager {
     if (checkpointManager != null) {
       debug("Registering checkpoint manager.")
 
-      getPartitions.foreach(checkpointManager.register)
+      systemStreamPartitions.keys.foreach(checkpointManager.register)
     } else {
       debug("Skipping checkpoint manager registration because no manager was 
defined.")
     }
@@ -253,9 +242,8 @@ class OffsetManager(
 
       checkpointManager.start
 
-      lastProcessedOffsets ++= getPartitions
-        .flatMap(restoreOffsetsFromCheckpoint(_))
-        .filter {
+      lastProcessedOffsets ++= systemStreamPartitions.keys
+        .flatMap(restoreOffsetsFromCheckpoint(_)).filter {
           case (systemStreamPartition, offset) =>
             val shouldKeep = 
offsetSettings.contains(systemStreamPartition.getSystemStream)
             if (!shouldKeep) {
@@ -269,20 +257,17 @@ class OffsetManager(
   }
 
   /**
-   * Loads last processed offsets for a single partition.
+   * Loads last processed offsets for a single taskName.
    */
-  private def restoreOffsetsFromCheckpoint(partition: Partition): 
Map[SystemStreamPartition, String] = {
-    debug("Loading checkpoints for partition: %s." format partition)
+  private def restoreOffsetsFromCheckpoint(taskName: TaskName): 
Map[SystemStreamPartition, String] = {
+    debug("Loading checkpoints for taskName: %s." format taskName)
 
-    val checkpoint = checkpointManager.readLastCheckpoint(partition)
+    val checkpoint = checkpointManager.readLastCheckpoint(taskName)
 
     if (checkpoint != null) {
-      checkpoint
-        .getOffsets
-        .map { case (systemStream, offset) => (new 
SystemStreamPartition(systemStream, partition), offset) }
-        .toMap
+      checkpoint.getOffsets.toMap
     } else {
-      info("Did not receive a checkpoint for partition %s. Proceeding without 
a checkpoint." format partition)
+      info("Did not receive a checkpoint for taskName %s. Proceeding without a 
checkpoint." format taskName)
 
       Map()
     }
@@ -338,7 +323,12 @@ class OffsetManager(
    * that was registered, but has no offset.
    */
   private def loadDefaults {
-    systemStreamPartitions.foreach(systemStreamPartition => {
+    val allSSPs: Set[SystemStreamPartition] = systemStreamPartitions
+      .values
+      .flatten
+      .toSet
+
+    allSSPs.foreach(systemStreamPartition => {
       if (!startingOffsets.contains(systemStreamPartition)) {
         val systemStream = systemStreamPartition.getSystemStream
         val partition = systemStreamPartition.getPartition

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala
 
b/samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala
index 364e489..2a87a6e 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala
@@ -20,40 +20,41 @@
 package org.apache.samza.checkpoint.file
 
 import java.io.File
+import java.io.FileNotFoundException
 import java.io.FileOutputStream
-import scala.collection.JavaConversions._
-import scala.io.Source
+import java.util
 import org.apache.samza.SamzaException
-import org.apache.samza.serializers.CheckpointSerde
-import org.apache.samza.metrics.MetricsRegistry
+import org.apache.samza.checkpoint.Checkpoint
+import org.apache.samza.checkpoint.CheckpointManager
+import org.apache.samza.checkpoint.CheckpointManagerFactory
 import org.apache.samza.config.Config
-import org.apache.samza.Partition
-import org.apache.samza.config.JobConfig.Config2Job
 import org.apache.samza.config.FileSystemCheckpointManagerConfig.Config2FSCP
-import org.apache.samza.checkpoint.CheckpointManagerFactory
-import org.apache.samza.checkpoint.CheckpointManager
-import org.apache.samza.checkpoint.Checkpoint
-import java.io.FileNotFoundException
+import org.apache.samza.config.JobConfig.Config2Job
+import org.apache.samza.container.TaskName
+import org.apache.samza.metrics.MetricsRegistry
+import org.apache.samza.serializers.CheckpointSerde
+import scala.io.Source
 
 class FileSystemCheckpointManager(
   jobName: String,
   root: File,
   serde: CheckpointSerde = new CheckpointSerde) extends CheckpointManager {
 
-  def register(partition: Partition) {
-  }
+  override def register(taskName: TaskName):Unit = Unit
 
-  def writeCheckpoint(partition: Partition, checkpoint: Checkpoint) {
+  def getCheckpointFile(taskName: TaskName) = getFile(jobName, taskName, 
"checkpoints")
+
+  def writeCheckpoint(taskName: TaskName, checkpoint: Checkpoint) {
     val bytes = serde.toBytes(checkpoint)
-    val fos = new FileOutputStream(getFile(jobName, partition))
+    val fos = new FileOutputStream(getCheckpointFile(taskName))
 
     fos.write(bytes)
     fos.close
   }
 
-  def readLastCheckpoint(partition: Partition): Checkpoint = {
+  def readLastCheckpoint(taskName: TaskName): Checkpoint = {
     try {
-      val bytes = Source.fromFile(getFile(jobName, 
partition)).map(_.toByte).toArray
+      val bytes = 
Source.fromFile(getCheckpointFile(taskName)).map(_.toByte).toArray
 
       serde.fromBytes(bytes)
     } catch {
@@ -69,8 +70,28 @@ class FileSystemCheckpointManager(
 
   def stop {}
 
-  private def getFile(jobName: String, partition: Partition) =
-    new File(root, "%s-%d" format (jobName, partition.getPartitionId))
+  private def getFile(jobName: String, taskName: TaskName, fileType:String) =
+    new File(root, "%s-%s-%s" format (jobName, taskName, fileType))
+
+  private def getChangeLogPartitionMappingFile() = getFile(jobName, new 
TaskName("partition-mapping"), "changelog-partition-mapping")
+
+  override def readChangeLogPartitionMapping(): util.Map[TaskName, 
java.lang.Integer] = {
+    try {
+      val bytes = 
Source.fromFile(getChangeLogPartitionMappingFile()).map(_.toByte).toArray
+      serde.changelogPartitionMappingFromBytes(bytes)
+    } catch {
+      case e: FileNotFoundException => new util.HashMap[TaskName, 
java.lang.Integer]()
+    }
+  }
+
+  def writeChangeLogPartitionMapping(mapping: util.Map[TaskName, 
java.lang.Integer]): Unit = {
+    val hashmap = new util.HashMap[TaskName, java.lang.Integer](mapping)
+    val bytes = serde.changelogPartitionMappingToBytes(hashmap)
+    val fos = new FileOutputStream(getChangeLogPartitionMappingFile())
+
+    fos.write(bytes)
+    fos.close
+  }
 }
 
 class FileSystemCheckpointManagerFactory extends CheckpointManagerFactory {

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala 
b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
index fcafe83..f84aeea 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
@@ -19,6 +19,8 @@
 
 package org.apache.samza.config
 
+import 
org.apache.samza.container.systemstreampartition.groupers.GroupByPartitionFactory
+
 object JobConfig {
   // job config constants
   val STREAM_JOB_FACTORY_CLASS = "job.factory.class" // 
streaming.job_factory_class
@@ -34,6 +36,8 @@ object JobConfig {
   val JOB_NAME = "job.name" // streaming.job_name
   val JOB_ID = "job.id" // streaming.job_id
 
+  val SSP_GROUPER_FACTORY = "job.systemstreampartition.grouper.factory"
+
   implicit def Config2Job(config: Config) = new JobConfig(config)
 }
 
@@ -47,4 +51,6 @@ class JobConfig(config: Config) extends 
ScalaMapConfig(config) {
   def getConfigRewriters = getOption(JobConfig.CONFIG_REWRITERS)
 
   def getConfigRewriterClass(name: String) = 
getOption(JobConfig.CONFIG_REWRITER_CLASS format name)
+
+  def getSystemStreamPartitionGrouperFactory = 
getOption(JobConfig.SSP_GROUPER_FACTORY).getOrElse(classOf[GroupByPartitionFactory].getCanonicalName)
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala 
b/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
index 0cdc0d1..e4197fa 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
@@ -26,7 +26,12 @@ object ShellCommandConfig {
   val ENV_CONFIG = "SAMZA_CONFIG"
 
   /**
-   * An encoded list of the streams and partitions this container is 
responsible for. Encoded by 
+   * All taskNames across the job; used to calculate state store partition 
mapping
+   */
+  val ENV_TASK_NAME_TO_CHANGELOG_PARTITION_MAPPING = 
"TASK_NAME_TO_CHANGELOG_PARTITION_MAPPING"
+
+  /**
+   * A serialized list of the streams and partitions this container is 
responsible for. Encoded by
    * {@link org.apache.samza.util.Util#createStreamPartitionString}
    */
   val ENV_SYSTEM_STREAMS = "SAMZA_SYSTEM_STREAMS"

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 
b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
index 4ca340c..7fb4763 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
@@ -20,8 +20,7 @@
 package org.apache.samza.container
 
 import grizzled.slf4j.Logging
-import org.apache.samza.Partition
-import org.apache.samza.system.SystemConsumers
+import org.apache.samza.system.{SystemStreamPartition, SystemConsumers}
 import org.apache.samza.task.ReadableCoordinator
 
 /**
@@ -34,7 +33,7 @@ import org.apache.samza.task.ReadableCoordinator
  * be done when.
  */
 class RunLoop(
-  val taskInstances: Map[Partition, TaskInstance],
+  val taskInstances: Map[TaskName, TaskInstance],
   val consumerMultiplexer: SystemConsumers,
   val metrics: SamzaContainerMetrics,
   val windowMs: Long = -1,
@@ -43,10 +42,18 @@ class RunLoop(
 
   private var lastWindowMs = 0L
   private var lastCommitMs = 0L
-  private var taskShutdownRequests: Set[Partition] = Set()
-  private var taskCommitRequests: Set[Partition] = Set()
+  private var taskShutdownRequests: Set[TaskName] = Set()
+  private var taskCommitRequests: Set[TaskName] = Set()
   private var shutdownNow = false
 
+  // Messages come from the chooser with no connection to the TaskInstance 
they're bound for.
+  // Keep a mapping of SystemStreamPartition to TaskInstance to efficiently 
route them.
+  val systemStreamPartitionToTaskInstance: Map[SystemStreamPartition, 
TaskInstance] = {
+    // We could just pass in the SystemStreamPartitionMap during construction, 
but it's safer and cleaner to derive the information directly
+    def getSystemStreamPartitionToTaskInstance(taskInstance: TaskInstance) = 
taskInstance.systemStreamPartitions.map(_ -> taskInstance).toMap
+
+    taskInstances.values.map{ getSystemStreamPartitionToTaskInstance 
}.flatten.toMap
+  }
 
   /**
    * Starts the run loop. Blocks until either the tasks request shutdown, or an
@@ -61,7 +68,6 @@ class RunLoop(
     }
   }
 
-
   /**
    * Chooses a message from an input stream to process, and calls the
    * process() method on the appropriate StreamTask to handle it.
@@ -73,13 +79,15 @@ class RunLoop(
     val envelope = consumerMultiplexer.choose
 
     if (envelope != null) {
-      val partition = envelope.getSystemStreamPartition.getPartition
+      val ssp = envelope.getSystemStreamPartition
 
-      trace("Processing incoming message envelope for partition %s." format 
partition)
+      trace("Processing incoming message envelope for SSP %s." format ssp)
       metrics.envelopes.inc
 
-      val coordinator = new ReadableCoordinator(partition)
-      taskInstances(partition).process(envelope, coordinator)
+      val taskInstance = systemStreamPartitionToTaskInstance(ssp)
+
+      val coordinator = new ReadableCoordinator(taskInstance.taskName)
+      taskInstance.process(envelope, coordinator)
       checkCoordinator(coordinator)
     } else {
       trace("No incoming message envelope was available.")
@@ -87,7 +95,6 @@ class RunLoop(
     }
   }
 
-
   /**
    * Invokes WindowableTask.window on all tasks if it's time to do so.
    */
@@ -97,8 +104,8 @@ class RunLoop(
       lastWindowMs = clock()
       metrics.windows.inc
 
-      taskInstances.foreach { case (partition, task) =>
-        val coordinator = new ReadableCoordinator(partition)
+      taskInstances.foreach { case (taskName, task) =>
+        val coordinator = new ReadableCoordinator(taskName)
         task.window(coordinator)
         checkCoordinator(coordinator)
       }
@@ -129,8 +136,8 @@ class RunLoop(
     } else if (!taskCommitRequests.isEmpty) {
       trace("Committing due to explicit commit request.")
       metrics.commits.inc
-      taskCommitRequests.foreach(partition => {
-        taskInstances(partition).commit
+      taskCommitRequests.foreach(taskName => {
+        taskInstances(taskName).commit
       })
     }
 
@@ -146,17 +153,17 @@ class RunLoop(
    */
   private def checkCoordinator(coordinator: ReadableCoordinator) {
     if (coordinator.requestedCommitTask) {
-      debug("Task %s requested commit for current task only" format 
coordinator.partition)
-      taskCommitRequests += coordinator.partition
+      debug("Task %s requested commit for current task only" format 
coordinator.taskName)
+      taskCommitRequests += coordinator.taskName
     }
 
     if (coordinator.requestedCommitAll) {
-      debug("Task %s requested commit for all tasks in the container" format 
coordinator.partition)
+      debug("Task %s requested commit for all tasks in the container" format 
coordinator.taskName)
       taskCommitRequests ++= taskInstances.keys
     }
 
     if (coordinator.requestedShutdownOnConsensus) {
-      taskShutdownRequests += coordinator.partition
+      taskShutdownRequests += coordinator.taskName
       info("Shutdown has now been requested by tasks: %s" format 
taskShutdownRequests)
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index a7142b2..d574ac4 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -19,12 +19,11 @@
 
 package org.apache.samza.container
 
-import java.io.File
 import grizzled.slf4j.Logging
+import java.io.File
 import org.apache.samza.Partition
 import org.apache.samza.SamzaException
-import org.apache.samza.checkpoint.CheckpointManager
-import org.apache.samza.checkpoint.CheckpointManagerFactory
+import org.apache.samza.checkpoint.{CheckpointManagerFactory, OffsetManager}
 import org.apache.samza.config.Config
 import org.apache.samza.config.MetricsConfig.Config2Metrics
 import org.apache.samza.config.SerializerConfig.Config2Serializer
@@ -34,35 +33,34 @@ import org.apache.samza.config.StreamConfig.Config2Stream
 import org.apache.samza.config.SystemConfig.Config2System
 import org.apache.samza.config.TaskConfig.Config2Task
 import org.apache.samza.config.serializers.JsonConfigSerializer
+import org.apache.samza.job.ShellCommandBuilder
 import org.apache.samza.metrics.JmxServer
 import org.apache.samza.metrics.JvmMetrics
+import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.metrics.MetricsReporter
 import org.apache.samza.metrics.MetricsReporterFactory
 import org.apache.samza.serializers.SerdeFactory
 import org.apache.samza.serializers.SerdeManager
 import org.apache.samza.storage.StorageEngineFactory
 import org.apache.samza.storage.TaskStorageManager
+import org.apache.samza.system.StreamMetadataCache
+import org.apache.samza.system.SystemConsumers
+import org.apache.samza.system.SystemConsumersMetrics
 import org.apache.samza.system.SystemFactory
+import org.apache.samza.system.SystemProducers
+import org.apache.samza.system.SystemProducersMetrics
 import org.apache.samza.system.SystemStream
+import org.apache.samza.system.SystemStreamMetadata
 import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.system.chooser.DefaultChooser
+import org.apache.samza.system.chooser.MessageChooserFactory
+import org.apache.samza.system.chooser.RoundRobinChooserFactory
+import org.apache.samza.task.ReadableCollector
 import org.apache.samza.task.StreamTask
 import org.apache.samza.task.TaskLifecycleListener
 import org.apache.samza.task.TaskLifecycleListenerFactory
 import org.apache.samza.util.Util
-import org.apache.samza.system.SystemProducers
-import org.apache.samza.task.ReadableCollector
-import org.apache.samza.system.SystemConsumers
-import org.apache.samza.system.chooser.MessageChooserFactory
-import org.apache.samza.system.SystemProducersMetrics
-import org.apache.samza.system.SystemConsumersMetrics
-import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.system.chooser.DefaultChooser
-import org.apache.samza.system.chooser.RoundRobinChooserFactory
 import scala.collection.JavaConversions._
-import org.apache.samza.system.SystemAdmin
-import org.apache.samza.system.SystemStreamMetadata
-import org.apache.samza.checkpoint.OffsetManager
-import org.apache.samza.system.StreamMetadataCache
 
 object SamzaContainer extends Logging {
 
@@ -92,29 +90,46 @@ object SamzaContainer extends Logging {
      * properties. Note: This is a temporary workaround to reduce the size of 
the config and hence size
      * of the environment variable(s) exported while starting a Samza 
container (SAMZA-337)
      */
-    val isCompressed = if 
(System.getenv(ShellCommandConfig.ENV_COMPRESS_CONFIG).equals("TRUE")) true 
else false
+    val isCompressed = 
System.getenv(ShellCommandConfig.ENV_COMPRESS_CONFIG).equals("TRUE")
     val configStr = getParameter(System.getenv(ShellCommandConfig.ENV_CONFIG), 
isCompressed)
     val config = JsonConfigSerializer.fromJson(configStr)
-    val encodedStreamsAndPartitions = 
getParameter(System.getenv(ShellCommandConfig.ENV_SYSTEM_STREAMS), isCompressed)
-    val partitions = 
Util.deserializeSSPSetFromJSON(encodedStreamsAndPartitions)
-
-    if (partitions.isEmpty) {
-      throw new SamzaException("No partitions for this task. Can't run a task 
without partition assignments. It's likely that the partition manager for this 
system doesn't know about the stream you're trying to read.")
-    }
+    val sspTaskNames = 
getTaskNameToSystemStreamPartition(getParameter(System.getenv(ShellCommandConfig.ENV_SYSTEM_STREAMS),
 isCompressed))
+    val taskNameToChangeLogPartitionMapping = 
getTaskNameToChangeLogPartitionMapping(getParameter(System.getenv(ShellCommandConfig.ENV_TASK_NAME_TO_CHANGELOG_PARTITION_MAPPING),
 isCompressed))
 
     try {
-      SamzaContainer(containerName, partitions, config).run
+      SamzaContainer(containerName, sspTaskNames, 
taskNameToChangeLogPartitionMapping, config).run
     } finally {
       jmxServer.stop
     }
   }
 
-  def apply(containerName: String, inputStreams: Set[SystemStreamPartition], 
config: Config) = {
+  def getTaskNameToSystemStreamPartition(SSPTaskNamesJSON: String) = {
+    // Covert into a standard Java map
+    val sspTaskNamesAsJava: Map[TaskName, Set[SystemStreamPartition]] = 
ShellCommandBuilder.deserializeSystemStreamPartitionSetFromJSON(SSPTaskNamesJSON)
+
+    // From that map build the TaskNamesToSystemStreamPartitions
+    val sspTaskNames = TaskNamesToSystemStreamPartitions(sspTaskNamesAsJava)
+
+    if (sspTaskNames.isEmpty) {
+      throw new SamzaException("No SystemStreamPartitions for this task. Can't 
run a task without SystemStreamPartition assignments.")
+    }
+
+    sspTaskNames
+  }
+
+  def 
getTaskNameToChangeLogPartitionMapping(taskNameToChangeLogPartitionMappingJSON: 
String) = {
+    // Convert that mapping into a Map
+    val taskNameToChangeLogPartitionMapping = 
ShellCommandBuilder.deserializeTaskNameToChangeLogPartitionMapping(taskNameToChangeLogPartitionMappingJSON).map(kv
 => kv._1 -> Integer.valueOf(kv._2))
+
+    taskNameToChangeLogPartitionMapping
+  }
+
+  def apply(containerName: String, sspTaskNames: 
TaskNamesToSystemStreamPartitions, taskNameToChangeLogPartitionMapping: 
Map[TaskName, java.lang.Integer], config: Config) = {
     val containerPID = Util.getContainerPID
 
     info("Setting up Samza container: %s" format containerName)
+    info("Using SystemStreamPartition taskNames %s" format sspTaskNames)
     info("Samza container PID: %s" format containerPID)
-    info("Using streams and partitions: %s" format inputStreams)
     info("Using configuration: %s" format config)
 
     val registry = new MetricsRegistryMap(containerName)
@@ -122,7 +137,7 @@ object SamzaContainer extends Logging {
     val systemProducersMetrics = new SystemProducersMetrics(registry)
     val systemConsumersMetrics = new SystemConsumersMetrics(registry)
 
-    val inputSystems = inputStreams.map(_.getSystem)
+    val inputSystems = sspTaskNames.getAllSystems()
 
     val systemNames = config.getSystemNames
 
@@ -150,7 +165,7 @@ object SamzaContainer extends Logging {
     info("Got system factories: %s" format systemFactories.keys)
 
     val streamMetadataCache = new StreamMetadataCache(systemAdmins)
-    val inputStreamMetadata = 
streamMetadataCache.getStreamMetadata(inputStreams.map(_.getSystemStream))
+    val inputStreamMetadata = 
streamMetadataCache.getStreamMetadata(sspTaskNames.getAllSystemStreams)
 
     info("Got input stream metadata: %s" format inputStreamMetadata)
 
@@ -219,7 +234,7 @@ object SamzaContainer extends Logging {
      * A Helper function to build a Map[SystemStream, Serde] for streams 
defined in the config. This is useful to build both key and message serde maps.
      */
     val buildSystemStreamSerdeMap = (getSerdeName: (SystemStream) => 
Option[String]) => {
-      (serdeStreams ++ inputStreams)
+      (serdeStreams ++ sspTaskNames.getAllSSPs())
         .filter(systemStream => getSerdeName(systemStream).isDefined)
         .map(systemStream => {
           val serdeName = getSerdeName(systemStream).get
@@ -384,20 +399,20 @@ object SamzaContainer extends Logging {
 
     info("Got commit milliseconds: %s" format taskCommitMs)
 
-    // Wire up all task-level (unshared) objects.
+    // Wire up all task-instance-level (unshared) objects.
 
-    val partitions = inputStreams.map(_.getPartition).toSet
+    val taskNames = sspTaskNames.keys.toSet
 
-    val containerContext = new SamzaContainerContext(containerName, config, 
partitions)
+    val containerContext = new SamzaContainerContext(containerName, config, 
taskNames)
 
-    val taskInstances = partitions.map(partition => {
-      debug("Setting up task instance: %s" format partition)
+    val taskInstances: Map[TaskName, TaskInstance] = taskNames.map(taskName => 
{
+      debug("Setting up task instance: %s" format taskName)
 
       val task = Util.getObj[StreamTask](taskClassName)
 
       val collector = new ReadableCollector
 
-      val taskInstanceMetrics = new TaskInstanceMetrics("Partition-%s" format 
partition.getPartitionId)
+      val taskInstanceMetrics = new TaskInstanceMetrics("TaskName-%s" format 
taskName)
 
       val storeConsumers = changeLogSystemStreams
         .map {
@@ -410,11 +425,13 @@ object SamzaContainer extends Logging {
 
       info("Got store consumers: %s" format storeConsumers)
 
+      val partitionForThisTaskName = new 
Partition(taskNameToChangeLogPartitionMapping(taskName))
+
       val taskStores = storageEngineFactories
         .map {
           case (storeName, storageEngineFactory) =>
             val changeLogSystemStreamPartition = if 
(changeLogSystemStreams.contains(storeName)) {
-              new SystemStreamPartition(changeLogSystemStreams(storeName), 
partition)
+              new SystemStreamPartition(changeLogSystemStreams(storeName), 
partitionForThisTaskName)
             } else {
               null
             }
@@ -426,7 +443,7 @@ object SamzaContainer extends Logging {
               case Some(msgSerde) => serdes(msgSerde)
               case _ => null
             }
-            val storePartitionDir = 
TaskStorageManager.getStorePartitionDir(storeBaseDir, storeName, partition)
+            val storePartitionDir = 
TaskStorageManager.getStorePartitionDir(storeBaseDir, storeName, taskName)
             val storageEngine = storageEngineFactory.getStorageEngine(
               storeName,
               storePartitionDir,
@@ -441,25 +458,26 @@ object SamzaContainer extends Logging {
 
       info("Got task stores: %s" format taskStores)
 
-      val changeLogOldestOffsets = 
getChangeLogOldestOffsetsForPartition(partition, changeLogMetadata)
+      val changeLogOldestOffsets = 
getChangeLogOldestOffsetsForPartition(partitionForThisTaskName, 
changeLogMetadata)
 
-      info("Assigning oldest change log offsets for partition %s: %s" format 
(partition, changeLogOldestOffsets))
+      info("Assigning oldest change log offsets for taskName %s: %s" format 
(taskName, changeLogOldestOffsets))
 
       val storageManager = new TaskStorageManager(
-        partition = partition,
+        taskName = taskName,
         taskStores = taskStores,
         storeConsumers = storeConsumers,
         changeLogSystemStreams = changeLogSystemStreams,
         changeLogOldestOffsets = changeLogOldestOffsets,
-        storeBaseDir = storeBaseDir)
+        storeBaseDir = storeBaseDir,
+        partitionForThisTaskName)
 
-      val inputStreamsForThisPartition = 
inputStreams.filter(_.getPartition.equals(partition)).map(_.getSystemStream)
+      val systemStreamPartitions: Set[SystemStreamPartition] = 
sspTaskNames.getOrElse(taskName, throw new SamzaException("Can't find taskName 
" + taskName + " in map of SystemStreamPartitions: " + sspTaskNames))
 
-      info("Assigning SystemStreams " + inputStreamsForThisPartition + " to " 
+ partition)
+      info("Retrieved SystemStreamPartitions " + systemStreamPartitions + " 
for " + taskName)
 
       val taskInstance = new TaskInstance(
         task = task,
-        partition = partition,
+        taskName = taskName,
         config = config,
         metrics = taskInstanceMetrics,
         consumerMultiplexer = consumerMultiplexer,
@@ -468,10 +486,10 @@ object SamzaContainer extends Logging {
         storageManager = storageManager,
         reporters = reporters,
         listeners = listeners,
-        inputStreams = inputStreamsForThisPartition,
+        systemStreamPartitions = systemStreamPartitions,
         collector = collector)
 
-      (partition, taskInstance)
+      (taskName, taskInstance)
     }).toMap
 
     val runLoop = new RunLoop(
@@ -506,7 +524,7 @@ object SamzaContainer extends Logging {
 }
 
 class SamzaContainer(
-  taskInstances: Map[Partition, TaskInstance],
+  taskInstances: Map[TaskName, TaskInstance],
   runLoop: RunLoop,
   consumerMultiplexer: SystemConsumers,
   producerMultiplexer: SystemProducers,

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/main/scala/org/apache/samza/container/SystemStreamPartitionTaskNameGrouper.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SystemStreamPartitionTaskNameGrouper.scala
 
b/samza-core/src/main/scala/org/apache/samza/container/SystemStreamPartitionTaskNameGrouper.scala
new file mode 100644
index 0000000..a8c93ac
--- /dev/null
+++ 
b/samza-core/src/main/scala/org/apache/samza/container/SystemStreamPartitionTaskNameGrouper.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container
+
+/**
+ * After the input SystemStreamPartitions have been mapped to their TaskNames 
by an implementation of
+ * {@link org.apache.samza.container.SystemStreamPartitionGrouper}, we can 
then map those groupings onto
+ * the {@link org.apache.samza.container.SamzaContainer}s on which they will 
run.  This class takes
+ * those groupings-of-SSPs and groups them together on which container each 
should run on.  A simple
+ * implementation could assign each TaskNamesToSystemStreamPartition to a 
separate container.  More
+ * advanced implementations could examine the TaskNamesToSystemStreamPartition 
to group by them
+ * by data locality, anti-affinity, even distribution of expected bandwidth 
consumption, etc.
+ */
+trait SystemStreamPartitionTaskNameGrouper {
+  /**
+   * Group TaskNamesToSystemStreamPartitions onto the containers they will 
share
+   *
+   * @param taskNames Pre-grouped SSPs
+   * @return Mapping of container ID to set if TaskNames it will run
+   */
+  def groupTaskNames(taskNames: TaskNamesToSystemStreamPartitions): Map[Int, 
TaskNamesToSystemStreamPartitions]
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index 99a9841..9484ddb 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -21,33 +21,27 @@ package org.apache.samza.container
 
 import org.apache.samza.metrics.MetricsReporter
 import org.apache.samza.config.Config
-import org.apache.samza.Partition
 import grizzled.slf4j.Logging
-import scala.collection.JavaConversions._
 import org.apache.samza.storage.TaskStorageManager
-import org.apache.samza.config.StreamConfig.Config2Stream
 import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.task.TaskContext
 import org.apache.samza.task.ClosableTask
 import org.apache.samza.task.InitableTask
 import org.apache.samza.system.IncomingMessageEnvelope
 import org.apache.samza.task.WindowableTask
-import org.apache.samza.checkpoint.CheckpointManager
 import org.apache.samza.task.TaskLifecycleListener
 import org.apache.samza.task.StreamTask
-import org.apache.samza.system.SystemStream
-import org.apache.samza.checkpoint.Checkpoint
 import org.apache.samza.task.ReadableCollector
 import org.apache.samza.system.SystemConsumers
 import org.apache.samza.system.SystemProducers
 import org.apache.samza.task.ReadableCoordinator
-import org.apache.samza.metrics.Gauge
 import org.apache.samza.checkpoint.OffsetManager
 import org.apache.samza.SamzaException
+import scala.collection.JavaConversions._
 
 class TaskInstance(
   task: StreamTask,
-  partition: Partition,
+  val taskName: TaskName,
   config: Config,
   metrics: TaskInstanceMetrics,
   consumerMultiplexer: SystemConsumers,
@@ -56,15 +50,14 @@ class TaskInstance(
   storageManager: TaskStorageManager = null,
   reporters: Map[String, MetricsReporter] = Map(),
   listeners: Seq[TaskLifecycleListener] = Seq(),
-  inputStreams: Set[SystemStream] = Set(),
+  val systemStreamPartitions: Set[SystemStreamPartition] = Set(),
   collector: ReadableCollector = new ReadableCollector) extends Logging {
-
   val isInitableTask = task.isInstanceOf[InitableTask]
   val isWindowableTask = task.isInstanceOf[WindowableTask]
   val isClosableTask = task.isInstanceOf[ClosableTask]
   val context = new TaskContext {
     def getMetricsRegistry = metrics.registry
-    def getPartition = partition
+    def getSystemStreamPartitions = systemStreamPartitions
     def getStore(storeName: String) = if (storageManager != null) {
       storageManager(storeName)
     } else {
@@ -72,29 +65,28 @@ class TaskInstance(
 
       null
     }
+    def getTaskName = taskName
   }
 
   def registerMetrics {
-    debug("Registering metrics for partition: %s." format partition)
+    debug("Registering metrics for taskName: %s" format taskName)
 
     reporters.values.foreach(_.register(metrics.source, metrics.registry))
   }
 
   def registerOffsets {
-    debug("Registering offsets for partition: %s." format partition)
+    debug("Registering offsets for taskName: %s" format taskName)
 
-    inputStreams.foreach(systemStream => {
-      offsetManager.register(new SystemStreamPartition(systemStream, 
partition))
-    })
+    offsetManager.register(taskName, systemStreamPartitions)
   }
 
   def startStores {
     if (storageManager != null) {
-      debug("Starting storage manager for partition: %s." format partition)
+      debug("Starting storage manager for taskName: %s" format taskName)
 
       storageManager.init
     } else {
-      debug("Skipping storage manager initialization for partition: %s." 
format partition)
+      debug("Skipping storage manager initialization for taskName: %s" format 
taskName)
     }
   }
 
@@ -102,31 +94,30 @@ class TaskInstance(
     listeners.foreach(_.beforeInit(config, context))
 
     if (isInitableTask) {
-      debug("Initializing task for partition: %s." format partition)
+      debug("Initializing task for taskName: %s" format taskName)
 
       task.asInstanceOf[InitableTask].init(config, context)
     } else {
-      debug("Skipping task initialization for partition: %s." format partition)
+      debug("Skipping task initialization for taskName: %s" format taskName)
     }
 
     listeners.foreach(_.afterInit(config, context))
   }
 
   def registerProducers {
-    debug("Registering producers for partition: %s." format partition)
+    debug("Registering producers for taskName: %s" format taskName)
 
     producerMultiplexer.register(metrics.source)
   }
 
   def registerConsumers {
-    debug("Registering consumers for partition: %s." format partition)
+    debug("Registering consumers for taskName: %s" format taskName)
 
-    inputStreams.foreach(systemStream => {
-      val systemStreamPartition = new SystemStreamPartition(systemStream, 
partition)
+    systemStreamPartitions.foreach(systemStreamPartition => {
       val offset = offsetManager.getStartingOffset(systemStreamPartition)
-        .getOrElse(throw new SamzaException("No offset defined for partition 
%s: %s" format (partition, systemStream)))
+        .getOrElse(throw new SamzaException("No offset defined for 
SystemStreamPartition: %s" format systemStreamPartition))
       consumerMultiplexer.register(systemStreamPartition, offset)
-      metrics.addOffsetGauge(systemStream, () => {
+      metrics.addOffsetGauge(systemStreamPartition, () => {
         offsetManager
           .getLastProcessedOffset(systemStreamPartition)
           .getOrElse(null)
@@ -139,20 +130,20 @@ class TaskInstance(
 
     listeners.foreach(_.beforeProcess(envelope, config, context))
 
-    trace("Processing incoming message envelope for partition: %s, %s" format 
(partition, envelope.getSystemStreamPartition))
+    trace("Processing incoming message envelope for taskName and SSP: %s, %s" 
format (taskName, envelope.getSystemStreamPartition))
 
     task.process(envelope, collector, coordinator)
 
     listeners.foreach(_.afterProcess(envelope, config, context))
 
-    trace("Updating offset map for partition: %s, %s, %s" format (partition, 
envelope.getSystemStreamPartition, envelope.getOffset))
+    trace("Updating offset map for taskName, SSP and offset: %s, %s, %s" 
format (taskName, envelope.getSystemStreamPartition, envelope.getOffset))
 
     offsetManager.update(envelope.getSystemStreamPartition, envelope.getOffset)
   }
 
   def window(coordinator: ReadableCoordinator) {
     if (isWindowableTask) {
-      trace("Windowing for partition: %s" format partition)
+      trace("Windowing for taskName: %s" format taskName)
 
       metrics.windows.inc
 
@@ -162,48 +153,48 @@ class TaskInstance(
 
   def send {
     if (collector.envelopes.size > 0) {
-      trace("Sending messages for partition: %s, %s" format (partition, 
collector.envelopes.size))
+      trace("Sending messages for taskName: %s, %s" format (taskName, 
collector.envelopes.size))
 
       metrics.sends.inc
       metrics.messagesSent.inc(collector.envelopes.size)
 
       collector.envelopes.foreach(envelope => 
producerMultiplexer.send(metrics.source, envelope))
 
-      trace("Resetting collector for partition: %s" format partition)
+      trace("Resetting collector for taskName: %s" format taskName)
 
       collector.reset
     } else {
-      trace("Skipping send for partition %s because no messages were 
collected." format partition)
+      trace("Skipping send for taskName %s because no messages were 
collected." format taskName)
 
       metrics.sendsSkipped.inc
     }
   }
 
   def commit {
-    trace("Flushing state stores for partition: %s" format partition)
+    trace("Flushing state stores for taskName: %s" format taskName)
 
     metrics.commits.inc
 
     storageManager.flush
 
-    trace("Flushing producers for partition: %s" format partition)
+    trace("Flushing producers for taskName: %s" format taskName)
 
     producerMultiplexer.flush(metrics.source)
 
-    trace("Committing offset manager for partition: %s" format partition)
+    trace("Committing offset manager for taskName: %s" format taskName)
 
-    offsetManager.checkpoint(partition)
+    offsetManager.checkpoint(taskName)
   }
 
   def shutdownTask {
     listeners.foreach(_.beforeClose(config, context))
 
     if (task.isInstanceOf[ClosableTask]) {
-      debug("Shutting down stream task for partition: %s" format partition)
+      debug("Shutting down stream task for taskName: %s" format taskName)
 
       task.asInstanceOf[ClosableTask].close
     } else {
-      debug("Skipping stream task shutdown for partition: %s" format partition)
+      debug("Skipping stream task shutdown for taskName: %s" format taskName)
     }
 
     listeners.foreach(_.afterClose(config, context))
@@ -211,16 +202,16 @@ class TaskInstance(
 
   def shutdownStores {
     if (storageManager != null) {
-      debug("Shutting down storage manager for partition: %s" format partition)
+      debug("Shutting down storage manager for taskName: %s" format taskName)
 
       storageManager.stop
     } else {
-      debug("Skipping storage manager shutdown for partition: %s" format 
partition)
+      debug("Skipping storage manager shutdown for taskName: %s" format 
taskName)
     }
   }
 
-  override def toString() = "TaskInstance for class %s and partition %s." 
format (task.getClass.getName, partition)
+  override def toString() = "TaskInstance for class %s and taskName %s." 
format (task.getClass.getName, taskName)
 
-  def toDetailedString() = "TaskInstance [windowable=%s, closable=%s, 
collector_size=%s]" format (isWindowableTask, isClosableTask, 
collector.envelopes.size)
+  def toDetailedString() = "TaskInstance [taskName = %s, windowable=%s, 
closable=%s, collector_size=%s]" format (taskName, isWindowableTask, 
isClosableTask, collector.envelopes.size)
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
 
b/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
index 7502124..aae3f87 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
@@ -21,10 +21,8 @@ package org.apache.samza.container
 
 import org.apache.samza.metrics.ReadableMetricsRegistry
 import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.Partition
 import org.apache.samza.metrics.MetricsHelper
-import org.apache.samza.system.SystemStream
-import org.apache.samza.metrics.Gauge
+import org.apache.samza.system.SystemStreamPartition
 
 class TaskInstanceMetrics(
   val source: String = "unknown",
@@ -37,7 +35,7 @@ class TaskInstanceMetrics(
   val sendsSkipped = newCounter("send-skipped")
   val messagesSent = newCounter("messages-sent")
 
-  def addOffsetGauge(systemStream: SystemStream, getValue: () => String) {
-    newGauge("%s-%s-offset" format (systemStream.getSystem, 
systemStream.getStream), getValue)
+  def addOffsetGauge(systemStreamPartition: SystemStreamPartition, getValue: 
() => String) {
+    newGauge("%s-%s-%d-offset" format (systemStreamPartition.getSystem, 
systemStreamPartition.getStream, 
systemStreamPartition.getPartition.getPartitionId), getValue)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/main/scala/org/apache/samza/container/TaskNamesToSystemStreamPartitions.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/TaskNamesToSystemStreamPartitions.scala
 
b/samza-core/src/main/scala/org/apache/samza/container/TaskNamesToSystemStreamPartitions.scala
new file mode 100644
index 0000000..427119e
--- /dev/null
+++ 
b/samza-core/src/main/scala/org/apache/samza/container/TaskNamesToSystemStreamPartitions.scala
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container
+
+import grizzled.slf4j.Logging
+import org.apache.samza.SamzaException
+import org.apache.samza.system.{SystemStream, SystemStreamPartition}
+import scala.collection.{immutable, Map, MapLike}
+
+/**
+ * Map of {@link TaskName} to its set of {@link SystemStreamPartition}s with 
additional methods for aggregating
+ * those SystemStreamPartitions' individual system, streams and partitions.  
Is useful for highlighting this
+ * particular, heavily used map within the code.
+ *
+ * @param m Original map of TaskNames to SystemStreamPartitions
+ */
+class TaskNamesToSystemStreamPartitions(m:Map[TaskName, 
Set[SystemStreamPartition]] = Map[TaskName, Set[SystemStreamPartition]]())
+  extends Map[TaskName, Set[SystemStreamPartition]]
+  with MapLike[TaskName, Set[SystemStreamPartition], 
TaskNamesToSystemStreamPartitions] with Logging {
+
+  // Constructor
+  validate
+
+  // Methods
+
+  // TODO: Get rid of public constructor, rely entirely on the companion object
+  override def -(key: TaskName): TaskNamesToSystemStreamPartitions = new 
TaskNamesToSystemStreamPartitions(m - key)
+
+  override def +[B1 >: Set[SystemStreamPartition]](kv: (TaskName, B1)): 
Map[TaskName, B1] = new TaskNamesToSystemStreamPartitions(m + 
kv.asInstanceOf[(TaskName, Set[SystemStreamPartition])])
+
+  override def iterator: Iterator[(TaskName, Set[SystemStreamPartition])] = 
m.iterator
+
+  override def get(key: TaskName): Option[Set[SystemStreamPartition]] = 
m.get(key)
+
+  override def empty: TaskNamesToSystemStreamPartitions = new 
TaskNamesToSystemStreamPartitions()
+
+  override def seq: Map[TaskName, Set[SystemStreamPartition]] = m.seq
+
+  override def foreach[U](f: ((TaskName, Set[SystemStreamPartition])) => U): 
Unit = m.foreach(f)
+
+  override def size: Int = m.size
+
+  /**
+   * Validate that this is a legal mapping of TaskNames to 
SystemStreamPartitions.  At the moment,
+   * we only check that an SSP is included in the mapping at most once.  We 
could add other,
+   * pluggable validations here, or if we decided to allow an SSP to appear in 
the mapping more than
+   * once, remove this limitation.
+   */
+  def validate():Unit = {
+    // Convert sets of SSPs to lists, to preserve duplicates
+    val allSSPs: List[SystemStreamPartition] = 
m.values.toList.map(_.toList).flatten
+    val sspCountMap = allSSPs.groupBy(ssp => ssp)  // Group all the SSPs 
together
+      .map(ssp => (ssp._1 -> ssp._2.size))         // Turn into map -> count 
of that SSP
+      .filter(ssp => ssp._2 != 1)                  // Filter out those that 
appear once
+
+    if(!sspCountMap.isEmpty) {
+      throw new SamzaException("Assigning the same SystemStreamPartition to 
multiple TaskNames is not currently supported." +
+        "  Out of compliance SystemStreamPartitions and counts: " + 
sspCountMap)
+    }
+
+    debug("Successfully validated TaskName to SystemStreamPartition set 
mapping:" + m)
+  }
+
+  /**
+   * Return a set of all the SystemStreamPartitions for all the keys.
+   *
+   * @return All SystemStreamPartitions within this map
+   */
+  def getAllSSPs(): Iterable[SystemStreamPartition] = m.values.flatten
+
+  /**
+   * Return a set of all the Systems presents in the SystemStreamPartitions 
across all the keys
+   *
+   * @return All Systems within this map
+   */
+  def getAllSystems(): Set[String] = 
getAllSSPs.map(_.getSystemStream.getSystem).toSet
+
+  /**
+   * Return a set of all the Partition IDs in the SystemStreamPartitions 
across all the keys
+   *
+   * @return All Partition IDs within this map
+   */
+  def getAllPartitionIds(): Set[Int] = 
getAllSSPs.map(_.getPartition.getPartitionId).toSet
+
+  /**
+   * Return a set of all the Streams in the SystemStreamPartitions across all 
the keys
+   *
+   * @return All Streams within this map
+   */
+  def getAllStreams(): Set[String] = 
getAllSSPs.map(_.getSystemStream.getStream).toSet
+
+  /**
+   * Return a set of all the SystemStreams in the SystemStreamPartitions 
across all the keys
+   *
+   * @return All SystemStreams within this map
+   */
+  def getAllSystemStreams: Set[SystemStream] = 
getAllSSPs().map(_.getSystemStream).toSet
+
+  // CommandBuilder needs to get a copy of this map and is a Java interface, 
therefore we can't just go straight
+  // from this type to JSON (for passing into the command option.
+  // Not super crazy about having the Java -> Scala and Scala -> Java methods 
in two different (but close) places:
+  // here and in the apply method on the companion object.  May be better to 
just have a conversion util, but would
+  // be less clean.  Life is cruel on the border of Scalapolis and Javatown.
+  def getJavaFriendlyType: java.util.Map[TaskName, 
java.util.Set[SystemStreamPartition]] = {
+    import scala.collection.JavaConverters._
+
+    m.map({case(k,v) => k -> v.asJava}).toMap.asJava
+  }
+}
+
+object TaskNamesToSystemStreamPartitions {
+  def apply() = new TaskNamesToSystemStreamPartitions()
+
+  def apply(m: Map[TaskName, Set[SystemStreamPartition]]) = new 
TaskNamesToSystemStreamPartitions(m)
+
+  /**
+   * Convert from Java-happy type we obtain from the SSPTaskName factory
+   *
+   * @param m Java version of a map of sets of strings
+   * @return Populated SSPTaskName map
+   */
+  def apply(m: java.util.Map[TaskName, java.util.Set[SystemStreamPartition]]) 
= {
+    import scala.collection.JavaConversions._
+
+    val rightType: immutable.Map[TaskName, Set[SystemStreamPartition]] = 
m.map({case(k,v) => k -> v.toSet}).toMap
+
+    new TaskNamesToSystemStreamPartitions(rightType)
+  }
+}

Reply via email to