Repository: incubator-samza
Updated Branches:
  refs/heads/master 7cecf0aef -> da79b6f92


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
 
b/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
index 520f784..d10dc38 100644
--- 
a/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
+++ 
b/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
@@ -79,13 +79,14 @@ class ApplicationMasterRestServlet(config: Config, state: 
SamzaAppMasterState, r
     state.runningTasks.values.foreach(c => {
       val containerIdStr = c.id.toString
       val containerMap = new HashMap[String, Object]
+
       val taskId = state.runningTasks.filter { case (_, container) => 
container.id.toString.equals(containerIdStr) }.keys.head
-      var partitions = new 
java.util.ArrayList(state.taskPartitions.get(taskId).get)
+      val taskNames = new 
java.util.ArrayList(state.taskToTaskNames.get(taskId).get.toList)
 
       containerMap.put("yarn-address", c.nodeHttpAddress)
       containerMap.put("start-time", c.startTime.toString)
       containerMap.put("up-time", c.upTime.toString)
-      containerMap.put("partitions", partitions)
+      containerMap.put("task-names", taskNames)
       containerMap.put("task-id", taskId.toString)
       containers.put(containerIdStr, containerMap)
     })

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
 
b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
index f1139f5..685620f 100644
--- 
a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
+++ 
b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
@@ -19,8 +19,6 @@
 
 package org.apache.samza.job.yarn
 
-import scala.annotation.elidable
-import scala.annotation.elidable.ASSERTION
 import scala.collection.JavaConversions._
 
 import org.apache.hadoop.conf.Configuration
@@ -41,7 +39,6 @@ import org.apache.samza.system.SystemFactory
 import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin
 import org.apache.samza.util.Util
-import org.junit.Assert._
 import org.junit.Test
 
 import TestSamzaAppMasterTaskManager._
@@ -230,7 +227,7 @@ class TestSamzaAppMasterTaskManager {
     taskManager.onContainerAllocated(getContainer(container2))
     assert(state.neededContainers == 0)
     assert(state.runningTasks.size == 1)
-    assert(state.taskPartitions.size == 1)
+    assert(state.taskToTaskNames.size == 1)
     assert(state.unclaimedTasks.size == 0)
     assert(containersRequested == 1)
     assert(containersStarted == 1)
@@ -239,7 +236,7 @@ class TestSamzaAppMasterTaskManager {
     taskManager.onContainerAllocated(getContainer(container3))
     assert(state.neededContainers == 0)
     assert(state.runningTasks.size == 1)
-    assert(state.taskPartitions.size == 1)
+    assert(state.taskToTaskNames.size == 1)
     assert(state.unclaimedTasks.size == 0)
     assert(amClient.getClient.requests.size == 1)
     assert(amClient.getClient.getRelease.size == 1)
@@ -255,7 +252,7 @@ class TestSamzaAppMasterTaskManager {
     assert(taskManager.shouldShutdown == false)
     assert(state.neededContainers == 0)
     assert(state.runningTasks.size == 1)
-    assert(state.taskPartitions.size == 1)
+    assert(state.taskToTaskNames.size == 1)
     assert(state.unclaimedTasks.size == 0)
     assert(amClient.getClient.requests.size == 0)
     assert(amClient.getClient.getRelease.size == 0)
@@ -293,13 +290,13 @@ class TestSamzaAppMasterTaskManager {
     taskManager.onContainerAllocated(getContainer(container2))
     assert(state.neededContainers == 1)
     assert(state.runningTasks.size == 1)
-    assert(state.taskPartitions.size == 1)
+    assert(state.taskToTaskNames.size == 1)
     assert(state.unclaimedTasks.size == 1)
     assert(containersStarted == 1)
     taskManager.onContainerAllocated(getContainer(container3))
     assert(state.neededContainers == 0)
     assert(state.runningTasks.size == 2)
-    assert(state.taskPartitions.size == 2)
+    assert(state.taskToTaskNames.size == 2)
     assert(state.unclaimedTasks.size == 0)
     assert(containersStarted == 2)
 
@@ -307,7 +304,7 @@ class TestSamzaAppMasterTaskManager {
     taskManager.onContainerCompleted(getContainerStatus(container2, 0, ""))
     assert(state.neededContainers == 0)
     assert(state.runningTasks.size == 1)
-    assert(state.taskPartitions.size == 1)
+    assert(state.taskToTaskNames.size == 1)
     assert(state.unclaimedTasks.size == 0)
     assert(state.completedTasks == 1)
 
@@ -315,7 +312,7 @@ class TestSamzaAppMasterTaskManager {
     taskManager.onContainerCompleted(getContainerStatus(container3, 1, 
"expected failure here"))
     assert(state.neededContainers == 1)
     assert(state.runningTasks.size == 0)
-    assert(state.taskPartitions.size == 0)
+    assert(state.taskToTaskNames.size == 0)
     assert(state.unclaimedTasks.size == 1)
     assert(state.completedTasks == 1)
     assert(taskManager.shouldShutdown == false)
@@ -324,7 +321,7 @@ class TestSamzaAppMasterTaskManager {
     taskManager.onContainerAllocated(getContainer(container3))
     assert(state.neededContainers == 0)
     assert(state.runningTasks.size == 1)
-    assert(state.taskPartitions.size == 1)
+    assert(state.taskToTaskNames.size == 1)
     assert(state.unclaimedTasks.size == 0)
     assert(containersStarted == 3)
 
@@ -332,7 +329,7 @@ class TestSamzaAppMasterTaskManager {
     taskManager.onContainerCompleted(getContainerStatus(container3, 0, ""))
     assert(state.neededContainers == 0)
     assert(state.runningTasks.size == 0)
-    assert(state.taskPartitions.size == 0)
+    assert(state.taskToTaskNames.size == 0)
     assert(state.unclaimedTasks.size == 0)
     assert(state.completedTasks == 2)
     assert(taskManager.shouldShutdown == true)
@@ -364,19 +361,19 @@ class TestSamzaAppMasterTaskManager {
     assert(amClient.getClient.getRelease.size == 0)
     assert(state.neededContainers == 1)
     assert(state.runningTasks.size == 0)
-    assert(state.taskPartitions.size == 0)
+    assert(state.taskToTaskNames.size == 0)
     assert(state.unclaimedTasks.size == 1)
     taskManager.onContainerAllocated(getContainer(container2))
     assert(state.neededContainers == 0)
     assert(state.runningTasks.size == 1)
-    assert(state.taskPartitions.size == 1)
+    assert(state.taskToTaskNames.size == 1)
     assert(state.unclaimedTasks.size == 0)
     assert(containersRequested == 1)
     assert(containersStarted == 1)
     taskManager.onContainerAllocated(getContainer(container3))
     assert(state.neededContainers == 0)
     assert(state.runningTasks.size == 1)
-    assert(state.taskPartitions.size == 1)
+    assert(state.taskToTaskNames.size == 1)
     assert(state.unclaimedTasks.size == 0)
     assert(containersRequested == 1)
     assert(containersStarted == 1)
@@ -385,38 +382,6 @@ class TestSamzaAppMasterTaskManager {
     assert(amClient.getClient.getRelease.head.equals(container3))
   }
 
-  @Test
-  def testPartitionsShouldWorkWithMoreTasksThanPartitions {
-    val onePartition = Set(new SystemStreamPartition("system", "stream", new 
Partition(0)))
-    assertEquals(Util.getStreamsAndPartitionsForContainer(0, 2, onePartition), 
Set(new SystemStreamPartition("system", "stream", new Partition(0))))
-    assertEquals(Util.getStreamsAndPartitionsForContainer(1, 2, onePartition), 
Set())
-  }
-
-  @Test
-  def testPartitionsShouldWorkWithMorePartitionsThanTasks {
-    val fivePartitions = (0 until 5).map(p => new 
SystemStreamPartition("system", "stream", new Partition(p))).toSet
-    assertEquals(Util.getStreamsAndPartitionsForContainer(0, 2, 
fivePartitions), Set(new SystemStreamPartition("system", "stream", new 
Partition(0)), new SystemStreamPartition("system", "stream", new Partition(2)), 
new SystemStreamPartition("system", "stream", new Partition(4))))
-    assertEquals(Util.getStreamsAndPartitionsForContainer(1, 2, 
fivePartitions), Set(new SystemStreamPartition("system", "stream", new 
Partition(1)), new SystemStreamPartition("system", "stream", new Partition(3))))
-  }
-
-  @Test
-  def testPartitionsShouldWorkWithTwelvePartitionsAndFiveContainers {
-    val fivePartitions = (0 until 12).map(p => new 
SystemStreamPartition("system", "stream", new Partition(p))).toSet
-    assertEquals(Util.getStreamsAndPartitionsForContainer(0, 5, 
fivePartitions), Set(new SystemStreamPartition("system", "stream", new 
Partition(0)), new SystemStreamPartition("system", "stream", new Partition(5)), 
new SystemStreamPartition("system", "stream", new Partition(10))))
-    assertEquals(Util.getStreamsAndPartitionsForContainer(1, 5, 
fivePartitions), Set(new SystemStreamPartition("system", "stream", new 
Partition(1)), new SystemStreamPartition("system", "stream", new Partition(6)), 
new SystemStreamPartition("system", "stream", new Partition(11))))
-    assertEquals(Util.getStreamsAndPartitionsForContainer(2, 5, 
fivePartitions), Set(new SystemStreamPartition("system", "stream", new 
Partition(2)), new SystemStreamPartition("system", "stream", new Partition(7))))
-    assertEquals(Util.getStreamsAndPartitionsForContainer(3, 5, 
fivePartitions), Set(new SystemStreamPartition("system", "stream", new 
Partition(3)), new SystemStreamPartition("system", "stream", new Partition(8))))
-    assertEquals(Util.getStreamsAndPartitionsForContainer(4, 5, 
fivePartitions), Set(new SystemStreamPartition("system", "stream", new 
Partition(4)), new SystemStreamPartition("system", "stream", new Partition(9))))
-  }
-
-  @Test
-  def testPartitionsShouldWorkWithEqualPartitionsAndTasks {
-    val twoPartitions = (0 until 2).map(p => new 
SystemStreamPartition("system", "stream", new Partition(p))).toSet
-    assertEquals(Util.getStreamsAndPartitionsForContainer(0, 2, 
twoPartitions), Set(new SystemStreamPartition("system", "stream", new 
Partition(0))))
-    assertEquals(Util.getStreamsAndPartitionsForContainer(1, 2, 
twoPartitions), Set(new SystemStreamPartition("system", "stream", new 
Partition(1))))
-    assertEquals(Util.getStreamsAndPartitionsForContainer(0, 1, Set(new 
SystemStreamPartition("system", "stream", new Partition(0)))), Set(new 
SystemStreamPartition("system", "stream", new Partition(0))))
-  }
-
   val clock = () => System.currentTimeMillis
 }
 

Reply via email to