Repository: beam-site
Updated Branches:
  refs/heads/asf-site 87b0fb5e4 -> 261e39777


Revise WordCount Example Walkthrough

Fiddle with monospace markers.

Update code snippets.

Reorder logging example and update "[Google ]Cloud Logging" ->
Stackdriver Logging


Project: http://git-wip-us.apache.org/repos/asf/beam-site/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam-site/commit/9fb214fe
Tree: http://git-wip-us.apache.org/repos/asf/beam-site/tree/9fb214fe
Diff: http://git-wip-us.apache.org/repos/asf/beam-site/diff/9fb214fe

Branch: refs/heads/asf-site
Commit: 9fb214fe7ee5c662248a448d06ef30e3aa8eb82e
Parents: 87b0fb5
Author: Thomas Groh <[email protected]>
Authored: Fri May 12 14:01:06 2017 -0700
Committer: Davor Bonaci <[email protected]>
Committed: Fri May 12 16:17:27 2017 -0700

----------------------------------------------------------------------
 src/documentation/sdks/python-custom-io.md |  2 +-
 src/get-started/wordcount-example.md       | 86 ++++++++++++++-----------
 2 files changed, 48 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam-site/blob/9fb214fe/src/documentation/sdks/python-custom-io.md
----------------------------------------------------------------------
diff --git a/src/documentation/sdks/python-custom-io.md 
b/src/documentation/sdks/python-custom-io.md
index ee87e4e..8ce174e 100644
--- a/src/documentation/sdks/python-custom-io.md
+++ b/src/documentation/sdks/python-custom-io.md
@@ -228,7 +228,7 @@ The Beam SDK for Python contains some convenient abstract 
base classes to help y
 
 #### FileSink
 
-If your data source uses files, you can derive your `Sink` and `Writer` 
classes from the `FileSink` and `FileSinkWriter` classes, which can be found in 
the 
[fileio.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/fileio.py)
 module. These classes implement code common sinks that interact with files, 
including:
+If your data source uses files, you can derive your `Sink` and `Writer` 
classes from the `FileBasedSink` and `FileBasedSinkWriter` classes, which can 
be found in the 
[filebasedsink.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filebasedsink.py)
 module. These classes implement code common sinks that interact with files, 
including:
 
 * Setting file headers and footers
 * Sequential record writing

http://git-wip-us.apache.org/repos/asf/beam-site/blob/9fb214fe/src/get-started/wordcount-example.md
----------------------------------------------------------------------
diff --git a/src/get-started/wordcount-example.md 
b/src/get-started/wordcount-example.md
index 3572e76..19a82d7 100644
--- a/src/get-started/wordcount-example.md
+++ b/src/get-started/wordcount-example.md
@@ -47,7 +47,7 @@ The following sections explain these concepts in detail along 
with excerpts of t
 
 ### Creating the Pipeline
 
-The first step in creating a Beam pipeline is to create a `PipelineOptions 
object`. This object lets us set various options for our pipeline, such as the 
pipeline runner that will execute our pipeline and any runner-specific 
configuration required by the chosen runner. In this example we set these 
options programmatically, but more often command-line arguments are used to set 
`PipelineOptions`. 
+The first step in creating a Beam pipeline is to create a `PipelineOptions` 
object. This object lets us set various options for our pipeline, such as the 
pipeline runner that will execute our pipeline and any runner-specific 
configuration required by the chosen runner. In this example we set these 
options programmatically, but more often command-line arguments are used to set 
`PipelineOptions`. 
 
 You can specify a runner for executing your pipeline, such as the 
`DataflowRunner` or `SparkRunner`. If you omit specifying a runner, as in this 
example, your pipeline will be executed locally using the `DirectRunner`. In 
the next sections, we will specify the pipeline's runner.
 
@@ -86,14 +86,14 @@ Pipeline p = Pipeline.create(options);
 
 The Minimal WordCount pipeline contains several transforms to read data into 
the pipeline, manipulate or otherwise transform the data, and write out the 
results. Each transform represents an operation in the pipeline.
 
-Each transform takes some kind of input (data or otherwise), and produces some 
output data. The input and output data is represented by the SDK class 
`PCollection`. `PCollection` is a special class, provided by the Beam SDK, that 
you can use to represent a data set of virtually any size, including infinite 
data sets.
+Each transform takes some kind of input (data or otherwise), and produces some 
output data. The input and output data is represented by the SDK class 
`PCollection`. `PCollection` is a special class, provided by the Beam SDK, that 
you can use to represent a data set of virtually any size, including unbounded 
data sets.
 
 <img src="{{ "/images/wordcount-pipeline.png" | prepend: site.baseurl }}" 
