Update according to reviewer comments

Project: http://git-wip-us.apache.org/repos/asf/beam-site/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam-site/commit/bc4d4947
Tree: http://git-wip-us.apache.org/repos/asf/beam-site/tree/bc4d4947
Diff: http://git-wip-us.apache.org/repos/asf/beam-site/diff/bc4d4947

Branch: refs/heads/asf-site
Commit: bc4d4947461244eee83d7c0f31e76cda9971b5cb
Parents: 884971d
Author: Hadar Hod <[email protected]>
Authored: Tue Jan 24 23:07:48 2017 -0800
Committer: Dan Halperin <[email protected]>
Committed: Fri Jan 27 13:42:01 2017 -0800

----------------------------------------------------------------------
 src/documentation/programming-guide.md | 135 ++++++++++------------------
 1 file changed, 48 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam-site/blob/bc4d4947/src/documentation/programming-guide.md
----------------------------------------------------------------------
diff --git a/src/documentation/programming-guide.md 
b/src/documentation/programming-guide.md
index 3851bb5..c0e947f 100644
--- a/src/documentation/programming-guide.md
+++ b/src/documentation/programming-guide.md
@@ -69,7 +69,7 @@ A typical Beam driver program works as follows:
 
 When you run your Beam driver program, the Pipeline Runner that you designate 
constructs a **workflow graph** of your pipeline based on the `PCollection` 
objects you've created and transforms that you've applied. That graph is then 
executed using the appropriate distributed processing back-end, becoming an 
asynchronous "job" (or equivalent) on that back-end.
 
-## <a name="pipeline"></a>Creating the Pipeline
+## <a name="pipeline"></a>Creating the pipeline
 
 The `Pipeline` abstraction encapsulates all the data and steps in your data 
