Updated Branches:
  refs/heads/master 01caadbca -> b694e7ade

SAMZA-82: Not use maximum number of partitions when initializing streams.


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/b694e7ad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/b694e7ad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/b694e7ad

Branch: refs/heads/master
Commit: b694e7adecb8dc326e8b96cc51b32603ffd4d9f2
Parents: 01caadb
Author: Jakob Homan <[email protected]>
Authored: Thu Dec 12 16:37:04 2013 -0800
Committer: Jakob Homan <[email protected]>
Committed: Thu Dec 12 16:37:04 2013 -0800

----------------------------------------------------------------------
 .../org/apache/samza/job/CommandBuilder.java    | 20 ++---
 .../samza/config/ShellCommandConfig.scala       |  5 +-
 .../org/apache/samza/config/TaskConfig.scala    |  2 -
 .../apache/samza/container/SamzaContainer.scala | 28 +++---
 .../apache/samza/job/ShellCommandBuilder.scala  |  7 +-
 .../samza/job/local/LocalJobFactory.scala       |  9 +-
 .../main/scala/org/apache/samza/util/Util.scala | 91 ++++++++++++++-----
 .../scala/org/apache/samza/util/TestUtil.scala  | 92 ++++++++++++++++++++
 .../kafka/KafkaCheckpointManagerFactory.scala   |  4 +-
 .../job/yarn/SamzaAppMasterTaskManager.scala    | 30 ++-----
 .../yarn/TestSamzaAppMasterTaskManager.scala    | 45 +++++-----
 11 files changed, 224 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/b694e7ad/samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java 
b/samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java
index 934423b..5ec6433 100644
--- a/samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java
+++ b/samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java
@@ -19,23 +19,22 @@
 
 package org.apache.samza.job;
 