alt="Word Count pipeline diagram">
 Figure 1: The pipeline data flow.
 
 The Minimal WordCount pipeline contains five transforms:
 
-1.  A text file `Read` transform is applied to the Pipeline object itself, and 
produces a `PCollection` as output. Each element in the output PCollection 
represents one line of text from the input file. This example happens to use 
input data stored in a publicly accessible Google Cloud Storage bucket 
("gs://").
+1.  A text file `Read` transform is applied to the Pipeline object itself, and 
produces a `PCollection` as output. Each element in the output PCollection 
represents one line of text from the input file. This example uses input data 
stored in a publicly accessible Google Cloud Storage bucket ("gs://").
 
     ```java
     p.apply(TextIO.Read.from("gs://apache-beam-samples/shakespeare/*"))
@@ -109,7 +109,9 @@ The Minimal WordCount pipeline contains five transforms:
     .apply("ExtractWords", ParDo.of(new DoFn<String, String>() {
         @ProcessElement
         public void processElement(ProcessContext c) {
-            for (String word : c.element().split("[^a-zA-Z']+")) {
+            // \p{L} denotes the category of Unicode letters,
+            // so this pattern will match on everything that is not a letter.
+            for (String word : c.element().split("[^\\p{L}]+")) {
                 if (!word.isEmpty()) {
                     c.output(word);
                 }
@@ -137,7 +139,7 @@ The Minimal WordCount pipeline contains five transforms:
 
 4.  The next transform formats each of the key/value pairs of unique words and 
occurrence counts into a printable string suitable for writing to an output 
file.
 
-       The map transform is a higher-level composite transform that 
encapsulates a simple `ParDo`; for each element in the input `PCollection`, the 
map transform applies a function that produces exactly one output element.
+       The map transform is a higher-level composite transform that 
encapsulates a simple `ParDo`. For each element in the input `PCollection`, the 
map transform applies a function that produces exactly one output element.
 
     ```java
     .apply("FormatResults", MapElements.via(new SimpleFunction<KV<String, 
Long>, String>() {
@@ -260,7 +262,7 @@ public static void main(String[] args) throws IOException {
 
 You can hard-code various execution options when you run your pipeline. 
However, the more common way is to define your own configuration options via 
command-line argument parsing. Defining your configuration options via the 
command-line makes the code more easily portable across different runners.
 
-Add arguments to be processed by the command-line parser, and specify default 
values for them. You can then access the options values in your pipeline code. 
+Add arguments to be processed by the command-line parser, and specify default 
values for them. You can then access the options values in your pipeline code.
 
 ```java
 public static interface WordCountOptions extends PipelineOptions {
@@ -298,15 +300,7 @@ The following sections explain these key concepts in 
detail, and break down the
 
 ### Logging
 
-Each runner may choose to handle logs in its own way. 
-
-#### Direct Runner
-
-If you execute your pipeline using `DirectRunner`, it will print the log 
messages directly to your local console.
-
-#### Dataflow Runner
-
-If you execute your pipeline using `DataflowRunner`, you can use Google Cloud 
Logging. Google Cloud Logging (currently in beta) aggregates the logs from all 
of your Dataflow job's workers to a single location in the Google Cloud 
Platform Console. You can use Cloud Logging to search and access the logs from 
all of the Compute Engine instances that Dataflow has spun up to complete your 
Dataflow job. You can add logging statements into your pipeline's `DoFn` 
instances that will appear in Cloud Logging as your pipeline runs.
+Each runner may choose to handle logs in its own way.
 
 ```java
 // This example uses .trace and .debug:
@@ -333,7 +327,16 @@ public class DebuggingWordCount {
 {% github_sample 
/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets.py 
tag:example_wordcount_debugging_logging
 %}```
 
-If you execute your pipeline using `DataflowRunner`, you can control the 
worker log levels. Dataflow workers that execute user code are configured to 
log to Cloud Logging by default at "INFO" log level and higher. You can 
override log levels for specific logging namespaces by specifying: 
`--workerLogLevelOverrides={"Name1":"Level1","Name2":"Level2",...}`. For 
example, by specifying 
`--workerLogLevelOverrides={"org.apache.beam.examples":"DEBUG"}` when executing 
this pipeline using the Dataflow service, Cloud Logging would contain only 
"DEBUG" or higher level logs for the package in addition to the default "INFO" 
or higher level logs. 
+
+#### Direct Runner
+
+If you execute your pipeline using `DirectRunner`, it will print the log 
messages directly to your local console.
+
+#### Dataflow Runner
+
+If you execute your pipeline using `DataflowRunner`, you can use Stackdriver 
Logging. Stackdriver Logging aggregates the logs from all of your Dataflow 
job's workers to a single location in the Google Cloud Platform Console. You 
can use Stackdriver Logging to search and access the logs from all of the 
workers that Dataflow has spun up to complete your Dataflow job. Logging 
statements in your pipeline's `DoFn` instances will appear in Stackdriver 
Logging as your pipeline runs.
+
+If you execute your pipeline using `DataflowRunner`, you can control the 
worker log levels. Dataflow workers that execute user code are configured to 
log to Stackdriver Logging by default at "INFO" log level and higher. You can 
override log levels for specific logging namespaces by specifying: 
`--workerLogLevelOverrides={"Name1":"Level1","Name2":"Level2",...}`. For 
example, by specifying 
`--workerLogLevelOverrides={"org.apache.beam.examples":"DEBUG"}` when executing 
this pipeline using the Dataflow service, Stackdriver Logging would contain 
only "DEBUG" or higher level logs for the package in addition to the default 
"INFO" or higher level logs.
 
 The default Dataflow worker logging configuration can be overridden by 
specifying `--defaultWorkerLogLevel=<one of TRACE, DEBUG, INFO, WARN, ERROR>`. 
For example, by specifying `--defaultWorkerLogLevel=DEBUG` when executing this 
pipeline with the Dataflow service, Cloud Logging would contain all "DEBUG" or 
higher level logs. Note that changing the default worker log level to TRACE or 
DEBUG will significantly increase the amount of logs output.
 
@@ -342,14 +345,18 @@ The default Dataflow worker logging configuration can be 
overridden by specifyin
 > **Note:** This section is yet to be added. There is an open issue for this 
 > ([BEAM-792](https://issues.apache.org/jira/browse/BEAM-792)).
 
 #### Apache Flink Runner
- 
+
 > **Note:** This section is yet to be added. There is an open issue for this 
 > ([BEAM-791](https://issues.apache.org/jira/browse/BEAM-791)).
 
+#### Apache Apex Runner
+
+> **Note:** This section is yet to be added. There is an open issue for this 
([BEAM-2285](https://issues.apache.org/jira/browse/BEAM-2285)).
+
 ### Testing your Pipeline via PAssert
 
-`PAssert` is a set of convenient `PTransform`s in the style of Hamcrest's 
collection matchers that can be used when writing Pipeline level tests to 
validate the contents of PCollections. `PAssert` is best used in unit tests 
with small data sets, but is demonstrated here as a teaching tool.
+`PAssert` is a set of convenient PTransforms in the style of Hamcrest's 
collection matchers that can be used when writing Pipeline level tests to 
validate the contents of PCollections. `PAssert` is best used in unit tests 
with small data sets, but is demonstrated here as a teaching tool.
 
-Below, we verify that the set of filtered words matches our expected counts. 
Note that `PAssert` does not provide any output, and that successful completion 
of the pipeline implies that the expectations were met. See 
[DebuggingWordCountTest](https://github.com/apache/beam/blob/master/examples/java/src/test/java/org/apache/beam/examples/DebuggingWordCountTest.java)
 for an example unit test.
+Below, we verify that the set of filtered words matches our expected counts. 
Note that `PAssert` does not produce any output, and pipeline will only succeed 
if all of the expectations are met. See 
[DebuggingWordCountTest](https://github.com/apache/beam/blob/master/examples/java/src/test/java/org/apache/beam/examples/DebuggingWordCountTest.java)
 for an example unit test.
 
 ```java
 public static void main(String[] args) {
@@ -381,7 +388,7 @@ The following sections explain these key concepts in 
detail, and break down the
 
 ### Unbounded and bounded pipeline input modes
 
-Beam allows you to create a single pipeline that can handle both bounded and 
unbounded types of input. If the input is unbounded, then all `PCollections` of 
the pipeline will be unbounded as well. The same goes for bounded input. If 
your input has a fixed number of elements, it's considered a 'bounded' data 
set. If your input is continuously updating, then it's considered 'unbounded'.
+Beam allows you to create a single pipeline that can handle both bounded and 
unbounded types of input. If the input is unbounded, then all PCollections of 
the pipeline will be unbounded as well. The same goes for bounded input. If 
your input has a fixed number of elements, it's considered a 'bounded' data 
set. If your input is continuously updating, then it's considered 'unbounded'.
 
 Recall that the input for this example is a a set of Shakespeare's texts, 
finite data. Therefore, this example reads bounded data from a text file:
 
@@ -415,20 +422,24 @@ Below is the code for `AddTimestampFn`, a `DoFn` invoked 
by `ParDo`, that sets t
 
 ```java
 static class AddTimestampFn extends DoFn<String, String> {
-  private static final Duration RAND_RANGE = Duration.standardHours(2);
   private final Instant minTimestamp;
+  private final Instant maxTimestamp;
 
-  AddTimestampFn() {
-    this.minTimestamp = new Instant(System.currentTimeMillis());
+  AddTimestampFn(Instant minTimestamp, Instant maxTimestamp) {
+    this.minTimestamp = minTimestamp;
+    this.maxTimestamp = maxTimestamp;
   }
 
   @ProcessElement
   public void processElement(ProcessContext c) {
-    // Generate a timestamp that falls somewhere in the past two hours.
-    long randMillis = (long) (Math.random() * RAND_RANGE.getMillis());
-    Instant randomTimestamp = minTimestamp.plus(randMillis);
-
-    // Set the data element with that timestamp.
+    Instant randomTimestamp =
+      new Instant(
+          ThreadLocalRandom.current()
+          .nextLong(minTimestamp.getMillis(), maxTimestamp.getMillis()));
+
+    /**
+     * Concept #2: Set the data element with that timestamp.
+     */
     c.outputWithTimestamp(c.element(), new Instant(randomTimestamp));
   }
 }
@@ -440,9 +451,9 @@ This feature is not yet available in the Beam SDK for 
Python.
 
 ### Windowing
 
-Beam uses a concept called **Windowing** to subdivide a `PCollection` 
according to the timestamps of its individual elements. `PTransforms` that 
aggregate multiple elements, process each `PCollection` as a succession of 
multiple, finite windows, even though the entire collection itself may be of 
infinite size (unbounded).
+Beam uses a concept called **Windowing** to subdivide a `PCollection` 
according to the timestamps of its individual elements. PTransforms that 
aggregate multiple elements, process each `PCollection` as a succession of 
multiple, finite windows, even though the entire collection itself may be of 
infinite size (unbounded).
 
-The `WindowingWordCount` example applies fixed-time windowing, wherein each 
window represents a fixed time interval. The fixed window size for this example 
defaults to 1 minute (you can change this with a command-line option). 
+The `WindowedWordCount` example applies fixed-time windowing, wherein each 
window represents a fixed time interval. The fixed window size for this example 
defaults to 1 minute (you can change this with a command-line option). 
 
 ```java
 PCollection<String> windowedWords = input
@@ -456,7 +467,7 @@ This feature is not yet available in the Beam SDK for 
Python.
 
 ### Reusing PTransforms over windowed PCollections
 
-You can reuse existing `PTransform`s, that were created for manipulating 
simple `PCollection`s, over windowed `PCollection`s as well.
+You can reuse existing PTransforms that were created for manipulating simple 
PCollections over windowed PCollections as well.
 
 ```java
 PCollection<KV<String, Long>> wordCounts = windowedWords.apply(new 
WordCount.CountWords());
@@ -468,17 +479,14 @@ This feature is not yet available in the Beam SDK for 
Python.
 
 ### Write Results to an Unbounded Sink
 
-Since our input is unbounded, the same is true of our output `PCollection`. We 
need to make sure that we choose an appropriate, unbounded sink. Some output 
sinks support only bounded output, such as a text file. Google Cloud BigQuery 
is an output source that supports both bounded and unbounded input.
+When our input is unbounded, the same is true of our output `PCollection`. We 
need to make sure that we choose an appropriate, unbounded sink. Some output 
sinks support only bounded output, while others support both bounded and 
unbounded outputs. By using a `FilenamePolicy`, we can use `TextIO` to files 
that are partitioned by windows. We use a composite `PTransform` that uses such 
a policy internally to write a single sharded file per window.
 
-In this example, we stream the results to a BigQuery table. The results are 
then formatted for a BigQuery table, and then written to BigQuery using 
BigQueryIO.Write. 
+In this example, we stream the results to a BigQuery table. The results are 
then formatted for a BigQuery table, and then written to BigQuery using 
BigQueryIO.Write.
 
 ```java
-wordCounts.apply(ParDo.of(new FormatAsTableRowFn()))
-    .apply(BigQueryIO.Write
-      .to(getTableReference(options))
-      .withSchema(getSchema())
-      
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
-      .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
+  wordCounts
+      .apply(MapElements.via(new WordCount.FormatAsTextFn()))
+      .apply(new WriteOneFilePerWindow(output, options.getNumShards()));
 ```
 
 ```py

Reply via email to