Repository: incubator-gearpump Updated Branches: refs/heads/master 000e846ab -> 176d82763
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala index c6817f5..79bcc2a 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala @@ -19,14 +19,13 @@ package org.apache.gearpump.streaming.task import org.slf4j.Logger - import com.google.common.primitives.Shorts import org.apache.gearpump.streaming.partitioner.{MulticastPartitioner, Partitioner, UnicastPartitioner} import org.apache.gearpump.streaming.AppMasterToExecutor.MsgLostException import org.apache.gearpump.streaming.LifeTime import org.apache.gearpump.streaming.task.Subscription._ import org.apache.gearpump.util.LogUtil -import org.apache.gearpump.{MAX_TIME_MILLIS, Message, MIN_TIME_MILLIS, TimeStamp} +import org.apache.gearpump.{MAX_TIME_MILLIS, MIN_TIME_MILLIS, Message, TimeStamp} /** * Manges the output and message clock for single downstream processor @@ -103,14 +102,16 @@ class Subscription( var count = 0 // Only sends message whose timestamp matches the lifeTime - if (partition != Partitioner.UNKNOWN_PARTITION_ID && life.contains(msg.timeInMillis)) { + if (partition != Partitioner.UNKNOWN_PARTITION_ID && life.contains( + msg.timestamp.toEpochMilli)) { val targetTask = TaskId(processorId, partition) transport.transport(msg, targetTask) - this.minClockValue(partition) = Math.min(this.minClockValue(partition), msg.timeInMillis) + this.minClockValue(partition) = Math.min(this.minClockValue(partition), + msg.timestamp.toEpochMilli) this.candidateMinClock(partition) = - Math.min(this.candidateMinClock(partition), msg.timeInMillis) + Math.min(this.candidateMinClock(partition), msg.timestamp.toEpochMilli) incrementMessageCount(partition, 1) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala index 5b174bd..90a8bff 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala @@ -24,7 +24,6 @@ import scala.concurrent.duration.FiniteDuration import akka.actor.Actor.Receive import akka.actor.{ActorRef, ActorSystem, Cancellable, Props} import org.slf4j.Logger - import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.util.LogUtil import org.apache.gearpump.{Message, TimeStamp} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala index 10648b4..8ef45f3 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala @@ -35,7 +35,7 @@ import org.apache.gearpump.streaming.ExecutorToAppMaster._ import org.apache.gearpump.streaming.ProcessorId import org.apache.gearpump.streaming.task.TaskActor._ import org.apache.gearpump.util.{LogUtil, TimeOutScheduler} -import org.apache.gearpump.{MAX_TIME_MILLIS, Message, MIN_TIME_MILLIS, TimeStamp} +import org.apache.gearpump.{MAX_TIME_MILLIS, MIN_TIME_MILLIS, Message, TimeStamp} import scala.collection.JavaConverters._ import scala.concurrent.duration._ http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala index 1b3f30c..f5f099c 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala @@ -26,7 +26,7 @@ import akka.actor.{ActorRef, ActorSystem, Cancellable, Props} import org.slf4j.Logger import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.util.LogUtil -import org.apache.gearpump.{TimeStamp, Message} +import org.apache.gearpump.{Message, TimeStamp} /** * This provides TaskContext for user defined tasks http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala index bcf96e4..c223a53 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala @@ -41,7 +41,7 @@ import org.apache.gearpump.streaming.{DAG, LifeTime, ProcessorDescription, Proce import org.apache.gearpump.transport.HostPort import org.apache.gearpump.util.Graph import org.apache.gearpump.util.Graph._ -import org.apache.gearpump.{Message, TimeStamp} +import org.apache.gearpump.TimeStamp import org.mockito.Mockito._ import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala index a9b23fe..f5d7c20 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala @@ -268,7 +268,7 @@ class FunctionRunnerSpec extends WordSpec with Matchers with MockitoSugar { source.onWatermarkProgress(Watermark.MAX) data.foreach { s => verify(taskContext, times(1)).output(MockUtil.argMatch[Message]( - message => message.msg == s)) + message => message.value == s)) } // Source with transformer @@ -282,7 +282,7 @@ class FunctionRunnerSpec extends WordSpec with Matchers with MockitoSugar { another.onWatermarkProgress(Watermark.MAX) data.foreach { s => verify(anotherTaskContext, times(2)).output(MockUtil.argMatch[Message]( - message => message.msg == s)) + message => message.value == s)) } } } @@ -317,7 +317,7 @@ class FunctionRunnerSpec extends WordSpec with Matchers with MockitoSugar { import scala.collection.JavaConverters._ val values = peopleCaptor.getAllValues.asScala.map(input => - input.msg.asInstanceOf[Option[String]].get) + input.value.asInstanceOf[Option[String]].get) assert(values.mkString(",") == "1,2,22,3,33,333") system.terminate() Await.result(system.whenTerminated, Duration.Inf) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala index 62a3bcb..fb398b8 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala @@ -115,10 +115,10 @@ object StreamSpec { var query: String = _ override def onNext(msg: Message): Unit = { - msg.msg match { + msg.value match { case Left(wordCount: (String @unchecked, Int @unchecked)) => if (query != null && wordCount._1 == query) { - taskContext.output(new Message(wordCount)) + taskContext.output(Message(wordCount)) } case Right(query: String) => http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala index f0bccd7..281d69a 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala @@ -64,12 +64,12 @@ class TransformTaskSpec extends PropSpec with PropertyChecks with Matchers with msgs.foreach { msg => runner.foreach(r => - when(r.process(msg.msg)).thenReturn(Some(msg.msg))) + when(r.process(msg.value)).thenReturn(Some(msg.value))) } task.onWatermarkProgress(Watermark.MAX) msgs.foreach { msg => - verify(taskContext).output(MockitoMatchers.eq(Message(msg.msg, Watermark.MAX))) + verify(taskContext).output(MockitoMatchers.eq(Message(msg.value, Watermark.MAX))) } } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala index 9f02cef..fb0beaa 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala @@ -66,13 +66,13 @@ class SubscriptionSpec extends FlatSpec with Matchers with MockitoSugar { it should "send message and handle ack correctly" in { val (subscription, transport) = prepare - val msg1 = new Message("1", timestamp = Instant.ofEpochMilli(70)) + val msg1 = Message("1", timestamp = Instant.ofEpochMilli(70)) subscription.sendMessage(msg1) verify(transport, times(1)).transport(msg1, TaskId(1, 1)) assert(subscription.minClock == 70) - val msg2 = new Message("0", timestamp = Instant.ofEpochMilli(50)) + val msg2 = Message("0", timestamp = Instant.ofEpochMilli(50)) subscription.sendMessage(msg2) verify(transport, times(1)).transport(msg2, TaskId(1, 0)) @@ -120,7 +120,7 @@ class SubscriptionSpec extends FlatSpec with Matchers with MockitoSugar { it should "report minClock as Long.MaxValue when there is no pending message" in { val (subscription, _) = prepare - val msg1 = new Message("1", timestamp = Instant.ofEpochMilli(70)) + val msg1 = Message("1", timestamp = Instant.ofEpochMilli(70)) subscription.sendMessage(msg1) assert(subscription.minClock == 70) subscription.receiveAck(Ack(TaskId(1, 1), 1, 1, session))
