[
https://issues.apache.org/jira/browse/FLINK-3591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15185055#comment-15185055
]
ASF GitHub Bot commented on FLINK-3591:
---------------------------------------
Github user vasia commented on a diff in the pull request:
https://github.com/apache/flink/pull/1774#discussion_r55372164
--- Diff: docs/quickstart/run_example_quickstart.md ---
@@ -27,116 +27,360 @@ under the License.
* This will be replaced by the TOC
{:toc}
-This guide walks you through the steps of executing an example program
([K-Means clustering](http://en.wikipedia.org/wiki/K-means_clustering)) on
Flink.
-On the way, you will see the a visualization of the program, the optimized
execution plan, and track the progress of its execution.
+In this guide we will start from scratch and fo from setting up a Flink
project and running
+a streaming analysis program on a Flink cluster.
+
+Wikipedia provides an IRC channel where all edits to the wiki are logged.
We are going to
+read this channel in Flink and count the number of bytes that each user
edits within
+a given window of time. This is easy enough to implement in a few minutes
using Flink but it will
+give you a good foundation from which to start building more complex
analysis programs on your own.
+
+## Setting up a Maven Project
+
+We are going to use a Flink Maven Archetype for creating our project
stucture. Please
+see [Java API Quickstart]({{ site.baseurl
}}/quickstart/java_api_quickstart.html) for more details
+about this. For our purposes, the command to run is this:
+
+{% highlight bash %}
+$ mvn archetype:generate\
+ -DarchetypeGroupId=org.apache.flink\
+ -DarchetypeArtifactId=flink-quickstart-java\
+ -DarchetypeVersion=1.0.0\
+ -DgroupId=wiki-edits\
+ -DartifactId=wiki-edits\
+ -Dversion=0.1\
+ -Dpackage=wikiedits\
+ -DinteractiveMode=false\
+{% endhighlight %}
+
+You can edit the `groupId`, `artifactId` and `package` if you like. With
the above parameters,
+maven will create a project structure that looks like this:
+
+{% highlight bash %}
+$ tree wiki-edits
+wiki-edits/
+├── pom.xml
+└── src
+ └── main
+ ├── java
+ │ └── wikiedits
+ │ ├── Job.java
+ │ ├── SocketTextStreamWordCount.java
+ │ └── WordCount.java
+ └── resources
+ └── log4j.properties
+{% endhighlight %}
+
+There is our `pom.xml` file that already has the Flink dependencies added
in the root directory and
+several example Flink programs in `src/main/java`. We can delete the
example programs, since
+we are going to start from scratch:
+
+{% highlight bash %}
+$ rm wiki-edits/src/main/java/wikiedits/*.java
+{% endhighlight %}
+
+As a last step we need to add the Flink wikipedia connector as a
dependency so that we can
+use it in our program. Edit the `dependencies` section so that it looks
like this:
+
+{% highlight xml %}
+<dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-java</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_2.10</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-clients_2.10</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-wikiedits_2.10</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+</dependencies>
+{% endhighlight %}
+
+Notice the `flink-connector-wikiedits_2.10` dependency that was added.
+
+## Writing a Flink Program
+
+It's coding time. Fire up your favorite IDE and import the Maven project
or open a text editor and
+create the file `src/main/wikiedits/WikipediaAnalysis.java`:
+
+{% highlight java %}
+package wikiedits;
+
+public class WikipediaAnalysis {
+
+ public static void main(String[] args) throws Exception {
+
+ }
+}
+{% endhighlight %}
+
+I admit it's very bare bones now but we will fill it as we go. Note, that
I'll not give
+import statements here since IDEs can add them automatically. At the end
of this section I'll show
+the complete code with import statements if you simply want to skip ahead
and enter that in your
+editor.
+
+The first step in a Flink program is to create a
`StreamExecutionEnvironment`
+(or `ExecutionEnvironment` if you are writing a batch job). This can be
used to set execution
+parameters and create sources for reading from external systems. So let's
go ahead, add
+this to the main method:
+
+{% highlight java %}
+StreamExecutionEnvironment see =
StreamExecutionEnvironment.getExecutionEnvironment();
+{% endhighlight %}
+
+Next, we will create a source that reads from the Wikipedia IRC log:
+
+{% highlight java %}
+DataStream<WikipediaEditEvent> edits = see.addSource(new
WikipediaEditsSource());
+{% endhighlight %}
+
+This creates a `DataStream` of `WikipediaEditEvent` elements that we can
further process. For
+the purposes of this example we are interrested in determining the number
of added or removed
+bytes that each user causes in a certain time window, let's say five
seconds. For this we first
+have to specify that we want to key the stream on the user name, that is
to say that operations
+on this should take the key into account. In our case the summation of
edited bytes in the windows
+should be per unique user. For keying a Stream we have to provide a
`KeySelector`, like this:
+
+{% highlight java %}
+KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
+ .keyBy(new KeySelector<WikipediaEditEvent, String>() {
+ @Override
+ public String getKey(WikipediaEditEvent event) {
+ return event.getUser();
+ }
+ });
+{% endhighlight %}
+
+This gives us the same Stream of `WikipediaEditEvent` that has a `String`
key that is the user.
+We can now specify that we want to have windows imposed on this stream and
compute some
+result based on elements in these windows. We need to specify a window
here because we are
+dealing with an infinite stream of events. If you want to compute an
aggregation on such an
+infinite stream you never know when you are finished. That's where windows
come into play,
+they specify a time slice in which we should perform our computation. In
our example we will say
+that we want to aggregate the sum of edited bytes for every five seconds:
+
+{% highlight java %}
+DataStream<Tuple2<String, Long>> result = keyedEdits
+ .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
+ .fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent,
Tuple2<String, Long>>() {
+ @Override
+ public Tuple2<String, Long> fold(Tuple2<String, Long> acc,
WikipediaEditEvent event) {
+ acc.f0 = event.getUser();
+ acc.f1 += event.getByteDiff();
+ return acc;
+ }
+ });
+{% endhighlight %}
+
+The first call, `.window()`, specified that we want to have tumbling
(non-overlapping) windows
+of five seconds. The second call specifies a *Fold transformation* on each
window slice for
+each unique key. In our case we start from an initial value of `("", 0L)`
and add to it the byte
+difference of every edit in that time window for a user. The resulting
Stream now contains
+a `Tuple2<String, Long>` for every user which gets emitted every five
seconds.
+
+The only thing left to do is print the stream to the console and start
execution:
+
+{% highlight java %}
+result.print();
+
+see.execute();
+{% endhighlight %}
+
+That last call is necessary to start the actual Flink job. All operations,
such as creating
+sources, transformations and sinks only build up a graph of internal
operations. Only when
+`execute()` is called is this graph of operations thrown on a cluster or
executed on your local
+machine.
+
+The complete code so far is this:
+
+{% highlight java %}
+package wikiedits;
+
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import
org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent;
+import
org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource;
+
+public class WikipediaAnalysis {
+
+ public static void main(String[] args) throws Exception {
+
+ StreamExecutionEnvironment see =
StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStream<WikipediaEditEvent> edits = see.addSource(new
WikipediaEditsSource());
+
+ KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
+ .keyBy(new KeySelector<WikipediaEditEvent, String>() {
+ @Override
+ public String getKey(WikipediaEditEvent event) {
+ return event.getUser();
+ }
+ });
+
+ DataStream<Tuple2<String, Long>> result = keyedEdits
+ .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
+ .fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent,
Tuple2<String, Long>>() {
+ @Override
+ public Tuple2<String, Long> fold(Tuple2<String, Long> acc,
WikipediaEditEvent event) {
+ acc.f0 = event.getUser();
+ acc.f1 += event.getByteDiff();
+ return acc;
+ }
+ });
+
+ result.print();
+
+ see.execute();
+ }
+}
+{% endhighlight %}
+
+You can run this example in your IDE or on the commandline, using Maven:
+
+{% highlight bash %}
+$ mvn clean package
+$ mvn exec:java -Dexec.mainClass=wikiedits.WikipediaAnalysis
+{% endhighlight %}
+
+The first command build our project and the second executes our main
class. The output should be
+similar to this:
+
+{% highlight bash %}
+1> (Fenix down,114)
+6> (AnomieBOT,155)
+8> (BD2412bot,-3690)
+7> (IgnorantArmies,49)
+3> (Ckh3111,69)
+5> (Slade360,0)
+7> (Narutolovehinata5,2195)
+6> (Vuyisa2001,79)
+4> (Ms Sarah Welch,269)
+4> (KasparBot,-245)
+{% endhighlight %}
+
+The number in front of each line tells you on which parallel instance of
the print sink the output
+was produced.
+
+This should get you started with writing your own Flink programs. You can
check out our guides
+about [basic concepts]{{{ site.baseurl }}/apis/common/index.html} and the
+[DataStream API]{{{ site.baseurl }}/apis/streaming/index.html} if you want
to learn more. Stick
+around for the bonus exercise if you want to learn about setting up a
Flink cluster on
+your own machine and writing results to [Kafka](http://kafka.apache.org).
+
+## Bonus Exercise: Running on a Cluster and Writing to Kafka
+
+Please follow our [setup quickstart](setup_quickstart.html) for setting up
a Flink distribution
+on your machine and refer to the [Kafka
quickstart](https://kafka.apache.org/documentation.html#quickstart)
+for setting up a Kafka installation before we proceed.
+
+As a first step, we have to add the Flink Kafka connector as a dependency
so that we can
+use the Kafka sink. Add this to the `pom.xml` file in the dependencies
section:
+
+{% highlight xml %}
+<dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-kafka-0.8_2.10</artifactId>
+ <version>${flink.version}</version>
+</dependency>
+{% endhighlight %}
+
+Next, we need to modify our program. We'll remove the `print()` sink and
instead use a
+Kafka sink. The new code looks like this:
+
+{% highlight java %}
+
+result
+ .map(new MapFunction<Tuple2<String,Long>, String>() {
+ @Override
+ public String map(Tuple2<String, Long> tuple) {
+ return tuple.toString();
+ }
+ })
+ .addSink(new FlinkKafkaProducer08<>("localhost:9092", "wiki-result",
new SimpleStringSchema()));
+{% endhighlight %}
+
+Note how we first transform the Stream of `Tuple2<String, Long>` to a
Stream of `String` using
+a MapFunction. We are doing this because it is easier to write plain
strings to Kafka. Then,
+we create a Kafka sink. You might have to adapt the hostname and port to
your setup, `"wiki-result"`
+is the name of the Kafka stream that we are going to create next, before
running our program.
+Build the project using Maven because we need the jar file for running on
the cluster:
+
+{% highlight bash %}
+$ mvn clean package
+{% endhighlight %}
+
+The resulting jar file will be in the `target` subfolder:
`target/wiki-edits-0.1.jar`. We'll use
+this later.
+
+Now we are ready to launch a Flink cluster and run the program that writes
to Kafka on it. Go
+to the location where you installed Flink and start a local cluster:
+
+{% highlight bash %}
+$ cd my/flink/directory
+$ bin/start-local.sh
+{% endhighlight %}
+
+We also have to create the Kafka Topic, so that we our program can write
to it:
--- End diff --
remove "we"
> Replace Quickstart K-Means Example by Streaming Example
> -------------------------------------------------------
>
> Key: FLINK-3591
> URL: https://issues.apache.org/jira/browse/FLINK-3591
> Project: Flink
> Issue Type: Improvement
> Components: Documentation
> Reporter: Aljoscha Krettek
> Assignee: Aljoscha Krettek
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)