processing task. Your Beam driver program typically starts by constructing a 
<span class="language-java">[Pipeline]({{ site.baseurl 
}}/documentation/sdks/javadoc/{{ site.release_latest 
}}/index.html?org/apache/beam/sdk/Pipeline.html)</span><span 
class="language-py">[Pipeline](https://github.com/apache/beam/blob/python-sdk/sdks/python/apache_beam/pipeline.py)</span>
 object, and then using that object as the basis for creating the pipeline's 
data sets as `PCollection`s and its operations as `Transform`s.
 
@@ -110,7 +110,7 @@ After you've created your `Pipeline`, you'll need to begin 
by creating at least
 
 You create a `PCollection` by either reading data from an external source 
using Beam's [Source API](#io), or you can create a `PCollection` of data 
stored in an in-memory collection class in your driver program. The former is 
typically how a production pipeline would ingest data; Beam's Source APIs 
contain adapters to help you read from external sources like large cloud-based 
files, databases, or subscription services. The latter is primarily useful for 
testing and debugging purposes.
 
-#### Reading from an External Source
+#### Reading from an external source
 
 To read from an external source, you use one of the [Beam-provided I/O 
adapters](#io). The adapters vary in their exact usage, but all of them from 
some external data source and return a `PCollection` whose elements represent 
the data records in that source. 
 
@@ -141,7 +141,7 @@ lines = p | 'ReadMyFile' >> 
beam.io.Read(beam.io.TextFileSource("protocol://path
 
 See the [section on I/O](#io) to learn more about how to read from the various 
data sources supported by the Beam SDK.
 
-#### Creating a PCollection from In-Memory Data
+#### Creating a PCollection from in-memory data
 
 {:.language-java}
 To create a `PCollection` from an in-memory Java `Collection`, you use the 
Beam-provided `Create` transform. Much like a data adapter's `Read`, you apply 
`Create` directly to your `Pipeline` object itself.
@@ -190,11 +190,11 @@ p = beam.Pipeline()
 collection = p | 'ReadMyLines' >> beam.Create(lines)
 ```
 
-### <a name="pccharacteristics"></a>PCollection Characteristics
+### <a name="pccharacteristics"></a>PCollection characteristics
 
 A `PCollection` is owned by the specific `Pipeline` object for which it is 
created; multiple pipelines cannot share a `PCollection`. In some respects, a 
`PCollection` functions like a collection class. However, a `PCollection` can 
differ in a few key ways:
 
-#### <a name="pcelementtype"></a>Element Type
+#### <a name="pcelementtype"></a>Element type
 
 The elements of a `PCollection` may be of any type, but must all be of the 
same type. However, to support distributed processing, Beam needs to be able to 
encode each individual element as a byte string (so elements can be passed 
around to distributed workers). The Beam SDKs provide a data encoding mechanism 
that includes built-in encoding for commonly-used types as well as support for 
specifying custom encodings as needed.
 
@@ -202,11 +202,11 @@ The elements of a `PCollection` may be of any type, but 
must all be of the same
 
 A `PCollection` is immutable. Once created, you cannot add, remove, or change 
individual elements. A Beam Transform might process each element of a 
`PCollection` and generate new pipeline data (as a new `PCollection`), *but it 
does not consume or modify the original input collection*.
 
-#### <a name="pcrandomaccess"></a>Random Access
+#### <a name="pcrandomaccess"></a>Random access
 
 A `PCollection` does not support random access to individual elements. 
Instead, Beam Transforms consider every element in a `PCollection` individually.
 
-#### <a name="pcsizebound"></a>Size and Boundedness
+#### <a name="pcsizebound"></a>Size and boundedness
 
 A `PCollection` is a large, immutable "bag" of elements. There is no upper 
limit on how many elements a `PCollection` can contain; any given `PCollection` 
might fit in memory on a single machine, or it might represent a very large 
distributed data set backed by a persistent data store.
 
@@ -216,7 +216,7 @@ The bounded (or unbounded) nature of your `PCollection` 
affects how Beam process
 
 When performing an operation that groups elements in an unbounded 
`PCollection`, Beam requires a concept called **Windowing** to divide a 
continuously updating data set into logical windows of finite size.  Beam 
processes each window as a bundle, and processing continues as the data set is 
generated. These logical windows are determined by some characteristic 
associated with a data element, such as a **timestamp**.
 
-#### <a name="pctimestamps"></a>Element Timestamps
+#### <a name="pctimestamps"></a>Element timestamps
 
 Each element in a `PCollection` has an associated intrinsic **timestamp**. The 
timestamp for each element is initially assigned by the [Source](#io) that 
creates the `PCollection`. Sources that create an unbounded `PCollection` often 
assign each new element a timestamp that corresponds to when the element was 
read or added.
 
@@ -226,7 +226,7 @@ Timestamps are useful for a `PCollection` that contains 
elements with an inheren
 
 You can manually assign timestamps to the elements of a `PCollection` if the 
source doesn't do it for you. You'll want to do this if the elements have an 
inherent timestamp, but the timestamp is somewhere in the structure of the 
element itself (such as a "time" field in a server log entry). Beam has 
[Transforms](#transforms) that take a `PCollection` as input and output an 
identical `PCollection` with timestamps attached; see [Assigning 
Timestamps](#windowing) for more information on how to do so.
 
-## <a name="transforms"></a>Applying Transforms
+## <a name="transforms"></a>Applying transforms
 
 In the Beam SDKs, **transforms** are the operations in your pipeline. A 
transform takes a `PCollection` (or more than one `PCollection`) as input, 
performs an operation that you specify on each element in that collection, and 
produces a new output `PCollection`. To invoke a transform, you must **apply** 
it to the input `PCollection`.
 
@@ -277,7 +277,7 @@ You can also build your own [composite 
transforms](#transforms-composite) that n
 
 The transforms in the Beam SDKs provide a generic **processing framework**, 
where you provide processing logic in the form of a function object 
(colloquially referred to as "user code"). The user code gets applied to the 
elements of the input `PCollection`. Instances of your user code might then be 
executed in parallel by many different workers across a cluster, depending on 
the pipeline runner and back-end that you choose to execute your Beam pipeline. 
The user code running on each worker generates the output elements that are 
ultimately added to the final output `PCollection` that the transform produces.
 
-### Core Beam Transforms
+### Core Beam transforms
 
 Beam provides the following transforms, each of which represents a different 
processing paradigm:
 
@@ -390,7 +390,7 @@ In your processing method, you'll also need to meet some 
immutability requiremen
 * Once you output a value using `ProcessContext.output()` or 
`ProcessContext.sideOutput()`, you should not modify that value in any way.
 
 
-##### Lightweight DoFns and Other Abstractions
+##### Lightweight DoFns and other abstractions
 
 If your function is relatively straightforward, you can simplify your use of 
`ParDo` by providing a lightweight `DoFn` in-line, as <span 
class="language-java">an anonymous inner class instance</span><span 
class="language-py">a lambda function</span>.
 
@@ -403,9 +403,8 @@ PCollection<String> words = ...;
 // Apply a ParDo with an anonymous DoFn to the PCollection words.
 // Save the result as the PCollection wordLengths.
 PCollection<Integer> wordLengths = words.apply(
-  ParDo
-    .named("ComputeWordLengths")            // the transform name
-    .of(new DoFn<String, Integer>() {       // a DoFn as an anonymous inner 
class instance
+  "ComputeWordLengths",                     // the transform name
+  ParDo.of(new DoFn<String, Integer>() {    // a DoFn as an anonymous inner 
class instance
       @ProcessElement
       public void processElement(ProcessContext c) {
         c.output(c.element().length());
@@ -497,7 +496,7 @@ When you apply a `Combine` transform, you must provide the 
function that contain
 
 Simple combine operations, such as sums, can usually be implemented as a 
simple function. More complex combination operations might require you to 
create a subclass of `CombineFn` that has an accumulation type distinct from 
the input/output type.
 
-##### **Simple Combinations Using Simple Functions**
+##### **Simple combinations using simple functions**
 
 The following example code shows a simple combine function.
 
@@ -519,7 +518,7 @@ public static class SumInts implements 
SerializableFunction<Iterable<Integer>, I
 {% github_sample 
/apache/beam/blob/python-sdk/sdks/python/apache_beam/examples/snippets/snippets_test.py
 tag:combine_bounded_sum
 %}```
 
-##### **Advanced Combinations using CombineFn**
+##### **Advanced combinations using CombineFn**
 
 For more complex combine functions, you can define a subclass of `CombineFn`. 
You should use `CombineFn` if the combine function requires a more 
sophisticated accumulator, must perform additional pre- or post-processing, 
might change the output type, or takes the key into account.
 
@@ -576,7 +575,7 @@ pc = ...
 
 If you are combining a `PCollection` of key-value pairs, [per-key 
combining](#transforms-combine-per-key) is often enough. If you need the 
combining strategy to change based on the key (for example, MIN for some users 
and MAX for other users), you can define a `KeyedCombineFn` to access the key 
within the combining strategy.
 
-##### **Combining a PCollection into a Single Value**
+##### **Combining a PCollection into a single value**
 
 Use the global combine to transform all of the elements in a given 
`PCollection` into a single value, represented in your pipeline as a new 
`PCollection` containing one element. The following example code shows how to 
apply the Beam provided sum combine function to produce a single sum value for 
a `PCollection` of integers.
 
@@ -595,7 +594,7 @@ pc = ...
 result = pc | beam.CombineGlobally(sum)
 ```
 
-##### Global Windowing:
+##### Global windowing:
 
 If your input `PCollection` uses the default global windowing, the default 
behavior is to return a `PCollection` containing one item. That item's value 
comes from the accumulator in the combine function that you specified when 
applying `Combine`. For example, the Beam provided sum combine function returns 
a zero value (the sum of an empty input), while the min combine function 
returns a maximal or infinite value.
 
@@ -613,7 +612,7 @@ sum = pc | beam.CombineGlobally(sum).without_defaults()
 
 ```
 
-##### Non-Global Windowing:
+##### Non-global windowing:
 
 If your `PCollection` uses any non-global windowing function, Beam does not 
provide the default behavior. You must specify one of the following options 
when applying `Combine`:
 
@@ -621,7 +620,7 @@ If your `PCollection` uses any non-global windowing 
function, Beam does not prov
 * Specify `.asSingletonView`, in which the output is immediately converted to 
a `PCollectionView`, which will provide a default value for each empty window 
when used as a side input. You'll generally only need to use this option if the 
result of your pipeline's `Combine` is to be used as a side input later in the 
pipeline.
 
 
-##### <a name="transforms-combine-per-key"></a>**Combining Values in a 
Key-Grouped Collection**
+##### <a name="transforms-combine-per-key"></a>**Combining values in a 
key-grouped collection**
 
 After creating a key-grouped collection (for example, by using a `GroupByKey` 
transform) a common pattern is to combine the collection of values associated 
with each key into a single, merged value. Drawing on the previous example from 
`GroupByKey`, a key-grouped `PCollection` called `groupedWords` looks like this:
 
@@ -688,11 +687,11 @@ merged = (
     | beam.Flatten())
 ```
 
-##### Data Encoding in Merged Collections:
+##### Data encoding in merged collections:
 
 By default, the coder for the output `PCollection` is the same as the coder 
for the first `PCollection` in the input `PCollectionList`. However, the input 
`PCollection` objects can each use different coders, as long as they all 
contain the same data type in your chosen language.
 
-##### Merging Windowed Collections:
+##### Merging windowed collections:
 
 When using `Flatten` to merge `PCollection` objects that have a windowing 
strategy applied, all of the `PCollection` objects you want to merge must use a 
compatible windowing strategy and window sizing. For example, all the 
collections you're merging must all use (hypothetically) identical 5-minute 
fixed windows or 4-minute sliding windows starting every 30 seconds.
 
@@ -733,7 +732,7 @@ by_decile = students | beam.Partition(partition_fn, 10)
 fortieth_percentile = by_decile[4]
 ```
 
-#### <a name="transforms-usercodereqs"></a>General Requirements for Writing 
User Code for Beam Transforms
+#### <a name="transforms-usercodereqs"></a>General Requirements for writing 
user code for Beam transforms
 
 When you build user code for a Beam transform, you should keep in mind the 
distributed nature of execution. For example, there might be many copies of 
your function running on a lot of different machines in parallel, and those 
copies function independently, without communicating or sharing state with any 
of the other copies. Depending on the Pipeline Runner and processing back-end 
you choose for your pipeline, each copy of your user code function may be 
retried or run multiple times. As such, you should be cautious about including 
things like state dependency in your user code.
 
@@ -758,7 +757,7 @@ Some other serializability factors you should keep in mind 
are:
 * Mutating a function object after it gets applied will have no effect.
 * Take care when declaring your function object inline by using an anonymous 
inner class instance. In a non-static context, your inner class instance will 
implicitly contain a pointer to the enclosing class and that class' state. That 
enclosing class will also be serialized, and thus the same considerations that 
apply to the function object itself also apply to this outer class.
 
-##### Thread-Compatibility
+##### Thread-compatibility
 
 Your function object should be thread-compatible. Each instance of your 
function object is accessed by a single thread on a worker instance, unless you 
explicitly create your own threads. Note, however, that **the Beam SDKs are not 
thread-safe**. If you create your own threads in your user code, you must 
provide your own synchronization. Note that static members in your function 
object are not passed to worker instances and that multiple instances of your 
function may be accessed from different threads.
 
@@ -768,14 +767,14 @@ It's recommended that you make your function object 
idempotent--that is, that it
 
 #### <a name="transforms-sideio"></a>Side Inputs and Side Outputs
 
-##### **Side Inputs**
+##### **Side inputs**
 
 In addition to the main input `PCollection`, you can provide additional inputs 
to a `ParDo` transform in the form of side inputs. A side input is an 
additional input that your `DoFn` can access each time it processes an element 
in the input `PCollection`. When you specify a side input, you create a view of 
some other data that can be read from within the `ParDo` transform's `DoFn` 
while procesing each element.
 
 Side inputs are useful if your `ParDo` needs to inject additional data when 
processing each element in the input `PCollection`, but the additional data 
needs to be determined at runtime (and not hard-coded). Such values might be 
determined by the input data, or depend on a different branch of your pipeline.
 
 
-##### Passing Side Inputs to ParDo:
+##### Passing side inputs to ParDo:
 
 ```java
   // Pass side inputs to your ParDo transform by invoking .withSideInputs.
@@ -824,7 +823,7 @@ Side inputs are useful if your `ParDo` needs to inject 
additional data when proc
 
 ```
 
-##### Side Inputs and Windowing:
+##### Side inputs and windowing:
 
 A windowed `PCollection` may be infinite and thus cannot be compressed into a 
single value (or single collection class). When you create a `PCollectionView` 
of a windowed `PCollection`, the `PCollectionView` represents a single entity 
per window (one singleton per window, one list per window, etc.).
 
@@ -836,11 +835,11 @@ If the main input element exists in more than one window, 
then `processElement`
 
 If the side input has multiple trigger firings, Beam uses the value from the 
latest trigger firing. This is particularly useful if you use a side input with 
a single global window and specify a trigger.
 
-##### **Side Outputs**
+##### **Side outputs**
 
 While `ParDo` always produces a main output `PCollection` (as the return value 
from apply), you can also have your `ParDo` produce any number of additional 
output `PCollection`s. If you choose to have multiple outputs, your `ParDo` 
returns all of the output `PCollection`s (including the main output) bundled 
together.
 
-##### Tags for Side Outputs:
+##### Tags for side outputs:
 
 ```java
 // To emit elements to a side output PCollection, create a TupleTag object to 
identify each collection that your ParDo produces.
@@ -902,7 +901,7 @@ While `ParDo` always produces a main output `PCollection` 
(as the return value f
 {% github_sample 
/apache/beam/blob/python-sdk/sdks/python/apache_beam/examples/snippets/snippets_test.py
 tag:model_pardo_with_side_outputs_iter
 %}```
 
-##### Emitting to Side Outputs in your DoFn:
+##### Emitting to side outputs in your DoFn:
 
 ```java
 // Inside your ParDo's DoFn, you can emit an element to a side output by using 
the method ProcessContext.sideOutput.
@@ -948,77 +947,39 @@ When you create a pipeline, you often need to read data 
from some external sourc
 
 > A guide that covers how to implement your own Beam IO transforms is in 
 > progress ([BEAM-1025](https://issues.apache.org/jira/browse/BEAM-1025)).
 
-### Reading Input Data
+### Reading input data
 
-Read transforms read data from an external source and return a `PCollection` 
representation of the data for use by your pipeline. You can use a read 
transform at any point while constructing your pipeline to create a new 
`PCollection`, though it will be most common at the start of your pipeline. 
Here are examples of two common ways to read data.
-
-#### Reading from a `Source`:
-
-```java
-// A fully-specified Read from a GCS file:
-PCollection<Integer> numbers =
-  p.apply("ReadNumbers", TextIO.Read
-   .from("gs://my_bucket/path/to/numbers-*.txt")
-   .withCoder(TextualIntegerCoder.of()));
-```
-
-```python
-pipeline | beam.io.ReadFromText('protocol://path/to/some/inputData.txt')
-```
-
-Note that many sources use the builder pattern for setting options. For 
additional examples, see the language-specific documentation (such as Javadoc) 
for each of the sources.
+Read transforms read data from an external source and return a `PCollection` 
representation of the data for use by your pipeline. You can use a read 
transform at any point while constructing your pipeline to create a new 
`PCollection`, though it will be most common at the start of your pipeline.
 
 #### Using a read transform:
 
 ```java
-// This example uses JmsIO.
-PCollection<JmsRecord> output =
-    pipeline.apply(JmsIO.read()
-        .withConnectionFactory(myConnectionFactory)
-        .withQueue("my-queue"))
+PCollection<String> lines = 
p.apply(TextIO.Read.from("gs://some/inputData.txt"));   
 ```
 
 ```python
-pipeline | beam.io.textio.ReadFromText('my_file_name')
+lines = pipeline | beam.io.ReadFromText('gs://some/inputData.txt')
 ```
 
-### Writing Output Data
+### Writing output data
 
-Write transforms write the data in a `PCollection` to an external data source. 
You will most often use write transforms at the end of your pipeline to output 
your pipeline's final results. However, you can use a write transform to output 
a `PCollection`'s data at any point in your pipeline. Here are examples of two 
common ways to write data.
-
-#### Writing to a `Sink`:
-
-```java
-// This example uses XmlSink.
-pipeline.apply(Write.to(
-          XmlSink.ofRecordClass(Type.class)
-              .withRootElementName(root_element)
-              .toFilenamePrefix(output_filename)));
-```
-
-```python
-output | beam.io.WriteToText('my_file_name')
-```
+Write transforms write the data in a `PCollection` to an external data source. 
You will most often use write transforms at the end of your pipeline to output 
your pipeline's final results. However, you can use a write transform to output 
a `PCollection`'s data at any point in your pipeline. 
 
-#### Using a write transform:
+#### Using a Write transform:
 
 ```java
-// This example uses JmsIO.
-pipeline.apply(...) // returns PCollection<String>
-        .apply(JmsIO.write()
-            .withConnectionFactory(myConnectionFactory)
-            .withQueue("my-queue")
+output.apply(TextIO.Write.to("gs://some/outputData"));
 ```
 
 ```python
-output | beam.io.textio.WriteToText('my_file_name')
+output | beam.io.WriteToText('gs://some/outputData')
 ```
 
 ### File-based input and output data
 
-#### Reading From Multiple Locations:
+#### Reading from multiple locations:
 
-Many read transforms support reading from multiple input files matching a glob 
operator you provide. The following TextIO example uses a glob operator (\*) to 
read all matching input files that have prefix "input-" and the suffix ".csv" 
in the given location:
+Many read transforms support reading from multiple input files matching a glob 
operator you provide. Note that glob operators are filesystem-specific and obey 
filesystem-specific consistency models. The following TextIO example uses a 
glob operator (\*) to read all matching input files that have prefix "input-" 
and the suffix ".csv" in the given location:
 
 ```java
 p.apply(“ReadFromText”,
@@ -1031,18 +992,18 @@ lines = p | beam.io.Read(
     beam.io.TextFileSource('protocol://my_bucket/path/to/input-*.csv'))
 ```
 
-To read data from disparate sources into a single `PCollection`, read each one 
independently and then use the `Flatten` transform to create a single 
`PCollection`.
+To read data from disparate sources into a single `PCollection`, read each one 
independently and then use the [Flatten](#transforms-flatten-partition) 
transform to create a single `PCollection`.
 
-#### Writing To Multiple Output Files:
+#### Writing to multiple output files:
 
 For file-based output data, write transforms write to multiple output files by 
default. When you pass an output file name to a write transform, the file name 
is used as the prefix for all output files that the write transform produces. 
You can append a suffix to each output file by specifying a suffix.
 
 The following write transform example writes multiple output files to a 
location. Each file has the prefix "numbers", a numeric tag, and the suffix 
".csv".
 
 ```java
-records.apply(TextIO.Write.named("WriteToText")
-                          .to("protocol://my_bucket/path/to/numbers")
-                          .withSuffix(".csv"));
+records.apply("WriteToText",
+    TextIO.Write.to("protocol://my_bucket/path/to/numbers")
+                .withSuffix(".csv"));
 ```
 
 ```python
@@ -1050,7 +1011,7 @@ filtered_words | beam.io.WriteToText(
 'protocol://my_bucket/path/to/numbers', file_name_suffix='.csv')
 ```
 
-### Beam provided I/O APIs
+### Beam-provided I/O APIs
 
 See the language specific source code directories for the Beam supported I/O 
APIs. Specific documentation for each of these I/O sources will be added in the 
future. ([BEAM-1054](https://issues.apache.org/jira/browse/BEAM-1054))
 
@@ -1100,7 +1061,7 @@ See the language specific source code directories for the 
Beam supported I/O API
 </table>
 
 
-## <a name="running"></a>Running the Pipeline
+## <a name="running"></a>Running the pipeline
 
 To run your pipeline, use the `run` method. The program you create sends a 
specification for your pipeline to a pipeline runner, which then constructs and 
runs the actual series of pipeline operations. Pipelines are executed 
asynchronously by default.
 
@@ -1119,7 +1080,7 @@ pipeline.run().waitUntilFinish();
 ```
 
 ```python
-# Not currently supported.
+pipeline.run().wait_until_finish();
 ```
 
 <a name="transforms-composite"></a>

Reply via email to