Repository: incubator-gearpump
Updated Branches:
  refs/heads/master c1801595d -> 38fe7ec00


[GEARPUMP-341] Update processing watermark in DataSinkTask

Author: manuzhang <[email protected]>

Closes #214 from manuzhang/fix_datasinktask.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/38fe7ec0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/38fe7ec0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/38fe7ec0

Branch: refs/heads/master
Commit: 38fe7ec0040299c5669a5e1c654bd5c61eb5c5bb
Parents: c180159
Author: manuzhang <[email protected]>
Authored: Thu Aug 17 06:34:10 2017 +0800
Committer: manuzhang <[email protected]>
Committed: Thu Aug 17 06:34:19 2017 +0800

----------------------------------------------------------------------
 .travis.yml                                                    | 2 +-
 .../streaming/examples/wordcount/dsl/WindowedWordCount.scala   | 6 +++---
 .../org/apache/gearpump/streaming/sink/DataSinkTask.scala      | 4 ++++
 3 files changed, 8 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/38fe7ec0/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 95c1427..8148c32 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -1,6 +1,6 @@
 language:
 - scala
-sudo: false
+sudo: required
 jdk:
 - oraclejdk8
 addons:

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/38fe7ec0/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala
 
b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala
index 2aa1bb4..379c7b6 100644
--- 
a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala
+++ 
b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala
@@ -24,7 +24,7 @@ import org.apache.gearpump.cluster.client.ClientContext
 import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
 import org.apache.gearpump.streaming.dsl.scalaapi.{LoggerSink, StreamApp}
 import org.apache.gearpump.streaming.dsl.window.api.{EventTimeTrigger, 
FixedWindows}
-import org.apache.gearpump.streaming.source.DataSource
+import org.apache.gearpump.streaming.source.{DataSource, Watermark}
 import org.apache.gearpump.streaming.task.TaskContext
 import org.apache.gearpump.util.AkkaApp
 
@@ -45,7 +45,7 @@ object WindowedWordCount extends AkkaApp with ArgumentsParser 
{
       groupBy(_._1).
       sum.sink(new LoggerSink)
 
-    context.submit(app)
+    context.submit(app).waitUntilFinish()
     context.close()
   }
 
@@ -79,7 +79,7 @@ object WindowedWordCount extends AkkaApp with ArgumentsParser 
{
 
     override def getWatermark: Instant = {
       if (data.isEmpty) {
-        watermark = watermark.plusMillis(1)
+        watermark = Watermark.MAX
       }
       watermark
     }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/38fe7ec0/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkTask.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkTask.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkTask.scala
index 0db44f2..932c750 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkTask.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkTask.scala
@@ -52,4 +52,8 @@ class DataSinkTask private[sink](context: TaskContext, conf: 
UserConfig, sink: D
     LOG.info("closing data sink...")
     sink.close()
   }
+
+  override def onWatermarkProgress(watermark: Instant): Unit = {
+    context.updateWatermark(watermark)
+  }
 }

Reply via email to