Repository: incubator-beam
Updated Branches:
  refs/heads/master 267136fb6 -> d02d2de09


[flink] improve example section in README

- updates the README
- repairs broken exec configuration


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2fe38770
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2fe38770
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2fe38770

Branch: refs/heads/master
Commit: 2fe387707d1e115b578f5ee643bb99c0e4667ee0
Parents: cf14644
Author: Maximilian Michels <[email protected]>
Authored: Wed Jul 20 16:06:06 2016 +0200
Committer: Maximilian Michels <[email protected]>
Committed: Mon Jul 25 17:30:19 2016 +0200

----------------------------------------------------------------------
 runners/flink/README.md                         | 25 ++++++++++++--------
 runners/flink/examples/pom.xml                  | 11 ++++-----
 .../beam/runners/flink/examples/WordCount.java  |  4 ++--
 3 files changed, 21 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2fe38770/runners/flink/README.md
----------------------------------------------------------------------
diff --git a/runners/flink/README.md b/runners/flink/README.md
index 3348119..aeb1692 100644
--- a/runners/flink/README.md
+++ b/runners/flink/README.md
@@ -109,35 +109,40 @@ Next, let's run the classic WordCount example. It's 
semantically identically to
 the example provided with Apache Beam. Only this time, we chose the
 `FlinkRunner` to execute the WordCount on top of Flink.
 
-Here's an excerpt from the WordCount class file:
+Here's an excerpt from the [WordCount class 
file](examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java):
 
 ```java
-Options options = PipelineOptionsFactory.fromArgs(args).as(Options.class);
+Options options = 
PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
+
 // yes, we want to run WordCount with Flink
 options.setRunner(FlinkRunner.class);
 
 Pipeline p = Pipeline.create(options);
 
-p.apply(TextIO.Read.named("ReadLines").from(options.getInput()))
-               .apply(new CountWords())
-               .apply(TextIO.Write.named("WriteCounts")
-                               .to(options.getOutput())
-                               .withNumShards(options.getNumShards()));
+p.apply("ReadLines", TextIO.Read.from(options.getInput()))
+    .apply(new CountWords())
+    .apply(MapElements.via(new FormatAsTextFn()))
+    .apply("WriteCounts", TextIO.Write.to(options.getOutput()));
 
 p.run();
 ```
 
 To execute the example, let's first get some sample data:
 
-    curl http://www.gutenberg.org/cache/epub/1128/pg1128.txt > 
examples/kinglear.txt
+    cd runners/flink/examples
+    curl http://www.gutenberg.org/cache/epub/1128/pg1128.txt > kinglear.txt
 
 Then let's run the included WordCount locally on your machine:
 
-    cd examples
-    mvn exec:exec -Dinput=kinglear.txt -Doutput=wordcounts.txt
+    cd runners/flink/examples
+    mvn exec:java 
-Dexec.mainClass=org.apache.beam.runners.flink.examples.WordCount \
+                  -Dinput=kinglear.txt -Doutput=wordcounts.txt
 
 Congratulations, you have run your first Apache Beam program on top of Apache 
Flink!
 
+Note, that you will find a number of `wordcounts*` output files because Flink 
parallelizes the
+WordCount computation. You may pass an additional `-Dparallelism=1` to disable 
parallelization and
+get a single `wordcounts.txt` file.
 
 # Running Beam programs on a Flink cluster
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2fe38770/runners/flink/examples/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/examples/pom.xml b/runners/flink/examples/pom.xml
index b0ee2ed..355a6be 100644
--- a/runners/flink/examples/pom.xml
+++ b/runners/flink/examples/pom.xml
@@ -33,11 +33,10 @@
   <packaging>jar</packaging>
 
   <properties>
-    <!-- Default parameters for mvn exec:exec -->
-    <clazz>org.apache.beam.runners.flink.examples.WordCount</clazz>
+    <!-- Default parameters for mvn exec:java -->
     <input>kinglear.txt</input>
     <output>wordcounts.txt</output>
-    <parallelism>1</parallelism>
+    <parallelism>-1</parallelism>
   </properties>
 
   <profiles>
@@ -131,12 +130,10 @@
         <configuration>
           <executable>java</executable>
           <arguments>
-            <argument>-classpath</argument>
-            <classpath />
-            <argument>${clazz}</argument>
+            
<argument>--runner=org.apache.beam.runners.flink.FlinkRunner</argument>
+            <argument>--parallelism=${parallelism}</argument>
             <argument>--input=${input}</argument>
             <argument>--output=${output}</argument>
-            <argument>--parallelism=${parallelism}</argument>
           </arguments>
         </configuration>
       </plugin>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2fe38770/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
 
b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
index 2d95c97..c54229d 100644
--- 
a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
+++ 
b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
@@ -21,10 +21,10 @@ import org.apache.beam.runners.flink.FlinkPipelineOptions;
 import org.apache.beam.runners.flink.FlinkRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -92,11 +92,11 @@ public class WordCount {
    */
   public interface Options extends PipelineOptions, FlinkPipelineOptions {
     @Description("Path of the file to read from")
-    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
     String getInput();
     void setInput(String value);
 
     @Description("Path of the file to write to")
+    @Validation.Required
     String getOutput();
     void setOutput(String value);
   }

Reply via email to