Repository: incubator-gearpump Updated Branches: refs/heads/master e1c2a9275 -> 23f365c3f
[GEARPUMP-103] Support finite stream Author: manuzhang <[email protected]> Closes #143 from manuzhang/finite_stream. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/23f365c3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/23f365c3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/23f365c3 Branch: refs/heads/master Commit: 23f365c3f248da06f1b81587741c1e94b930844c Parents: e1c2a92 Author: manuzhang <[email protected]> Authored: Fri Feb 10 10:46:39 2017 +0800 Committer: manuzhang <[email protected]> Committed: Fri Feb 10 10:46:47 2017 +0800 ---------------------------------------------------------------------- .../cluster/client/RunningApplication.scala | 2 +- .../examples/wordcount/dsl/WordCount.scala | 3 ++- .../gearpump/streaming/appmaster/AppMaster.scala | 5 ++++- .../streaming/appmaster/ClockService.scala | 18 ++++++++++++++---- .../streaming/dsl/scalaapi/StreamApp.scala | 10 ++++++++-- .../gearpump/streaming/source/DataSource.scala | 5 ++--- .../gearpump/streaming/source/Watermark.scala | 6 ++++++ .../gearpump/streaming/task/TaskActor.scala | 1 + .../streaming/task/TaskControlMessage.scala | 2 ++ .../streaming/appmaster/ClockServiceSpec.scala | 7 ++++--- 10 files changed, 44 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23f365c3/core/src/main/scala/org/apache/gearpump/cluster/client/RunningApplication.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/client/RunningApplication.scala b/core/src/main/scala/org/apache/gearpump/cluster/client/RunningApplication.scala index 1c6c959..d62356a 100644 --- a/core/src/main/scala/org/apache/gearpump/cluster/client/RunningApplication.scala +++ b/core/src/main/scala/org/apache/gearpump/cluster/client/RunningApplication.scala @@ -44,7 +44,7 @@ class RunningApplication(val appId: Int, master: ActorRef, timeout: Timeout) { } /** - * This funtion will block until the application finished or failed. + * This function will block until the application finished or failed. * If failed, an exception will be thrown out */ def waitUntilFinish(): Unit = { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23f365c3/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala index 1cbfb22..bcc68cf 100644 --- a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala +++ b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala @@ -39,7 +39,8 @@ object WordCount extends AkkaApp with ArgumentsParser { // (word, count1), (word, count2) => (word, count1 + count2) groupByKey().sum.log - val appId = context.submit(app) + val application = context.submit(app) + application.waitUntilFinish() context.close() } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23f365c3/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala index 1266337..5ace1b2 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala @@ -128,7 +128,7 @@ class AppMaster(appContext: AppMasterContext, app: AppDescription) extends Appli ActorPathUtil.executorManagerActorName) for (dag <- getDAG) { - clockService = Some(context.actorOf(Props(new ClockService(dag, store)))) + clockService = Some(context.actorOf(Props(new ClockService(dag, self, store)))) val jarScheduler = new JarScheduler(appId, app.name, systemConfig, context) taskManager = Some(context.actorOf(Props(new TaskManager(appContext.appId, dagManager, @@ -296,6 +296,9 @@ class AppMaster(appContext: AppMasterContext, app: AppDescription) extends Appli System.currentTimeMillis(), null) case AppMasterActivated(id) => LOG.info(s"AppMaster for app$id is activated") + case EndingClock => + masterProxy ! ApplicationStatusChanged(appId, ApplicationStatus.SUCCEEDED, + System.currentTimeMillis(), null) } /** Error handling */ http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23f365c3/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala index 2085953..0a2999d 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala @@ -18,11 +18,12 @@ package org.apache.gearpump.streaming.appmaster +import java.time.Instant import java.util import java.util.Date import java.util.concurrent.TimeUnit -import akka.actor.{Actor, Cancellable, Stash} +import akka.actor.{Actor, ActorRef, Cancellable, Stash} import com.google.common.primitives.Longs import org.apache.gearpump.TimeStamp import org.apache.gearpump.cluster.ClientToMaster.GetStallingTasks @@ -30,6 +31,7 @@ import org.apache.gearpump.streaming.AppMasterToMaster.StallingTasks import org.apache.gearpump.streaming._ import org.apache.gearpump.streaming.appmaster.ClockService.HealthChecker.ClockValue import org.apache.gearpump.streaming.appmaster.ClockService._ +import org.apache.gearpump.streaming.source.Watermark import org.apache.gearpump.streaming.storage.AppDataStore import org.apache.gearpump.streaming.task._ import org.apache.gearpump.util.LogUtil @@ -42,7 +44,10 @@ import scala.language.implicitConversions /** * Maintains a global view of message timestamp in the application */ -class ClockService(private var dag: DAG, store: AppDataStore) extends Actor with Stash { +class ClockService( + private var dag: DAG, + appMaster: ActorRef, + store: AppDataStore) extends Actor with Stash { private val LOG: Logger = LogUtil.getLogger(getClass) import context.dispatcher @@ -210,14 +215,19 @@ class ClockService(private var dag: DAG, store: AppDataStore) extends Actor with case GetUpstreamMinClock(task) => getUpStreamMinClock(task.processorId).foreach(sender ! UpstreamMinClock(_)) - case update@UpdateClock(task, clock) => + case UpdateClock(task, clock) => val processorClock = clocks.get(task.processorId) if (processorClock.isDefined) { processorClock.get.updateMinClock(task.index, clock) } else { LOG.error(s"Cannot updateClock for task $task") } - getUpStreamMinClock(task.processorId).foreach(sender ! UpstreamMinClock(_)) + if (Instant.ofEpochMilli(minClock).equals(Watermark.MAX)) { + healthCheckScheduler.cancel() + appMaster ! EndingClock + } else { + getUpStreamMinClock(task.processorId).foreach(sender ! UpstreamMinClock(_)) + } case GetLatestMinClock => sender ! LatestMinClock(minClock) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23f365c3/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala index 52972b7..6378a18 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala @@ -26,7 +26,7 @@ import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.cluster.client.ClientContext import org.apache.gearpump.streaming.StreamApplication import org.apache.gearpump.streaming.dsl.plan._ -import org.apache.gearpump.streaming.source.DataSource +import org.apache.gearpump.streaming.source.{DataSource, Watermark} import org.apache.gearpump.streaming.task.TaskContext import org.apache.gearpump.util.Graph @@ -105,5 +105,11 @@ class CollectionDataSource[T](seq: Seq[T]) extends DataSource { override def close(): Unit = {} - override def getWatermark: Instant = Instant.now() + override def getWatermark: Instant = { + if (iterator.hasNext) { + Instant.now() + } else { + Watermark.MAX + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23f365c3/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSource.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSource.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSource.scala index f4c87da..cea5491 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSource.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSource.scala @@ -70,9 +70,8 @@ trait DataSource extends java.io.Serializable { def close(): Unit /** - * Returns a watermark - * no timestamp earlier than the watermark - * should enter the system + * Returns a watermark such that no timestamp earlier than the watermark should enter the system + * Watermark.MAX mark the end of source data */ def getWatermark: Instant } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23f365c3/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala index 912bb12..1f8d3a1 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala @@ -27,3 +27,9 @@ import org.apache.gearpump.Message case class Watermark(instant: Instant) { def toMessage: Message = Message("watermark", instant) } + +object Watermark { + + // maximum time won't overflow when converted to milli-seconds + val MAX = Instant.ofEpochMilli(Long.MaxValue) +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23f365c3/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala index 92f6672..c814fa5 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala @@ -247,6 +247,7 @@ class TaskActor( updateUpstreamMinClock(instant.toEpochMilli) minClockReported = false } + receiveMessage(watermark.toMessage, sender) case upstream@UpstreamMinClock(upstreamClock) => http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23f365c3/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala index a915e7f..73cd5af 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala @@ -62,6 +62,8 @@ case object GetStartClock case class StartClock(clock: TimeStamp) +case object EndingClock + /** Probe the latency between two upstream to downstream tasks. */ case class LatencyProbe(timestamp: Long) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23f365c3/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala index 46175a4..4b824e0 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala @@ -41,6 +41,7 @@ class ClockServiceSpec(_system: ActorSystem) extends TestKit(_system) with Impli val task1 = ProcessorDescription(id = 0, taskClass = classOf[TaskActor].getName, parallelism = 1) val task2 = ProcessorDescription(id = 1, taskClass = classOf[TaskActor].getName, parallelism = 1) val dag = DAG(Graph(task1 ~ hash ~> task2)) + private val appMaster = TestProbe().ref override def afterAll(): Unit = { TestKit.shutdownActorSystem(system) @@ -51,7 +52,7 @@ class ClockServiceSpec(_system: ActorSystem) extends TestKit(_system) with Impli val store = new Store() val startClock = 100L store.put(ClockService.START_CLOCK, startClock) - val clockService = system.actorOf(Props(new ClockService(dag, store))) + val clockService = system.actorOf(Props(new ClockService(dag, appMaster, store))) clockService ! GetLatestMinClock expectMsg(LatestMinClock(startClock)) @@ -77,7 +78,7 @@ class ClockServiceSpec(_system: ActorSystem) extends TestKit(_system) with Impli val store = new Store() val startClock = 100L store.put(ClockService.START_CLOCK, startClock) - val clockService = system.actorOf(Props(new ClockService(dag, store))) + val clockService = system.actorOf(Props(new ClockService(dag, appMaster, store))) val task = TestProbe() clockService.tell(UpdateClock(TaskId(0, 0), 200), task.ref) @@ -116,7 +117,7 @@ class ClockServiceSpec(_system: ActorSystem) extends TestKit(_system) with Impli val store = new Store() val startClock = 100L store.put(ClockService.START_CLOCK, startClock) - val clockService = system.actorOf(Props(new ClockService(dag, store))) + val clockService = system.actorOf(Props(new ClockService(dag, appMaster, store))) clockService ! UpdateClock(TaskId(0, 0), 200L) clockService ! UpdateClock(TaskId(1, 0), 200L) expectMsgType[UpstreamMinClock]
