Repository: samza
Updated Branches:
  refs/heads/master 4323003dc -> e801ab259


SAMZA-662: fixed auto-created changelog stream does not have enough partitions 
when container number > 1


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/e801ab25
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/e801ab25
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/e801ab25

Branch: refs/heads/master
Commit: e801ab2598b98ed6fdd3826760a059026c0dfe55
Parents: 4323003
Author: Guozhang Wang <[email protected]>
Authored: Wed May 13 11:06:18 2015 -0700
Committer: Yan Fang <[email protected]>
Committed: Wed May 13 11:06:18 2015 -0700

----------------------------------------------------------------------
 .../apache/samza/container/SamzaContainer.scala    | 17 +++++++++--------
 .../apache/samza/job/local/ThreadJobFactory.scala  |  2 +-
 2 files changed, 10 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/e801ab25/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index e8e830e..bdd491b 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -92,7 +92,7 @@ object SamzaContainer extends Logging {
 
     try {
       jmxServer = newJmxServer()
-      SamzaContainer(containerModel, config).run
+      SamzaContainer(containerModel, jobModel).run
     } finally {
       if (jmxServer != null) {
         jmxServer.stop
@@ -133,7 +133,8 @@ object SamzaContainer extends Logging {
     serde
   }
 
-  def apply(containerModel: ContainerModel, config: Config) = {
+  def apply(containerModel: ContainerModel, jobModel: JobModel) = {
+    val config = jobModel.getConfig
     val containerId = containerModel.getContainerId
     val containerName = "samza-container-%s" format containerId
     val containerPID = Util.getContainerPID
@@ -408,12 +409,12 @@ object SamzaContainer extends Logging {
 
     val containerContext = new SamzaContainerContext(containerId, config, 
taskNames)
 
-    // compute the number of partitions necessary for the change log stream 
creation.
-    // Increment by 1 because partition starts from 0, but we need the 
absolute count,
-    // this value is used for change log topic creation.
-    val maxChangeLogStreamPartitions = containerModel.getTasks.values
-            .max(Ordering.by { task:TaskModel => 
task.getChangelogPartition.getPartitionId })
-            .getChangelogPartition.getPartitionId + 1
+    // Compute the number of change log stream partitions as the maximum 
partition-id
+    // of all total number of tasks of the job; Increment by 1 because 
partition ids
+    // start from 0 while we need the absolute count.
+    val maxChangeLogStreamPartitions = jobModel.getContainers.values.flatMap { 
container: ContainerModel =>
+      container.getTasks.values.map(_.getChangelogPartition.getPartitionId)
+    }.max + 1
 
     val taskInstances: Map[TaskName, TaskInstance] = 
containerModel.getTasks.values.map(taskModel => {
       debug("Setting up task instance: %s" format taskModel)

http://git-wip-us.apache.org/repos/asf/samza/blob/e801ab25/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 
b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
index 60ee36f..3f2f70e 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
@@ -48,7 +48,7 @@ class ThreadJobFactory extends StreamJobFactory with Logging {
 
     try {
       coordinator.start
-      new ThreadJob(SamzaContainer(containerModel, config))
+      new ThreadJob(SamzaContainer(containerModel, coordinator.jobModel))
     } finally {
       coordinator.stop
     }

Reply via email to