Repository: incubator-gearpump Updated Branches: refs/heads/master 3c0ebb13f -> a743a9ca6
[GEARPUMP-192] refactor example sources task to use DataSourceAPI [GEARPUMP-192] Refactor example sources task to use DataSourceAPI. Author: Roshanson <[email protected]> Author: [email protected] <doyouta123> Closes #78 from Roshanson/fix-192. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/a743a9ca Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/a743a9ca Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/a743a9ca Branch: refs/heads/master Commit: a743a9ca6e9bd074c35097cd9602e606a351f46a Parents: 3c0ebb1 Author: Roshanson <[email protected]> Authored: Fri Sep 2 23:19:20 2016 +0800 Committer: manuzhang <[email protected]> Committed: Fri Sep 2 23:19:20 2016 +0800 ---------------------------------------------------------------------- .../streaming/examples/wordcount/Split.scala | 48 +++++++++++++------- .../examples/wordcount/WordCount.scala | 23 ++++++---- .../examples/wordcount/SplitSpec.scala | 30 ++++-------- .../examples/wordcount/WordCountSpec.scala | 2 +- 4 files changed, 58 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a743a9ca/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala index 44cf211..ad6f41a 100644 --- a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala +++ b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala @@ -19,33 +19,48 @@ package org.apache.gearpump.streaming.examples.wordcount import java.time.Instant -import java.util.concurrent.TimeUnit import org.apache.gearpump.Message -import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.streaming.source.Watermark -import org.apache.gearpump.streaming.task.{Task, TaskContext} +import org.apache.gearpump.streaming.source.DataSource +import org.apache.gearpump.streaming.task.TaskContext -class Split(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { - import taskContext.output +import scala.collection.mutable.ArrayBuffer + + +class Split extends DataSource { + + val result = ArrayBuffer[Message]() + var item = -1 + Split.TEXT_TO_SPLIT.lines.foreach { line => + line.split("[\\s]+").filter(_.nonEmpty).foreach { msg => + result.append(new Message(msg, System.currentTimeMillis())) + + } - override def onStart(startTime: Instant): Unit = { - self ! Watermark(Instant.now) } - override def onNext(msg: Message): Unit = { - Split.TEXT_TO_SPLIT.lines.foreach { line => - line.split("[\\s]+").filter(_.nonEmpty).foreach { msg => - output(new Message(msg, System.currentTimeMillis())) - } + override def open(context: TaskContext, startTime: Instant): Unit = {} + + override def read(): Message = { + + if (item < result.size - 1) { + item += 1 + result(item) + } else { + item = 0 + result(item) } - import scala.concurrent.duration._ - taskContext.scheduleOnce(Duration(100, TimeUnit.MILLISECONDS))(self ! - Watermark(Instant.now)) } + + override def close(): Unit = {} + + override def getWatermark: Instant = Instant.now() + + } + object Split { val TEXT_TO_SPLIT = """ @@ -66,3 +81,4 @@ object Split { | limitations under the License. """.stripMargin } + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a743a9ca/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala index 9917d9f..99b83ad 100644 --- a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala +++ b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala @@ -18,16 +18,17 @@ package org.apache.gearpump.streaming.examples.wordcount -import org.slf4j.Logger - +import akka.actor.ActorSystem import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.cluster.client.ClientContext import org.apache.gearpump.cluster.embedded.EmbeddedCluster import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult} import org.apache.gearpump.partitioner.HashPartitioner +import org.apache.gearpump.streaming.source.DataSourceProcessor import org.apache.gearpump.streaming.{Processor, StreamApplication} import org.apache.gearpump.util.Graph.Node import org.apache.gearpump.util.{AkkaApp, Graph, LogUtil} +import org.slf4j.Logger /** Same WordCount with low level Processor Graph syntax */ object WordCount extends AkkaApp with ArgumentsParser { @@ -35,21 +36,27 @@ object WordCount extends AkkaApp with ArgumentsParser { val RUN_FOR_EVER = -1 override val options: Array[(String, CLIOption[Any])] = Array( - "split" -> CLIOption[Int]("<how many split tasks>", required = false, defaultValue = Some(1)), + "source" -> CLIOption[Int]("<how many source tasks>", required = false, + defaultValue = Some(1)), "sum" -> CLIOption[Int]("<how many sum tasks>", required = false, defaultValue = Some(1)), "debug" -> CLIOption[Boolean]("<true|false>", required = false, defaultValue = Some(false)), "sleep" -> CLIOption[Int]("how many seconds to sleep for debug mode", required = false, defaultValue = Some(30)) ) - def application(config: ParseResult): StreamApplication = { - val splitNum = config.getInt("split") + def application(config: ParseResult, system: ActorSystem): StreamApplication = { + implicit val actorSystem = system + val sumNum = config.getInt("sum") - val split = Processor[Split](splitNum) + val sourceNum = config.getInt("source") + val source = new Split + val sourceProcessor = DataSourceProcessor(source, sourceNum) val sum = Processor[Sum](sumNum) val partitioner = new HashPartitioner + val computation = sourceProcessor ~ partitioner ~> + sum - val app = StreamApplication("wordCount", Graph(split ~ partitioner ~> sum), UserConfig.empty) + val app = StreamApplication("wordCount", Graph(computation), UserConfig.empty) app } @@ -72,7 +79,7 @@ object WordCount extends AkkaApp with ArgumentsParser { case None => ClientContext(akkaConf) } - val app = application(config) + val app = application(config, context.system) context.submit(app) if (debugMode) { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a743a9ca/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SplitSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SplitSpec.scala b/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SplitSpec.scala index 46d9e97..7c9de35 100644 --- a/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SplitSpec.scala +++ b/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SplitSpec.scala @@ -19,20 +19,16 @@ package org.apache.gearpump.streaming.examples.wordcount import java.time.Instant -import org.apache.gearpump.streaming.source.Watermark - -import scala.concurrent.Await -import scala.concurrent.duration.Duration - import akka.actor.ActorSystem +import org.apache.gearpump.Message import akka.testkit.TestProbe -import org.mockito.Matchers._ +import org.apache.gearpump.cluster.TestUtil +import org.apache.gearpump.streaming.MockUtil import org.mockito.Mockito._ import org.scalatest.{Matchers, WordSpec} -import org.apache.gearpump.Message -import org.apache.gearpump.cluster.{TestUtil, UserConfig} -import org.apache.gearpump.streaming.MockUtil +import scala.concurrent.Await +import scala.concurrent.duration.Duration class SplitSpec extends WordSpec with Matchers { @@ -45,19 +41,13 @@ class SplitSpec extends WordSpec with Matchers { val mockTaskActor = TestProbe() - // Mock self ActorRef when(taskContext.self).thenReturn(mockTaskActor.ref) - val conf = UserConfig.empty - val split = new Split(taskContext, conf) - split.onStart(Instant.EPOCH) - mockTaskActor.expectMsgType[Watermark] - - val expectedWordCount = Split.TEXT_TO_SPLIT.split( """[\s\n]+""").count(_.nonEmpty) - - split.onNext(Message("next")) - verify(taskContext, times(expectedWordCount)).output(anyObject()) - + val split = new Split + split.open(taskContext, Instant.now()) + split.read() shouldBe a[Message] + split.close() + split.getWatermark system.terminate() Await.result(system.whenTerminated, Duration.Inf) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a743a9ca/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/WordCountSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/WordCountSpec.scala b/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/WordCountSpec.scala index f703552..5121815 100644 --- a/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/WordCountSpec.scala +++ b/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/WordCountSpec.scala @@ -44,7 +44,7 @@ class WordCountSpec property("WordCount should succeed to submit application with required arguments") { val requiredArgs = Array.empty[String] val optionalArgs = Array( - "-split", "1", + "-source", "1", "-sum", "1") val args = {
