fix #1966 make Partitioner API Java compatible
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/25aacb29 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/25aacb29 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/25aacb29 Branch: refs/heads/master Commit: 25aacb29c73e0fccca2cfbdc4b13aae9204b2aa4 Parents: 9feb159 Author: huafengw <[email protected]> Authored: Tue Feb 16 15:42:25 2016 +0800 Committer: huafengw <[email protected]> Committed: Wed Feb 17 15:25:53 2016 +0800 ---------------------------------------------------------------------- .../partitioner/BroadcastPartitioner.scala | 11 +++++++-- .../io/gearpump/partitioner/Partitioner.scala | 4 +-- .../streaming/examples/stock/QueryServer.scala | 26 ++++++++++---------- .../examples/transport/QueryServer.scala | 18 +++++++------- .../storm/partitioner/StormPartitioner.scala | 6 ++--- .../storm/topology/GearpumpTuple.scala | 2 +- .../experiments/storm/util/Grouper.scala | 25 ++++++++++--------- .../storm/util/StormOutputCollector.scala | 10 ++++---- .../partitioner/StormPartitionerSpec.scala | 6 ++--- .../storm/util/StormOutputCollectorSpec.scala | 10 ++++---- .../io/gearpump/services/MasterService.scala | 22 +++++++---------- .../streaming/appmaster/TaskManagerSpec.scala | 18 ++++---------- 12 files changed, 77 insertions(+), 81 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/25aacb29/core/src/main/scala/io/gearpump/partitioner/BroadcastPartitioner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/partitioner/BroadcastPartitioner.scala b/core/src/main/scala/io/gearpump/partitioner/BroadcastPartitioner.scala index 07ce399..dba02ee 100644 --- a/core/src/main/scala/io/gearpump/partitioner/BroadcastPartitioner.scala +++ b/core/src/main/scala/io/gearpump/partitioner/BroadcastPartitioner.scala @@ -21,7 +21,14 @@ package io.gearpump.partitioner import io.gearpump.Message class BroadcastPartitioner extends MulticastPartitioner { - override def getPartitions(msg: Message, partitionNum: Int, currentPartitionId: Int): List[Int] = { - (0 until partitionNum).toList + private var lastPartitionNum = -1 + private var partitions = Array.empty[Int] + + override def getPartitions(msg: Message, partitionNum: Int, currentPartitionId: Int): Array[Int] = { + if (partitionNum != lastPartitionNum) { + partitions = (0 until partitionNum).toArray + lastPartitionNum = partitionNum + } + partitions } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/25aacb29/core/src/main/scala/io/gearpump/partitioner/Partitioner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/partitioner/Partitioner.scala b/core/src/main/scala/io/gearpump/partitioner/Partitioner.scala index 2478d3a..6285bb7 100644 --- a/core/src/main/scala/io/gearpump/partitioner/Partitioner.scala +++ b/core/src/main/scala/io/gearpump/partitioner/Partitioner.scala @@ -41,9 +41,9 @@ trait UnicastPartitioner extends Partitioner { } trait MulticastPartitioner extends Partitioner { - def getPartitions(msg: Message, partitionNum: Int, currentPartitionId: Int): List[Int] + def getPartitions(msg: Message, partitionNum: Int, currentPartitionId: Int): Array[Int] - def getPartitions(msg: Message, partitionNum: Int): List[Int] = { + def getPartitions(msg: Message, partitionNum: Int): Array[Int] = { getPartitions(msg, partitionNum, Partitioner.UNKNOWN_PARTITION_ID) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/25aacb29/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/QueryServer.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/QueryServer.scala b/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/QueryServer.scala index a705000..aa8541a 100644 --- a/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/QueryServer.scala +++ b/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/QueryServer.scala @@ -21,29 +21,29 @@ package io.gearpump.streaming.examples.stock import java.util.concurrent.TimeUnit -import akka.actor.{Props, Actor} -import akka.io.IO -import io.gearpump.streaming.ProcessorId -import io.gearpump.streaming.appmaster.{ProcessorSummary, StreamAppMasterSummary, AppMaster} -import AppMaster.{TaskActorRef, LookupTaskActorRef} import akka.actor.Actor._ -import io.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId} +import akka.actor.{Actor, Props} +import akka.io.IO +import akka.pattern.ask import io.gearpump.Message import io.gearpump.cluster.MasterToAppMaster.AppMasterDataDetailRequest import io.gearpump.cluster.UserConfig -import QueryServer.WebServer -import akka.pattern.ask +import io.gearpump.streaming.ProcessorId +import io.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, TaskActorRef} +import io.gearpump.streaming.appmaster.{AppMaster, ProcessorSummary, StreamAppMasterSummary} +import io.gearpump.streaming.examples.stock.QueryServer.WebServer +import io.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId} import spray.can.Http -import spray.http.{StatusCodes} -import spray.routing.HttpService -import upickle.default.{read, write} +import spray.http.StatusCodes import spray.json._ +import spray.routing.HttpService +import upickle.default.write -import scala.concurrent.{Future, ExecutionContext} +import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Success} class QueryServer(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf){ - import taskContext.{appMaster, appId} + import taskContext.{appId, appMaster} import ExecutionContext.Implicits.global http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/25aacb29/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/QueryServer.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/QueryServer.scala b/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/QueryServer.scala index ce22648..5f91883 100644 --- a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/QueryServer.scala +++ b/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/QueryServer.scala @@ -19,31 +19,31 @@ package io.gearpump.streaming.examples.transport import java.util.concurrent.TimeUnit -import akka.actor.{Props, Actor} import akka.actor.Actor._ +import akka.actor.{Actor, Props} import akka.io.IO import akka.pattern.ask -import io.gearpump.streaming.{ProcessorId, StreamApplication, ProcessorDescription, DAG} -import io.gearpump.streaming.appmaster.AppMaster -import io.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId} import io.gearpump.Message import io.gearpump.cluster.UserConfig -import QueryServer.{GetAllRecords, WebServer} import io.gearpump.partitioner.PartitionerDescription -import AppMaster.{TaskActorRef, LookupTaskActorRef} +import io.gearpump.streaming.appmaster.AppMaster +import io.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, TaskActorRef} +import io.gearpump.streaming.examples.transport.QueryServer.{GetAllRecords, WebServer} +import io.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId} +import io.gearpump.streaming.{DAG, ProcessorDescription, ProcessorId, StreamApplication} import io.gearpump.util.Graph import spray.can.Http import spray.http.StatusCodes -import spray.routing.HttpService -import upickle.default.{read, write} import spray.json._ +import spray.routing.HttpService +import upickle.default.write import scala.concurrent.Future import scala.util.{Failure, Success} class QueryServer(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf){ - import taskContext.{appMaster, appId} import system.dispatcher + import taskContext.appMaster var inspector: (ProcessorId, ProcessorDescription) = null implicit val timeOut = akka.util.Timeout(3, TimeUnit.SECONDS) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/25aacb29/experiments/storm/src/main/scala/io/gearpump/experiments/storm/partitioner/StormPartitioner.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/partitioner/StormPartitioner.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/partitioner/StormPartitioner.scala index d2071ae..04ccb91 100644 --- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/partitioner/StormPartitioner.scala +++ b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/partitioner/StormPartitioner.scala @@ -21,7 +21,7 @@ package io.gearpump.experiments.storm.partitioner import io.gearpump.Message import io.gearpump.experiments.storm.topology.GearpumpTuple import io.gearpump.experiments.storm.util.StormOutputCollector -import io.gearpump.partitioner.{MulticastPartitioner, Partitioner} +import io.gearpump.partitioner.{Partitioner, MulticastPartitioner} /** * this is a partitioner bound to a target Storm component @@ -38,9 +38,9 @@ import io.gearpump.partitioner.{MulticastPartitioner, Partitioner} */ private[storm] class StormPartitioner(target: String) extends MulticastPartitioner { - override def getPartitions(msg: Message, partitionNum: Int, currentPartitionId: Int): List[Int] = { + override def getPartitions(msg: Message, partitionNum: Int, currentPartitionId: Int): Array[Int] = { val stormTuple = msg.msg.asInstanceOf[GearpumpTuple] - stormTuple.targetPartitions.getOrElse(target, List(Partitioner.UNKNOWN_PARTITION_ID)) + stormTuple.targetPartitions.getOrElse(target, Array(Partitioner.UNKNOWN_PARTITION_ID)) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/25aacb29/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpTuple.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpTuple.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpTuple.scala index a52c9c7..7662f36 100644 --- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpTuple.scala +++ b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpTuple.scala @@ -34,7 +34,7 @@ private[storm] class GearpumpTuple( val values: JList[AnyRef], val sourceTaskId: Integer, val sourceStreamId: String, - @transient val targetPartitions: Map[String, List[Int]]) extends Serializable { + @transient val targetPartitions: Map[String, Array[Int]]) extends Serializable { /** * creates a Storm [[Tuple]] to be passed to a Storm component * this is needed for each incoming message http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/25aacb29/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/Grouper.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/Grouper.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/Grouper.scala index 1ac5f71..d727b7a 100644 --- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/Grouper.scala +++ b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/Grouper.scala @@ -38,14 +38,14 @@ sealed trait Grouper { * @param values storm tuple values * @return a list of gearpump partitions */ - def getPartitions(taskId: Int, values: JList[AnyRef]): List[Int] + def getPartitions(taskId: Int, values: JList[AnyRef]): Array[Int] } /** * GlobalGrouper always returns partition 0 */ class GlobalGrouper extends Grouper { - override def getPartitions(taskId: Int, values: JList[AnyRef]): List[Int] = List(0) + override def getPartitions(taskId: Int, values: JList[AnyRef]): Array[Int] = Array(0) } /** @@ -55,9 +55,9 @@ class GlobalGrouper extends Grouper { class NoneGrouper(numTasks: Int) extends Grouper { private val random = new Random - override def getPartitions(taskId: Int, values: JList[AnyRef]): List[Int] = { + override def getPartitions(taskId: Int, values: JList[AnyRef]): Array[Int] = { val partition = StormUtil.mod(random.nextInt, numTasks) - List(partition) + Array(partition) } } @@ -71,7 +71,7 @@ class ShuffleGrouper(numTasks: Int) extends Grouper { private var index = -1 private var partitions = List.empty[Int] - override def getPartitions(taskId: Int, values: JList[AnyRef]): List[Int] = { + override def getPartitions(taskId: Int, values: JList[AnyRef]): Array[Int] = { index += 1 if (partitions.isEmpty) { partitions = 0.until(numTasks).toList @@ -80,7 +80,7 @@ class ShuffleGrouper(numTasks: Int) extends Grouper { index = 0 partitions = random.shuffle(partitions) } - List(partitions(index)) + Array(partitions(index)) } } @@ -92,10 +92,10 @@ class ShuffleGrouper(numTasks: Int) extends Grouper { */ class FieldsGrouper(outFields: Fields, groupFields: Fields, numTasks: Int) extends Grouper { - override def getPartitions(taskId: Int, values: JList[AnyRef]): List[Int] = { + override def getPartitions(taskId: Int, values: JList[AnyRef]): Array[Int] = { val hash = outFields.select(groupFields, values).hashCode() val partition = StormUtil.mod(hash, numTasks) - List(partition) + Array(partition) } } @@ -104,9 +104,10 @@ class FieldsGrouper(outFields: Fields, groupFields: Fields, numTasks: Int) exten * @param numTasks number of target tasks */ class AllGrouper(numTasks: Int) extends Grouper { + val partitions = (0 until numTasks).toArray - override def getPartitions(taskId: Int, values: JList[AnyRef]): List[Int] = { - (0 until numTasks).toList + override def getPartitions(taskId: Int, values: JList[AnyRef]): Array[Int] = { + partitions } } @@ -120,8 +121,8 @@ class CustomGrouper(grouping: CustomStreamGrouping) extends Grouper { grouping.prepare(topologyContext, globalStreamId, targetTasks) } - override def getPartitions(taskId: Int, values: JList[AnyRef]): List[Int] = { - grouping.chooseTasks(taskId, values).map(StormUtil.stormTaskIdToGearpump(_).index).toList + override def getPartitions(taskId: Int, values: JList[AnyRef]): Array[Int] = { + grouping.chooseTasks(taskId, values).map(StormUtil.stormTaskIdToGearpump(_).index).toArray } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/25aacb29/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormOutputCollector.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormOutputCollector.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormOutputCollector.scala index 1abc246..c1df63b 100644 --- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormOutputCollector.scala +++ b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormOutputCollector.scala @@ -66,12 +66,12 @@ object StormOutputCollector { targets: JMap[String, JMap[String, Grouping]], streamGroupers: Map[String, Grouper], componentToProcessorId: Map[String, ProcessorId], - values: JList[AnyRef]): (Map[String, List[Int]], JList[Integer]) ={ + values: JList[AnyRef]): (Map[String, Array[Int]], JList[Integer]) ={ val ret: JList[Integer] = new JArrayList[Integer](targets.size) @annotation.tailrec def getRecur(iter: JIterator[String], - accum: Map[String, List[Int]]): Map[String, List[Int]] = { + accum: Map[String, Array[Int]]): Map[String, Array[Int]] = { if (iter.hasNext) { val target = iter.next val grouper = streamGroupers(streamId) @@ -85,7 +85,7 @@ object StormOutputCollector { accum } } - val targetPartitions = getRecur(targets.get(streamId).keySet().iterator, Map.empty[String, List[Int]]) + val targetPartitions = getRecur(targets.get(streamId).keySet().iterator, Map.empty[String, Array[Int]]) (targetPartitions, ret) } @@ -156,7 +156,7 @@ class StormOutputCollector( stormTaskId: Int, taskToComponent: JMap[Integer, String], targets: JMap[String, JMap[String, Grouping]], - getTargetPartitionsFn: (String, JList[AnyRef]) => (Map[String, List[Int]], JList[Integer]), + getTargetPartitionsFn: (String, JList[AnyRef]) => (Map[String, Array[Int]], JList[Integer]), val taskContext: TaskContext, private var timestamp: TimeStamp) { import io.gearpump.experiments.storm.util.StormOutputCollector._ @@ -196,7 +196,7 @@ class StormOutputCollector( if (targets.containsKey(streamId)) { val target = taskToComponent.get(id) val partition = stormTaskIdToGearpump(id).index - val targetPartitions = Map(target -> List(partition)) + val targetPartitions = Map(target -> Array(partition)) val tuple = new GearpumpTuple(values, stormTaskId, streamId, targetPartitions) taskContext.output(Message(tuple, timestamp)) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/25aacb29/experiments/storm/src/test/scala/io/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala index ef66937..200807c 100644 --- a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala +++ b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala @@ -32,17 +32,17 @@ class StormPartitionerSpec extends PropSpec with PropertyChecks with Matchers { property("StormPartitioner should get partitions directed by message and target") { val idGen = Gen.chooseNum[Int](0, Int.MaxValue) val componentsGen = Gen.listOf[String](Gen.alphaStr).map(_.distinct).suchThat(_.size > 1) - val partitionsGen = Gen.listOf[Int](idGen).suchThat(_.nonEmpty).map(_.distinct.sorted) + val partitionsGen = Gen.listOf[Int](idGen).suchThat(_.nonEmpty).map(_.distinct.sorted.toArray) val tupleFactoryGen = for { values <- Gen.listOf[String](Gen.alphaStr).map(_.asJava.asInstanceOf[JList[AnyRef]]) sourceTaskId <- idGen sourceStreamId <- Gen.alphaStr - } yield (targetPartitions: Map[String, List[Int]]) => { + } yield (targetPartitions: Map[String, Array[Int]]) => { new GearpumpTuple(values, new Integer(sourceTaskId), sourceStreamId, targetPartitions) } forAll(tupleFactoryGen, idGen, componentsGen, partitionsGen) { - (tupleFactory: Map[String, List[Int]] => GearpumpTuple, id: Int, components: List[String], partitions: List[Int]) => + (tupleFactory: Map[String, Array[Int]] => GearpumpTuple, id: Int, components: List[String], partitions: Array[Int]) => val currentPartitionId = id val targetPartitions = components.init.map(c => (c, partitions)).toMap val tuple = tupleFactory(targetPartitions) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/25aacb29/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala index 6f16eb1..c75e92c 100644 --- a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala +++ b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala @@ -43,8 +43,8 @@ class StormOutputCollectorSpec extends PropSpec with PropertyChecks with Matcher (timestamp: TimeStamp, streamId: String, values: JList[AnyRef]) => val targets = mock[JMap[String, JMap[String, Grouping]]] val taskToComponent = mock[JMap[Integer, String]] - val getTargetPartitionsFn = mock[(String, JList[AnyRef]) => (Map[String, List[Int]], JList[Integer])] - val targetPartitions = mock[Map[String, List[Int]]] + val getTargetPartitionsFn = mock[(String, JList[AnyRef]) => (Map[String, Array[Int]], JList[Integer])] + val targetPartitions = mock[Map[String, Array[Int]]] val targetStormTaskIds = mock[JList[Integer]] when(getTargetPartitionsFn(streamId, values)).thenReturn((targetPartitions, targetStormTaskIds)) val taskContext = MockUtil.mockTaskContext @@ -75,8 +75,8 @@ class StormOutputCollectorSpec extends PropSpec with PropertyChecks with Matcher val targets = mock[JMap[String, JMap[String, Grouping]]] val taskToComponent = mock[JMap[Integer, String]] when(taskToComponent.get(id)).thenReturn(target) - val getTargetPartitionsFn = mock[(String, JList[AnyRef]) => (Map[String, List[Int]], JList[Integer])] - val targetPartitions = mock[Map[String, List[Int]]] + val getTargetPartitionsFn = mock[(String, JList[AnyRef]) => (Map[String, Array[Int]], JList[Integer])] + val targetPartitions = mock[Map[String, Array[Int]]] val targetStormTaskIds = mock[JList[Integer]] when(getTargetPartitionsFn(streamId, values)).thenReturn((targetPartitions, targetStormTaskIds)) val taskContext = MockUtil.mockTaskContext @@ -89,7 +89,7 @@ class StormOutputCollectorSpec extends PropSpec with PropertyChecks with Matcher when(targets.containsKey(streamId)).thenReturn(true) stormOutputCollector.setTimestamp(timestamp) stormOutputCollector.emitDirect(id, streamId, values) - val partitions = List(StormUtil.stormTaskIdToGearpump(id).index) + val partitions = Array(StormUtil.stormTaskIdToGearpump(id).index) verify(taskContext, times(1)).output(MockUtil.argMatch[Message]({ case Message(tuple: GearpumpTuple, t) => val expected = new GearpumpTuple(values, stormTaskId, streamId, Map(target -> partitions)) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/25aacb29/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala b/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala index aa90726..839c0ae 100644 --- a/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala +++ b/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala @@ -23,32 +23,28 @@ import java.io.{File, IOException} import akka.actor.{ActorRef, ActorSystem} import akka.http.scaladsl.server.Directives._ -import akka.http.scaladsl.server.Route import akka.http.scaladsl.server.directives.ParameterDirectives.ParamMagnet import akka.http.scaladsl.unmarshalling.Unmarshaller._ -import akka.stream.{Materializer, ActorMaterializer} -import com.typesafe.config.{ConfigRenderOptions, Config} +import akka.stream.Materializer +import com.typesafe.config.Config import io.gearpump.cluster.AppMasterToMaster.{GetAllWorkers, GetMasterData, GetWorkerData, MasterData, WorkerData} -import io.gearpump.cluster.ClientToMaster.{ReadOption, QueryHistoryMetrics, QueryMasterConfig} +import io.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, QueryMasterConfig, ReadOption} import io.gearpump.cluster.MasterToAppMaster.{AppMastersData, AppMastersDataRequest, WorkerList} import io.gearpump.cluster.MasterToClient.{HistoryMetrics, MasterConfig, SubmitApplicationResultValue} -import io.gearpump.cluster.{ClusterConfig, UserConfig} import io.gearpump.cluster.client.ClientContext -import io.gearpump.cluster.main.AppSubmitter import io.gearpump.cluster.worker.WorkerSummary +import io.gearpump.cluster.{ClusterConfig, UserConfig} import io.gearpump.jarstore.JarStoreService +import io.gearpump.services.MasterService.{BuiltinPartitioners, SubmitApplicationRequest} import io.gearpump.partitioner.{PartitionerByClassName, PartitionerDescription} import io.gearpump.streaming.{ProcessorDescription, ProcessorId, StreamApplication} -import io.gearpump.util.ActorUtil.{askActor, _} -import io.gearpump.util.Constants._ -import io.gearpump.util.FileDirective._ -import io.gearpump.util.{Graph, Constants, Util, FileUtils} import io.gearpump.util.ActorUtil._ -import io.gearpump.services.MasterService.{SubmitApplicationRequest, BuiltinPartitioners} +import io.gearpump.util.FileDirective._ +import io.gearpump.util.{Constants, Graph, Util} import scala.collection.JavaConversions._ -import scala.concurrent.{ExecutionContext, Future} -import scala.util.{Failure, Success, Try} +import scala.concurrent.Future +import scala.util.{Failure, Success} class MasterService(val master: ActorRef, val jarStore: JarStoreService, override val system: ActorSystem) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/25aacb29/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskManagerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskManagerSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskManagerSpec.scala index 4e4e96b..621455d 100644 --- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskManagerSpec.scala +++ b/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskManagerSpec.scala @@ -20,34 +20,26 @@ package io.gearpump.streaming.appmaster import akka.actor.{ActorRef, ActorSystem, Props} import akka.testkit.TestProbe -import io.gearpump.streaming.AppMasterToExecutor.{StartAllTasks, TaskLocationsReceived, StartDynamicDag, TaskLocationsReady, TaskRegistered, LaunchTasks} -import io.gearpump.streaming.{ProcessorId, DAG, LifeTime, ProcessorDescription} -import io.gearpump.streaming.ExecutorToAppMaster.RegisterTask -import io.gearpump.streaming.appmaster.AppMaster.AllocateResourceTimeOut -import io.gearpump.streaming.appmaster.ClockService.{ChangeToNewDAGSuccess, ChangeToNewDAG} -import io.gearpump.streaming.appmaster.DagManager.{TaskLaunchData, GetLatestDAG, NewDAGDeployed, WatchChange} -import io.gearpump.streaming.appmaster.ExecutorManager.{ExecutorResourceUsageSummary, SetTaskManager, StartExecutorsTimeOut, StartExecutors} -import io.gearpump.streaming.appmaster.TaskRegistry.TaskLocations -import io.gearpump.streaming.task.{StartTime, TaskContext, GetStartClock, Subscriber} -import io.gearpump.{TimeStamp, Message} import io.gearpump.cluster.MasterToAppMaster.ReplayFromTimestampWindowTrailingEdge import io.gearpump.cluster.scheduler.{Resource, ResourceRequest} import io.gearpump.cluster.{AppJar, TestUtil, UserConfig} import io.gearpump.jarstore.FilePath import io.gearpump.partitioner.{HashPartitioner, Partitioner, PartitionerDescription} +import io.gearpump.streaming.AppMasterToExecutor.{LaunchTasks, StartAllTasks, StartDynamicDag, TaskLocationsReady, TaskLocationsReceived, TaskRegistered} import io.gearpump.streaming.ExecutorToAppMaster.RegisterTask import io.gearpump.streaming.appmaster.AppMaster.AllocateResourceTimeOut import io.gearpump.streaming.appmaster.ClockService.{ChangeToNewDAG, ChangeToNewDAGSuccess} import io.gearpump.streaming.appmaster.DagManager.{GetLatestDAG, GetTaskLaunchData, LatestDAG, NewDAGDeployed, TaskLaunchData, WatchChange} -import io.gearpump.streaming.appmaster.ExecutorManager._ +import io.gearpump.streaming.appmaster.ExecutorManager.{ExecutorResourceUsageSummary, SetTaskManager, StartExecutors, _} import io.gearpump.streaming.appmaster.JarScheduler.ResourceRequestDetail import io.gearpump.streaming.appmaster.TaskManagerSpec.{Env, Task1, Task2} -import io.gearpump.streaming.appmaster.TaskRegistry.TaskLocations import io.gearpump.streaming.executor.Executor.RestartTasks -import io.gearpump.streaming.task._ +import io.gearpump.streaming.task.{StartTime, TaskContext, _} +import io.gearpump.streaming.{DAG, LifeTime, ProcessorDescription, ProcessorId} import io.gearpump.transport.HostPort import io.gearpump.util.Graph import io.gearpump.util.Graph._ +import io.gearpump.{Message, TimeStamp} import org.mockito.Mockito._ import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
