Repository: incubator-gearpump Updated Branches: refs/heads/master 23daf0cf9 -> 529799cc4
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/529799cc/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala index 647ad0a..e461ae8 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala @@ -159,9 +159,6 @@ class AppMasterSpec extends WordSpec with Matchers with BeforeAndAfterEach with // clock status: task(0,0) -> 1, task(0,1)->0, task(1,0)->0, task(1,1)->0 appMaster.tell(UpdateClock(TaskId(0, 0), 1), mockTask.ref) - // there is no further upstream, so the upstreamMinClock is Long.MaxValue - mockTask.expectMsg(UpstreamMinClock(Long.MaxValue)) - // check min clock appMaster.tell(GetLatestMinClock, mockTask.ref) mockTask.expectMsg(LatestMinClock(0)) @@ -169,9 +166,6 @@ class AppMasterSpec extends WordSpec with Matchers with BeforeAndAfterEach with // clock status: task(0,0) -> 1, task(0,1)->1, task(1, 0)->0, task(1,1)->0 appMaster.tell(UpdateClock(TaskId(0, 1), 1), mockTask.ref) - // there is no further upstream, so the upstreamMinClock is Long.MaxValue - mockTask.expectMsg(UpstreamMinClock(Long.MaxValue)) - // check min clock appMaster.tell(GetLatestMinClock, mockTask.ref) mockTask.expectMsg(LatestMinClock(0)) @@ -238,7 +232,7 @@ class AppMasterSpec extends WordSpec with Matchers with BeforeAndAfterEach with for (i <- 1 to 5) { val taskId = TaskId(0, 0) appMaster.tell(UpdateClock(taskId, i), mockTask.ref) - mockTask.expectMsg(UpstreamMinClock(Long.MaxValue)) + val cause = s"message loss $i from $taskId" appMaster.tell(MessageLoss(0, taskId, cause), mockTask.ref) // appmaster restarted @@ -300,9 +294,7 @@ object AppMasterSpec { } class TaskA(taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) { - override def onNext(msg: Message): Unit = {} } class TaskB(taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) { - override def onNext(msg: Message): Unit = {} } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/529799cc/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala index e742a2c..d42fe6f 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala @@ -58,9 +58,6 @@ class ClockServiceSpec(_system: ActorSystem) extends TestKit(_system) with Impli // task(0,0): clock(101); task(1,0): clock(100) clockService ! UpdateClock(TaskId(0, 0), 101) - // There is no upstream, so pick Long.MaxValue - expectMsg(UpstreamMinClock(Long.MaxValue)) - // Min clock is updated clockService ! GetLatestMinClock expectMsg(LatestMinClock(100)) @@ -83,7 +80,6 @@ class ClockServiceSpec(_system: ActorSystem) extends TestKit(_system) with Impli val clockService = system.actorOf(Props(new ClockService(dag, store))) val task = TestProbe() clockService.tell(UpdateClock(TaskId(0, 0), 200), task.ref) - task.expectMsgType[UpstreamMinClock] val task3 = ProcessorDescription(id = 3, taskClass = classOf[TaskActor].getName, parallelism = 1) @@ -122,7 +118,6 @@ class ClockServiceSpec(_system: ActorSystem) extends TestKit(_system) with Impli store.put(ClockService.START_CLOCK, startClock) val clockService = system.actorOf(Props(new ClockService(dag, store))) clockService ! UpdateClock(TaskId(0, 0), 200L) - expectMsgType[UpstreamMinClock] clockService ! UpdateClock(TaskId(1, 0), 200L) expectMsgType[UpstreamMinClock] http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/529799cc/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala index bb495a7..54ecde1 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala @@ -270,11 +270,9 @@ object TaskManagerSpec { class Task1(taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) { - override def onNext(msg: Message): Unit = {} } class Task2(taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) { - override def onNext(msg: Message): Unit = {} } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/529799cc/streaming/src/test/scala/org/apache/gearpump/streaming/source/DefaultTimeStampFilterSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DefaultTimeStampFilterSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DefaultTimeStampFilterSpec.scala deleted file mode 100644 index d6fa443..0000000 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DefaultTimeStampFilterSpec.scala +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.streaming.source - -import org.scalacheck.Gen -import org.scalatest.prop.PropertyChecks -import org.scalatest.{Matchers, PropSpec} - -import org.apache.gearpump.{Message, TimeStamp} - -class DefaultTimeStampFilterSpec extends PropSpec with PropertyChecks with Matchers { - property("DefaultTimeStampFilter should filter message against give timestamp") { - val timestampGen = Gen.chooseNum[Long](0L, 1000L) - val messageGen = for { - msg <- Gen.alphaStr - time <- timestampGen - } yield Message(msg, time) - - val filter = new DefaultTimeStampFilter() - - forAll(timestampGen, messageGen) { - (predicate: TimeStamp, message: Message) => - if (message.timestamp >= predicate) { - filter.filter(message, predicate) shouldBe Some(message) - } else { - filter.filter(message, predicate) shouldBe None - } - - filter.filter(null, predicate) shouldBe None - } - } -}
