Repository: incubator-gearpump Updated Branches: refs/heads/master 9bb9ca5d8 -> 83b36ef76
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala new file mode 100644 index 0000000..dd286de --- /dev/null +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.dsl + +import akka.actor.ActorSystem +import org.apache.gearpump.cluster.TestUtil +import org.apache.gearpump.cluster.client.ClientContext +import org.apache.gearpump.streaming.dsl.plan.OpTranslator.SourceTask +import org.mockito.Mockito.when +import org.scalatest._ +import org.scalatest.mock.MockitoSugar + +import scala.concurrent.Await +import scala.concurrent.duration.Duration +class StreamAppSpec extends FlatSpec with Matchers with BeforeAndAfterAll with MockitoSugar { + + implicit var system: ActorSystem = null + + override def beforeAll(): Unit = { + system = ActorSystem("test", TestUtil.DEFAULT_CONFIG) + } + + override def afterAll(): Unit = { + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) + } + + it should "be able to generate multiple new streams" in { + val context: ClientContext = mock[ClientContext] + when(context.system).thenReturn(system) + + val app = StreamApp("dsl", context) + app.source(List("A"), 1, "") + app.source(List("B"), 1, "") + + assert(app.graph.vertices.size == 2) + } + + it should "plan the dsl to Processsor(TaskDescription) DAG" in { + val context: ClientContext = mock[ClientContext] + when(context.system).thenReturn(system) + + val app = StreamApp("dsl", context) + val parallism = 3 + app.source(List("A", "B", "C"), parallism, "").flatMap(Array(_)).reduce(_ + _) + val task = app.plan.dag.vertices.iterator.next() + assert(task.taskClass == classOf[SourceTask[_, _]].getName) + assert(task.parallelism == parallism) + } + + it should "produce 3 messages" in { + val context: ClientContext = mock[ClientContext] + when(context.system).thenReturn(system) + val app = StreamApp("dsl", context) + val list = List[String]( + "0", + "1", + "2" + ) + val producer = app.source(list, 1, "producer").flatMap(Array(_)).reduce(_ + _) + val task = app.plan.dag.vertices.iterator.next() + /* + val task = app.plan.dag.vertices.iterator.map(desc => { + LOG.info(s"${desc.taskClass}") + }) + val sum = producer.flatMap(msg => { + LOG.info("in flatMap") + assert(msg.msg.isInstanceOf[String]) + val num = msg.msg.asInstanceOf[String].toInt + Array(num) + }).reduce(_+_) + val task = app.plan.dag.vertices.iterator.map(desc => { + LOG.info(s"${desc.taskClass}") + }) + */ + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala new file mode 100644 index 0000000..6bdd8aa --- /dev/null +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.dsl + +import akka.actor._ +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.client.ClientContext +import org.apache.gearpump.cluster.{TestUtil, UserConfig} +import org.apache.gearpump.partitioner.{CoLocationPartitioner, HashPartitioner} +import org.apache.gearpump.streaming.dsl.StreamSpec.Join +import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner +import org.apache.gearpump.streaming.dsl.plan.OpTranslator._ +import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext} +import org.apache.gearpump.util.Graph +import org.apache.gearpump.util.Graph._ +import org.mockito.Mockito.when +import org.scalatest._ +import org.scalatest.mock.MockitoSugar + +import scala.concurrent.Await +import scala.concurrent.duration.Duration +import scala.util.{Either, Left, Right} + +class StreamSpec extends FlatSpec with Matchers with BeforeAndAfterAll with MockitoSugar { + + implicit var system: ActorSystem = null + + override def beforeAll(): Unit = { + system = ActorSystem("test", TestUtil.DEFAULT_CONFIG) + } + + override def afterAll(): Unit = { + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) + } + + it should "translate the DSL to a DAG" in { + val context: ClientContext = mock[ClientContext] + when(context.system).thenReturn(system) + + val app = StreamApp("dsl", context) + + val data = + """ + five four three two one + five four three two + five four three + five four + five + """ + val stream = app.source(data.lines.toList, 1, ""). + flatMap(line => line.split("[\\s]+")).filter(_.nonEmpty). + map(word => (word, 1)). + groupBy(_._1, parallelism = 2). + reduce((left, right) => (left._1, left._2 + right._2)). + map[Either[(String, Int), String]](Left(_)) + + val query = app.source(List("two"), 1, "").map[Either[(String, Int), String]](Right(_)) + stream.merge(query).process[(String, Int)](classOf[Join], 1) + + val appDescription = app.plan + + val dagTopology = appDescription.dag.mapVertex(_.taskClass).mapEdge { (node1, edge, node2) => + edge.partitionerFactory.partitioner.getClass.getName + } + val expectedDagTopology = getExpectedDagTopology + + assert(dagTopology.vertices.toSet.equals(expectedDagTopology.vertices.toSet)) + assert(dagTopology.edges.toSet.equals(expectedDagTopology.edges.toSet)) + } + + private def getExpectedDagTopology: Graph[String, String] = { + val source = classOf[SourceTask[_, _]].getName + val group = classOf[GroupByTask[_, _, _]].getName + val merge = classOf[TransformTask[_, _]].getName + val join = classOf[Join].getName + + val hash = classOf[HashPartitioner].getName + val groupBy = classOf[GroupByPartitioner[_, _]].getName + val colocation = classOf[CoLocationPartitioner].getName + + val expectedDagTopology = Graph( + source ~ groupBy ~> group ~ colocation ~> merge ~ hash ~> join, + source ~ hash ~> merge + ) + expectedDagTopology + } +} + +object StreamSpec { + + class Join(taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) { + + var query: String = null + override def onStart(startTime: StartTime): Unit = {} + + override def onNext(msg: Message): Unit = { + msg.msg match { + case Left(wordCount: (String @unchecked, Int @unchecked)) => + if (query != null && wordCount._1 == query) { + taskContext.output(new Message(wordCount)) + } + + case Right(query: String) => + this.query = query + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala new file mode 100644 index 0000000..fcc646d --- /dev/null +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.dsl.partitioner + +import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} + +import org.apache.gearpump.Message +import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitionerSpec.People + +class GroupByPartitionerSpec extends FlatSpec with Matchers with BeforeAndAfterAll { + it should "use the outpout of groupBy function to do partition" in { + val mark = People("Mark", "male") + val tom = People("Tom", "male") + val michelle = People("Michelle", "female") + + val partitionNum = 10 + val groupBy = new GroupByPartitioner[People, String](_.gender) + assert(groupBy.getPartition(Message(mark), partitionNum) + == groupBy.getPartition(Message(tom), partitionNum)) + + assert(groupBy.getPartition(Message(mark), partitionNum) + != groupBy.getPartition(Message(michelle), partitionNum)) + } +} + +object GroupByPartitionerSpec { + case class People(name: String, gender: String) +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala new file mode 100644 index 0000000..ecc5352 --- /dev/null +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.dsl.plan + +import scala.concurrent.Await +import scala.concurrent.duration.Duration + +import akka.actor.ActorSystem +import org.mockito.ArgumentCaptor +import org.mockito.Matchers._ +import org.mockito.Mockito._ +import org.scalatest._ + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.{TestUtil, UserConfig} +import org.apache.gearpump.streaming.Constants._ +import org.apache.gearpump.streaming.MockUtil +import org.apache.gearpump.streaming.dsl.CollectionDataSource +import org.apache.gearpump.streaming.dsl.plan.OpTranslator._ +import org.apache.gearpump.streaming.task.StartTime + +class OpTranslatorSpec extends FlatSpec with Matchers with BeforeAndAfterAll { + + "andThen" should "chain multiple single input function" in { + val dummy = new DummyInputFunction[String] + val split = new FlatMapFunction[String, String](line => line.split("\\s"), "split") + + val filter = new FlatMapFunction[String, String](word => + if (word.isEmpty) None else Some(word), "filter") + + val map = new FlatMapFunction[String, Int](word => Some(1), "map") + + val sum = new ReduceFunction[Int]({ (left, right) => left + right }, "sum") + + val all = dummy.andThen(split).andThen(filter).andThen(map).andThen(sum) + + assert(all.description == "split.filter.map.sum") + + val data = + """ + five four three two one + five four three two + five four three + five four + five + """ + val count = all.process(data).toList.last + assert(count == 15) + } + + "Source" should "iterate over input source and apply attached operator" in { + + val taskContext = MockUtil.mockTaskContext + + val conf = UserConfig.empty + val data = "one two three".split("\\s") + + // Source with no transformer + val source = new SourceTask[String, String](new CollectionDataSource[String](data), None, + taskContext, conf) + source.onStart(StartTime(0)) + source.onNext(Message("next")) + verify(taskContext, times(1)).output(anyObject()) + + // Source with transformer + val anotherTaskContext = MockUtil.mockTaskContext + val double = new FlatMapFunction[String, String](word => List(word, word), "double") + val another = new SourceTask(new CollectionDataSource[String](data), Some(double), + anotherTaskContext, conf) + another.onStart(StartTime(0)) + another.onNext(Message("next")) + verify(anotherTaskContext, times(2)).output(anyObject()) + } + + "GroupByTask" should "group input by groupBy Function and " + + "apply attached operator for each group" in { + + val data = "1 2 2 3 3 3" + + var map = Map.empty[String, Int] + + val concat = new ReduceFunction[String]({ (left, right) => + left + right + }, "concat") + + implicit val system = ActorSystem("test", TestUtil.DEFAULT_CONFIG) + val config = UserConfig.empty.withValue[SingleInputFunction[String, String]]( + GEARPUMP_STREAMING_OPERATOR, concat) + + val taskContext = MockUtil.mockTaskContext + + val task = new GroupByTask[String, String, String](input => input, taskContext, config) + task.onStart(StartTime(0)) + + val peopleCaptor = ArgumentCaptor.forClass(classOf[Message]) + + data.split("\\s+").foreach { word => + task.onNext(Message(word)) + } + verify(taskContext, times(6)).output(peopleCaptor.capture()) + + import scala.collection.JavaConverters._ + + val values = peopleCaptor.getAllValues().asScala.map(input => input.msg.asInstanceOf[String]) + assert(values.mkString(",") == "1,2,22,3,33,333") + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) + } + + "MergeTask" should "accept two stream and apply the attached operator" in { + + // Source with transformer + val taskContext = MockUtil.mockTaskContext + val conf = UserConfig.empty + val double = new FlatMapFunction[String, String](word => List(word, word), "double") + val task = new TransformTask[String, String](Some(double), taskContext, conf) + task.onStart(StartTime(0)) + + val data = "1 2 2 3 3 3".split("\\s+") + + data.foreach { input => + task.onNext(Message(input)) + } + + verify(taskContext, times(data.length * 2)).output(anyObject()) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/org/apache/gearpump/streaming/executor/ExecutorSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/executor/ExecutorSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/executor/ExecutorSpec.scala new file mode 100644 index 0000000..4c4b37b --- /dev/null +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/executor/ExecutorSpec.scala @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.executor + +import scala.concurrent.Await +import scala.concurrent.duration.Duration + +import akka.actor.{ActorSystem, Props} +import akka.testkit.TestProbe +import org.mockito.Matchers._ +import org.mockito.Mockito.{times, _} +import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} + +import org.apache.gearpump.cluster.appmaster.WorkerInfo +import org.apache.gearpump.cluster.scheduler.Resource +import org.apache.gearpump.cluster.worker.WorkerId +import org.apache.gearpump.cluster.{ExecutorContext, TestUtil, UserConfig} +import org.apache.gearpump.streaming.AppMasterToExecutor._ +import org.apache.gearpump.streaming.ExecutorToAppMaster.RegisterTask +import org.apache.gearpump.streaming.appmaster.TaskRegistry.TaskLocations +import org.apache.gearpump.streaming.executor.TaskLauncherSpec.MockTask +import org.apache.gearpump.streaming.task.{Subscriber, TaskId} +import org.apache.gearpump.streaming.{LifeTime, ProcessorDescription} +import org.apache.gearpump.transport.HostPort + +class ExecutorSpec extends FlatSpec with Matchers with BeforeAndAfterAll { + val appId = 0 + val executorId = 0 + val workerId = WorkerId(0, 0L) + var appMaster: TestProbe = null + implicit var system: ActorSystem = null + val userConf = UserConfig.empty + + override def beforeAll(): Unit = { + system = ActorSystem("TaskLauncherSpec", TestUtil.DEFAULT_CONFIG) + appMaster = TestProbe() + } + + override def afterAll(): Unit = { + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) + } + + it should "call launcher to launch task" in { + val worker = TestProbe() + val workerInfo = WorkerInfo(workerId, worker.ref) + val executorContext = ExecutorContext(executorId, workerInfo, appId, "app", + appMaster.ref, Resource(2)) + val taskLauncher = mock(classOf[ITaskLauncher]) + val executor = system.actorOf(Props(new Executor(executorContext, userConf, taskLauncher))) + val processor = ProcessorDescription(id = 0, taskClass = classOf[MockTask].getName, + parallelism = 2) + val taskIds = List(TaskId(0, 0), TaskId(0, 1)) + val launchTasks = LaunchTasks(taskIds, dagVersion = 0, processor, List.empty[Subscriber]) + + val task = TestProbe() + when(taskLauncher.launch(any(), any(), any(), any(), any())) + .thenReturn(taskIds.map((_, task.ref)).toMap) + + val client = TestProbe() + client.send(executor, launchTasks) + client.expectMsg(TasksLaunched) + + verify(taskLauncher, times(1)).launch(any(), any(), any(), any(), any()) + + executor ! RegisterTask(TaskId(0, 0), executorId, HostPort("localhost:80")) + executor ! RegisterTask(TaskId(0, 1), executorId, HostPort("localhost:80")) + + executor ! TaskRegistered(TaskId(0, 0), 0, 0) + + task.expectMsgType[TaskRegistered] + + executor ! TaskRegistered(TaskId(0, 1), 0, 0) + + task.expectMsgType[TaskRegistered] + + executor ! TaskLocationsReady(TaskLocations(Map.empty), dagVersion = 0) + executor ! StartAllTasks(dagVersion = 0) + + task.expectMsgType[StartTask] + task.expectMsgType[StartTask] + + val changeTasks = ChangeTasks(taskIds, dagVersion = 1, life = LifeTime(0, Long.MaxValue), + List.empty[Subscriber]) + + client.send(executor, changeTasks) + client.expectMsgType[TasksChanged] + + executor ! TaskLocationsReady(TaskLocations(Map.empty), 1) + executor ! StartAllTasks(dagVersion = 1) + + task.expectMsgType[ChangeTask] + task.expectMsgType[ChangeTask] + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/org/apache/gearpump/streaming/executor/TaskArgumentStoreSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/executor/TaskArgumentStoreSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/executor/TaskArgumentStoreSpec.scala new file mode 100644 index 0000000..bd45121 --- /dev/null +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/executor/TaskArgumentStoreSpec.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.executor + +import org.scalatest._ + +import org.apache.gearpump.streaming.executor.Executor.TaskArgumentStore +import org.apache.gearpump.streaming.executor.TaskLauncher.TaskArgument +import org.apache.gearpump.streaming.task.TaskId + +class TaskArgumentStoreSpec extends FlatSpec with Matchers with BeforeAndAfterEach { + it should "retain all history of taskArgument" in { + val version0 = TaskArgument(0, null, null) + val version2 = version0.copy(dagVersion = 2) + val store = new TaskArgumentStore + val task = TaskId(0, 0) + store.add(task, version0) + store.add(task, version2) + + // Should return a version which is same or older than expected version + assert(store.get(dagVersion = 1, task) == Some(version0)) + assert(store.get(dagVersion = 0, task) == Some(version0)) + assert(store.get(dagVersion = 2, task) == Some(version2)) + + store.removeObsoleteVersion() + assert(store.get(dagVersion = 1, task) == None) + assert(store.get(dagVersion = 0, task) == None) + assert(store.get(dagVersion = 2, task) == Some(version2)) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/org/apache/gearpump/streaming/executor/TaskLauncherSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/executor/TaskLauncherSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/executor/TaskLauncherSpec.scala new file mode 100644 index 0000000..074866a --- /dev/null +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/executor/TaskLauncherSpec.scala @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.executor + +import scala.concurrent.Await +import scala.concurrent.duration.Duration + +import akka.actor.{Actor, ActorSystem} +import akka.testkit.TestProbe +import org.scalatest._ + +import org.apache.gearpump.cluster.{TestUtil, UserConfig} +import org.apache.gearpump.serializer.SerializationFramework +import org.apache.gearpump.streaming.ProcessorDescription +import org.apache.gearpump.streaming.executor.TaskLauncher.TaskArgument +import org.apache.gearpump.streaming.executor.TaskLauncherSpec.{MockTask, MockTaskActor} +import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskContextData, TaskId, TaskWrapper} + +class TaskLauncherSpec extends FlatSpec with Matchers with BeforeAndAfterAll { + val appId = 0 + val executorId = 0 + var appMaster: TestProbe = null + implicit var system: ActorSystem = null + val userConf = UserConfig.empty + + override def beforeAll(): Unit = { + system = ActorSystem("TaskLauncherSpec", TestUtil.DEFAULT_CONFIG) + appMaster = TestProbe() + } + + override def afterAll(): Unit = { + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) + } + + it should "able to launch tasks" in { + val launcher = new TaskLauncher(appId, "app", executorId, appMaster.ref, + userConf, classOf[MockTaskActor]) + val taskIds = List(TaskId(0, 0), TaskId(0, 1)) + val processor = ProcessorDescription(id = 0, taskClass = classOf[MockTask].getName, + parallelism = 2) + val argument = TaskArgument(0, processor, null) + + val tasks = launcher.launch(taskIds, argument, system, null, + "gearpump.shared-thread-pool-dispatcher") + tasks.keys.toSet shouldBe taskIds.toSet + } +} + +object TaskLauncherSpec { + class MockTaskActor( + val taskId: TaskId, + val taskContextData : TaskContextData, + userConf : UserConfig, + val task: TaskWrapper, + serializer: SerializationFramework) extends Actor { + def receive: Receive = null + } + + class MockTask(taskContext: TaskContext, userConf: UserConfig) + extends Task(taskContext, userConf) { + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/org/apache/gearpump/streaming/metrics/ProcessorAggregatorSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/metrics/ProcessorAggregatorSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/metrics/ProcessorAggregatorSpec.scala new file mode 100644 index 0000000..5804b00 --- /dev/null +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/metrics/ProcessorAggregatorSpec.scala @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.metrics + +import scala.collection.JavaConverters._ +import scala.util.Random + +import org.scalatest.{FlatSpec, Matchers} + +import org.apache.gearpump.cluster.ClientToMaster.ReadOption +import org.apache.gearpump.cluster.MasterToClient.HistoryMetricsItem +import org.apache.gearpump.metrics.Metrics.{Gauge, Histogram, Meter} +import org.apache.gearpump.streaming.metrics.ProcessorAggregator.{AggregatorFactory, HistogramAggregator, MeterAggregator, MultiLayerMap} +import org.apache.gearpump.streaming.task.TaskId +import org.apache.gearpump.util.HistoryMetricsService.HistoryMetricsConfig + +class ProcessorAggregatorSpec extends FlatSpec with Matchers { + + "MultiLayerMap" should "maintain multiple layers HashMap" in { + val layers = 3 + val map = new MultiLayerMap[String](layers) + + assert(map.get(layer = 0, "key") == null) + + // Illegal, handle safely + assert(map.get(layer = 10, "key") == null) + + map.put(layer = 0, "key", "value") + assert(map.get(layer = 0, "key") == "value") + + map.put(layer = 1, "key2", "value2") + map.put(layer = 2, "key3", "value3") + map.put(layer = 2, "key4", "value4") + + // Illegal, should be ignored + map.put(layer = 4, "key5", "value5") + + assert(map.size == 4) + assert(map.valueIterator.asScala.toSet == Set("value", "value2", "value3", "value4")) + } + + "HistogramAggregator" should "aggregate by calculating average" in { + val aggregator = new HistogramAggregator("processor") + + val a = Histogram("processor.task1", 1, 2, 3, 4, 5, 6) + val b = Histogram("processor.task2", 5, 6, 7, 8, 9, 10) + val expect = Histogram("processor.task2", 3, 4, 5, 6, 7, 8) + + val olderTime = 100 + val newerTime = 200 + + aggregator.aggregate(HistoryMetricsItem(time = newerTime, a)) + aggregator.aggregate(HistoryMetricsItem(time = olderTime, b)) + + val result = aggregator.result + + // Picks old time as aggregated time + assert(result.time == olderTime) + + // Does average + val check = result.value.asInstanceOf[Histogram] + + assert(check.mean - expect.mean < 0.01) + assert(check.stddev - expect.stddev < 0.01) + assert(check.median - expect.median < 0.01) + assert(check.p95 - expect.p95 < 0.01) + assert(check.p99 - expect.p99 < 0.01) + assert(check.p999 - expect.p999 < 0.01) + } + + "MeterAggregator" should "aggregate by summing" in { + val aggregator = new MeterAggregator("processor") + + val a = Meter("processor.task1", count = 1, 1, 3, "s") + val b = Meter("processor.task2", count = 2, 5, 7, "s") + val expect = Meter("processor.task2", count = 3, 6, 10, "s") + + val olderTime = 100 + val newerTime = 200 + + aggregator.aggregate(HistoryMetricsItem(time = newerTime, a)) + aggregator.aggregate(HistoryMetricsItem(time = olderTime, b)) + + val result = aggregator.result + + // Picks old time + assert(result.time == olderTime) + + // Does summing + val check = result.value.asInstanceOf[Meter] + + assert(check.count == expect.count) + assert(check.m1 - expect.m1 < 0.01) + assert(check.meanRate - expect.meanRate < 0.01) + assert(check.rateUnit == expect.rateUnit) + } + + "AggregatorFactory" should "create aggregator" in { + val factory = new AggregatorFactory() + + val a = Meter("processor.task1", count = 1, 1, 3, "s") + val b = Histogram("processor.task1", 1, 2, 3, 4, 5, 6) + + val aggegator1 = factory.create(HistoryMetricsItem(time = 0, a), "name1") + assert(aggegator1.isInstanceOf[MeterAggregator]) + + val aggegator2 = factory.create(HistoryMetricsItem(time = 0, b), "name2") + assert(aggegator2.isInstanceOf[HistogramAggregator]) + } + + "ProcessorAggregator" should "aggregate on different read options" in { + val hours = 2 // Maintains 2 hours history + val seconds = 2 // Maintains 2 seconds recent data + val taskCount = 5 // For each processor + val metricCount = 100 // For each task, have metricCount metrics + val range = new HistoryMetricsConfig(hours, hours / 2 * 3600 * 1000, + seconds, seconds / 2 * 1000) + + val aggregator = new ProcessorAggregator(range) + + def count(value: Int): Int = value + + def inputs(timeRange: Long): List[HistoryMetricsItem] = { + (0 until taskCount).map(TaskId(processorId = 0, _)) + .flatMap(histogram(_, "receiveLatency", timeRange, metricCount)).toList ++ + (0 until taskCount).map(TaskId(processorId = 0, _)) + .flatMap(histogram(_, "processTime", timeRange, metricCount)).toList ++ + (0 until taskCount).map(TaskId(processorId = 1, _)) + .flatMap(histogram(_, "receiveLatency", timeRange, metricCount)).toList ++ + (0 until taskCount).map(TaskId(processorId = 1, _)) + .flatMap(histogram(_, "processTime", timeRange, metricCount)).toList ++ + (0 until taskCount).map(TaskId(processorId = 0, _)) + .flatMap(meter(_, "sendThroughput", timeRange, metricCount)).toList ++ + (0 until taskCount).map(TaskId(processorId = 0, _)) + .flatMap(meter(_, "receiveThroughput", timeRange, metricCount)).toList ++ + (0 until taskCount).map(TaskId(processorId = 1, _)) + .flatMap(meter(_, "sendThroughput", timeRange, metricCount)).toList ++ + (0 until taskCount).map(TaskId(processorId = 1, _)) + .flatMap(meter(_, "receiveThroughput", timeRange, metricCount)).toList + } + + def check(list: List[HistoryMetricsItem], countMap: Map[String, Int]): Boolean = { + val nameCount = list.map(_.value.name).groupBy(key => key).mapValues(_.size).toList.toMap + nameCount sameElements countMap + } + + // Aggregates on processor and meterNames, + val input = inputs(Long.MaxValue) + val readLatest = aggregator.aggregate(ReadOption.ReadLatest, + input.iterator, now = Long.MaxValue) + assert(readLatest.size == 8) // 2 processor * 4 metrics type + assert(check(readLatest, Map( + "app0.processor0:receiveLatency" -> count(1), + "app0.processor0:processTime" -> count(1), + "app0.processor0:sendThroughput" -> count(1), + "app0.processor0:receiveThroughput" -> count(1), + "app0.processor1:receiveLatency" -> count(1), + "app0.processor1:processTime" -> count(1), + "app0.processor1:sendThroughput" -> count(1), + "app0.processor1:receiveThroughput" -> count(1) + ))) + + // Aggregates on processor and meterNames and time range, + val readRecent = aggregator.aggregate(ReadOption.ReadRecent, + inputs(seconds * 1000).iterator, now = seconds * 1000) + assert(readRecent.size == 16) // 2 processor * 4 metrics type * 2 time range + assert(check(readRecent, Map( + "app0.processor0:receiveLatency" -> count(2), + "app0.processor0:processTime" -> count(2), + "app0.processor0:sendThroughput" -> count(2), + "app0.processor0:receiveThroughput" -> count(2), + "app0.processor1:receiveLatency" -> count(2), + "app0.processor1:processTime" -> count(2), + "app0.processor1:sendThroughput" -> count(2), + "app0.processor1:receiveThroughput" -> count(2) + ))) + + // Aggregates on processor and meterNames and time range, + val readHistory = aggregator.aggregate(ReadOption.ReadHistory, + inputs(hours * 3600 * 1000).iterator, now = hours * 3600 * 1000) + assert(readHistory.size == 16) // 2 processor * 4 metrics type * 2 time ranges + assert(check(readHistory, Map( + "app0.processor0:receiveLatency" -> count(2), + "app0.processor0:processTime" -> count(2), + "app0.processor0:sendThroughput" -> count(2), + "app0.processor0:receiveThroughput" -> count(2), + "app0.processor1:receiveLatency" -> count(2), + "app0.processor1:processTime" -> count(2), + "app0.processor1:sendThroughput" -> count(2), + "app0.processor1:receiveThroughput" -> count(2) + ))) + } + + private def histogram( + taskId: TaskId, metricName: String = "latency", timeRange: Long = Long.MaxValue, + repeat: Int = 1): List[HistoryMetricsItem] = { + val random = new Random() + (0 until repeat).map { _ => + new HistoryMetricsItem(Math.abs(random.nextLong() % timeRange), + new Histogram(s"app0.processor${taskId.processorId}.task${taskId.index}:$metricName", + Math.abs(random.nextDouble()), + Math.abs(random.nextDouble()), + Math.abs(random.nextDouble()), + Math.abs(random.nextDouble()), + Math.abs(random.nextDouble()), + Math.abs(random.nextDouble()) + )) + }.toList + } + + private def meter(taskId: TaskId, metricName: String, timeRange: Long, repeat: Int) + : List[HistoryMetricsItem] = { + val random = new Random() + (0 until repeat).map { _ => + new HistoryMetricsItem(Math.abs(random.nextLong() % timeRange), + new Meter(s"app0.processor${taskId.processorId}.task${taskId.index}:$metricName", + Math.abs(random.nextInt()), + Math.abs(random.nextDouble()), + Math.abs(random.nextDouble()), + "event/s") + ) + }.toList + } + + "ProcessorAggregator" should "handle smoothly for unsupported metric type and " + + "error formatted metric name" in { + val invalid = List( + // Unsupported metric type + HistoryMetricsItem(0, new Gauge("app0.processor0.task0:gauge", 100)), + + // Wrong format: should be app0.processor0.task0:throughput + HistoryMetricsItem(0, new Meter("app0.processor0.task0/throughput", 100, 0, 0, "")) + ) + + val valid = histogram(TaskId(0, 0), repeat = 10) + + val aggregator = new ProcessorAggregator(new HistoryMetricsConfig(0, 0, 0, 0)) + val result = aggregator.aggregate(ReadOption.ReadLatest, (valid ++ invalid).toIterator, + now = Long.MaxValue) + + // For one taskId, will only use one data point. + assert(result.size == 1) + assert(result.head.value.name == "app0.processor0:latency") + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/org/apache/gearpump/streaming/metrics/TaskFilterAggregatorSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/metrics/TaskFilterAggregatorSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/metrics/TaskFilterAggregatorSpec.scala new file mode 100644 index 0000000..ee9402b --- /dev/null +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/metrics/TaskFilterAggregatorSpec.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.metrics + +import scala.util.Random + +import org.scalatest.{FlatSpec, Matchers} + +import org.apache.gearpump.cluster.MasterToClient.HistoryMetricsItem +import org.apache.gearpump.metrics.Metrics.{Histogram, Meter} +import org.apache.gearpump.streaming.metrics.TaskFilterAggregator.Options +import org.apache.gearpump.streaming.task.TaskId + +class TaskFilterAggregatorSpec extends FlatSpec with Matchers { + + def metric(taskId: TaskId): HistoryMetricsItem = { + val random = new Random() + new HistoryMetricsItem(Math.abs(random.nextLong()), + new Histogram(s"app0.processor${taskId.processorId}.task${taskId.index}:latency", + 0, 0, 0, 0, 0, 0)) + } + + it should "filter data on processor range, task range combination" in { + val inputs = (0 until 10).flatMap { processor => + (0 until 10).map { task => + metric(TaskId(processor, task)) + } + }.toList + + val globalLimit = 10 + val aggregator = new TaskFilterAggregator(globalLimit) + + // Limit not met, return all matches in this matrix + var options = new Options(limit = 20, startTask = 3, endTask = 6, + startProcessor = 3, endProcessor = 6) + assert(aggregator.aggregate(options, inputs.iterator).size == 9) + + // User limit reached + options = new Options(limit = 3, startTask = 3, endTask = 5, + startProcessor = 3, endProcessor = 5) + assert(aggregator.aggregate(options, inputs.iterator).size == 3) + + // Global limit reached + options = new Options(limit = 20, startTask = 3, endTask = 8, + startProcessor = 3, endProcessor = 8) + assert(aggregator.aggregate(options, inputs.iterator).size == globalLimit) + } + + it should "reject wrong format options" in { + val options = Map(TaskFilterAggregator.StartTask -> "not a number") + assert(Options.parse(options) == null) + } + + it should "skip wrong format metrics" in { + val invalid = List { + // Wrong format: should be app0.processor0.task0:throughput + HistoryMetricsItem(0, new Meter("app0.processor0.task0/throughput", 100, 0, 0, "")) + } + val options = Options.acceptAll + + val aggregator = new TaskFilterAggregator(Int.MaxValue) + assert(aggregator.aggregate(options, invalid.iterator).size == 0) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/org/apache/gearpump/streaming/source/DefaultTimeStampFilterSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DefaultTimeStampFilterSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DefaultTimeStampFilterSpec.scala new file mode 100644 index 0000000..d6fa443 --- /dev/null +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DefaultTimeStampFilterSpec.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.source + +import org.scalacheck.Gen +import org.scalatest.prop.PropertyChecks +import org.scalatest.{Matchers, PropSpec} + +import org.apache.gearpump.{Message, TimeStamp} + +class DefaultTimeStampFilterSpec extends PropSpec with PropertyChecks with Matchers { + property("DefaultTimeStampFilter should filter message against give timestamp") { + val timestampGen = Gen.chooseNum[Long](0L, 1000L) + val messageGen = for { + msg <- Gen.alphaStr + time <- timestampGen + } yield Message(msg, time) + + val filter = new DefaultTimeStampFilter() + + forAll(timestampGen, messageGen) { + (predicate: TimeStamp, message: Message) => + if (message.timestamp >= predicate) { + filter.filter(message, predicate) shouldBe Some(message) + } else { + filter.filter(message, predicate) shouldBe None + } + + filter.filter(null, predicate) shouldBe None + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/CheckpointManagerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/CheckpointManagerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/CheckpointManagerSpec.scala new file mode 100644 index 0000000..9e42e85 --- /dev/null +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/CheckpointManagerSpec.scala @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.state.impl + +import org.mockito.Mockito._ +import org.mockito.{Matchers => MockitoMatchers} +import org.scalacheck.Gen +import org.scalatest.mock.MockitoSugar +import org.scalatest.prop.PropertyChecks +import org.scalatest.{Matchers, PropSpec} + +import org.apache.gearpump.TimeStamp +import org.apache.gearpump.streaming.transaction.api.CheckpointStore + +class CheckpointManagerSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { + + val timestampGen = Gen.chooseNum[Long](0L, 1000L) + val checkpointIntervalGen = Gen.chooseNum[Long](100L, 10000L) + property("CheckpointManager should recover from CheckpointStore") { + forAll(timestampGen, checkpointIntervalGen) { + (timestamp: TimeStamp, checkpointInterval: Long) => + val checkpointStore = mock[CheckpointStore] + val checkpointManager = + new CheckpointManager(checkpointInterval, checkpointStore) + checkpointManager.recover(timestamp) + + verify(checkpointStore).recover(timestamp) + } + } + + property("CheckpointManager should write checkpoint to CheckpointStore") { + val checkpointGen = Gen.alphaStr.map(_.getBytes("UTF-8")) + forAll(timestampGen, checkpointIntervalGen, checkpointGen) { + (timestamp: TimeStamp, checkpointInterval: Long, checkpoint: Array[Byte]) => + val checkpointStore = mock[CheckpointStore] + val checkpointManager = + new CheckpointManager(checkpointInterval, checkpointStore) + checkpointManager.checkpoint(timestamp, checkpoint) + + verify(checkpointStore).persist(timestamp, checkpoint) + } + } + + property("CheckpointManager should close CheckpointStore") { + forAll(checkpointIntervalGen) { + (checkpointInterval: Long) => + val checkpointStore = mock[CheckpointStore] + val checkpointManager = + new CheckpointManager(checkpointInterval, checkpointStore) + checkpointManager.close() + verify(checkpointStore).close() + } + } + + property("CheckpointManager should update checkpoint time according to max message timestamp") { + forAll(timestampGen, checkpointIntervalGen) { + (timestamp: TimeStamp, checkpointInterval: Long) => + val checkpointStore = mock[CheckpointStore] + val checkpointManager = + new CheckpointManager(checkpointInterval, checkpointStore) + checkpointManager.update(timestamp) + checkpointManager.getMaxMessageTime shouldBe timestamp + + val checkpointTime = checkpointManager.getCheckpointTime.get + timestamp should (be < checkpointTime and be >= (checkpointTime - checkpointInterval)) + + checkpointManager.checkpoint(checkpointTime, Array.empty[Byte]) + verify(checkpointStore).persist(MockitoMatchers.eq(checkpointTime), + MockitoMatchers.anyObject[Array[Byte]]()) + checkpointManager.getCheckpointTime shouldBe empty + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/InMemoryCheckpointStoreSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/InMemoryCheckpointStoreSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/InMemoryCheckpointStoreSpec.scala new file mode 100644 index 0000000..8d2f515 --- /dev/null +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/InMemoryCheckpointStoreSpec.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.state.impl + +import org.scalacheck.Gen +import org.scalatest.prop.PropertyChecks +import org.scalatest.{Matchers, PropSpec} + +class InMemoryCheckpointStoreSpec extends PropSpec with PropertyChecks with Matchers { + + property("InMemoryCheckpointStore should provide read / write checkpoint") { + val timestampGen = Gen.chooseNum[Long](1, 1000) + val checkpointGen = Gen.alphaStr.map(_.getBytes("UTF-8")) + forAll(timestampGen, checkpointGen) { (timestamp: Long, checkpoint: Array[Byte]) => + val store = new InMemoryCheckpointStore + store.recover(timestamp) shouldBe None + store.persist(timestamp, checkpoint) + store.recover(timestamp) shouldBe Some(checkpoint) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/NonWindowStateSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/NonWindowStateSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/NonWindowStateSpec.scala new file mode 100644 index 0000000..4cdff95 --- /dev/null +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/NonWindowStateSpec.scala @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.state.impl + +import scala.util.Success + +import org.mockito.Mockito._ +import org.scalacheck.Gen +import org.scalatest.mock.MockitoSugar +import org.scalatest.prop.PropertyChecks +import org.scalatest.{Matchers, PropSpec} + +import org.apache.gearpump.TimeStamp +import org.apache.gearpump.streaming.state.api.{Monoid, Serializer} + +class NonWindowStateSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { + + val longGen = Gen.chooseNum[Long](100L, System.currentTimeMillis()) + + property("NonWindowState should recover checkpointed state at given timestamp") { + forAll(longGen) { + (timestamp: TimeStamp) => + val monoid = mock[Monoid[AnyRef]] + val serializer = mock[Serializer[AnyRef]] + val bytes = Array.empty[Byte] + val checkpoint = mock[AnyRef] + val zero = mock[AnyRef] + when(monoid.zero).thenReturn(zero, zero) + when(monoid.plus(zero, zero)).thenReturn(zero, Nil: _*) + when(monoid.plus(checkpoint, zero)).thenReturn(checkpoint, Nil: _*) + + val state = new NonWindowState[AnyRef](monoid, serializer) + state.left shouldBe zero + state.right shouldBe zero + state.get shouldBe Some(zero) + + when(serializer.deserialize(bytes)).thenReturn(Success(checkpoint)) + state.recover(timestamp, bytes) + + state.left shouldBe checkpoint + state.right shouldBe zero + state.get shouldBe Some(checkpoint) + } + } + + property("NonWindowState checkpoints state") { + forAll(longGen) { + (checkpointTime: TimeStamp) => + val monoid = mock[Monoid[AnyRef]] + val serializer = mock[Serializer[AnyRef]] + + val left = mock[AnyRef] + val right = mock[AnyRef] + val zero = mock[AnyRef] + val plus = mock[AnyRef] + + when(monoid.zero).thenReturn(zero, zero) + when(monoid.plus(zero, zero)).thenReturn(zero, Nil: _*) + + val state = new NonWindowState[AnyRef](monoid, serializer) + state.left shouldBe zero + state.right shouldBe zero + state.get shouldBe Some(zero) + + state.left = left + state.right = right + + when(monoid.zero).thenReturn(zero, Nil: _*) + when(monoid.plus(left, right)).thenReturn(plus, Nil: _*) + when(monoid.plus(plus, zero)).thenReturn(plus, Nil: _*) + state.checkpoint() + + verify(serializer).serialize(left) + state.left shouldBe plus + state.right shouldBe zero + state.get shouldBe Some(plus) + } + } + + property("NonWindowState updates state") { + forAll(longGen) { + (checkpointTime: TimeStamp) => + val monoid = mock[Monoid[AnyRef]] + val serializer = mock[Serializer[AnyRef]] + + val left = mock[AnyRef] + val right = mock[AnyRef] + val zero = mock[AnyRef] + val plus = mock[AnyRef] + + when(monoid.zero).thenReturn(zero, zero) + when(monoid.plus(zero, zero)).thenReturn(zero, Nil: _*) + + val state = new NonWindowState[AnyRef](monoid, serializer) + state.left shouldBe zero + state.right shouldBe zero + state.get shouldBe Some(zero) + + when(monoid.plus(zero, left)).thenReturn(left, Nil: _*) + when(monoid.plus(left, zero)).thenReturn(left, Nil: _*) + state.setNextCheckpointTime(checkpointTime) + state.update(checkpointTime - 1, left) + state.left shouldBe left + state.right shouldBe zero + state.get shouldBe Some(left) + + when(monoid.plus(zero, right)).thenReturn(right, Nil: _*) + when(monoid.plus(left, right)).thenReturn(plus, Nil: _*) + state.setNextCheckpointTime(checkpointTime) + state.update(checkpointTime + 1, right) + state.left shouldBe left + state.right shouldBe right + state.get shouldBe Some(plus) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowSpec.scala new file mode 100644 index 0000000..d9282ae --- /dev/null +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowSpec.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.state.impl + +import org.scalacheck.Gen +import org.scalatest.mock.MockitoSugar +import org.scalatest.prop.PropertyChecks +import org.scalatest.{Matchers, PropSpec} + +import org.apache.gearpump.TimeStamp + +class WindowSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { + + val windowSizeGen = Gen.chooseNum[Long](1L, 1000L) + val windowStepGen = Gen.chooseNum[Long](1L, 1000L) + val timestampGen = Gen.chooseNum[Long](0L, 1000L) + property("Window should only slide when time passes window end") { + forAll(timestampGen, windowSizeGen, windowStepGen) { + (timestamp: TimeStamp, windowSize: Long, windowStep: Long) => + val window = new Window(windowSize, windowStep) + window.shouldSlide shouldBe false + window.update(timestamp) + window.shouldSlide shouldBe timestamp >= windowSize + } + } + + property("Window should slide by one or to given timestamp") { + forAll(timestampGen, windowSizeGen, windowStepGen) { + (timestamp: TimeStamp, windowSize: Long, windowStep: Long) => + val window = new Window(windowSize, windowStep) + window.range shouldBe(0L, windowSize) + + window.slideOneStep() + window.range shouldBe(windowStep, windowSize + windowStep) + + window.slideTo(timestamp) + val (startTime, endTime) = window.range + if (windowStep > windowSize) { + timestamp should (be >= startTime and be < (startTime + windowStep)) + } else { + timestamp should (be >= startTime and be < endTime) + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowStateSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowStateSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowStateSpec.scala new file mode 100644 index 0000000..299a626 --- /dev/null +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowStateSpec.scala @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.state.impl + +import scala.collection.immutable.TreeMap +import scala.util.Success + +import org.mockito.Mockito._ +import org.scalacheck.Gen +import org.scalatest.mock.MockitoSugar +import org.scalatest.prop.PropertyChecks +import org.scalatest.{Matchers, PropSpec} + +import org.apache.gearpump._ +import org.apache.gearpump.streaming.MockUtil +import org.apache.gearpump.streaming.state.api.{Group, Serializer} + +class WindowStateSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { + + val longGen = Gen.chooseNum[Long](100L, 10000L) + + val intervalGen = for { + st <- longGen + et <- Gen.chooseNum[Long](st + 1, 100000L) + } yield Interval(st, et) + + property("WindowState init should recover checkpointed state") { + forAll(intervalGen) { + (interval: Interval) => + val window = mock[Window] + val taskContext = MockUtil.mockTaskContext + val group = mock[Group[AnyRef]] + val serializer = mock[Serializer[TreeMap[Interval, AnyRef]]] + + val timestamp = interval.startTime + val zero = mock[AnyRef] + val bytes = Array.empty[Byte] + val data = mock[AnyRef] + val checkpoint = TreeMap(interval -> data) + when(group.zero).thenReturn(zero, zero) + when(group.plus(zero, data)).thenReturn(data, Nil: _*) + when(group.plus(data, zero)).thenReturn(data, Nil: _*) + when(group.plus(zero, zero)).thenReturn(zero, Nil: _*) + when(serializer.deserialize(bytes)).thenReturn(Success(checkpoint)) + + val state = new WindowState[AnyRef](group, serializer, taskContext, window) + state.left shouldBe zero + state.right shouldBe zero + state.get shouldBe Some(zero) + + state.recover(timestamp, bytes) + + state.left shouldBe data + state.right shouldBe zero + state.get shouldBe Some(data) + state.getIntervalStates(interval.startTime, interval.endTime) shouldBe checkpoint + } + } + + property("WindowState checkpoints") { + forAll(longGen) { (checkpointTime: TimeStamp) => + val window = mock[Window] + val taskContext = MockUtil.mockTaskContext + val group = mock[Group[AnyRef]] + val serializer = mock[Serializer[TreeMap[Interval, AnyRef]]] + + val zero = mock[AnyRef] + val left = mock[AnyRef] + val right = mock[AnyRef] + val plus = mock[AnyRef] + + when(group.zero).thenReturn(zero, zero) + when(group.plus(zero, zero)).thenReturn(zero, Nil: _*) + val state = new WindowState[AnyRef](group, serializer, taskContext, window) + state.left shouldBe zero + state.right shouldBe zero + state.get shouldBe Some(zero) + + val start = checkpointTime - 1 + val end = checkpointTime + 1 + val size = end - start + val step = 1L + + when(window.range).thenReturn((start, end)) + when(window.windowSize).thenReturn(size) + when(window.windowStep).thenReturn(step) + when(group.zero).thenReturn(zero, zero) + when(group.plus(zero, left)).thenReturn(left, Nil: _*) + when(group.plus(zero, right)).thenReturn(right, Nil: _*) + when(group.plus(left, right)).thenReturn(plus, Nil: _*) + + state.left = left + state.updateIntervalStates(start, left, checkpointTime) + state.right = right + state.updateIntervalStates(checkpointTime, right, checkpointTime) + + state.setNextCheckpointTime(checkpointTime) + state.checkpoint() + + state.left shouldBe plus + state.right shouldBe zero + verify(serializer).serialize(TreeMap(Interval(start, checkpointTime) -> left)) + } + } + + property("WindowState updates state") { + forAll(longGen) { (checkpointTime: TimeStamp) => + val window = mock[Window] + val taskContext = MockUtil.mockTaskContext + val group = mock[Group[AnyRef]] + val serializer = mock[Serializer[TreeMap[Interval, AnyRef]]] + + val zero = mock[AnyRef] + val left = mock[AnyRef] + val right = mock[AnyRef] + val plus = mock[AnyRef] + + when(group.zero).thenReturn(zero, zero) + val state = new WindowState[AnyRef](group, serializer, taskContext, window) + + val start = checkpointTime - 1 + val end = checkpointTime + 1 + val size = end - start + val step = 1L + + when(window.range).thenReturn((start, end)) + when(window.windowSize).thenReturn(size) + when(window.windowStep).thenReturn(step) + when(window.shouldSlide).thenReturn(false) + when(group.plus(zero, left)).thenReturn(left, left) + when(group.plus(left, zero)).thenReturn(left, Nil: _*) + when(taskContext.upstreamMinClock).thenReturn(0L) + + // Time < checkpointTime + // Update left in current window + state.setNextCheckpointTime(checkpointTime) + state.update(start, left) + + verify(window).update(0L) + state.left shouldBe left + state.right shouldBe zero + state.get shouldBe Some(left) + state.getIntervalStates(start, end) shouldBe TreeMap(Interval(start, checkpointTime) -> left) + + when(window.range).thenReturn((start, end)) + when(window.windowSize).thenReturn(size) + when(window.windowStep).thenReturn(step) + when(window.shouldSlide).thenReturn(false) + when(group.plus(zero, right)).thenReturn(right, right) + when(group.plus(left, right)).thenReturn(plus, Nil: _*) + when(taskContext.upstreamMinClock).thenReturn(0L) + + // Time >= checkpointTime + // Update right in current window + state.setNextCheckpointTime(checkpointTime) + state.update(checkpointTime, right) + + verify(window, times(2)).update(0L) + state.left shouldBe left + state.right shouldBe right + state.get shouldBe Some(plus) + state.getIntervalStates(start, end) shouldBe + TreeMap(Interval(start, start + step) -> left, Interval(start + step, end) -> right) + + // Slides window forward + when(window.range).thenReturn((start, end), (start + step, end + step)) + when(window.shouldSlide).thenReturn(true) + when(taskContext.upstreamMinClock).thenReturn(checkpointTime) + when(group.minus(left, left)).thenReturn(zero, Nil: _*) + when(group.plus(zero, right)).thenReturn(right, Nil: _*) + when(group.plus(right, right)).thenReturn(plus, Nil: _*) + when(group.plus(zero, plus)).thenReturn(plus, Nil: _*) + + state.setNextCheckpointTime(checkpointTime) + state.update(end, right) + + verify(window).slideOneStep() + verify(window).update(checkpointTime) + state.left shouldBe zero + state.right shouldBe plus + state.get shouldBe Some(plus) + state.getIntervalStates(start, end + step) shouldBe + TreeMap( + Interval(start, start + step) -> left, + Interval(start + step, end) -> right, + Interval(end, end + step) -> right) + } + } + + property("WindowState gets interval for timestamp") { + forAll(longGen, longGen, longGen, longGen) { + (timestamp: TimeStamp, checkpointTime: TimeStamp, windowSize: Long, windowStep: Long) => + val windowManager = new Window(windowSize, windowStep) + val taskContext = MockUtil.mockTaskContext + val group = mock[Group[AnyRef]] + val serializer = mock[Serializer[TreeMap[Interval, AnyRef]]] + + val zero = mock[AnyRef] + when(group.zero).thenReturn(zero, zero) + val state = new WindowState[AnyRef](group, serializer, taskContext, windowManager) + + val interval = state.getInterval(timestamp, checkpointTime) + intervalSpec(interval, timestamp, checkpointTime, windowSize, windowStep) + + val nextTimeStamp = interval.endTime + val nextInterval = state.getInterval(nextTimeStamp, checkpointTime) + intervalSpec(nextInterval, nextTimeStamp, checkpointTime, windowSize, windowStep) + + interval.endTime shouldBe nextInterval.startTime + } + + def intervalSpec(interval: Interval, timestamp: TimeStamp, + checkpointTime: TimeStamp, windowSize: Long, windowStep: Long): Unit = { + interval.startTime should be <= interval.endTime + timestamp / windowStep * windowStep should (be <= interval.startTime) + (timestamp - windowSize) / windowStep * windowStep should (be <= interval.startTime) + (timestamp / windowStep + 1) * windowStep should (be >= interval.endTime) + ((timestamp - windowSize) / windowStep + 1) * windowStep + windowSize should + (be >= interval.endTime) + checkpointTime should (be <= interval.startTime or be >= interval.endTime) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/org/apache/gearpump/streaming/storage/InMemoryAppStoreOnMasterSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/storage/InMemoryAppStoreOnMasterSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/storage/InMemoryAppStoreOnMasterSpec.scala new file mode 100644 index 0000000..71901a4 --- /dev/null +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/storage/InMemoryAppStoreOnMasterSpec.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.storage + +import scala.concurrent.Await +import scala.concurrent.duration._ + +import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpec} + +import org.apache.gearpump.cluster.{MasterHarness, MiniCluster} +import org.apache.gearpump.streaming.StreamingTestUtil +import org.apache.gearpump.util.Constants + +class InMemoryAppStoreOnMasterSpec extends WordSpec with Matchers with BeforeAndAfterAll { + implicit val timeout = Constants.FUTURE_TIMEOUT + implicit val dispatcher = MasterHarness.cachedPool + + "InMemoryAppStoreOnMaster" should { + "save and return the data properly" in { + val appId = 0 + val miniCluster = new MiniCluster + val master = miniCluster.mockMaster + StreamingTestUtil.startAppMaster(miniCluster, appId) + val store = new InMemoryAppStoreOnMaster(appId, master) + + Thread.sleep(500) + + store.put("String_type", "this is a string") + store.put("Int_type", 1024) + store.put("Tuple2_type", ("element1", 1024)) + + val future1 = store.get("String_type").map { value => + value.asInstanceOf[String] should be("this is a string") + } + val future2 = store.get("Int_type").map { value => value.asInstanceOf[Int] should be(1024) } + val future3 = store.get("Tuple2_type").map { value => + value.asInstanceOf[(String, Int)] should be(("element1", 1024)) + } + val future4 = store.get("key").map { value => value.asInstanceOf[Object] should be(null) } + Await.result(future1, 15.seconds) + Await.result(future2, 15.seconds) + Await.result(future3, 15.seconds) + Await.result(future4, 15.seconds) + miniCluster.shutDown + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriberSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriberSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriberSpec.scala new file mode 100644 index 0000000..cfe47eb --- /dev/null +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriberSpec.scala @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.task + +import org.scalatest.{FlatSpec, Matchers} + +import org.apache.gearpump.partitioner.{HashPartitioner, Partitioner} +import org.apache.gearpump.streaming.task.SubscriberSpec.TestTask +import org.apache.gearpump.streaming.{DAG, ProcessorDescription} +import org.apache.gearpump.util.Graph +import org.apache.gearpump.util.Graph._ + +class SubscriberSpec extends FlatSpec with Matchers { + "Subscriber.of" should "return all subscriber for a processor" in { + + val sourceProcessorId = 0 + val task1 = ProcessorDescription(id = sourceProcessorId, taskClass = + classOf[TestTask].getName, parallelism = 1) + val task2 = ProcessorDescription(id = 1, taskClass = classOf[TestTask].getName, parallelism = 1) + val task3 = ProcessorDescription(id = 2, taskClass = classOf[TestTask].getName, parallelism = 1) + val partitioner = Partitioner[HashPartitioner] + val dag = DAG(Graph(task1 ~ partitioner ~> task2, task1 ~ partitioner ~> task3, + task2 ~ partitioner ~> task3)) + + val subscribers = Subscriber.of(sourceProcessorId, dag) + assert(subscribers.size == 2) + + assert(subscribers.toSet == + Set(Subscriber(1, partitioner, task2.parallelism, task2.life), Subscriber(2, partitioner, + task3.parallelism, task3.life))) + } +} + +object SubscriberSpec { + class TestTask +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala new file mode 100644 index 0000000..4afee8b --- /dev/null +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.task + +import java.util.Random + +import org.mockito.Mockito._ +import org.scalatest.mock.MockitoSugar +import org.scalatest.{FlatSpec, Matchers} + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.partitioner.{HashPartitioner, Partitioner} +import org.apache.gearpump.streaming.task.SubscriptionSpec.NextTask +import org.apache.gearpump.streaming.{LifeTime, ProcessorDescription} + +class SubscriptionSpec extends FlatSpec with Matchers with MockitoSugar { + val appId = 0 + val executorId = 0 + val taskId = TaskId(0, 0) + val session = new Random().nextInt() + + val downstreamProcessorId = 1 + val partitioner = Partitioner[HashPartitioner] + + val parallism = 2 + val downstreamProcessor = ProcessorDescription(downstreamProcessorId, classOf[NextTask].getName, + parallism) + val subscriber = Subscriber(downstreamProcessorId, partitioner, downstreamProcessor.parallelism, + downstreamProcessor.life) + + private def prepare: (Subscription, ExpressTransport) = { + val transport = mock[ExpressTransport] + val subscription = new Subscription(appId, executorId, taskId, subscriber, session, transport) + subscription.start() + + val expectedAckRequest = InitialAckRequest(taskId, session) + verify(transport, times(1)).transport(expectedAckRequest, TaskId(1, 0), TaskId(1, 1)) + + (subscription, transport) + } + + it should "not send any more message when its life ends" in { + val (subscription, transport) = prepare + subscription.changeLife(LifeTime(0, 0)) + val count = subscription.sendMessage(Message("some")) + assert(count == 0) + } + + it should "send message and handle ack correctly" in { + val (subscription, transport) = prepare + val msg1 = new Message("1", timestamp = 70) + subscription.sendMessage(msg1) + + verify(transport, times(1)).transport(msg1, TaskId(1, 1)) + assert(subscription.minClock == 70) + + val msg2 = new Message("0", timestamp = 50) + subscription.sendMessage(msg2) + verify(transport, times(1)).transport(msg2, TaskId(1, 0)) + + // minClock has been set to smaller one + assert(subscription.minClock == 50) + + val initialMinClock = subscription.minClock + + // Acks initial AckRequest(0) + subscription.receiveAck(Ack(TaskId(1, 1), 0, 0, session)) + subscription.receiveAck(Ack(TaskId(1, 0), 0, 0, session)) + + // Sends 100 messages + 100 until 200 foreach { clock => + subscription.sendMessage(Message("1", clock)) + subscription.sendMessage(Message("2", clock)) + } + + // Ack not received, minClock no change + assert(subscription.minClock == initialMinClock) + + subscription.receiveAck(Ack(TaskId(1, 1), 100, 100, session)) + subscription.receiveAck(Ack(TaskId(1, 0), 100, 100, session)) + + // Ack received, minClock changed + assert(subscription.minClock > initialMinClock) + + // Expects to receive two ackRequest for two downstream tasks + val ackRequestForTask0 = AckRequest(taskId, 200, session) + verify(transport, times(1)).transport(ackRequestForTask0, TaskId(1, 0)) + + val ackRequestForTask1 = AckRequest(taskId, 200, session) + verify(transport, times(1)).transport(ackRequestForTask1, TaskId(1, 1)) + } + + it should "disallow more message sending if there is no ack back" in { + val (subscription, transport) = prepare + // send 100 messages + 0 until (Subscription.MAX_PENDING_MESSAGE_COUNT * 2 + 1) foreach { clock => + subscription.sendMessage(Message(randomMessage, clock)) + } + + assert(subscription.allowSendingMoreMessages() == false) + } + + it should "report minClock as Long.MaxValue when there is no pending message" in { + val (subscription, transport) = prepare + val msg1 = new Message("1", timestamp = 70) + subscription.sendMessage(msg1) + assert(subscription.minClock == 70) + subscription.receiveAck(Ack(TaskId(1, 1), 1, 1, session)) + assert(subscription.minClock == Long.MaxValue) + } + + private def randomMessage: String = new Random().nextInt.toString +} + +object SubscriptionSpec { + + class NextTask(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { + + override def onStart(startTime: StartTime): Unit = { + } + + override def onNext(msg: Message): Unit = { + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/org/apache/gearpump/streaming/task/TaskActorSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/task/TaskActorSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/task/TaskActorSpec.scala new file mode 100644 index 0000000..48901d2 --- /dev/null +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/task/TaskActorSpec.scala @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.task + +import akka.actor.{ExtendedActorSystem, Props} +import akka.testkit._ +import com.typesafe.config.{Config, ConfigFactory} +import org.mockito.Mockito.{mock, times, verify, when} +import org.scalatest.{BeforeAndAfterEach, Matchers, WordSpec} + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.{MasterHarness, TestUtil, UserConfig} +import org.apache.gearpump.partitioner.{HashPartitioner, Partitioner} +import org.apache.gearpump.serializer.{FastKryoSerializer, SerializationFramework} +import org.apache.gearpump.streaming.AppMasterToExecutor.{ChangeTask, MsgLostException, StartTask, TaskChanged, TaskRegistered} +import org.apache.gearpump.streaming.task.TaskActorSpec.TestTask +import org.apache.gearpump.streaming.{DAG, LifeTime, ProcessorDescription} +import org.apache.gearpump.util.Graph._ +import org.apache.gearpump.util.{Graph, Util} + +class TaskActorSpec extends WordSpec with Matchers with BeforeAndAfterEach with MasterHarness { + protected override def config: Config = { + ConfigFactory.parseString( + """ akka.loggers = ["akka.testkit.TestEventListener"] + | akka.test.filter-leeway = 20000 + """.stripMargin). + withFallback(TestUtil.DEFAULT_CONFIG) + } + + val appId = 0 + val task1 = ProcessorDescription(id = 0, taskClass = classOf[TestTask].getName, parallelism = 1) + val task2 = ProcessorDescription(id = 1, taskClass = classOf[TestTask].getName, parallelism = 1) + val dag: DAG = DAG(Graph(task1 ~ Partitioner[HashPartitioner] ~> task2)) + val taskId1 = TaskId(0, 0) + val taskId2 = TaskId(1, 0) + val executorId1 = 1 + val executorId2 = 2 + + var mockMaster: TestProbe = null + var taskContext1: TaskContextData = null + + var mockSerializerPool: SerializationFramework = null + + override def beforeEach(): Unit = { + startActorSystem() + mockMaster = TestProbe()(getActorSystem) + + mockSerializerPool = mock(classOf[SerializationFramework]) + val serializer = new FastKryoSerializer(getActorSystem.asInstanceOf[ExtendedActorSystem]) + when(mockSerializerPool.get()).thenReturn(serializer) + + taskContext1 = TaskContextData(executorId1, appId, + "appName", mockMaster.ref, 1, + LifeTime.Immortal, + Subscriber.of(processorId = 0, dag)) + } + + "TaskActor" should { + "register itself to AppMaster when started" in { + val mockTask = mock(classOf[TaskWrapper]) + val testActor = TestActorRef[TaskActor](Props( + new TaskActor(taskId1, taskContext1, UserConfig.empty, + mockTask, mockSerializerPool)))(getActorSystem) + testActor ! TaskRegistered(taskId1, 0, Util.randInt()) + testActor ! StartTask(taskId1) + + implicit val system = getActorSystem + val ack = Ack(taskId2, 100, 99, testActor.underlyingActor.sessionId) + EventFilter[MsgLostException](occurrences = 1) intercept { + testActor ! ack + } + } + + "respond to ChangeTask" in { + val mockTask = mock(classOf[TaskWrapper]) + val testActor = TestActorRef[TaskActor](Props(new TaskActor(taskId1, taskContext1, + UserConfig.empty, mockTask, mockSerializerPool)))(getActorSystem) + testActor ! TaskRegistered(taskId1, 0, Util.randInt()) + testActor ! StartTask(taskId1) + mockMaster.expectMsgType[GetUpstreamMinClock] + + mockMaster.send(testActor, ChangeTask(taskId1, 1, LifeTime.Immortal, List.empty[Subscriber])) + mockMaster.expectMsgType[TaskChanged] + } + + "handle received message correctly" in { + val mockTask = mock(classOf[TaskWrapper]) + val msg = Message("test") + + val testActor = TestActorRef[TaskActor](Props(new TaskActor(taskId1, taskContext1, + UserConfig.empty, mockTask, mockSerializerPool)))(getActorSystem) + testActor.tell(TaskRegistered(taskId1, 0, Util.randInt()), mockMaster.ref) + testActor.tell(StartTask(taskId1), mockMaster.ref) + + testActor.tell(msg, testActor) + + verify(mockTask, times(1)).onNext(msg) + } + } + + override def afterEach(): Unit = { + shutdownActorSystem() + } +} + +object TaskActorSpec { + class TestTask +} \ No newline at end of file
