Repository: incubator-gearpump
Updated Branches:
  refs/heads/master e1c2a9275 -> 23f365c3f


[GEARPUMP-103] Support finite stream

Author: manuzhang <[email protected]>

Closes #143 from manuzhang/finite_stream.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/23f365c3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/23f365c3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/23f365c3

Branch: refs/heads/master
Commit: 23f365c3f248da06f1b81587741c1e94b930844c
Parents: e1c2a92
Author: manuzhang <[email protected]>
Authored: Fri Feb 10 10:46:39 2017 +0800
Committer: manuzhang <[email protected]>
Committed: Fri Feb 10 10:46:47 2017 +0800

----------------------------------------------------------------------
 .../cluster/client/RunningApplication.scala       |  2 +-
 .../examples/wordcount/dsl/WordCount.scala        |  3 ++-
 .../gearpump/streaming/appmaster/AppMaster.scala  |  5 ++++-
 .../streaming/appmaster/ClockService.scala        | 18 ++++++++++++++----
 .../streaming/dsl/scalaapi/StreamApp.scala        | 10 ++++++++--
 .../gearpump/streaming/source/DataSource.scala    |  5 ++---
 .../gearpump/streaming/source/Watermark.scala     |  6 ++++++
 .../gearpump/streaming/task/TaskActor.scala       |  1 +
 .../streaming/task/TaskControlMessage.scala       |  2 ++
 .../streaming/appmaster/ClockServiceSpec.scala    |  7 ++++---
 10 files changed, 44 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23f365c3/core/src/main/scala/org/apache/gearpump/cluster/client/RunningApplication.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/gearpump/cluster/client/RunningApplication.scala
 
