Repository: incubator-gearpump Updated Branches: refs/heads/master c1801595d -> 38fe7ec00
[GEARPUMP-341] Update processing watermark in DataSinkTask Author: manuzhang <[email protected]> Closes #214 from manuzhang/fix_datasinktask. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/38fe7ec0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/38fe7ec0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/38fe7ec0 Branch: refs/heads/master Commit: 38fe7ec0040299c5669a5e1c654bd5c61eb5c5bb Parents: c180159 Author: manuzhang <[email protected]> Authored: Thu Aug 17 06:34:10 2017 +0800 Committer: manuzhang <[email protected]> Committed: Thu Aug 17 06:34:19 2017 +0800 ---------------------------------------------------------------------- .travis.yml | 2 +- .../streaming/examples/wordcount/dsl/WindowedWordCount.scala | 6 +++--- .../org/apache/gearpump/streaming/sink/DataSinkTask.scala | 4 ++++ 3 files changed, 8 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/38fe7ec0/.travis.yml ---------------------------------------------------------------------- diff --git a/.travis.yml b/.travis.yml index 95c1427..8148c32 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,6 +1,6 @@ language: - scala -sudo: false +sudo: required jdk: - oraclejdk8 addons: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/38fe7ec0/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala index 2aa1bb4..379c7b6 100644 --- a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala +++ b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala @@ -24,7 +24,7 @@ import org.apache.gearpump.cluster.client.ClientContext import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption} import org.apache.gearpump.streaming.dsl.scalaapi.{LoggerSink, StreamApp} import org.apache.gearpump.streaming.dsl.window.api.{EventTimeTrigger, FixedWindows} -import org.apache.gearpump.streaming.source.DataSource +import org.apache.gearpump.streaming.source.{DataSource, Watermark} import org.apache.gearpump.streaming.task.TaskContext import org.apache.gearpump.util.AkkaApp @@ -45,7 +45,7 @@ object WindowedWordCount extends AkkaApp with ArgumentsParser { groupBy(_._1). sum.sink(new LoggerSink) - context.submit(app) + context.submit(app).waitUntilFinish() context.close() } @@ -79,7 +79,7 @@ object WindowedWordCount extends AkkaApp with ArgumentsParser { override def getWatermark: Instant = { if (data.isEmpty) { - watermark = watermark.plusMillis(1) + watermark = Watermark.MAX } watermark } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/38fe7ec0/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkTask.scala index 0db44f2..932c750 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkTask.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkTask.scala @@ -52,4 +52,8 @@ class DataSinkTask private[sink](context: TaskContext, conf: UserConfig, sink: D LOG.info("closing data sink...") sink.close() } + + override def onWatermarkProgress(watermark: Instant): Unit = { + context.updateWatermark(watermark) + } }
