Repository: incubator-gearpump Updated Branches: refs/heads/master e970d8b54 -> 3206770e7
fix GEARPUMP-109: add wordcount Java/Scala and DSL/low level API examples. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/3206770e Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/3206770e Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/3206770e Branch: refs/heads/master Commit: 3206770e709e8b49da845a0aaf0e43ff149ab604 Parents: e970d8b Author: Weihua Jiang <[email protected]> Authored: Wed Apr 27 17:08:04 2016 +0800 Committer: Weihua Jiang <[email protected]> Committed: Fri Apr 29 09:55:00 2016 +0800 ---------------------------------------------------------------------- docs/dev-write-1st-app.md | 273 +++++++++++++++++++++++++++++++++++++--- docs/gearpump-internals.md | 2 +- 2 files changed, 258 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/3206770e/docs/dev-write-1st-app.md ---------------------------------------------------------------------- diff --git a/docs/dev-write-1st-app.md b/docs/dev-write-1st-app.md index 1dd47b6..e768397 100644 --- a/docs/dev-write-1st-app.md +++ b/docs/dev-write-1st-app.md @@ -14,23 +14,131 @@ Repository and library dependencies can be found at [Maven Setting](maven-settin ### IDE Setup (Optional) You can get your preferred IDE ready for Gearpump by following [this guide](dev-ide-setup.html). -### Define Processor(Task) class and Partitioner class +### Decide which language and API to use for writing +Gearpump supports two level APIs: -An application is a Directed Acyclic Graph (DAG) of processors. In the wordcount example, We will firstly define two processors `Split` and `Sum`, and then weave them together. +1. Low level API, which is more similar to Akka programming, operating on each event. The API document can be found at [Low Level API Doc](http://www.gearpump.io/releases/latest/api/scala/index.html#io.gearpump.streaming.package). + +2. High level API (aka DSL), which is operating on streaming instead of individual event. The API document can be found at [DSL API Doc](http://www.gearpump.io/releases/latest/api/scala/index.html#io.gearpump.streaming.dsl.package). + +And both APIs have their Java version and Scala version. + +So, before you writing your first Gearpump application, you need to decide which API to use and which language to use. + +## DSL version for Wordcount + +The easiest way to write your streaming application is to write it with Gearpump DSL. +Below will demostrate how to write WordCount application via Gearpump DSL. + + +<div class="codetabs"> +<div data-lang="scala" markdown="1" > + + +```scala +/** WordCount with High level DSL */ +object WordCount extends AkkaApp with ArgumentsParser { + + override val options: Array[(String, CLIOption[Any])] = Array.empty -#### About message type + override def main(akkaConf: Config, args: Array[String]): Unit = { + val context = ClientContext(akkaConf) + val app = StreamApp("dsl", context) + val data = "This is a good start, bingo!! bingo!!" -User are allowed to send message of type Any(Accept any type except Null, Nothing and Unit). + //count for each word and output to log + app.source(data.lines.toList, 1, "source"). + // word => (word, count) + flatMap(line => line.split("[\\s]+")).map((_, 1)). + // (word, count1), (word, count2) => (word, count1 + count2) + groupByKey().sum.log + val appId = context.submit(app) + context.close() + } +} ``` -case class Message(msg: Any, timestamp: TimeStamp = Message.noTimeStamp) + +</div> + +<div data-lang="java" markdown="1"> + +```java + +/** Java version of WordCount with high level DSL API */ +public class WordCount { + + public static void main(String[] args) throws InterruptedException { + main(ClusterConfig.defaultConfig(), args); + } + + public static void main(Config akkaConf, String[] args) throws InterruptedException { + ClientContext context = new ClientContext(akkaConf); + JavaStreamApp app = new JavaStreamApp("JavaDSL", context, UserConfig.empty()); + List<String> source = Lists.newArrayList("This is a good start, bingo!! bingo!!"); + + //create a stream from the string list. + JavaStream<String> sentence = app.source(source, 1, UserConfig.empty(), "source"); + + //tokenize the strings and create a new stream + JavaStream<String> words = sentence.flatMap(new FlatMapFunction<String, String>() { + @Override + public Iterator<String> apply(String s) { + return Lists.newArrayList(s.split("\\s+")).iterator(); + } + }, "flatMap"); + + //map each string as (string, 1) pair + JavaStream<Tuple2<String, Integer>> ones = words.map(new MapFunction<String, Tuple2<String, Integer>>() { + @Override + public Tuple2<String, Integer> apply(String s) { + return new Tuple2<String, Integer>(s, 1); + } + }, "map"); + + //group by according to string + JavaStream<Tuple2<String, Integer>> groupedOnes = ones.groupBy(new GroupByFunction<Tuple2<String, Integer>, String>() { + @Override + public String apply(Tuple2<String, Integer> tuple) { + return tuple._1(); + } + }, 1, "groupBy"); + + //for each group, make the sum + JavaStream<Tuple2<String, Integer>> wordcount = groupedOnes.reduce(new ReduceFunction<Tuple2<String, Integer>>() { + @Override + public Tuple2<String, Integer> apply(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) { + return new Tuple2<String, Integer>(t1._1(), t1._2() + t2._2()); + } + }, "reduce"); + + //output result using log + wordcount.log(); + + app.run(); + context.close(); + } +} ``` +</div> + +</div> + +## Low level API based Wordcount + +### Define Processor(Task) class and Partitioner class + +An application is a Directed Acyclic Graph (DAG) of processors. In the wordcount example, We will firstly define two processors `Split` and `Sum`, and then weave them together. + + #### Split processor -In the Split processor, we simply split a predefined text (the content is simplified for conciseness) and send out each split word to Sum. +In the `Split` processor, we simply split a predefined text (the content is simplified for conciseness) and send out each split word to `Sum`. + +<div class="codetabs"> +<div data-lang="scala" markdown="1" > -Scala: ```scala class Split(taskContext : TaskContext, conf: UserConfig) extends Task(taskContext, conf) { @@ -55,13 +163,58 @@ object Split { } ``` -Like Split, every processor extends a `TaskActor`. The `onStart` method is called once before any message comes in; `onNext` method is called to process every incoming message. Note that Gearpump employs the message-driven model and that's why Split sends itself a message at the end of `onStart` and `onNext` to trigger next message processing. +</div> + +<div data-lang="java" markdown="1"> +```java +public class Split extends Task { + + public static String TEXT = "This is a good start for java! bingo! bingo! "; + + public Split(TaskContext taskContext, UserConfig userConf) { + super(taskContext, userConf); + } + + private Long now() { + return System.currentTimeMillis(); + } + + @Override + public void onStart(StartTime startTime) { + self().tell(new Message("start", now()), self()); + } + + @Override + public void onNext(Message msg) { + + // Split the TEXT to words + String[] words = TEXT.split(" "); + for (int i = 0; i < words.length; i++) { + context.output(new Message(words[i], now())); + } + self().tell(new Message("next", now()), self()); + } +} +``` + +</div> + +</div> + +Essentially, each processor consists of two descriptions: + +1. A `Task` to define the operation. + +2. A parallelism level to define the number of tasks of this processor in parallel. + +Just like `Split`, every processor extends `Task`. The `onStart` method is called once before any message comes in; `onNext` method is called to process every incoming message. Note that Gearpump employs the message-driven model and that's why Split sends itself a message at the end of `onStart` and `onNext` to trigger next message processing. #### Sum Processor -The structure of Sum processor looks much alike. Sum does not need to send messages to itself since it receives messages from Split. +The structure of `Sum` processor looks much alike. `Sum` does not need to send messages to itself since it receives messages from `Split`. -Scala: +<div class="codetabs"> +<div data-lang="scala" markdown="1" > ```scala class Sum (taskContext : TaskContext, conf: UserConfig) extends Task(taskContext, conf) { @@ -102,7 +255,42 @@ class Sum (taskContext : TaskContext, conf: UserConfig) extends Task(taskContext } ``` -Besides counting the sum, we also define a scheduler to report throughput every 5 seconds. The scheduler should be cancelled when the computation completes, which could be accomplished overriding the `onStop` method. The default implementation of `onStop` is a no-op. +</div> +<div data-lang="java" markdown="1"> + +```java +public class Sum extends Task { + + private Logger LOG = super.LOG(); + private HashMap<String, Integer> wordCount = new HashMap<String, Integer>(); + + public Sum(TaskContext taskContext, UserConfig userConf) { + super(taskContext, userConf); + } + + @Override + public void onStart(StartTime startTime) { + //skip + } + + @Override + public void onNext(Message messagePayLoad) { + String word = (String) (messagePayLoad.msg()); + Integer current = wordCount.get(word); + if (current == null) { + current = 0; + } + Integer newCount = current + 1; + wordCount.put(word, newCount); + } +} +``` + +</div> + +</div> + +Besides counting the sum, in Scala version, we also define a scheduler to report throughput every 5 seconds. The scheduler should be cancelled when the computation completes, which could be accomplished overriding the `onStop` method. The default implementation of `onStop` is a no-op. #### Partitioner @@ -111,7 +299,8 @@ A processor could be parallelized to a list of tasks. A `Partitioner` defines ho * `HashPartitioner`: partitions data based on the message's hashcode * `ShufflePartitioner`: partitions data in a round-robin way. -You could define your own partitioner by extending the `Partitioner` trait and overriding the `getPartition` method. +You could define your own partitioner by extending the `Partitioner` trait/interface and overriding the `getPartition` method. + ```scala trait Partitioner extends Serializable { @@ -119,12 +308,15 @@ trait Partitioner extends Serializable { } ``` -### Define TaskDescription and AppDescription +### Wrap up as an application Now, we are able to write our application class, weaving the above components together. The application class extends `App` and `ArgumentsParser which make it easier to parse arguments and run main functions. +<div class="codetabs"> +<div data-lang="scala" markdown="1" > + ```scala object WordCount extends App with ArgumentsParser { private val LOG: Logger = LogUtil.getLogger(getClass) @@ -155,13 +347,62 @@ object WordCount extends App with ArgumentsParser { We override `options` value and define an array of command line arguments to parse. We want application users to pass in masters' hosts and ports, the parallelism of split and sum tasks, and how long to run the example. We also specify whether an option is `required` and provide `defaultValue` for some arguments. -Given the `ParseResult` of command line arguments, we create `TaskDescription`s for Split and Sum processors, and connect them with `HashPartitioner` using DAG API. The graph is wrapped in an `AppDescription` , which is finally submit to master. +</div> + +<div data-lang="java" markdown="1"> + +```java + +/** Java version of WordCount with Processor Graph API */ +public class WordCount { + + public static void main(String[] args) throws InterruptedException { + main(ClusterConfig.defaultConfig(), args); + } + + public static void main(Config akkaConf, String[] args) throws InterruptedException { + + // For split task, we config to create two tasks + int splitTaskNumber = 2; + Processor split = new Processor(Split.class).withParallelism(splitTaskNumber); + + // For sum task, we have two summer. + int sumTaskNumber = 2; + Processor sum = new Processor(Sum.class).withParallelism(sumTaskNumber); + + // construct the graph + Graph graph = new Graph(); + graph.addVertex(split); + graph.addVertex(sum); + + Partitioner partitioner = new HashPartitioner(); + graph.addEdge(split, partitioner, sum); + + UserConfig conf = UserConfig.empty(); + StreamApplication app = new StreamApplication("wordcountJava", conf, graph); + + // create master client + // It will read the master settings under gearpump.cluster.masters + ClientContext masterClient = new ClientContext(akkaConf); + + masterClient.submit(app); + + masterClient.close(); + } +} +``` + +</div> + +</div> + + -### Submit application +## Submit application After all these, you need to package everything into a uber jar and submit the jar to Gearpump Cluster. Please check [Application submission tool](commandline.html) to command line tool syntax. -### Advanced topic +## Advanced topic For a real application, you definitely need to define your own customized message passing between processors. Customized message needs customized serializer to help message passing over wire. Check [this guide](dev-custom-serializer.html) for how to customize serializer. http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/3206770e/docs/gearpump-internals.md ---------------------------------------------------------------------- diff --git a/docs/gearpump-internals.md b/docs/gearpump-internals.md index 8b189fa..bdfecbe 100644 --- a/docs/gearpump-internals.md +++ b/docs/gearpump-internals.md @@ -119,7 +119,7 @@ In case of a master crash, other standby masters will be notified, they will res #### What happens when a worker crashes? -In case of a worker crash, the Master will get notified and stop scheduling new computation to this worker. All supervised executors on current worker will be killed, AppMaster can treat it as recovery of executor crash like [What happen when executor crash?](#what-happen-when-an-executor-crashes) +In case of a worker crash, the Master will get notified and stop scheduling new computation to this worker. All supervised executors on current worker will be killed, AppMaster can treat it as recovery of executor crash like [What happen when an executor crashes?](#what-happen-when-an-executor-crashes) #### What happens when the AppMaster crashes?