+import org.apache.samza.config.Config;
+import org.apache.samza.system.SystemStreamPartition;
+
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.samza.Partition;
-import org.apache.samza.config.Config;
-
 public abstract class CommandBuilder {
-  protected Set<Partition> partitions;
-  protected int totalPartitions;
+  protected Set<SystemStreamPartition> systemStreamPartitions;
   protected String name;
   protected Config config;
 
-  public CommandBuilder setPartitions(Set<Partition> partitions) {
-    this.partitions = partitions;
+  public CommandBuilder setStreamPartitions(Set<SystemStreamPartition> ssp) {
+    this.systemStreamPartitions = ssp;
     return this;
   }
-
+  
   /**
    * @param name
    *          associated with a specific instantiation of a TaskRunner.
@@ -51,11 +50,6 @@ public abstract class CommandBuilder {
     return this;
   }
 
-  public CommandBuilder setTotalPartitions(int totalPartitions) {
-    this.totalPartitions = totalPartitions;
-    return this;
-  }
-
   public abstract String buildCommand();
 
   public abstract Map<String, String> buildEnvironment();

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/b694e7ad/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala 
b/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
index b6efe46..3e4ab29 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
@@ -26,9 +26,10 @@ object ShellCommandConfig {
   val ENV_CONFIG = "SAMZA_CONFIG"
 
   /**
-   * A CSV list of partition IDs that a TaskRunner is responsible for (e.g. 
0,2,4,6).
+   * An encoded list of the streams and partitions this container is 
responsible for. Encoded by 
+   * {@link org.apache.samza.util.Util#createStreamPartitionString}
    */
-  val ENV_PARTITION_IDS = "SAMZA_PARTITION_IDS"
+  val ENV_SYSTEM_STREAMS = "SAMZA_SYSTEM_STREAMS"
 
   /**
    * The name for a container (either a YARN AM or SamzaContainer)

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/b694e7ad/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 
b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
index 9c4370f..3510f1f 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
@@ -19,8 +19,6 @@
 
 package org.apache.samza.config
 
-import org.apache.samza.config.StreamConfig.Config2Stream
-import org.apache.samza.SamzaException
 import org.apache.samza.util.Util
 import org.apache.samza.system.SystemStream
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/b694e7ad/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index aefec27..9baff44 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -38,7 +38,6 @@ import org.apache.samza.metrics.JmxServer
 import org.apache.samza.metrics.JvmMetrics
 import org.apache.samza.metrics.MetricsReporter
 import org.apache.samza.metrics.MetricsReporterFactory
-import org.apache.samza.serializers.Serde
 import org.apache.samza.serializers.SerdeFactory
 import org.apache.samza.serializers.SerdeManager
 import org.apache.samza.storage.StorageEngineFactory
@@ -54,7 +53,6 @@ import org.apache.samza.task.ReadableCoordinator
 import org.apache.samza.system.SystemProducers
 import org.apache.samza.task.ReadableCollector
 import org.apache.samza.system.SystemConsumers
-import org.apache.samza.system.chooser.MessageChooser
 import org.apache.samza.system.chooser.MessageChooserFactory
 import org.apache.samza.system.SystemProducersMetrics
 import org.apache.samza.system.SystemConsumersMetrics
@@ -68,12 +66,11 @@ object SamzaContainer extends Logging {
     val containerName = System.getenv(ShellCommandConfig.ENV_CONTAINER_NAME)
     val configStr = System.getenv(ShellCommandConfig.ENV_CONFIG)
     val config = JsonConfigSerializer.fromJson(configStr)
-    val partitionIdsCsv = System.getenv(ShellCommandConfig.ENV_PARTITION_IDS)
-    val partitions = if (partitionIdsCsv.length > 0) {
-      partitionIdsCsv.split(",")
-        .map(partitionIdStr => new Partition(partitionIdStr.toInt))
-        .toSet
-    } else {
+    val encodedStreamsAndPartitions = 
System.getenv(ShellCommandConfig.ENV_SYSTEM_STREAMS)
+    
+    val partitions = 
Util.createStreamPartitionsFromString(encodedStreamsAndPartitions)
+    
+    if(partitions.isEmpty) {
       throw new SamzaException("No partitions for this task. Can't run a task 
without partition assignments. It's likely that the partition manager for this 
system doesn't know about the stream you're trying to read.")
     }
 
@@ -84,9 +81,9 @@ object SamzaContainer extends Logging {
     }
   }
 
-  def apply(containerName: String, partitions: Set[Partition], config: Config) 
= {
+  def apply(containerName: String, inputStreams: Set[SystemStreamPartition], 
config: Config) = {
     info("Setting up Samza container: %s" format containerName)
-    info("Using partitions: %s" format partitions)
+    info("Using streams and partitions: %s" format inputStreams)
     info("Using configuration: %s" format config)
 
     val registry = new MetricsRegistryMap(containerName)
@@ -94,11 +91,8 @@ object SamzaContainer extends Logging {
     val systemProducersMetrics = new SystemProducersMetrics(registry)
     val systemConsumersMetrics = new SystemConsumersMetrics(registry)
 
-    val inputStreams = config.getInputStreams
     val inputSystems = inputStreams.map(_.getSystem)
 
-    info("Got input streams: %s" format inputStreams)
-
     val systemNames = config.getSystemNames
 
     info("Got system names: %s" format systemNames)
@@ -336,6 +330,9 @@ object SamzaContainer extends Logging {
     info("Got commit milliseconds: %s" format taskCommitMs)
 
     // Wire up all task-level (unshared) objects.
+
+    val partitions = inputStreams.map(_.getPartition).toSet
+    
     val taskInstances = partitions.map(partition => {
       debug("Setting up task instance: %s" format partition)
 
@@ -394,6 +391,9 @@ object SamzaContainer extends Logging {
         changeLogSystemStreams = changeLogSystemStreams,
         storeBaseDir = storeBaseDir)
 
+      val inputStreamsForThisPartition = 
inputStreams.filter(_.getPartition.equals(partition)).map(_.getSystemStream)
+      info("Assigning SystemStreams " + inputStreamsForThisPartition + " to " 
+ partition)
+      
       val taskInstance = new TaskInstance(
         task = task,
         partition = partition,
@@ -405,7 +405,7 @@ object SamzaContainer extends Logging {
         checkpointManager = checkpointManager,
         reporters = reporters,
         listeners = listeners,
-        inputStreams = inputStreams,
+        inputStreams = inputStreamsForThisPartition,
         resetInputStreams = resetInputStreams,
         windowMs = taskWindowMs,
         commitMs = taskCommitMs,

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/b694e7ad/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala 
b/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala
index c67e46d..b4eaf90 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala
@@ -23,16 +23,17 @@ import scala.collection.JavaConversions._
 import org.apache.samza.config.ShellCommandConfig
 import org.apache.samza.config.ShellCommandConfig.Config2ShellCommand
 import org.apache.samza.config.serializers.JsonConfigSerializer
+import org.apache.samza.util.Util
 
 class ShellCommandBuilder extends CommandBuilder {
   def buildCommand() = config.getCommand
 
   def buildEnvironment(): java.util.Map[String, String] = {
-    val parts = if (partitions.size() > 0) 
partitions.map(_.getPartitionId.toString).reduceLeft(_ + "," + _) else ""
-
+    val streamsAndPartsString = 
Util.createStreamPartitionString(systemStreamPartitions.toSet) // Java to Scala 
set conversion
+    
     Map(
       ShellCommandConfig.ENV_CONTAINER_NAME -> name,
-      ShellCommandConfig.ENV_PARTITION_IDS -> parts,
+      ShellCommandConfig.ENV_SYSTEM_STREAMS -> streamsAndPartsString,
       ShellCommandConfig.ENV_CONFIG -> JsonConfigSerializer.toJson(config),
       ShellCommandConfig.ENV_JAVA_OPTS -> config.getTaskOpts.getOrElse(""))
   }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/b694e7ad/samza-core/src/main/scala/org/apache/samza/job/local/LocalJobFactory.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/job/local/LocalJobFactory.scala 
b/samza-core/src/main/scala/org/apache/samza/job/local/LocalJobFactory.scala
index ddb119b..e20e7c1 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/LocalJobFactory.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/LocalJobFactory.scala
@@ -20,14 +20,11 @@
 package org.apache.samza.job.local
 import org.apache.samza.config.TaskConfig._
 import org.apache.samza.config.Config
-import org.apache.samza.config.SystemConfig._
-import org.apache.samza.config.StreamConfig._
 import org.apache.samza.config.ShellCommandConfig._
 import org.apache.samza.job.CommandBuilder
 import org.apache.samza.job.StreamJob
 import org.apache.samza.job.StreamJobFactory
 import scala.collection.JavaConversions._
-import org.apache.samza.Partition
 import grizzled.slf4j.Logging
 import org.apache.samza.SamzaException
 import org.apache.samza.container.SamzaContainer
@@ -37,7 +34,7 @@ import org.apache.samza.job.ShellCommandBuilder
 class LocalJobFactory extends StreamJobFactory with Logging {
   def getJob(config: Config): StreamJob = {
     val taskName = "local-task"
-    val partitions = Util.getMaxInputStreamPartitions(config)
+    val partitions = Util.getInputStreamPartitions(config)
 
     info("got partitions for job %s" format partitions)
 
@@ -50,11 +47,11 @@ class LocalJobFactory extends StreamJobFactory with Logging 
{
         // A command class was specified, so we need to use a process job to
         // execute the command in its own process.
         val cmdBuilder = 
Class.forName(cmdBuilderClassName).newInstance.asInstanceOf[CommandBuilder]
-
+        
         cmdBuilder
           .setConfig(config)
           .setName(taskName)
-          .setPartitions(partitions)
+          .setStreamPartitions(partitions)
 
         val processBuilder = new 
ProcessBuilder(cmdBuilder.buildCommand.split(" ").toList)
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/b694e7ad/samza-core/src/main/scala/org/apache/samza/util/Util.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala 
b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
index 6b2ec49..04a6dfc 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
@@ -20,18 +20,14 @@
 package org.apache.samza.util
 
 import java.io.File
-import java.net.InetAddress
-import java.net.UnknownHostException
 import java.util.Random
 import grizzled.slf4j.Logging
-import org.apache.samza.SamzaException
+import org.apache.samza.{Partition, SamzaException}
 import org.apache.samza.config.Config
 import org.apache.samza.config.SystemConfig.Config2System
 import org.apache.samza.config.TaskConfig.Config2Task
 import scala.collection.JavaConversions._
-import java.util.concurrent.ThreadFactory
-import org.apache.samza.system.SystemFactory
-import org.apache.samza.system.SystemStream
+import org.apache.samza.system.{SystemStreamPartition, SystemAdmin, 
SystemFactory, SystemStream}
 
 object Util extends Logging {
   val random = new Random
@@ -77,15 +73,17 @@ object Util extends Logging {
   }
 
   /**
-   * Uses config to create SystemAdmin classes for all input stream systems to
-   * get each input stream's partition count, then returns the maximum count.
-   * An input stream with two partitions, and a second input stream with four
-   * partitions would result in this method returning 4.
+   * For each input stream specified in config, exactly determine its 
partitions, returning a set of SystemStreamPartitions
+   * corresponding to them all
+   * 
+   * @param config Source of truth for systems and inputStreams
+   * @return Set of SystemStreamPartitions, one for each unique system, stream 
and partition
    */
-  def getMaxInputStreamPartitions(config: Config) = {
-    val inputStreams = config.getInputStreams
+  def getInputStreamPartitions(config: Config): Set[SystemStreamPartition] = {
+    val systemStreams = config.getInputStreams
     val systemNames = config.getSystemNames
-    val systemAdmins = systemNames.map(systemName => {
+    
+    val systemAdmins: Map[String, SystemAdmin] = systemNames.map(systemName => 
{
       val systemFactoryClassName = config
         .getSystemFactory(systemName)
         .getOrElse(throw new SamzaException("A stream uses system %s, which is 
missing from the configuration." format systemName))
@@ -93,14 +91,12 @@ object Util extends Logging {
       val systemAdmin = systemFactory.getAdmin(systemName, config)
       (systemName, systemAdmin)
     }).toMap
-    inputStreams.flatMap(systemStream => {
-      systemAdmins.get(systemStream.getSystem) match {
-        case Some(sysAdmin) => sysAdmin.getPartitions(systemStream.getStream)
-        case None => throw new IllegalArgumentException("Could not find a 
stream admin for system '" + systemStream.getSystem + "'")
-      }
-    }).toSet
+    
+    def getPartitions(is:SystemStream) = systemAdmins.getOrElse(is.getSystem, 
throw new IllegalArgumentException("Could not find a stream admin for system '" 
+ is.getSystem + "'"))
+                                                     
.getPartitions(is.getStream).map(p => new SystemStreamPartition(is, p))
+    systemStreams.map(getPartitions).flatten
   }
-
+  
   /**
    * Returns a SystemStream object based on the system stream name given. For
    * example, kafka.topic would return new SystemStream("kafka", "topic").
@@ -121,6 +117,61 @@ object Util extends Logging {
   }
 
   /**
+   * For specified containerId, create a list of of the streams and partitions 
that task should handle,
+   * based on the number of tasks in the job
+   *
+   * @param containerId TaskID to determine work for
+   * @param containerCount Total number of tasks in the job
+   * @param ssp All SystemStreamPartitions
+   * @return Collection of streams and partitions for this particular 
containerId
+   */
+  def getStreamsAndPartitionsForContainer(containerId:Int, containerCount:Int, 
ssp:Set[SystemStreamPartition]): Set[SystemStreamPartition] = {
+    ssp.filter(_.getPartition.getPartitionId % containerCount == containerId)
+  }
+  
+  /**
+   * Serialize a collection of stream-partitions to a string suitable for 
passing between processes.
+   * The streams will be grouped by partition. The partition will be separated 
from the topics by
+   * a colon (":"), the topics separated by commas (",") and the topic-stream 
groups by a slash ("/").
+   * Ordering of the grouping is not specified.
+   *
+   * For example: (A,0),(A,4)(B,0)(B,4)(C,0) could be transformed to: 
4:a,b/0:a,b,c
+   *
+   * @param sp Stream topics to group into a string
+   * @return Serialized string of the topics and streams grouped and delimited
+   */
+  def createStreamPartitionString(sp: Set[SystemStreamPartition]): String = {
+    for(ch <- List(':', ',', '/');
+        s  <- sp) {
+      if(s.getStream.contains(ch)) throw new IllegalArgumentException(s + " 
contains illegal character " + ch)
+    }
+
+    sp.groupBy(_.getPartition).map(z => z._1.getPartitionId + ":" + z._2.map(y 
=> y.getSystem + "." +y.getStream).mkString(",")).mkString("/")
+
+  }
+
+  /**
+   * Invert @{list createStreamPartitionString}, building a list of streams 
and their partitions,
+   * from the string that function produced.
+   *
+   * @param sp Strings and partitions encoded as a stream by the above function
+   * @return List of string and partition tuples extracted from string. Order 
is not necessarily preserved.
+   */
+  def createStreamPartitionsFromString(sp:String): Set[SystemStreamPartition] 
= {
+    if(sp == null || sp.isEmpty) return Set.empty
+    
+    def splitPartitionGroup(pg:String) = {
+      val split = pg.split(":") // Seems like there should be a more scalar 
way of doing this
+      val part = split(0).toInt
+      val streams = split(1).split(",").toList
+
+      streams.map(s => new SystemStreamPartition(getSystemStreamFromNames(s), 
new Partition(part))).toSet
+    }
+
+    sp.split("/").map(splitPartitionGroup(_)).toSet.flatten
+  } 
+
+  /**
    * Makes sure that an object is not null, and throws a NullPointerException
    * if it is.
    */

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/b694e7ad/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala 
b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
new file mode 100644
index 0000000..9c225d3
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.util
+
+import org.junit.Test
+import org.apache.samza.Partition
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.util.Util._
+import org.junit.Assert._
+
+class TestUtil {
+  @Test
+  def testGetTopicPartitionsForTask() {
+    def partitionSet(max:Int) = (0 until max).map(new Partition(_)).toSet
+
+    val taskCount = 4
+    val streamsMap = Map("kafka.a" -> partitionSet(4), "kafka.b" -> 
partitionSet(18), "timestream.c" -> partitionSet(24))
+    val streamsAndParts = (for(s <- streamsMap.keys;
+                               part <- streamsMap.getOrElse(s, Set.empty))
+    yield new SystemStreamPartition(getSystemStreamFromNames(s), part)).toSet
+
+    for(i <- 0 until taskCount) {
+      val result: Set[SystemStreamPartition] = 
Util.getStreamsAndPartitionsForContainer(i, taskCount, streamsAndParts)
+      // b -> 18 % 4 = 2 therefore first two results should have an extra 
element
+      if(i < 2) {
+        assertEquals(12, result.size)
+      } else {
+        assertEquals(11, result.size)
+      }
+
+      result.foreach(r => assertEquals(i, r.getPartition.getPartitionId % 
taskCount))
+    }
+  }
+  
+  @Test
+  def testCreateStreamPartitionStringBlocksDelimeters() {
+    val partOne = new Partition(1)
+    val toTry = List(':', ',', '/')
+      .map(ch => (ch, Set(new SystemStreamPartition("kafka", "good1", partOne),
+      new SystemStreamPartition("kafka", "bad" + ch, partOne),
+      new SystemStreamPartition("notkafka", "alsogood", partOne))))
+    toTry.foreach(t => try {
+      createStreamPartitionString(t._2)
+      fail("Should have thrown an exception")
+    } catch {
+      case iae:IllegalArgumentException =>
+        val expected = "SystemStreamPartition [partition=Partition 
[partition=1], system" +
+          "=kafka, stream=bad" + t._1 + "] contains illegal character " + t._1
+        assertEquals(expected, iae.getMessage)
+    } )
+  }
+
+  @Test
+  def testCreateStreamPartitionStringRoundTrip() {
+    val getPartitions = {
+      // Build a heavily skewed set of partitions.
+      def partitionSet(max:Int) = (0 until max).map(new Partition(_)).toSet
+      val system = "all-same-system."
+      val lotsOfParts = Map(system + "topic-with-many-parts-a" -> 
partitionSet(128),
+        system + "topic-with-many-parts-b" -> partitionSet(128), system + 
"topic-with-many-parts-c" -> partitionSet(64))
+      val fewParts = ('c' to 'z').map(l => system + l.toString -> 
partitionSet(4)).toMap
+      val streamsMap = (lotsOfParts ++ fewParts)
+      (for(s <- streamsMap.keys;
+           part <- streamsMap.getOrElse(s, Set.empty)) yield new 
SystemStreamPartition(getSystemStreamFromNames(s), part)).toSet
+    }
+
+    val streamsAndParts = getStreamsAndPartitionsForContainer(0, 4, 
getPartitions)
+    println(streamsAndParts)
+    val asString = createStreamPartitionString(streamsAndParts)
+    println(asString)
+    val backToStreamsAndParts = createStreamPartitionsFromString(asString)
+
+    assertEquals(streamsAndParts, backToStreamsAndParts)
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/b694e7ad/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
index bc94f6a..2197b01 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
@@ -73,7 +73,9 @@ class KafkaCheckpointManagerFactory extends 
CheckpointManagerFactory with Loggin
       .getOrElse(throw new SamzaException("No broker list defined in config 
for %s." format systemName))
     val metadataStore = new ClientUtilTopicMetadataStore(brokersListString, 
clientId)
     val stateTopic = getTopic(jobName, jobId)
-    val totalPartitions = Util.getMaxInputStreamPartitions(config).size
+    
+    // This is a reasonably expensive operation and the TaskInstance already 
knows the answer. Should use that info.
+    val totalPartitions = 
Util.getInputStreamPartitions(config).map(_.getPartition).toSet.size
 
     new KafkaCheckpointManager(
       clientId,

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/b694e7ad/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
 
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
index 9f4db17..7a63a2b 100644
--- 
a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
+++ 
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
@@ -21,7 +21,6 @@ package org.apache.samza.job.yarn
 import org.apache.hadoop.yarn.api.records.ContainerStatus
 import org.apache.hadoop.yarn.api.records.Container
 import org.apache.samza.config.Config
-import org.apache.samza.Partition
 import grizzled.slf4j.Logging
 import org.apache.samza.config.YarnConfig.Config2Yarn
 import org.apache.samza.config.YarnConfig
@@ -29,21 +28,15 @@ import org.apache.samza.job.CommandBuilder
 import org.apache.hadoop.yarn.api.ApplicationConstants
 import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.fs.Path
-import org.apache.samza.task.TaskContext
-import org.apache.samza.config.SystemConfig.Config2System
-import org.apache.samza.config.StreamConfig.Config2Stream
 import org.apache.samza.config.TaskConfig.Config2Task
-import scala.collection.JavaConversions._
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus
 import org.apache.samza.util.Util
 import scala.collection.JavaConversions._
-import org.apache.samza.SamzaException
 import org.apache.hadoop.yarn.client.api.AMRMClient
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
 import org.apache.hadoop.yarn.api.records.Priority
 import org.apache.hadoop.yarn.api.records.Resource
 import org.apache.hadoop.yarn.util.Records
-import org.apache.hadoop.security.token.Token
 import org.apache.hadoop.yarn.api.records.LocalResource
 import org.apache.hadoop.yarn.util.ConverterUtils
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext
@@ -51,17 +44,11 @@ import 
org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.api.records.LocalResourceType
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility
-import org.apache.hadoop.yarn.ipc.YarnRPC
-import org.apache.hadoop.yarn.security.ContainerTokenIdentifier
-import org.apache.hadoop.io.Text
-import org.apache.hadoop.net.NetUtils
 import java.util.Collections
-import java.security.PrivilegedAction
 import org.apache.samza.job.ShellCommandBuilder
 import org.apache.hadoop.io.DataOutputBuffer
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier
 import java.nio.ByteBuffer
-import org.apache.hadoop.yarn.client.api.NMClient
 import org.apache.hadoop.yarn.client.api.impl.NMClientImpl
 
 object SamzaAppMasterTaskManager {
@@ -69,10 +56,6 @@ object SamzaAppMasterTaskManager {
   val DEFAULT_CPU_CORES = 1
   val DEFAULT_CONTAINER_RETRY_COUNT = 8
   val DEFAULT_CONTAINER_RETRY_WINDOW_MS = 300000
-
-  def getPartitionsForTask(taskId: Int, taskCount: Int, partitions: 
Set[Partition]) = {
-    partitions.filter(_.getPartitionId % taskCount == taskId).toSet
-  }
 }
 
 case class TaskFailure(val count: Int, val lastFailure: Long)
@@ -93,7 +76,7 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: 
Config, state: SamzaA
       1
   }
 
-  val partitions = Util.getMaxInputStreamPartitions(config)
+  val allSystemStreamPartitions = Util.getInputStreamPartitions(config)
   var taskFailures = Map[Int, TaskFailure]()
   var tooManyFailedContainers = false
   var containerManager: NMClientImpl = null
@@ -126,14 +109,13 @@ class SamzaAppMasterTaskManager(clock: () => Long, 
config: Config, state: SamzaA
     state.unclaimedTasks.headOption match {
       case Some(taskId) => {
         info("Got available task id (%d) for container: %s" format (taskId, 
container))
-        val partitionsForTask = getPartitionsForTask(taskId, state.taskCount, 
partitions)
-        info("Claimed partitions %s for task ID %s" format (partitionsForTask, 
taskId))
+        val streamsAndPartitionsForTask = 
Util.getStreamsAndPartitionsForContainer(taskId, state.taskCount, 
allSystemStreamPartitions)
+        info("Claimed partitions %s for container ID %s" format 
(allSystemStreamPartitions, taskId))
         val cmdBuilderClassName = 
config.getCommandClass.getOrElse(classOf[ShellCommandBuilder].getName)
         val cmdBuilder = 
Class.forName(cmdBuilderClassName).newInstance.asInstanceOf[CommandBuilder]
           .setConfig(config)
           .setName("samza-container-%s" format taskId)
-          .setPartitions(partitionsForTask)
-          .setTotalPartitions(partitions.size)
+          .setStreamPartitions(streamsAndPartitionsForTask)
         val command = cmdBuilder.buildCommand
         info("Task ID %s using command %s" format (taskId, command))
         val env = cmdBuilder.buildEnvironment.map { case (k, v) => (k, 
Util.envVarEscape(v)) }
@@ -150,7 +132,7 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: 
Config, state: SamzaA
         state.neededContainers -= 1
         state.runningTasks += taskId -> container
         state.unclaimedTasks -= taskId
-        state.taskPartitions += taskId -> partitionsForTask
+        state.taskPartitions += taskId -> 
streamsAndPartitionsForTask.map(_.getPartition).toSet
 
         info("Claimed task ID %s for container %s on node %s 
(http://%s/node/containerlogs/%s)." format (taskId, containerIdStr, 
container.getNodeId.getHost, container.getNodeHttpAddress, containerIdStr))
 
@@ -200,7 +182,7 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: 
Config, state: SamzaA
 
         state.releasedContainers += 1
 
-        // If this container was assigned some partitions (a taskId), then 
+        // If this container was assigned some partitions (a containerId), 
then 
         // clean up, and request a new container for the tasks. This only 
         // should happen if the container was 'lost' due to node failure, not 
         // if the AM released the container.

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/b694e7ad/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
 
b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
index ee3ffef..a7b5564 100644
--- 
a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
+++ 
b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
@@ -20,29 +20,25 @@
 package org.apache.samza.job.yarn
 import org.junit.Assert._
 import org.junit.Test
-import scala.collection.JavaConversions._
 import org.apache.samza.config.Config
 import org.apache.samza.config.MapConfig
-import org.apache.samza.Partition
+import org.apache.samza.{Partition, SamzaException}
 import org.apache.hadoop.yarn.api.records._
 import org.apache.hadoop.yarn.util.ConverterUtils
 import scala.collection.JavaConversions._
 import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.conf.Configuration
 import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse
-import org.apache.hadoop.yarn.client.api.AMRMClient
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
 import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl
-import org.apache.hadoop.service._
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.yarn.api.records.NodeReport
 import TestSamzaAppMasterTaskManager._
-import org.apache.samza.system.SystemAdmin
-import org.apache.samza.system.SystemFactory
+import org.apache.samza.system.{SystemStreamPartition, SystemAdmin, 
SystemFactory}
 import org.apache.samza.metrics.MetricsRegistry
-import org.apache.samza.SamzaException
+import org.apache.samza.util.Util._
+import org.apache.samza.util.Util
 
 object TestSamzaAppMasterTaskManager {
   def getContainer(containerId: ContainerId) = new Container {
@@ -133,6 +129,7 @@ object TestSamzaAppMasterTaskManager {
 }
 
 class TestSamzaAppMasterTaskManager {
+  
   val config = new MapConfig(Map[String, String](
     "yarn.container.count" -> "1",
     "systems.test-system.samza.factory" -> 
"org.apache.samza.job.yarn.MockSystemFactory",
@@ -379,34 +376,34 @@ class TestSamzaAppMasterTaskManager {
 
   @Test
   def testPartitionsShouldWorkWithMoreTasksThanPartitions {
-    val onePartition = Set(new Partition(0))
-    assert(SamzaAppMasterTaskManager.getPartitionsForTask(0, 2, 
onePartition).equals(Set(new Partition(0))))
-    assert(SamzaAppMasterTaskManager.getPartitionsForTask(1, 2, 
onePartition).equals(Set()))
+    val onePartition = Set(new SystemStreamPartition("system", "stream", new 
Partition(0)))
+    assert(Util.getStreamsAndPartitionsForContainer(0, 2, 
onePartition).equals(Set(new SystemStreamPartition("system", "stream", new 
Partition(0)))))
+    assert(Util.getStreamsAndPartitionsForContainer(1, 2, 
onePartition).equals(Set()))
   }
 
   @Test
   def testPartitionsShouldWorkWithMorePartitionsThanTasks {
-    val fivePartitions = (0 until 5).map(new Partition(_)).toSet
-    assert(SamzaAppMasterTaskManager.getPartitionsForTask(0, 2, 
fivePartitions).equals(Set(new Partition(0), new Partition(2), new 
Partition(4))))
-    assert(SamzaAppMasterTaskManager.getPartitionsForTask(1, 2, 
fivePartitions).equals(Set(new Partition(1), new Partition(3))))
+    val fivePartitions = (0 until 5).map(p => new 
SystemStreamPartition("system", "stream", new Partition(p))).toSet
+    assert(Util.getStreamsAndPartitionsForContainer(0, 2, 
fivePartitions).equals(Set(new SystemStreamPartition("system", "stream", new 
Partition(0)), new SystemStreamPartition("system", "stream", new Partition(2)), 
new SystemStreamPartition("system", "stream", new Partition(4)))))
+    assert(Util.getStreamsAndPartitionsForContainer(1, 2, 
fivePartitions).equals(Set(new SystemStreamPartition("system", "stream", new 
Partition(1)), new SystemStreamPartition("system", "stream", new 
Partition(3)))))
   }
 
   @Test
   def testPartitionsShouldWorkWithTwelvePartitionsAndFiveContainers {
-    val fivePartitions = (0 until 12).map(new Partition(_)).toSet
-    assert(SamzaAppMasterTaskManager.getPartitionsForTask(0, 5, 
fivePartitions).equals(Set(new Partition(0), new Partition(5), new 
Partition(10))))
-    assert(SamzaAppMasterTaskManager.getPartitionsForTask(1, 5, 
fivePartitions).equals(Set(new Partition(1), new Partition(6), new 
Partition(11))))
-    assert(SamzaAppMasterTaskManager.getPartitionsForTask(2, 5, 
fivePartitions).equals(Set(new Partition(2), new Partition(7))))
-    assert(SamzaAppMasterTaskManager.getPartitionsForTask(3, 5, 
fivePartitions).equals(Set(new Partition(3), new Partition(8))))
-    assert(SamzaAppMasterTaskManager.getPartitionsForTask(4, 5, 
fivePartitions).equals(Set(new Partition(4), new Partition(9))))
+    val fivePartitions = (0 until 12).map(p => new 
SystemStreamPartition("system", "stream", new Partition(p))).toSet
+    assert(Util.getStreamsAndPartitionsForContainer(0, 5, 
fivePartitions).equals(Set(new SystemStreamPartition("system", "stream", new 
Partition(0)), new SystemStreamPartition("system", "stream", new Partition(5)), 
new SystemStreamPartition("system", "stream", new Partition(10)))))
+    assert(Util.getStreamsAndPartitionsForContainer(1, 5, 
fivePartitions).equals(Set(new SystemStreamPartition("system", "stream", new 
Partition(1)), new SystemStreamPartition("system", "stream", new Partition(6)), 
new SystemStreamPartition("system", "stream", new Partition(11)))))
+    assert(Util.getStreamsAndPartitionsForContainer(2, 5, 
fivePartitions).equals(Set(new SystemStreamPartition("system", "stream", new 
Partition(2)), new SystemStreamPartition("system", "stream", new 
Partition(7)))))
+    assert(Util.getStreamsAndPartitionsForContainer(3, 5, 
fivePartitions).equals(Set(new SystemStreamPartition("system", "stream", new 
Partition(3)), new SystemStreamPartition("system", "stream", new 
Partition(8)))))
+    assert(Util.getStreamsAndPartitionsForContainer(4, 5, 
fivePartitions).equals(Set(new SystemStreamPartition("system", "stream", new 
Partition(4)), new SystemStreamPartition("system", "stream", new 
Partition(9)))))
   }
 
   @Test
   def testPartitionsShouldWorkWithEqualPartitionsAndTasks {
-    val twoPartitions = (0 until 2).map(new Partition(_)).toSet
-    assert(SamzaAppMasterTaskManager.getPartitionsForTask(0, 2, 
twoPartitions).equals(Set(new Partition(0))))
-    assert(SamzaAppMasterTaskManager.getPartitionsForTask(1, 2, 
twoPartitions).equals(Set(new Partition(1))))
-    assert(SamzaAppMasterTaskManager.getPartitionsForTask(0, 1, Set(new 
Partition(0))).equals(Set(new Partition(0))))
+    val twoPartitions = (0 until 2).map(p => new 
SystemStreamPartition("system", "stream", new Partition(p))).toSet
+    assert(Util.getStreamsAndPartitionsForContainer(0, 2, 
twoPartitions).equals(Set(new SystemStreamPartition("system", "stream", new 
Partition(0)))))
+    assert(Util.getStreamsAndPartitionsForContainer(1, 2, 
twoPartitions).equals(Set(new SystemStreamPartition("system", "stream", new 
Partition(1)))))
+    assert(Util.getStreamsAndPartitionsForContainer(0, 1, Set(new 
SystemStreamPartition("system", "stream", new Partition(0)))).equals(Set(new 
SystemStreamPartition("system", "stream", new Partition(0)))))
   }
 
   val clock = () => System.currentTimeMillis

Reply via email to