Repository: incubator-samza Updated Branches: refs/heads/master 7cecf0aef -> da79b6f92
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala b/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala index 520f784..d10dc38 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala @@ -79,13 +79,14 @@ class ApplicationMasterRestServlet(config: Config, state: SamzaAppMasterState, r state.runningTasks.values.foreach(c => { val containerIdStr = c.id.toString val containerMap = new HashMap[String, Object] + val taskId = state.runningTasks.filter { case (_, container) => container.id.toString.equals(containerIdStr) }.keys.head - var partitions = new java.util.ArrayList(state.taskPartitions.get(taskId).get) + val taskNames = new java.util.ArrayList(state.taskToTaskNames.get(taskId).get.toList) containerMap.put("yarn-address", c.nodeHttpAddress) containerMap.put("start-time", c.startTime.toString) containerMap.put("up-time", c.upTime.toString) - containerMap.put("partitions", partitions) + containerMap.put("task-names", taskNames) containerMap.put("task-id", taskId.toString) containers.put(containerIdStr, containerMap) }) http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala index f1139f5..685620f 100644 --- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala +++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala @@ -19,8 +19,6 @@ package org.apache.samza.job.yarn -import scala.annotation.elidable -import scala.annotation.elidable.ASSERTION import scala.collection.JavaConversions._ import org.apache.hadoop.conf.Configuration @@ -41,7 +39,6 @@ import org.apache.samza.system.SystemFactory import org.apache.samza.system.SystemStreamPartition import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin import org.apache.samza.util.Util -import org.junit.Assert._ import org.junit.Test import TestSamzaAppMasterTaskManager._ @@ -230,7 +227,7 @@ class TestSamzaAppMasterTaskManager { taskManager.onContainerAllocated(getContainer(container2)) assert(state.neededContainers == 0) assert(state.runningTasks.size == 1) - assert(state.taskPartitions.size == 1) + assert(state.taskToTaskNames.size == 1) assert(state.unclaimedTasks.size == 0) assert(containersRequested == 1) assert(containersStarted == 1) @@ -239,7 +236,7 @@ class TestSamzaAppMasterTaskManager { taskManager.onContainerAllocated(getContainer(container3)) assert(state.neededContainers == 0) assert(state.runningTasks.size == 1) - assert(state.taskPartitions.size == 1) + assert(state.taskToTaskNames.size == 1) assert(state.unclaimedTasks.size == 0) assert(amClient.getClient.requests.size == 1) assert(amClient.getClient.getRelease.size == 1) @@ -255,7 +252,7 @@ class TestSamzaAppMasterTaskManager { assert(taskManager.shouldShutdown == false) assert(state.neededContainers == 0) assert(state.runningTasks.size == 1) - assert(state.taskPartitions.size == 1) + assert(state.taskToTaskNames.size == 1) assert(state.unclaimedTasks.size == 0) assert(amClient.getClient.requests.size == 0) assert(amClient.getClient.getRelease.size == 0) @@ -293,13 +290,13 @@ class TestSamzaAppMasterTaskManager { taskManager.onContainerAllocated(getContainer(container2)) assert(state.neededContainers == 1) assert(state.runningTasks.size == 1) - assert(state.taskPartitions.size == 1) + assert(state.taskToTaskNames.size == 1) assert(state.unclaimedTasks.size == 1) assert(containersStarted == 1) taskManager.onContainerAllocated(getContainer(container3)) assert(state.neededContainers == 0) assert(state.runningTasks.size == 2) - assert(state.taskPartitions.size == 2) + assert(state.taskToTaskNames.size == 2) assert(state.unclaimedTasks.size == 0) assert(containersStarted == 2) @@ -307,7 +304,7 @@ class TestSamzaAppMasterTaskManager { taskManager.onContainerCompleted(getContainerStatus(container2, 0, "")) assert(state.neededContainers == 0) assert(state.runningTasks.size == 1) - assert(state.taskPartitions.size == 1) + assert(state.taskToTaskNames.size == 1) assert(state.unclaimedTasks.size == 0) assert(state.completedTasks == 1) @@ -315,7 +312,7 @@ class TestSamzaAppMasterTaskManager { taskManager.onContainerCompleted(getContainerStatus(container3, 1, "expected failure here")) assert(state.neededContainers == 1) assert(state.runningTasks.size == 0) - assert(state.taskPartitions.size == 0) + assert(state.taskToTaskNames.size == 0) assert(state.unclaimedTasks.size == 1) assert(state.completedTasks == 1) assert(taskManager.shouldShutdown == false) @@ -324,7 +321,7 @@ class TestSamzaAppMasterTaskManager { taskManager.onContainerAllocated(getContainer(container3)) assert(state.neededContainers == 0) assert(state.runningTasks.size == 1) - assert(state.taskPartitions.size == 1) + assert(state.taskToTaskNames.size == 1) assert(state.unclaimedTasks.size == 0) assert(containersStarted == 3) @@ -332,7 +329,7 @@ class TestSamzaAppMasterTaskManager { taskManager.onContainerCompleted(getContainerStatus(container3, 0, "")) assert(state.neededContainers == 0) assert(state.runningTasks.size == 0) - assert(state.taskPartitions.size == 0) + assert(state.taskToTaskNames.size == 0) assert(state.unclaimedTasks.size == 0) assert(state.completedTasks == 2) assert(taskManager.shouldShutdown == true) @@ -364,19 +361,19 @@ class TestSamzaAppMasterTaskManager { assert(amClient.getClient.getRelease.size == 0) assert(state.neededContainers == 1) assert(state.runningTasks.size == 0) - assert(state.taskPartitions.size == 0) + assert(state.taskToTaskNames.size == 0) assert(state.unclaimedTasks.size == 1) taskManager.onContainerAllocated(getContainer(container2)) assert(state.neededContainers == 0) assert(state.runningTasks.size == 1) - assert(state.taskPartitions.size == 1) + assert(state.taskToTaskNames.size == 1) assert(state.unclaimedTasks.size == 0) assert(containersRequested == 1) assert(containersStarted == 1) taskManager.onContainerAllocated(getContainer(container3)) assert(state.neededContainers == 0) assert(state.runningTasks.size == 1) - assert(state.taskPartitions.size == 1) + assert(state.taskToTaskNames.size == 1) assert(state.unclaimedTasks.size == 0) assert(containersRequested == 1) assert(containersStarted == 1) @@ -385,38 +382,6 @@ class TestSamzaAppMasterTaskManager { assert(amClient.getClient.getRelease.head.equals(container3)) } - @Test - def testPartitionsShouldWorkWithMoreTasksThanPartitions { - val onePartition = Set(new SystemStreamPartition("system", "stream", new Partition(0))) - assertEquals(Util.getStreamsAndPartitionsForContainer(0, 2, onePartition), Set(new SystemStreamPartition("system", "stream", new Partition(0)))) - assertEquals(Util.getStreamsAndPartitionsForContainer(1, 2, onePartition), Set()) - } - - @Test - def testPartitionsShouldWorkWithMorePartitionsThanTasks { - val fivePartitions = (0 until 5).map(p => new SystemStreamPartition("system", "stream", new Partition(p))).toSet - assertEquals(Util.getStreamsAndPartitionsForContainer(0, 2, fivePartitions), Set(new SystemStreamPartition("system", "stream", new Partition(0)), new SystemStreamPartition("system", "stream", new Partition(2)), new SystemStreamPartition("system", "stream", new Partition(4)))) - assertEquals(Util.getStreamsAndPartitionsForContainer(1, 2, fivePartitions), Set(new SystemStreamPartition("system", "stream", new Partition(1)), new SystemStreamPartition("system", "stream", new Partition(3)))) - } - - @Test - def testPartitionsShouldWorkWithTwelvePartitionsAndFiveContainers { - val fivePartitions = (0 until 12).map(p => new SystemStreamPartition("system", "stream", new Partition(p))).toSet - assertEquals(Util.getStreamsAndPartitionsForContainer(0, 5, fivePartitions), Set(new SystemStreamPartition("system", "stream", new Partition(0)), new SystemStreamPartition("system", "stream", new Partition(5)), new SystemStreamPartition("system", "stream", new Partition(10)))) - assertEquals(Util.getStreamsAndPartitionsForContainer(1, 5, fivePartitions), Set(new SystemStreamPartition("system", "stream", new Partition(1)), new SystemStreamPartition("system", "stream", new Partition(6)), new SystemStreamPartition("system", "stream", new Partition(11)))) - assertEquals(Util.getStreamsAndPartitionsForContainer(2, 5, fivePartitions), Set(new SystemStreamPartition("system", "stream", new Partition(2)), new SystemStreamPartition("system", "stream", new Partition(7)))) - assertEquals(Util.getStreamsAndPartitionsForContainer(3, 5, fivePartitions), Set(new SystemStreamPartition("system", "stream", new Partition(3)), new SystemStreamPartition("system", "stream", new Partition(8)))) - assertEquals(Util.getStreamsAndPartitionsForContainer(4, 5, fivePartitions), Set(new SystemStreamPartition("system", "stream", new Partition(4)), new SystemStreamPartition("system", "stream", new Partition(9)))) - } - - @Test - def testPartitionsShouldWorkWithEqualPartitionsAndTasks { - val twoPartitions = (0 until 2).map(p => new SystemStreamPartition("system", "stream", new Partition(p))).toSet - assertEquals(Util.getStreamsAndPartitionsForContainer(0, 2, twoPartitions), Set(new SystemStreamPartition("system", "stream", new Partition(0)))) - assertEquals(Util.getStreamsAndPartitionsForContainer(1, 2, twoPartitions), Set(new SystemStreamPartition("system", "stream", new Partition(1)))) - assertEquals(Util.getStreamsAndPartitionsForContainer(0, 1, Set(new SystemStreamPartition("system", "stream", new Partition(0)))), Set(new SystemStreamPartition("system", "stream", new Partition(0)))) - } - val clock = () => System.currentTimeMillis }
