http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskSchedulerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskSchedulerSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskSchedulerSpec.scala deleted file mode 100644 index 2c64133..0000000 --- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskSchedulerSpec.scala +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.streaming.appmaster - -import scala.collection.mutable.ArrayBuffer - -import com.typesafe.config.ConfigFactory -import org.scalatest.{Matchers, WordSpec} - -import io.gearpump.Message -import io.gearpump.cluster.scheduler.{Relaxation, Resource, ResourceRequest} -import io.gearpump.cluster.worker.WorkerId -import io.gearpump.cluster.{TestUtil, UserConfig} -import io.gearpump.partitioner.{HashPartitioner, Partitioner} -import io.gearpump.streaming.appmaster.TaskLocator.Localities -import io.gearpump.streaming.appmaster.TaskSchedulerSpec.{TestTask1, TestTask2} -import io.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId} -import io.gearpump.streaming.{Constants, DAG, ProcessorDescription} -import io.gearpump.util.Graph -import io.gearpump.util.Graph._ - -class TaskSchedulerSpec extends WordSpec with Matchers { - val task1 = ProcessorDescription(id = 0, taskClass = classOf[TestTask1].getName, parallelism = 4) - val task2 = ProcessorDescription(id = 1, taskClass = classOf[TestTask2].getName, parallelism = 2) - - val dag = DAG(Graph(task1 ~ Partitioner[HashPartitioner] ~> task2)) - - val config = TestUtil.DEFAULT_CONFIG - - "TaskScheduler" should { - "schedule tasks on different workers properly according user's configuration" in { - - val localities = Localities( - Map(WorkerId(1, 0L) -> Array(TaskId(0, 0), TaskId(0, 1), TaskId(1, 0), TaskId(1, 1)), - WorkerId(2, 0L) -> Array(TaskId(0, 2), TaskId(0, 3)) - )) - - val localityConfig = ConfigFactory.parseString(Localities.toJson(localities)) - - import io.gearpump.streaming.Constants.GEARPUMP_STREAMING_LOCALITIES - val appName = "app" - val taskScheduler = new TaskSchedulerImpl(appId = 0, appName, - config.withValue(s"$GEARPUMP_STREAMING_LOCALITIES.$appName", localityConfig.root)) - - val expectedRequests = - Array(ResourceRequest(Resource(4), WorkerId(1, 0L), relaxation = Relaxation.SPECIFICWORKER), - ResourceRequest(Resource(2), WorkerId(2, 0L), relaxation = Relaxation.SPECIFICWORKER)) - - taskScheduler.setDAG(dag) - val resourceRequests = taskScheduler.getResourceRequests() - - val acturalRequests = resourceRequests.sortBy(_.resource.slots) - assert(acturalRequests.sameElements(expectedRequests.sortBy(_.resource.slots))) - - val tasksOnWorker1 = ArrayBuffer[Int]() - val tasksOnWorker2 = ArrayBuffer[Int]() - for (i <- 0 until 4) { - tasksOnWorker1.append(taskScheduler.schedule(WorkerId(1, 0L), - executorId = 0, Resource(1)).head.processorId) - } - for (i <- 0 until 2) { - tasksOnWorker2.append(taskScheduler.schedule(WorkerId(2, 0L), executorId = 1, - Resource(1)).head.processorId) - } - - // Allocates more resource, and no tasks to launch - assert(taskScheduler.schedule(WorkerId(3, 0L), executorId = 3, - Resource(1)) == List.empty[TaskId]) - - // On worker1, executor 0 - assert(tasksOnWorker1.sorted.sameElements(Array(0, 0, 1, 1))) - - // On worker2, executor 1, Task(0, 0), Task(0, 1) - assert(tasksOnWorker2.sorted.sameElements(Array(0, 0))) - - val rescheduledResources = taskScheduler.executorFailed(executorId = 1) - - assert(rescheduledResources.sameElements(Array(ResourceRequest(Resource(2), - WorkerId.unspecified, relaxation = Relaxation.ONEWORKER)))) - - val launchedTask = taskScheduler.schedule(WorkerId(3, 0L), executorId = 3, Resource(2)) - - // Starts the failed 2 tasks Task(0, 0) and Task(0, 1) - assert(launchedTask.length == 2) - } - - "schedule task fairly" in { - val appName = "app" - val taskScheduler = new TaskSchedulerImpl(appId = 0, appName, config) - - val expectedRequests = - Array(ResourceRequest(Resource(4), WorkerId(1, 0L), relaxation = Relaxation.SPECIFICWORKER), - ResourceRequest(Resource(2), WorkerId(2, 0L), relaxation = Relaxation.SPECIFICWORKER)) - - taskScheduler.setDAG(dag) - val tasks = taskScheduler.schedule(WorkerId(1, 0L), executorId = 0, Resource(4)) - assert(tasks.filter(_.processorId == 0).length == 2) - assert(tasks.filter(_.processorId == 1).length == 2) - } - } -} - -object TaskSchedulerSpec { - class TestTask1(taskContext: TaskContext, userConf: UserConfig) - extends Task(taskContext, userConf) { - override def onStart(startTime: StartTime): Unit = Unit - override def onNext(msg: Message): Unit = Unit - } - - class TestTask2(taskContext: TaskContext, userConf: UserConfig) - extends Task(taskContext, userConf) { - override def onStart(startTime: StartTime): Unit = Unit - override def onNext(msg: Message): Unit = Unit - } -}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/dsl/StreamAppSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/dsl/StreamAppSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/dsl/StreamAppSpec.scala deleted file mode 100644 index 132d46c..0000000 --- a/streaming/src/test/scala/io/gearpump/streaming/dsl/StreamAppSpec.scala +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.dsl - -import scala.concurrent.Await -import scala.concurrent.duration.Duration - -import akka.actor.ActorSystem -import org.mockito.Mockito.when -import org.scalatest._ -import org.scalatest.mock.MockitoSugar - -import io.gearpump.cluster.TestUtil -import io.gearpump.cluster.client.ClientContext -import io.gearpump.streaming.dsl.plan.OpTranslator.SourceTask -class StreamAppSpec extends FlatSpec with Matchers with BeforeAndAfterAll with MockitoSugar { - - implicit var system: ActorSystem = null - - override def beforeAll(): Unit = { - system = ActorSystem("test", TestUtil.DEFAULT_CONFIG) - } - - override def afterAll(): Unit = { - system.terminate() - Await.result(system.whenTerminated, Duration.Inf) - } - - it should "be able to generate multiple new streams" in { - val context: ClientContext = mock[ClientContext] - when(context.system).thenReturn(system) - - val app = StreamApp("dsl", context) - app.source(List("A"), 1, "") - app.source(List("B"), 1, "") - - assert(app.graph.vertices.size == 2) - } - - it should "plan the dsl to Processsor(TaskDescription) DAG" in { - val context: ClientContext = mock[ClientContext] - when(context.system).thenReturn(system) - - val app = StreamApp("dsl", context) - val parallism = 3 - app.source(List("A", "B", "C"), parallism, "").flatMap(Array(_)).reduce(_ + _) - val task = app.plan.dag.vertices.iterator.next() - assert(task.taskClass == classOf[SourceTask[_, _]].getName) - assert(task.parallelism == parallism) - } - - it should "produce 3 messages" in { - val context: ClientContext = mock[ClientContext] - when(context.system).thenReturn(system) - val app = StreamApp("dsl", context) - val list = List[String]( - "0", - "1", - "2" - ) - val producer = app.source(list, 1, "producer").flatMap(Array(_)).reduce(_ + _) - val task = app.plan.dag.vertices.iterator.next() - /* - val task = app.plan.dag.vertices.iterator.map(desc => { - LOG.info(s"${desc.taskClass}") - }) - val sum = producer.flatMap(msg => { - LOG.info("in flatMap") - assert(msg.msg.isInstanceOf[String]) - val num = msg.msg.asInstanceOf[String].toInt - Array(num) - }).reduce(_+_) - val task = app.plan.dag.vertices.iterator.map(desc => { - LOG.info(s"${desc.taskClass}") - }) - */ - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/dsl/StreamSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/dsl/StreamSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/dsl/StreamSpec.scala deleted file mode 100644 index c8a44f6..0000000 --- a/streaming/src/test/scala/io/gearpump/streaming/dsl/StreamSpec.scala +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.dsl - -import scala.concurrent.Await -import scala.concurrent.duration.Duration -import scala.util.{Either, Left, Right} - -import akka.actor._ -import org.mockito.Mockito.when -import org.scalatest._ -import org.scalatest.mock.MockitoSugar - -import io.gearpump.Message -import io.gearpump.cluster.client.ClientContext -import io.gearpump.cluster.{TestUtil, UserConfig} -import io.gearpump.partitioner.{CoLocationPartitioner, HashPartitioner} -import io.gearpump.streaming.dsl.StreamSpec.Join -import io.gearpump.streaming.dsl.partitioner.GroupByPartitioner -import io.gearpump.streaming.dsl.plan.OpTranslator._ -import io.gearpump.streaming.task.{StartTime, Task, TaskContext} -import io.gearpump.util.Graph -import io.gearpump.util.Graph._ - -class StreamSpec extends FlatSpec with Matchers with BeforeAndAfterAll with MockitoSugar { - - implicit var system: ActorSystem = null - - override def beforeAll(): Unit = { - system = ActorSystem("test", TestUtil.DEFAULT_CONFIG) - } - - override def afterAll(): Unit = { - system.terminate() - Await.result(system.whenTerminated, Duration.Inf) - } - - it should "translate the DSL to a DAG" in { - val context: ClientContext = mock[ClientContext] - when(context.system).thenReturn(system) - - val app = StreamApp("dsl", context) - - val data = - """ - five four three two one - five four three two - five four three - five four - five - """ - val stream = app.source(data.lines.toList, 1, ""). - flatMap(line => line.split("[\\s]+")).filter(_.nonEmpty). - map(word => (word, 1)). - groupBy(_._1, parallelism = 2). - reduce((left, right) => (left._1, left._2 + right._2)). - map[Either[(String, Int), String]](Left(_)) - - val query = app.source(List("two"), 1, "").map[Either[(String, Int), String]](Right(_)) - stream.merge(query).process[(String, Int)](classOf[Join], 1) - - val appDescription = app.plan - - val dagTopology = appDescription.dag.mapVertex(_.taskClass).mapEdge { (node1, edge, node2) => - edge.partitionerFactory.partitioner.getClass.getName - } - val expectedDagTopology = getExpectedDagTopology - - assert(dagTopology.vertices.toSet.equals(expectedDagTopology.vertices.toSet)) - assert(dagTopology.edges.toSet.equals(expectedDagTopology.edges.toSet)) - } - - private def getExpectedDagTopology: Graph[String, String] = { - val source = classOf[SourceTask[_, _]].getName - val group = classOf[GroupByTask[_, _, _]].getName - val merge = classOf[TransformTask[_, _]].getName - val join = classOf[Join].getName - - val hash = classOf[HashPartitioner].getName - val groupBy = classOf[GroupByPartitioner[_, _]].getName - val colocation = classOf[CoLocationPartitioner].getName - - val expectedDagTopology = Graph( - source ~ groupBy ~> group ~ colocation ~> merge ~ hash ~> join, - source ~ hash ~> merge - ) - expectedDagTopology - } -} - -object StreamSpec { - - class Join(taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) { - - var query: String = null - override def onStart(startTime: StartTime): Unit = {} - - override def onNext(msg: Message): Unit = { - msg.msg match { - case Left(wordCount: (String @unchecked, Int @unchecked)) => - if (query != null && wordCount._1 == query) { - taskContext.output(new Message(wordCount)) - } - - case Right(query: String) => - this.query = query - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala deleted file mode 100644 index 03dd242..0000000 --- a/streaming/src/test/scala/io/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.dsl.partitioner - -import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} - -import io.gearpump.Message -import io.gearpump.streaming.dsl.partitioner.GroupByPartitionerSpec.People - -class GroupByPartitionerSpec extends FlatSpec with Matchers with BeforeAndAfterAll { - it should "use the outpout of groupBy function to do partition" in { - val mark = People("Mark", "male") - val tom = People("Tom", "male") - val michelle = People("Michelle", "female") - - val partitionNum = 10 - val groupBy = new GroupByPartitioner[People, String](_.gender) - assert(groupBy.getPartition(Message(mark), partitionNum) - == groupBy.getPartition(Message(tom), partitionNum)) - - assert(groupBy.getPartition(Message(mark), partitionNum) - != groupBy.getPartition(Message(michelle), partitionNum)) - } -} - -object GroupByPartitionerSpec { - case class People(name: String, gender: String) -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala deleted file mode 100644 index 62a0f95..0000000 --- a/streaming/src/test/scala/io/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.dsl.plan - -import scala.concurrent.Await -import scala.concurrent.duration.Duration - -import akka.actor.ActorSystem -import org.mockito.ArgumentCaptor -import org.mockito.Matchers._ -import org.mockito.Mockito._ -import org.scalatest._ - -import io.gearpump.Message -import io.gearpump.cluster.{TestUtil, UserConfig} -import io.gearpump.streaming.Constants._ -import io.gearpump.streaming.MockUtil -import io.gearpump.streaming.dsl.CollectionDataSource -import io.gearpump.streaming.dsl.plan.OpTranslator._ -import io.gearpump.streaming.task.StartTime - -class OpTranslatorSpec extends FlatSpec with Matchers with BeforeAndAfterAll { - - "andThen" should "chain multiple single input function" in { - val dummy = new DummyInputFunction[String] - val split = new FlatMapFunction[String, String](line => line.split("\\s"), "split") - - val filter = new FlatMapFunction[String, String](word => - if (word.isEmpty) None else Some(word), "filter") - - val map = new FlatMapFunction[String, Int](word => Some(1), "map") - - val sum = new ReduceFunction[Int]({ (left, right) => left + right }, "sum") - - val all = dummy.andThen(split).andThen(filter).andThen(map).andThen(sum) - - assert(all.description == "split.filter.map.sum") - - val data = - """ - five four three two one - five four three two - five four three - five four - five - """ - val count = all.process(data).toList.last - assert(count == 15) - } - - "Source" should "iterate over input source and apply attached operator" in { - - val taskContext = MockUtil.mockTaskContext - - val conf = UserConfig.empty - val data = "one two three".split("\\s") - - // Source with no transformer - val source = new SourceTask[String, String](new CollectionDataSource[String](data), None, - taskContext, conf) - source.onStart(StartTime(0)) - source.onNext(Message("next")) - verify(taskContext, times(1)).output(anyObject()) - - // Source with transformer - val anotherTaskContext = MockUtil.mockTaskContext - val double = new FlatMapFunction[String, String](word => List(word, word), "double") - val another = new SourceTask(new CollectionDataSource[String](data), Some(double), - anotherTaskContext, conf) - another.onStart(StartTime(0)) - another.onNext(Message("next")) - verify(anotherTaskContext, times(2)).output(anyObject()) - } - - "GroupByTask" should "group input by groupBy Function and " + - "apply attached operator for each group" in { - - val data = "1 2 2 3 3 3" - - var map = Map.empty[String, Int] - - val concat = new ReduceFunction[String]({ (left, right) => - left + right - }, "concat") - - implicit val system = ActorSystem("test", TestUtil.DEFAULT_CONFIG) - val config = UserConfig.empty.withValue[SingleInputFunction[String, String]]( - GEARPUMP_STREAMING_OPERATOR, concat) - - val taskContext = MockUtil.mockTaskContext - - val task = new GroupByTask[String, String, String](input => input, taskContext, config) - task.onStart(StartTime(0)) - - val peopleCaptor = ArgumentCaptor.forClass(classOf[Message]) - - data.split("\\s+").foreach { word => - task.onNext(Message(word)) - } - verify(taskContext, times(6)).output(peopleCaptor.capture()) - - import scala.collection.JavaConverters._ - - val values = peopleCaptor.getAllValues().asScala.map(input => input.msg.asInstanceOf[String]) - assert(values.mkString(",") == "1,2,22,3,33,333") - system.terminate() - Await.result(system.whenTerminated, Duration.Inf) - } - - "MergeTask" should "accept two stream and apply the attached operator" in { - - // Source with transformer - val taskContext = MockUtil.mockTaskContext - val conf = UserConfig.empty - val double = new FlatMapFunction[String, String](word => List(word, word), "double") - val task = new TransformTask[String, String](Some(double), taskContext, conf) - task.onStart(StartTime(0)) - - val data = "1 2 2 3 3 3".split("\\s+") - - data.foreach { input => - task.onNext(Message(input)) - } - - verify(taskContext, times(data.length * 2)).output(anyObject()) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/executor/ExecutorSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/executor/ExecutorSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/executor/ExecutorSpec.scala deleted file mode 100644 index 1d94776..0000000 --- a/streaming/src/test/scala/io/gearpump/streaming/executor/ExecutorSpec.scala +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.streaming.executor - -import scala.concurrent.Await -import scala.concurrent.duration.Duration - -import akka.actor.{ActorSystem, Props} -import akka.testkit.TestProbe -import org.mockito.Matchers._ -import org.mockito.Mockito.{times, _} -import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} - -import io.gearpump.cluster.appmaster.WorkerInfo -import io.gearpump.cluster.scheduler.Resource -import io.gearpump.cluster.worker.WorkerId -import io.gearpump.cluster.{ExecutorContext, TestUtil, UserConfig} -import io.gearpump.streaming.AppMasterToExecutor._ -import io.gearpump.streaming.ExecutorToAppMaster.RegisterTask -import io.gearpump.streaming.appmaster.TaskRegistry.TaskLocations -import io.gearpump.streaming.executor.TaskLauncherSpec.MockTask -import io.gearpump.streaming.task.{Subscriber, TaskId} -import io.gearpump.streaming.{LifeTime, ProcessorDescription} -import io.gearpump.transport.HostPort - -class ExecutorSpec extends FlatSpec with Matchers with BeforeAndAfterAll { - val appId = 0 - val executorId = 0 - val workerId = WorkerId(0, 0L) - var appMaster: TestProbe = null - implicit var system: ActorSystem = null - val userConf = UserConfig.empty - - override def beforeAll(): Unit = { - system = ActorSystem("TaskLauncherSpec", TestUtil.DEFAULT_CONFIG) - appMaster = TestProbe() - } - - override def afterAll(): Unit = { - system.terminate() - Await.result(system.whenTerminated, Duration.Inf) - } - - it should "call launcher to launch task" in { - val worker = TestProbe() - val workerInfo = WorkerInfo(workerId, worker.ref) - val executorContext = ExecutorContext(executorId, workerInfo, appId, "app", - appMaster.ref, Resource(2)) - val taskLauncher = mock(classOf[ITaskLauncher]) - val executor = system.actorOf(Props(new Executor(executorContext, userConf, taskLauncher))) - val processor = ProcessorDescription(id = 0, taskClass = classOf[MockTask].getName, - parallelism = 2) - val taskIds = List(TaskId(0, 0), TaskId(0, 1)) - val launchTasks = LaunchTasks(taskIds, dagVersion = 0, processor, List.empty[Subscriber]) - - val task = TestProbe() - when(taskLauncher.launch(any(), any(), any(), any(), any())) - .thenReturn(taskIds.map((_, task.ref)).toMap) - - val client = TestProbe() - client.send(executor, launchTasks) - client.expectMsg(TasksLaunched) - - verify(taskLauncher, times(1)).launch(any(), any(), any(), any(), any()) - - executor ! RegisterTask(TaskId(0, 0), executorId, HostPort("localhost:80")) - executor ! RegisterTask(TaskId(0, 1), executorId, HostPort("localhost:80")) - - executor ! TaskRegistered(TaskId(0, 0), 0, 0) - - task.expectMsgType[TaskRegistered] - - executor ! TaskRegistered(TaskId(0, 1), 0, 0) - - task.expectMsgType[TaskRegistered] - - executor ! TaskLocationsReady(TaskLocations(Map.empty), dagVersion = 0) - executor ! StartAllTasks(dagVersion = 0) - - task.expectMsgType[StartTask] - task.expectMsgType[StartTask] - - val changeTasks = ChangeTasks(taskIds, dagVersion = 1, life = LifeTime(0, Long.MaxValue), - List.empty[Subscriber]) - - client.send(executor, changeTasks) - client.expectMsgType[TasksChanged] - - executor ! TaskLocationsReady(TaskLocations(Map.empty), 1) - executor ! StartAllTasks(dagVersion = 1) - - task.expectMsgType[ChangeTask] - task.expectMsgType[ChangeTask] - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/executor/TaskArgumentStoreSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/executor/TaskArgumentStoreSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/executor/TaskArgumentStoreSpec.scala deleted file mode 100644 index c97ae4c..0000000 --- a/streaming/src/test/scala/io/gearpump/streaming/executor/TaskArgumentStoreSpec.scala +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.streaming.executor - -import org.scalatest._ - -import io.gearpump.streaming.executor.Executor.TaskArgumentStore -import io.gearpump.streaming.executor.TaskLauncher.TaskArgument -import io.gearpump.streaming.task.TaskId - -class TaskArgumentStoreSpec extends FlatSpec with Matchers with BeforeAndAfterEach { - it should "retain all history of taskArgument" in { - val version0 = TaskArgument(0, null, null) - val version2 = version0.copy(dagVersion = 2) - val store = new TaskArgumentStore - val task = TaskId(0, 0) - store.add(task, version0) - store.add(task, version2) - - // Should return a version which is same or older than expected version - assert(store.get(dagVersion = 1, task) == Some(version0)) - assert(store.get(dagVersion = 0, task) == Some(version0)) - assert(store.get(dagVersion = 2, task) == Some(version2)) - - store.removeObsoleteVersion() - assert(store.get(dagVersion = 1, task) == None) - assert(store.get(dagVersion = 0, task) == None) - assert(store.get(dagVersion = 2, task) == Some(version2)) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/executor/TaskLauncherSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/executor/TaskLauncherSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/executor/TaskLauncherSpec.scala deleted file mode 100644 index c135c5b..0000000 --- a/streaming/src/test/scala/io/gearpump/streaming/executor/TaskLauncherSpec.scala +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.streaming.executor - -import scala.concurrent.Await -import scala.concurrent.duration.Duration - -import akka.actor.{Actor, ActorSystem} -import akka.testkit.TestProbe -import org.scalatest._ - -import io.gearpump.cluster.{TestUtil, UserConfig} -import io.gearpump.serializer.SerializationFramework -import io.gearpump.streaming.ProcessorDescription -import io.gearpump.streaming.executor.TaskLauncher.TaskArgument -import io.gearpump.streaming.executor.TaskLauncherSpec.{MockTask, MockTaskActor} -import io.gearpump.streaming.task.{Task, TaskContext, TaskContextData, TaskId, TaskWrapper} - -class TaskLauncherSpec extends FlatSpec with Matchers with BeforeAndAfterAll { - val appId = 0 - val executorId = 0 - var appMaster: TestProbe = null - implicit var system: ActorSystem = null - val userConf = UserConfig.empty - - override def beforeAll(): Unit = { - system = ActorSystem("TaskLauncherSpec", TestUtil.DEFAULT_CONFIG) - appMaster = TestProbe() - } - - override def afterAll(): Unit = { - system.terminate() - Await.result(system.whenTerminated, Duration.Inf) - } - - it should "able to launch tasks" in { - val launcher = new TaskLauncher(appId, "app", executorId, appMaster.ref, - userConf, classOf[MockTaskActor]) - val taskIds = List(TaskId(0, 0), TaskId(0, 1)) - val processor = ProcessorDescription(id = 0, taskClass = classOf[MockTask].getName, - parallelism = 2) - val argument = TaskArgument(0, processor, null) - - val tasks = launcher.launch(taskIds, argument, system, null, - "gearpump.shared-thread-pool-dispatcher") - tasks.keys.toSet shouldBe taskIds.toSet - } -} - -object TaskLauncherSpec { - class MockTaskActor( - val taskId: TaskId, - val taskContextData : TaskContextData, - userConf : UserConfig, - val task: TaskWrapper, - serializer: SerializationFramework) extends Actor { - def receive: Receive = null - } - - class MockTask(taskContext: TaskContext, userConf: UserConfig) - extends Task(taskContext, userConf) { - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/metrics/ProcessorAggregatorSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/metrics/ProcessorAggregatorSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/metrics/ProcessorAggregatorSpec.scala deleted file mode 100644 index da1ca9f..0000000 --- a/streaming/src/test/scala/io/gearpump/streaming/metrics/ProcessorAggregatorSpec.scala +++ /dev/null @@ -1,261 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.metrics - -import scala.collection.JavaConverters._ -import scala.util.Random - -import org.scalatest.{FlatSpec, Matchers} - -import io.gearpump.cluster.ClientToMaster.ReadOption -import io.gearpump.cluster.MasterToClient.HistoryMetricsItem -import io.gearpump.metrics.Metrics.{Gauge, Histogram, Meter} -import io.gearpump.streaming.metrics.ProcessorAggregator.{AggregatorFactory, HistogramAggregator, MeterAggregator, MultiLayerMap} -import io.gearpump.streaming.task.TaskId -import io.gearpump.util.HistoryMetricsService.HistoryMetricsConfig - -class ProcessorAggregatorSpec extends FlatSpec with Matchers { - - "MultiLayerMap" should "maintain multiple layers HashMap" in { - val layers = 3 - val map = new MultiLayerMap[String](layers) - - assert(map.get(layer = 0, "key") == null) - - // Illegal, handle safely - assert(map.get(layer = 10, "key") == null) - - map.put(layer = 0, "key", "value") - assert(map.get(layer = 0, "key") == "value") - - map.put(layer = 1, "key2", "value2") - map.put(layer = 2, "key3", "value3") - map.put(layer = 2, "key4", "value4") - - // Illegal, should be ignored - map.put(layer = 4, "key5", "value5") - - assert(map.size == 4) - assert(map.valueIterator.asScala.toSet == Set("value", "value2", "value3", "value4")) - } - - "HistogramAggregator" should "aggregate by calculating average" in { - val aggregator = new HistogramAggregator("processor") - - val a = Histogram("processor.task1", 1, 2, 3, 4, 5, 6) - val b = Histogram("processor.task2", 5, 6, 7, 8, 9, 10) - val expect = Histogram("processor.task2", 3, 4, 5, 6, 7, 8) - - val olderTime = 100 - val newerTime = 200 - - aggregator.aggregate(HistoryMetricsItem(time = newerTime, a)) - aggregator.aggregate(HistoryMetricsItem(time = olderTime, b)) - - val result = aggregator.result - - // Picks old time as aggregated time - assert(result.time == olderTime) - - // Does average - val check = result.value.asInstanceOf[Histogram] - - assert(check.mean - expect.mean < 0.01) - assert(check.stddev - expect.stddev < 0.01) - assert(check.median - expect.median < 0.01) - assert(check.p95 - expect.p95 < 0.01) - assert(check.p99 - expect.p99 < 0.01) - assert(check.p999 - expect.p999 < 0.01) - } - - "MeterAggregator" should "aggregate by summing" in { - val aggregator = new MeterAggregator("processor") - - val a = Meter("processor.task1", count = 1, 1, 3, "s") - val b = Meter("processor.task2", count = 2, 5, 7, "s") - val expect = Meter("processor.task2", count = 3, 6, 10, "s") - - val olderTime = 100 - val newerTime = 200 - - aggregator.aggregate(HistoryMetricsItem(time = newerTime, a)) - aggregator.aggregate(HistoryMetricsItem(time = olderTime, b)) - - val result = aggregator.result - - // Picks old time - assert(result.time == olderTime) - - // Does summing - val check = result.value.asInstanceOf[Meter] - - assert(check.count == expect.count) - assert(check.m1 - expect.m1 < 0.01) - assert(check.meanRate - expect.meanRate < 0.01) - assert(check.rateUnit == expect.rateUnit) - } - - "AggregatorFactory" should "create aggregator" in { - val factory = new AggregatorFactory() - - val a = Meter("processor.task1", count = 1, 1, 3, "s") - val b = Histogram("processor.task1", 1, 2, 3, 4, 5, 6) - - val aggegator1 = factory.create(HistoryMetricsItem(time = 0, a), "name1") - assert(aggegator1.isInstanceOf[MeterAggregator]) - - val aggegator2 = factory.create(HistoryMetricsItem(time = 0, b), "name2") - assert(aggegator2.isInstanceOf[HistogramAggregator]) - } - - "ProcessorAggregator" should "aggregate on different read options" in { - val hours = 2 // Maintains 2 hours history - val seconds = 2 // Maintains 2 seconds recent data - val taskCount = 5 // For each processor - val metricCount = 100 // For each task, have metricCount metrics - val range = new HistoryMetricsConfig(hours, hours / 2 * 3600 * 1000, - seconds, seconds / 2 * 1000) - - val aggregator = new ProcessorAggregator(range) - - def count(value: Int): Int = value - - def inputs(timeRange: Long): List[HistoryMetricsItem] = { - (0 until taskCount).map(TaskId(processorId = 0, _)) - .flatMap(histogram(_, "receiveLatency", timeRange, metricCount)).toList ++ - (0 until taskCount).map(TaskId(processorId = 0, _)) - .flatMap(histogram(_, "processTime", timeRange, metricCount)).toList ++ - (0 until taskCount).map(TaskId(processorId = 1, _)) - .flatMap(histogram(_, "receiveLatency", timeRange, metricCount)).toList ++ - (0 until taskCount).map(TaskId(processorId = 1, _)) - .flatMap(histogram(_, "processTime", timeRange, metricCount)).toList ++ - (0 until taskCount).map(TaskId(processorId = 0, _)) - .flatMap(meter(_, "sendThroughput", timeRange, metricCount)).toList ++ - (0 until taskCount).map(TaskId(processorId = 0, _)) - .flatMap(meter(_, "receiveThroughput", timeRange, metricCount)).toList ++ - (0 until taskCount).map(TaskId(processorId = 1, _)) - .flatMap(meter(_, "sendThroughput", timeRange, metricCount)).toList ++ - (0 until taskCount).map(TaskId(processorId = 1, _)) - .flatMap(meter(_, "receiveThroughput", timeRange, metricCount)).toList - } - - def check(list: List[HistoryMetricsItem], countMap: Map[String, Int]): Boolean = { - val nameCount = list.map(_.value.name).groupBy(key => key).mapValues(_.size).toList.toMap - nameCount sameElements countMap - } - - // Aggregates on processor and meterNames, - val input = inputs(Long.MaxValue) - val readLatest = aggregator.aggregate(ReadOption.ReadLatest, - input.iterator, now = Long.MaxValue) - assert(readLatest.size == 8) // 2 processor * 4 metrics type - assert(check(readLatest, Map( - "app0.processor0:receiveLatency" -> count(1), - "app0.processor0:processTime" -> count(1), - "app0.processor0:sendThroughput" -> count(1), - "app0.processor0:receiveThroughput" -> count(1), - "app0.processor1:receiveLatency" -> count(1), - "app0.processor1:processTime" -> count(1), - "app0.processor1:sendThroughput" -> count(1), - "app0.processor1:receiveThroughput" -> count(1) - ))) - - // Aggregates on processor and meterNames and time range, - val readRecent = aggregator.aggregate(ReadOption.ReadRecent, - inputs(seconds * 1000).iterator, now = seconds * 1000) - assert(readRecent.size == 16) // 2 processor * 4 metrics type * 2 time range - assert(check(readRecent, Map( - "app0.processor0:receiveLatency" -> count(2), - "app0.processor0:processTime" -> count(2), - "app0.processor0:sendThroughput" -> count(2), - "app0.processor0:receiveThroughput" -> count(2), - "app0.processor1:receiveLatency" -> count(2), - "app0.processor1:processTime" -> count(2), - "app0.processor1:sendThroughput" -> count(2), - "app0.processor1:receiveThroughput" -> count(2) - ))) - - // Aggregates on processor and meterNames and time range, - val readHistory = aggregator.aggregate(ReadOption.ReadHistory, - inputs(hours * 3600 * 1000).iterator, now = hours * 3600 * 1000) - assert(readHistory.size == 16) // 2 processor * 4 metrics type * 2 time ranges - assert(check(readHistory, Map( - "app0.processor0:receiveLatency" -> count(2), - "app0.processor0:processTime" -> count(2), - "app0.processor0:sendThroughput" -> count(2), - "app0.processor0:receiveThroughput" -> count(2), - "app0.processor1:receiveLatency" -> count(2), - "app0.processor1:processTime" -> count(2), - "app0.processor1:sendThroughput" -> count(2), - "app0.processor1:receiveThroughput" -> count(2) - ))) - } - - private def histogram( - taskId: TaskId, metricName: String = "latency", timeRange: Long = Long.MaxValue, - repeat: Int = 1): List[HistoryMetricsItem] = { - val random = new Random() - (0 until repeat).map { _ => - new HistoryMetricsItem(Math.abs(random.nextLong() % timeRange), - new Histogram(s"app0.processor${taskId.processorId}.task${taskId.index}:$metricName", - Math.abs(random.nextDouble()), - Math.abs(random.nextDouble()), - Math.abs(random.nextDouble()), - Math.abs(random.nextDouble()), - Math.abs(random.nextDouble()), - Math.abs(random.nextDouble()) - )) - }.toList - } - - private def meter(taskId: TaskId, metricName: String, timeRange: Long, repeat: Int) - : List[HistoryMetricsItem] = { - val random = new Random() - (0 until repeat).map { _ => - new HistoryMetricsItem(Math.abs(random.nextLong() % timeRange), - new Meter(s"app0.processor${taskId.processorId}.task${taskId.index}:$metricName", - Math.abs(random.nextInt()), - Math.abs(random.nextDouble()), - Math.abs(random.nextDouble()), - "event/s") - ) - }.toList - } - - "ProcessorAggregator" should "handle smoothly for unsupported metric type and " + - "error formatted metric name" in { - val invalid = List( - // Unsupported metric type - HistoryMetricsItem(0, new Gauge("app0.processor0.task0:gauge", 100)), - - // Wrong format: should be app0.processor0.task0:throughput - HistoryMetricsItem(0, new Meter("app0.processor0.task0/throughput", 100, 0, 0, "")) - ) - - val valid = histogram(TaskId(0, 0), repeat = 10) - - val aggregator = new ProcessorAggregator(new HistoryMetricsConfig(0, 0, 0, 0)) - val result = aggregator.aggregate(ReadOption.ReadLatest, (valid ++ invalid).toIterator, - now = Long.MaxValue) - - // For one taskId, will only use one data point. - assert(result.size == 1) - assert(result.head.value.name == "app0.processor0:latency") - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/metrics/TaskFilterAggregatorSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/metrics/TaskFilterAggregatorSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/metrics/TaskFilterAggregatorSpec.scala deleted file mode 100644 index f654eb9..0000000 --- a/streaming/src/test/scala/io/gearpump/streaming/metrics/TaskFilterAggregatorSpec.scala +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.metrics - -import scala.util.Random - -import org.scalatest.{FlatSpec, Matchers} - -import io.gearpump.cluster.MasterToClient.HistoryMetricsItem -import io.gearpump.metrics.Metrics.{Histogram, Meter} -import io.gearpump.streaming.metrics.TaskFilterAggregator.Options -import io.gearpump.streaming.task.TaskId - -class TaskFilterAggregatorSpec extends FlatSpec with Matchers { - - def metric(taskId: TaskId): HistoryMetricsItem = { - val random = new Random() - new HistoryMetricsItem(Math.abs(random.nextLong()), - new Histogram(s"app0.processor${taskId.processorId}.task${taskId.index}:latency", - 0, 0, 0, 0, 0, 0)) - } - - it should "filter data on processor range, task range combination" in { - val inputs = (0 until 10).flatMap { processor => - (0 until 10).map { task => - metric(TaskId(processor, task)) - } - }.toList - - val globalLimit = 10 - val aggregator = new TaskFilterAggregator(globalLimit) - - // Limit not met, return all matches in this matrix - var options = new Options(limit = 20, startTask = 3, endTask = 6, - startProcessor = 3, endProcessor = 6) - assert(aggregator.aggregate(options, inputs.iterator).size == 9) - - // User limit reached - options = new Options(limit = 3, startTask = 3, endTask = 5, - startProcessor = 3, endProcessor = 5) - assert(aggregator.aggregate(options, inputs.iterator).size == 3) - - // Global limit reached - options = new Options(limit = 20, startTask = 3, endTask = 8, - startProcessor = 3, endProcessor = 8) - assert(aggregator.aggregate(options, inputs.iterator).size == globalLimit) - } - - it should "reject wrong format options" in { - val options = Map(TaskFilterAggregator.StartTask -> "not a number") - assert(Options.parse(options) == null) - } - - it should "skip wrong format metrics" in { - val invalid = List { - // Wrong format: should be app0.processor0.task0:throughput - HistoryMetricsItem(0, new Meter("app0.processor0.task0/throughput", 100, 0, 0, "")) - } - val options = Options.acceptAll - - val aggregator = new TaskFilterAggregator(Int.MaxValue) - assert(aggregator.aggregate(options, invalid.iterator).size == 0) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/source/DefaultTimeStampFilterSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/source/DefaultTimeStampFilterSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/source/DefaultTimeStampFilterSpec.scala deleted file mode 100644 index 752a8f4..0000000 --- a/streaming/src/test/scala/io/gearpump/streaming/source/DefaultTimeStampFilterSpec.scala +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.source - -import org.scalacheck.Gen -import org.scalatest.prop.PropertyChecks -import org.scalatest.{Matchers, PropSpec} - -import io.gearpump.{Message, TimeStamp} - -class DefaultTimeStampFilterSpec extends PropSpec with PropertyChecks with Matchers { - property("DefaultTimeStampFilter should filter message against give timestamp") { - val timestampGen = Gen.chooseNum[Long](0L, 1000L) - val messageGen = for { - msg <- Gen.alphaStr - time <- timestampGen - } yield Message(msg, time) - - val filter = new DefaultTimeStampFilter() - - forAll(timestampGen, messageGen) { - (predicate: TimeStamp, message: Message) => - if (message.timestamp >= predicate) { - filter.filter(message, predicate) shouldBe Some(message) - } else { - filter.filter(message, predicate) shouldBe None - } - - filter.filter(null, predicate) shouldBe None - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/state/impl/CheckpointManagerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/state/impl/CheckpointManagerSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/state/impl/CheckpointManagerSpec.scala deleted file mode 100644 index ea670f6..0000000 --- a/streaming/src/test/scala/io/gearpump/streaming/state/impl/CheckpointManagerSpec.scala +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.state.impl - -import org.mockito.Mockito._ -import org.mockito.{Matchers => MockitoMatchers} -import org.scalacheck.Gen -import org.scalatest.mock.MockitoSugar -import org.scalatest.prop.PropertyChecks -import org.scalatest.{Matchers, PropSpec} - -import io.gearpump.TimeStamp -import io.gearpump.streaming.transaction.api.CheckpointStore - -class CheckpointManagerSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { - - val timestampGen = Gen.chooseNum[Long](0L, 1000L) - val checkpointIntervalGen = Gen.chooseNum[Long](100L, 10000L) - property("CheckpointManager should recover from CheckpointStore") { - forAll(timestampGen, checkpointIntervalGen) { - (timestamp: TimeStamp, checkpointInterval: Long) => - val checkpointStore = mock[CheckpointStore] - val checkpointManager = - new CheckpointManager(checkpointInterval, checkpointStore) - checkpointManager.recover(timestamp) - - verify(checkpointStore).recover(timestamp) - } - } - - property("CheckpointManager should write checkpoint to CheckpointStore") { - val checkpointGen = Gen.alphaStr.map(_.getBytes("UTF-8")) - forAll(timestampGen, checkpointIntervalGen, checkpointGen) { - (timestamp: TimeStamp, checkpointInterval: Long, checkpoint: Array[Byte]) => - val checkpointStore = mock[CheckpointStore] - val checkpointManager = - new CheckpointManager(checkpointInterval, checkpointStore) - checkpointManager.checkpoint(timestamp, checkpoint) - - verify(checkpointStore).persist(timestamp, checkpoint) - } - } - - property("CheckpointManager should close CheckpointStore") { - forAll(checkpointIntervalGen) { - (checkpointInterval: Long) => - val checkpointStore = mock[CheckpointStore] - val checkpointManager = - new CheckpointManager(checkpointInterval, checkpointStore) - checkpointManager.close() - verify(checkpointStore).close() - } - } - - property("CheckpointManager should update checkpoint time according to max message timestamp") { - forAll(timestampGen, checkpointIntervalGen) { - (timestamp: TimeStamp, checkpointInterval: Long) => - val checkpointStore = mock[CheckpointStore] - val checkpointManager = - new CheckpointManager(checkpointInterval, checkpointStore) - checkpointManager.update(timestamp) - checkpointManager.getMaxMessageTime shouldBe timestamp - - val checkpointTime = checkpointManager.getCheckpointTime.get - timestamp should (be < checkpointTime and be >= (checkpointTime - checkpointInterval)) - - checkpointManager.checkpoint(checkpointTime, Array.empty[Byte]) - verify(checkpointStore).persist(MockitoMatchers.eq(checkpointTime), - MockitoMatchers.anyObject[Array[Byte]]()) - checkpointManager.getCheckpointTime shouldBe empty - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/state/impl/InMemoryCheckpointStoreSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/state/impl/InMemoryCheckpointStoreSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/state/impl/InMemoryCheckpointStoreSpec.scala deleted file mode 100644 index 4bbad3b..0000000 --- a/streaming/src/test/scala/io/gearpump/streaming/state/impl/InMemoryCheckpointStoreSpec.scala +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.state.impl - -import org.scalacheck.Gen -import org.scalatest.prop.PropertyChecks -import org.scalatest.{Matchers, PropSpec} - -class InMemoryCheckpointStoreSpec extends PropSpec with PropertyChecks with Matchers { - - property("InMemoryCheckpointStore should provide read / write checkpoint") { - val timestampGen = Gen.chooseNum[Long](1, 1000) - val checkpointGen = Gen.alphaStr.map(_.getBytes("UTF-8")) - forAll(timestampGen, checkpointGen) { (timestamp: Long, checkpoint: Array[Byte]) => - val store = new InMemoryCheckpointStore - store.recover(timestamp) shouldBe None - store.persist(timestamp, checkpoint) - store.recover(timestamp) shouldBe Some(checkpoint) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/state/impl/NonWindowStateSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/state/impl/NonWindowStateSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/state/impl/NonWindowStateSpec.scala deleted file mode 100644 index b20b666..0000000 --- a/streaming/src/test/scala/io/gearpump/streaming/state/impl/NonWindowStateSpec.scala +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.state.impl - -import scala.util.Success - -import org.mockito.Mockito._ -import org.scalacheck.Gen -import org.scalatest.mock.MockitoSugar -import org.scalatest.prop.PropertyChecks -import org.scalatest.{Matchers, PropSpec} - -import io.gearpump.TimeStamp -import io.gearpump.streaming.state.api.{Monoid, Serializer} - -class NonWindowStateSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { - - val longGen = Gen.chooseNum[Long](100L, System.currentTimeMillis()) - - property("NonWindowState should recover checkpointed state at given timestamp") { - forAll(longGen) { - (timestamp: TimeStamp) => - val monoid = mock[Monoid[AnyRef]] - val serializer = mock[Serializer[AnyRef]] - val bytes = Array.empty[Byte] - val checkpoint = mock[AnyRef] - val zero = mock[AnyRef] - when(monoid.zero).thenReturn(zero, zero) - when(monoid.plus(zero, zero)).thenReturn(zero, Nil: _*) - when(monoid.plus(checkpoint, zero)).thenReturn(checkpoint, Nil: _*) - - val state = new NonWindowState[AnyRef](monoid, serializer) - state.left shouldBe zero - state.right shouldBe zero - state.get shouldBe Some(zero) - - when(serializer.deserialize(bytes)).thenReturn(Success(checkpoint)) - state.recover(timestamp, bytes) - - state.left shouldBe checkpoint - state.right shouldBe zero - state.get shouldBe Some(checkpoint) - } - } - - property("NonWindowState checkpoints state") { - forAll(longGen) { - (checkpointTime: TimeStamp) => - val monoid = mock[Monoid[AnyRef]] - val serializer = mock[Serializer[AnyRef]] - - val left = mock[AnyRef] - val right = mock[AnyRef] - val zero = mock[AnyRef] - val plus = mock[AnyRef] - - when(monoid.zero).thenReturn(zero, zero) - when(monoid.plus(zero, zero)).thenReturn(zero, Nil: _*) - - val state = new NonWindowState[AnyRef](monoid, serializer) - state.left shouldBe zero - state.right shouldBe zero - state.get shouldBe Some(zero) - - state.left = left - state.right = right - - when(monoid.zero).thenReturn(zero, Nil: _*) - when(monoid.plus(left, right)).thenReturn(plus, Nil: _*) - when(monoid.plus(plus, zero)).thenReturn(plus, Nil: _*) - state.checkpoint() - - verify(serializer).serialize(left) - state.left shouldBe plus - state.right shouldBe zero - state.get shouldBe Some(plus) - } - } - - property("NonWindowState updates state") { - forAll(longGen) { - (checkpointTime: TimeStamp) => - val monoid = mock[Monoid[AnyRef]] - val serializer = mock[Serializer[AnyRef]] - - val left = mock[AnyRef] - val right = mock[AnyRef] - val zero = mock[AnyRef] - val plus = mock[AnyRef] - - when(monoid.zero).thenReturn(zero, zero) - when(monoid.plus(zero, zero)).thenReturn(zero, Nil: _*) - - val state = new NonWindowState[AnyRef](monoid, serializer) - state.left shouldBe zero - state.right shouldBe zero - state.get shouldBe Some(zero) - - when(monoid.plus(zero, left)).thenReturn(left, Nil: _*) - when(monoid.plus(left, zero)).thenReturn(left, Nil: _*) - state.setNextCheckpointTime(checkpointTime) - state.update(checkpointTime - 1, left) - state.left shouldBe left - state.right shouldBe zero - state.get shouldBe Some(left) - - when(monoid.plus(zero, right)).thenReturn(right, Nil: _*) - when(monoid.plus(left, right)).thenReturn(plus, Nil: _*) - state.setNextCheckpointTime(checkpointTime) - state.update(checkpointTime + 1, right) - state.left shouldBe left - state.right shouldBe right - state.get shouldBe Some(plus) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/state/impl/WindowSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/state/impl/WindowSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/state/impl/WindowSpec.scala deleted file mode 100644 index e12ec9b..0000000 --- a/streaming/src/test/scala/io/gearpump/streaming/state/impl/WindowSpec.scala +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.state.impl - -import org.scalacheck.Gen -import org.scalatest.mock.MockitoSugar -import org.scalatest.prop.PropertyChecks -import org.scalatest.{Matchers, PropSpec} - -import io.gearpump.TimeStamp - -class WindowSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { - - val windowSizeGen = Gen.chooseNum[Long](1L, 1000L) - val windowStepGen = Gen.chooseNum[Long](1L, 1000L) - val timestampGen = Gen.chooseNum[Long](0L, 1000L) - property("Window should only slide when time passes window end") { - forAll(timestampGen, windowSizeGen, windowStepGen) { - (timestamp: TimeStamp, windowSize: Long, windowStep: Long) => - val window = new Window(windowSize, windowStep) - window.shouldSlide shouldBe false - window.update(timestamp) - window.shouldSlide shouldBe timestamp >= windowSize - } - } - - property("Window should slide by one or to given timestamp") { - forAll(timestampGen, windowSizeGen, windowStepGen) { - (timestamp: TimeStamp, windowSize: Long, windowStep: Long) => - val window = new Window(windowSize, windowStep) - window.range shouldBe(0L, windowSize) - - window.slideOneStep() - window.range shouldBe(windowStep, windowSize + windowStep) - - window.slideTo(timestamp) - val (startTime, endTime) = window.range - if (windowStep > windowSize) { - timestamp should (be >= startTime and be < (startTime + windowStep)) - } else { - timestamp should (be >= startTime and be < endTime) - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/state/impl/WindowStateSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/state/impl/WindowStateSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/state/impl/WindowStateSpec.scala deleted file mode 100644 index bc79ff6..0000000 --- a/streaming/src/test/scala/io/gearpump/streaming/state/impl/WindowStateSpec.scala +++ /dev/null @@ -1,239 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.state.impl - -import scala.collection.immutable.TreeMap -import scala.util.Success - -import org.mockito.Mockito._ -import org.scalacheck.Gen -import org.scalatest.mock.MockitoSugar -import org.scalatest.prop.PropertyChecks -import org.scalatest.{Matchers, PropSpec} - -import io.gearpump._ -import io.gearpump.streaming.MockUtil -import io.gearpump.streaming.state.api.{Group, Serializer} - -class WindowStateSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { - - val longGen = Gen.chooseNum[Long](100L, 10000L) - - val intervalGen = for { - st <- longGen - et <- Gen.chooseNum[Long](st + 1, 100000L) - } yield Interval(st, et) - - property("WindowState init should recover checkpointed state") { - forAll(intervalGen) { - (interval: Interval) => - val window = mock[Window] - val taskContext = MockUtil.mockTaskContext - val group = mock[Group[AnyRef]] - val serializer = mock[Serializer[TreeMap[Interval, AnyRef]]] - - val timestamp = interval.startTime - val zero = mock[AnyRef] - val bytes = Array.empty[Byte] - val data = mock[AnyRef] - val checkpoint = TreeMap(interval -> data) - when(group.zero).thenReturn(zero, zero) - when(group.plus(zero, data)).thenReturn(data, Nil: _*) - when(group.plus(data, zero)).thenReturn(data, Nil: _*) - when(group.plus(zero, zero)).thenReturn(zero, Nil: _*) - when(serializer.deserialize(bytes)).thenReturn(Success(checkpoint)) - - val state = new WindowState[AnyRef](group, serializer, taskContext, window) - state.left shouldBe zero - state.right shouldBe zero - state.get shouldBe Some(zero) - - state.recover(timestamp, bytes) - - state.left shouldBe data - state.right shouldBe zero - state.get shouldBe Some(data) - state.getIntervalStates(interval.startTime, interval.endTime) shouldBe checkpoint - } - } - - property("WindowState checkpoints") { - forAll(longGen) { (checkpointTime: TimeStamp) => - val window = mock[Window] - val taskContext = MockUtil.mockTaskContext - val group = mock[Group[AnyRef]] - val serializer = mock[Serializer[TreeMap[Interval, AnyRef]]] - - val zero = mock[AnyRef] - val left = mock[AnyRef] - val right = mock[AnyRef] - val plus = mock[AnyRef] - - when(group.zero).thenReturn(zero, zero) - when(group.plus(zero, zero)).thenReturn(zero, Nil: _*) - val state = new WindowState[AnyRef](group, serializer, taskContext, window) - state.left shouldBe zero - state.right shouldBe zero - state.get shouldBe Some(zero) - - val start = checkpointTime - 1 - val end = checkpointTime + 1 - val size = end - start - val step = 1L - - when(window.range).thenReturn((start, end)) - when(window.windowSize).thenReturn(size) - when(window.windowStep).thenReturn(step) - when(group.zero).thenReturn(zero, zero) - when(group.plus(zero, left)).thenReturn(left, Nil: _*) - when(group.plus(zero, right)).thenReturn(right, Nil: _*) - when(group.plus(left, right)).thenReturn(plus, Nil: _*) - - state.left = left - state.updateIntervalStates(start, left, checkpointTime) - state.right = right - state.updateIntervalStates(checkpointTime, right, checkpointTime) - - state.setNextCheckpointTime(checkpointTime) - state.checkpoint() - - state.left shouldBe plus - state.right shouldBe zero - verify(serializer).serialize(TreeMap(Interval(start, checkpointTime) -> left)) - } - } - - property("WindowState updates state") { - forAll(longGen) { (checkpointTime: TimeStamp) => - val window = mock[Window] - val taskContext = MockUtil.mockTaskContext - val group = mock[Group[AnyRef]] - val serializer = mock[Serializer[TreeMap[Interval, AnyRef]]] - - val zero = mock[AnyRef] - val left = mock[AnyRef] - val right = mock[AnyRef] - val plus = mock[AnyRef] - - when(group.zero).thenReturn(zero, zero) - val state = new WindowState[AnyRef](group, serializer, taskContext, window) - - val start = checkpointTime - 1 - val end = checkpointTime + 1 - val size = end - start - val step = 1L - - when(window.range).thenReturn((start, end)) - when(window.windowSize).thenReturn(size) - when(window.windowStep).thenReturn(step) - when(window.shouldSlide).thenReturn(false) - when(group.plus(zero, left)).thenReturn(left, left) - when(group.plus(left, zero)).thenReturn(left, Nil: _*) - when(taskContext.upstreamMinClock).thenReturn(0L) - - // Time < checkpointTime - // Update left in current window - state.setNextCheckpointTime(checkpointTime) - state.update(start, left) - - verify(window).update(0L) - state.left shouldBe left - state.right shouldBe zero - state.get shouldBe Some(left) - state.getIntervalStates(start, end) shouldBe TreeMap(Interval(start, checkpointTime) -> left) - - when(window.range).thenReturn((start, end)) - when(window.windowSize).thenReturn(size) - when(window.windowStep).thenReturn(step) - when(window.shouldSlide).thenReturn(false) - when(group.plus(zero, right)).thenReturn(right, right) - when(group.plus(left, right)).thenReturn(plus, Nil: _*) - when(taskContext.upstreamMinClock).thenReturn(0L) - - // Time >= checkpointTime - // Update right in current window - state.setNextCheckpointTime(checkpointTime) - state.update(checkpointTime, right) - - verify(window, times(2)).update(0L) - state.left shouldBe left - state.right shouldBe right - state.get shouldBe Some(plus) - state.getIntervalStates(start, end) shouldBe - TreeMap(Interval(start, start + step) -> left, Interval(start + step, end) -> right) - - // Slides window forward - when(window.range).thenReturn((start, end), (start + step, end + step)) - when(window.shouldSlide).thenReturn(true) - when(taskContext.upstreamMinClock).thenReturn(checkpointTime) - when(group.minus(left, left)).thenReturn(zero, Nil: _*) - when(group.plus(zero, right)).thenReturn(right, Nil: _*) - when(group.plus(right, right)).thenReturn(plus, Nil: _*) - when(group.plus(zero, plus)).thenReturn(plus, Nil: _*) - - state.setNextCheckpointTime(checkpointTime) - state.update(end, right) - - verify(window).slideOneStep() - verify(window).update(checkpointTime) - state.left shouldBe zero - state.right shouldBe plus - state.get shouldBe Some(plus) - state.getIntervalStates(start, end + step) shouldBe - TreeMap( - Interval(start, start + step) -> left, - Interval(start + step, end) -> right, - Interval(end, end + step) -> right) - } - } - - property("WindowState gets interval for timestamp") { - forAll(longGen, longGen, longGen, longGen) { - (timestamp: TimeStamp, checkpointTime: TimeStamp, windowSize: Long, windowStep: Long) => - val windowManager = new Window(windowSize, windowStep) - val taskContext = MockUtil.mockTaskContext - val group = mock[Group[AnyRef]] - val serializer = mock[Serializer[TreeMap[Interval, AnyRef]]] - - val zero = mock[AnyRef] - when(group.zero).thenReturn(zero, zero) - val state = new WindowState[AnyRef](group, serializer, taskContext, windowManager) - - val interval = state.getInterval(timestamp, checkpointTime) - intervalSpec(interval, timestamp, checkpointTime, windowSize, windowStep) - - val nextTimeStamp = interval.endTime - val nextInterval = state.getInterval(nextTimeStamp, checkpointTime) - intervalSpec(nextInterval, nextTimeStamp, checkpointTime, windowSize, windowStep) - - interval.endTime shouldBe nextInterval.startTime - } - - def intervalSpec(interval: Interval, timestamp: TimeStamp, - checkpointTime: TimeStamp, windowSize: Long, windowStep: Long): Unit = { - interval.startTime should be <= interval.endTime - timestamp / windowStep * windowStep should (be <= interval.startTime) - (timestamp - windowSize) / windowStep * windowStep should (be <= interval.startTime) - (timestamp / windowStep + 1) * windowStep should (be >= interval.endTime) - ((timestamp - windowSize) / windowStep + 1) * windowStep + windowSize should - (be >= interval.endTime) - checkpointTime should (be <= interval.startTime or be >= interval.endTime) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/storage/InMemoryAppStoreOnMasterSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/storage/InMemoryAppStoreOnMasterSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/storage/InMemoryAppStoreOnMasterSpec.scala deleted file mode 100644 index 60baa74..0000000 --- a/streaming/src/test/scala/io/gearpump/streaming/storage/InMemoryAppStoreOnMasterSpec.scala +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.streaming.storage - -import scala.concurrent.Await -import scala.concurrent.duration._ - -import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpec} - -import io.gearpump.cluster.{MasterHarness, MiniCluster} -import io.gearpump.streaming.StreamingTestUtil -import io.gearpump.util.Constants - -class InMemoryAppStoreOnMasterSpec extends WordSpec with Matchers with BeforeAndAfterAll { - implicit val timeout = Constants.FUTURE_TIMEOUT - implicit val dispatcher = MasterHarness.cachedPool - - "InMemoryAppStoreOnMaster" should { - "save and return the data properly" in { - val appId = 0 - val miniCluster = new MiniCluster - val master = miniCluster.mockMaster - StreamingTestUtil.startAppMaster(miniCluster, appId) - val store = new InMemoryAppStoreOnMaster(appId, master) - - Thread.sleep(500) - - store.put("String_type", "this is a string") - store.put("Int_type", 1024) - store.put("Tuple2_type", ("element1", 1024)) - - val future1 = store.get("String_type").map { value => - value.asInstanceOf[String] should be("this is a string") - } - val future2 = store.get("Int_type").map { value => value.asInstanceOf[Int] should be(1024) } - val future3 = store.get("Tuple2_type").map { value => - value.asInstanceOf[(String, Int)] should be(("element1", 1024)) - } - val future4 = store.get("key").map { value => value.asInstanceOf[Object] should be(null) } - Await.result(future1, 15.seconds) - Await.result(future2, 15.seconds) - Await.result(future3, 15.seconds) - Await.result(future4, 15.seconds) - miniCluster.shutDown - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/task/SubscriberSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/task/SubscriberSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/task/SubscriberSpec.scala deleted file mode 100644 index ffb343e..0000000 --- a/streaming/src/test/scala/io/gearpump/streaming/task/SubscriberSpec.scala +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.streaming.task - -import org.scalatest.{FlatSpec, Matchers} - -import io.gearpump.partitioner.{HashPartitioner, Partitioner} -import io.gearpump.streaming.task.SubscriberSpec.TestTask -import io.gearpump.streaming.{DAG, ProcessorDescription} -import io.gearpump.util.Graph -import io.gearpump.util.Graph._ - -class SubscriberSpec extends FlatSpec with Matchers { - "Subscriber.of" should "return all subscriber for a processor" in { - - val sourceProcessorId = 0 - val task1 = ProcessorDescription(id = sourceProcessorId, taskClass = - classOf[TestTask].getName, parallelism = 1) - val task2 = ProcessorDescription(id = 1, taskClass = classOf[TestTask].getName, parallelism = 1) - val task3 = ProcessorDescription(id = 2, taskClass = classOf[TestTask].getName, parallelism = 1) - val partitioner = Partitioner[HashPartitioner] - val dag = DAG(Graph(task1 ~ partitioner ~> task2, task1 ~ partitioner ~> task3, - task2 ~ partitioner ~> task3)) - - val subscribers = Subscriber.of(sourceProcessorId, dag) - assert(subscribers.size == 2) - - assert(subscribers.toSet == - Set(Subscriber(1, partitioner, task2.parallelism, task2.life), Subscriber(2, partitioner, - task3.parallelism, task3.life))) - } -} - -object SubscriberSpec { - class TestTask -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/task/SubscriptionSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/task/SubscriptionSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/task/SubscriptionSpec.scala deleted file mode 100644 index 57f4b8c..0000000 --- a/streaming/src/test/scala/io/gearpump/streaming/task/SubscriptionSpec.scala +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.task - -import java.util.Random - -import org.mockito.Mockito._ -import org.scalatest.mock.MockitoSugar -import org.scalatest.{FlatSpec, Matchers} - -import io.gearpump.Message -import io.gearpump.cluster.UserConfig -import io.gearpump.partitioner.{HashPartitioner, Partitioner} -import io.gearpump.streaming.task.SubscriptionSpec.NextTask -import io.gearpump.streaming.{LifeTime, ProcessorDescription} - -class SubscriptionSpec extends FlatSpec with Matchers with MockitoSugar { - val appId = 0 - val executorId = 0 - val taskId = TaskId(0, 0) - val session = new Random().nextInt() - - val downstreamProcessorId = 1 - val partitioner = Partitioner[HashPartitioner] - - val parallism = 2 - val downstreamProcessor = ProcessorDescription(downstreamProcessorId, classOf[NextTask].getName, - parallism) - val subscriber = Subscriber(downstreamProcessorId, partitioner, downstreamProcessor.parallelism, - downstreamProcessor.life) - - private def prepare: (Subscription, ExpressTransport) = { - val transport = mock[ExpressTransport] - val subscription = new Subscription(appId, executorId, taskId, subscriber, session, transport) - subscription.start() - - val expectedAckRequest = InitialAckRequest(taskId, session) - verify(transport, times(1)).transport(expectedAckRequest, TaskId(1, 0), TaskId(1, 1)) - - (subscription, transport) - } - - it should "not send any more message when its life ends" in { - val (subscription, transport) = prepare - subscription.changeLife(LifeTime(0, 0)) - val count = subscription.sendMessage(Message("some")) - assert(count == 0) - } - - it should "send message and handle ack correctly" in { - val (subscription, transport) = prepare - val msg1 = new Message("1", timestamp = 70) - subscription.sendMessage(msg1) - - verify(transport, times(1)).transport(msg1, TaskId(1, 1)) - assert(subscription.minClock == 70) - - val msg2 = new Message("0", timestamp = 50) - subscription.sendMessage(msg2) - verify(transport, times(1)).transport(msg2, TaskId(1, 0)) - - // minClock has been set to smaller one - assert(subscription.minClock == 50) - - val initialMinClock = subscription.minClock - - // Acks initial AckRequest(0) - subscription.receiveAck(Ack(TaskId(1, 1), 0, 0, session)) - subscription.receiveAck(Ack(TaskId(1, 0), 0, 0, session)) - - // Sends 100 messages - 100 until 200 foreach { clock => - subscription.sendMessage(Message("1", clock)) - subscription.sendMessage(Message("2", clock)) - } - - // Ack not received, minClock no change - assert(subscription.minClock == initialMinClock) - - subscription.receiveAck(Ack(TaskId(1, 1), 100, 100, session)) - subscription.receiveAck(Ack(TaskId(1, 0), 100, 100, session)) - - // Ack received, minClock changed - assert(subscription.minClock > initialMinClock) - - // Expects to receive two ackRequest for two downstream tasks - val ackRequestForTask0 = AckRequest(taskId, 200, session) - verify(transport, times(1)).transport(ackRequestForTask0, TaskId(1, 0)) - - val ackRequestForTask1 = AckRequest(taskId, 200, session) - verify(transport, times(1)).transport(ackRequestForTask1, TaskId(1, 1)) - } - - it should "disallow more message sending if there is no ack back" in { - val (subscription, transport) = prepare - // send 100 messages - 0 until (Subscription.MAX_PENDING_MESSAGE_COUNT * 2 + 1) foreach { clock => - subscription.sendMessage(Message(randomMessage, clock)) - } - - assert(subscription.allowSendingMoreMessages() == false) - } - - it should "report minClock as Long.MaxValue when there is no pending message" in { - val (subscription, transport) = prepare - val msg1 = new Message("1", timestamp = 70) - subscription.sendMessage(msg1) - assert(subscription.minClock == 70) - subscription.receiveAck(Ack(TaskId(1, 1), 1, 1, session)) - assert(subscription.minClock == Long.MaxValue) - } - - private def randomMessage: String = new Random().nextInt.toString -} - -object SubscriptionSpec { - - class NextTask(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { - - override def onStart(startTime: StartTime): Unit = { - } - - override def onNext(msg: Message): Unit = { - } - } -} \ No newline at end of file