b/core/src/main/scala/org/apache/gearpump/cluster/client/RunningApplication.scala
index 1c6c959..d62356a 100644
--- 
a/core/src/main/scala/org/apache/gearpump/cluster/client/RunningApplication.scala
+++ 
b/core/src/main/scala/org/apache/gearpump/cluster/client/RunningApplication.scala
@@ -44,7 +44,7 @@ class RunningApplication(val appId: Int, master: ActorRef, 
timeout: Timeout) {
   }
 
   /**
-   * This funtion will block until the application finished or failed.
+   * This function will block until the application finished or failed.
    * If failed, an exception will be thrown out
    */
   def waitUntilFinish(): Unit = {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23f365c3/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala
 
b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala
index 1cbfb22..bcc68cf 100644
--- 
a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala
+++ 
b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala
@@ -39,7 +39,8 @@ object WordCount extends AkkaApp with ArgumentsParser {
       // (word, count1), (word, count2) => (word, count1 + count2)
       groupByKey().sum.log
 
-    val appId = context.submit(app)
+    val application = context.submit(app)
+    application.waitUntilFinish()
     context.close()
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23f365c3/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala
index 1266337..5ace1b2 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala
@@ -128,7 +128,7 @@ class AppMaster(appContext: AppMasterContext, app: 
AppDescription) extends Appli
       ActorPathUtil.executorManagerActorName)
 
   for (dag <- getDAG) {
-    clockService = Some(context.actorOf(Props(new ClockService(dag, store))))
+    clockService = Some(context.actorOf(Props(new ClockService(dag, self, 
store))))
     val jarScheduler = new JarScheduler(appId, app.name, systemConfig, context)
 
     taskManager = Some(context.actorOf(Props(new TaskManager(appContext.appId, 
dagManager,
@@ -296,6 +296,9 @@ class AppMaster(appContext: AppMasterContext, app: 
AppDescription) extends Appli
         System.currentTimeMillis(), null)
     case AppMasterActivated(id) =>
       LOG.info(s"AppMaster for app$id is activated")
+    case EndingClock =>
+      masterProxy ! ApplicationStatusChanged(appId, 
ApplicationStatus.SUCCEEDED,
+        System.currentTimeMillis(), null)
   }
 
   /** Error handling */

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23f365c3/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala
index 2085953..0a2999d 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala
@@ -18,11 +18,12 @@
 
 package org.apache.gearpump.streaming.appmaster
 
+import java.time.Instant
 import java.util
 import java.util.Date
 import java.util.concurrent.TimeUnit
 
-import akka.actor.{Actor, Cancellable, Stash}
+import akka.actor.{Actor, ActorRef, Cancellable, Stash}
 import com.google.common.primitives.Longs
 import org.apache.gearpump.TimeStamp
 import org.apache.gearpump.cluster.ClientToMaster.GetStallingTasks
@@ -30,6 +31,7 @@ import 
org.apache.gearpump.streaming.AppMasterToMaster.StallingTasks
 import org.apache.gearpump.streaming._
 import 
org.apache.gearpump.streaming.appmaster.ClockService.HealthChecker.ClockValue
 import org.apache.gearpump.streaming.appmaster.ClockService._
+import org.apache.gearpump.streaming.source.Watermark
 import org.apache.gearpump.streaming.storage.AppDataStore
 import org.apache.gearpump.streaming.task._
 import org.apache.gearpump.util.LogUtil
@@ -42,7 +44,10 @@ import scala.language.implicitConversions
 /**
  * Maintains a global view of message timestamp in the application
  */
-class ClockService(private var dag: DAG, store: AppDataStore) extends Actor 
with Stash {
+class ClockService(
+    private var dag: DAG,
+    appMaster: ActorRef,
+    store: AppDataStore) extends Actor with Stash {
   private val LOG: Logger = LogUtil.getLogger(getClass)
 
   import context.dispatcher
@@ -210,14 +215,19 @@ class ClockService(private var dag: DAG, store: 
AppDataStore) extends Actor with
     case GetUpstreamMinClock(task) =>
       getUpStreamMinClock(task.processorId).foreach(sender ! 
UpstreamMinClock(_))
 
-    case update@UpdateClock(task, clock) =>
+    case UpdateClock(task, clock) =>
       val processorClock = clocks.get(task.processorId)
       if (processorClock.isDefined) {
         processorClock.get.updateMinClock(task.index, clock)
       } else {
         LOG.error(s"Cannot updateClock for task $task")
       }
-      getUpStreamMinClock(task.processorId).foreach(sender ! 
UpstreamMinClock(_))
+      if (Instant.ofEpochMilli(minClock).equals(Watermark.MAX)) {
+        healthCheckScheduler.cancel()
+        appMaster ! EndingClock
+      } else {
+        getUpStreamMinClock(task.processorId).foreach(sender ! 
UpstreamMinClock(_))
+      }
 
     case GetLatestMinClock =>
       sender ! LatestMinClock(minClock)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23f365c3/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala
index 52972b7..6378a18 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala
@@ -26,7 +26,7 @@ import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.cluster.client.ClientContext
 import org.apache.gearpump.streaming.StreamApplication
 import org.apache.gearpump.streaming.dsl.plan._
-import org.apache.gearpump.streaming.source.DataSource
+import org.apache.gearpump.streaming.source.{DataSource, Watermark}
 import org.apache.gearpump.streaming.task.TaskContext
 import org.apache.gearpump.util.Graph
 
@@ -105,5 +105,11 @@ class CollectionDataSource[T](seq: Seq[T]) extends 
DataSource {
 
   override def close(): Unit = {}
 
-  override def getWatermark: Instant = Instant.now()
+  override def getWatermark: Instant = {
+    if (iterator.hasNext) {
+      Instant.now()
+    } else {
+      Watermark.MAX
+    }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23f365c3/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSource.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSource.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSource.scala
index f4c87da..cea5491 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSource.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSource.scala
@@ -70,9 +70,8 @@ trait DataSource extends java.io.Serializable {
   def close(): Unit
 
   /**
-   * Returns a watermark
-   * no timestamp earlier than the watermark
-   * should enter the system
+   * Returns a watermark such that no timestamp earlier than the watermark 
should enter the system
+   * Watermark.MAX mark the end of source data
    */
   def getWatermark: Instant
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23f365c3/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala 
b/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala
index 912bb12..1f8d3a1 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala
@@ -27,3 +27,9 @@ import org.apache.gearpump.Message
 case class Watermark(instant: Instant) {
   def toMessage: Message = Message("watermark", instant)
 }
+
+object Watermark {
+
+  // maximum time won't overflow when converted to milli-seconds
+  val MAX = Instant.ofEpochMilli(Long.MaxValue)
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23f365c3/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
index 92f6672..c814fa5 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
@@ -247,6 +247,7 @@ class TaskActor(
         updateUpstreamMinClock(instant.toEpochMilli)
         minClockReported = false
       }
+
       receiveMessage(watermark.toMessage, sender)
 
     case upstream@UpstreamMinClock(upstreamClock) =>

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23f365c3/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala
index a915e7f..73cd5af 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala
@@ -62,6 +62,8 @@ case object GetStartClock
 
 case class StartClock(clock: TimeStamp)
 
+case object EndingClock
+
 /** Probe the latency between two upstream to downstream tasks. */
 case class LatencyProbe(timestamp: Long)
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23f365c3/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala
index 46175a4..4b824e0 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala
@@ -41,6 +41,7 @@ class ClockServiceSpec(_system: ActorSystem) extends 
TestKit(_system) with Impli
   val task1 = ProcessorDescription(id = 0, taskClass = 
classOf[TaskActor].getName, parallelism = 1)
   val task2 = ProcessorDescription(id = 1, taskClass = 
classOf[TaskActor].getName, parallelism = 1)
   val dag = DAG(Graph(task1 ~ hash ~> task2))
+  private val appMaster = TestProbe().ref
 
   override def afterAll(): Unit = {
     TestKit.shutdownActorSystem(system)
@@ -51,7 +52,7 @@ class ClockServiceSpec(_system: ActorSystem) extends 
TestKit(_system) with Impli
       val store = new Store()
       val startClock = 100L
       store.put(ClockService.START_CLOCK, startClock)
-      val clockService = system.actorOf(Props(new ClockService(dag, store)))
+      val clockService = system.actorOf(Props(new ClockService(dag, appMaster, 
store)))
       clockService ! GetLatestMinClock
       expectMsg(LatestMinClock(startClock))
 
@@ -77,7 +78,7 @@ class ClockServiceSpec(_system: ActorSystem) extends 
TestKit(_system) with Impli
       val store = new Store()
       val startClock = 100L
       store.put(ClockService.START_CLOCK, startClock)
-      val clockService = system.actorOf(Props(new ClockService(dag, store)))
+      val clockService = system.actorOf(Props(new ClockService(dag, appMaster, 
store)))
       val task = TestProbe()
       clockService.tell(UpdateClock(TaskId(0, 0), 200), task.ref)
 
@@ -116,7 +117,7 @@ class ClockServiceSpec(_system: ActorSystem) extends 
TestKit(_system) with Impli
       val store = new Store()
       val startClock = 100L
       store.put(ClockService.START_CLOCK, startClock)
-      val clockService = system.actorOf(Props(new ClockService(dag, store)))
+      val clockService = system.actorOf(Props(new ClockService(dag, appMaster, 
store)))
       clockService ! UpdateClock(TaskId(0, 0), 200L)
       clockService ! UpdateClock(TaskId(1, 0), 200L)
       expectMsgType[UpstreamMinClock]

Reply via email to