Repository: beam-site Updated Branches: refs/heads/asf-site 6a85cdf69 -> 6df417f99
[BEAM-278] Add I/O section to programming guide Project: http://git-wip-us.apache.org/repos/asf/beam-site/repo Commit: http://git-wip-us.apache.org/repos/asf/beam-site/commit/884971d0 Tree: http://git-wip-us.apache.org/repos/asf/beam-site/tree/884971d0 Diff: http://git-wip-us.apache.org/repos/asf/beam-site/diff/884971d0 Branch: refs/heads/asf-site Commit: 884971d0e0c0b365146bdda2260a55580442eb96 Parents: 6a85cdf Author: melissa <[email protected]> Authored: Mon Nov 28 17:05:32 2016 -0800 Committer: Dan Halperin <[email protected]> Committed: Fri Jan 27 13:39:30 2017 -0800 ---------------------------------------------------------------------- src/documentation/programming-guide.md | 188 +++++++++++++++++++++++++++- 1 file changed, 183 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam-site/blob/884971d0/src/documentation/programming-guide.md ---------------------------------------------------------------------- diff --git a/src/documentation/programming-guide.md b/src/documentation/programming-guide.md index 869d9db..3851bb5 100644 --- a/src/documentation/programming-guide.md +++ b/src/documentation/programming-guide.md @@ -9,7 +9,7 @@ redirect_from: # Apache Beam Programming Guide -The **Beam Programming Guide** is intended for Beam users who want to use the Beam SDKs to create data processing pipelines. It provides guidance for using the Beam SDK classes to build and test your pipeline. It is not intended as an exhaustive reference, but as a language-agnostic, high-level guide to programmatically building your Beam pipeline. As the programming guide is filled out, the text will include code samples in multiple languages to help illustrate how to implement Beam concepts in your programs. +The **Beam Programming Guide** is intended for Beam users who want to use the Beam SDKs to create data processing pipelines. It provides guidance for using the Beam SDK classes to build and test your pipeline. It is not intended as an exhaustive reference, but as a language-agnostic, high-level guide to programmatically building your Beam pipeline. As the programming guide is filled out, the text will include code samples in multiple languages to help illustrate how to implement Beam concepts in your pipelines. <nav class="language-switcher"> @@ -39,7 +39,7 @@ The **Beam Programming Guide** is intended for Beam users who want to use the Be * [Using Flatten and Partition](#transforms-flatten-partition) * [General Requirements for Writing User Code for Beam Transforms](#transforms-usercodereqs) * [Side Inputs and Side Outputs](#transforms-sideio) -* [I/O](#io) +* [Pipeline I/O](#io) * [Running the Pipeline](#running) * [Data Encoding and Type Safety](#coders) * [Working with Windowing](#windowing) @@ -124,7 +124,7 @@ public static void main(String[] args) { Pipeline p = Pipeline.create(options); PCollection<String> lines = p.apply( - TextIO.Read.named("ReadMyFile").from("protocol://path/to/some/inputData.txt")); + "ReadMyFile", TextIO.Read.from("protocol://path/to/some/inputData.txt")); } ``` @@ -942,8 +942,186 @@ While `ParDo` always produces a main output `PCollection` (as the return value f {% github_sample /apache/beam/blob/python-sdk/sdks/python/apache_beam/examples/snippets/snippets_test.py tag:model_pardo_with_side_outputs_undeclared %}``` -<a name="io"></a> -<a name="running"></a> +## <a name="io"></a>Pipeline I/O + +When you create a pipeline, you often need to read data from some external source, such as a file in external data sink or a database. Likewise, you may want your pipeline to output its result data to a similar external data sink. Beam provides read and write transforms for a number of common data storage types. If you want your pipeline to read from or write to a data storage format that isn't supported by the built-in transforms, you can implement your own read and write transforms. + +> A guide that covers how to implement your own Beam IO transforms is in progress ([BEAM-1025](https://issues.apache.org/jira/browse/BEAM-1025)). + +### Reading Input Data + +Read transforms read data from an external source and return a `PCollection` representation of the data for use by your pipeline. You can use a read transform at any point while constructing your pipeline to create a new `PCollection`, though it will be most common at the start of your pipeline. Here are examples of two common ways to read data. + +#### Reading from a `Source`: + +```java +// A fully-specified Read from a GCS file: +PCollection<Integer> numbers = + p.apply("ReadNumbers", TextIO.Read + .from("gs://my_bucket/path/to/numbers-*.txt") + .withCoder(TextualIntegerCoder.of())); +``` + +```python +pipeline | beam.io.ReadFromText('protocol://path/to/some/inputData.txt') +``` + +Note that many sources use the builder pattern for setting options. For additional examples, see the language-specific documentation (such as Javadoc) for each of the sources. + +#### Using a read transform: + +```java +// This example uses JmsIO. +PCollection<JmsRecord> output = + pipeline.apply(JmsIO.read() + .withConnectionFactory(myConnectionFactory) + .withQueue("my-queue")) +``` + +```python +pipeline | beam.io.textio.ReadFromText('my_file_name') +``` + +### Writing Output Data + +Write transforms write the data in a `PCollection` to an external data source. You will most often use write transforms at the end of your pipeline to output your pipeline's final results. However, you can use a write transform to output a `PCollection`'s data at any point in your pipeline. Here are examples of two common ways to write data. + +#### Writing to a `Sink`: + +```java +// This example uses XmlSink. +pipeline.apply(Write.to( + XmlSink.ofRecordClass(Type.class) + .withRootElementName(root_element) + .toFilenamePrefix(output_filename))); +``` + +```python +output | beam.io.WriteToText('my_file_name') +``` + +#### Using a write transform: + +```java +// This example uses JmsIO. +pipeline.apply(...) // returns PCollection<String> + .apply(JmsIO.write() + .withConnectionFactory(myConnectionFactory) + .withQueue("my-queue") +``` + +```python +output | beam.io.textio.WriteToText('my_file_name') +``` + +### File-based input and output data + +#### Reading From Multiple Locations: + +Many read transforms support reading from multiple input files matching a glob operator you provide. The following TextIO example uses a glob operator (\*) to read all matching input files that have prefix "input-" and the suffix ".csv" in the given location: + +```java +p.apply(âReadFromTextâ, + TextIO.Read.from("protocol://my_bucket/path/to/input-*.csv"); +``` + +```python +lines = p | beam.io.Read( + 'ReadFromText', + beam.io.TextFileSource('protocol://my_bucket/path/to/input-*.csv')) +``` + +To read data from disparate sources into a single `PCollection`, read each one independently and then use the `Flatten` transform to create a single `PCollection`. + +#### Writing To Multiple Output Files: + +For file-based output data, write transforms write to multiple output files by default. When you pass an output file name to a write transform, the file name is used as the prefix for all output files that the write transform produces. You can append a suffix to each output file by specifying a suffix. + +The following write transform example writes multiple output files to a location. Each file has the prefix "numbers", a numeric tag, and the suffix ".csv". + +```java +records.apply(TextIO.Write.named("WriteToText") + .to("protocol://my_bucket/path/to/numbers") + .withSuffix(".csv")); +``` + +```python +filtered_words | beam.io.WriteToText( +'protocol://my_bucket/path/to/numbers', file_name_suffix='.csv') +``` + +### Beam provided I/O APIs + +See the language specific source code directories for the Beam supported I/O APIs. Specific documentation for each of these I/O sources will be added in the future. ([BEAM-1054](https://issues.apache.org/jira/browse/BEAM-1054)) + +<table class="table table-bordered"> +<tr> + <th>Language</th> + <th>File-based</th> + <th>Messaging</th> + <th>Database</th> +</tr> +<tr> + <td>Java</td> + <td> + <p><a href="https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java">AvroIO</a></p> + <p><a href="https://github.com/apache/beam/tree/master/sdks/java/io/hdfs">HDFS</a></p> + <p><a href="https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java">TextIO</a></p> + <p><a href="https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/">XML</a></p> + </td> + <td> + <p><a href="https://github.com/apache/beam/tree/master/sdks/java/io/jms">JMS</a></p> + <p><a href="https://github.com/apache/beam/tree/master/sdks/java/io/kafka">Kafka</a></p> + <p><a href="https://github.com/apache/beam/tree/master/sdks/java/io/kinesis">Kinesis</a></p> + <p><a href="https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io">Google Cloud PubSub</a></p> + </td> + <td> + <p><a href="https://github.com/apache/beam/tree/master/sdks/java/io/mongodb">MongoDB</a></p> + <p><a href="https://github.com/apache/beam/tree/master/sdks/java/io/jdbc">JDBC</a></p> + <p><a href="https://github.com/apache/beam/tree/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery">Google BigQuery</a></p> + <p><a href="https://github.com/apache/beam/tree/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable">Google Cloud Bigtable</a></p> + <p><a href="https://github.com/apache/beam/tree/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore">Google Cloud Datastore</a></p> + </td> +</tr> +<tr> + <td>Python</td> + <td> + <p><a href="https://github.com/apache/beam/blob/python-sdk/sdks/python/apache_beam/io/avroio.py">avroio</a></p> + <p><a href="https://github.com/apache/beam/blob/python-sdk/sdks/python/apache_beam/io/textio.py">textio</a></p> + </td> + <td> + </td> + <td> + <p><a href="https://github.com/apache/beam/blob/python-sdk/sdks/python/apache_beam/io/bigquery.py">Google BigQuery</a></p> + <p><a href="https://github.com/apache/beam/tree/python-sdk/sdks/python/apache_beam/io/datastore">Google Cloud Datastore</a></p> + </td> + +</tr> +</table> + + +## <a name="running"></a>Running the Pipeline + +To run your pipeline, use the `run` method. The program you create sends a specification for your pipeline to a pipeline runner, which then constructs and runs the actual series of pipeline operations. Pipelines are executed asynchronously by default. + +```java +pipeline.run(); +``` + +```python +pipeline.run() +``` + +For blocking execution, append the `waitUntilFinish` method: + +```java +pipeline.run().waitUntilFinish(); +``` + +```python +# Not currently supported. +``` + <a name="transforms-composite"></a> <a name="coders"></a> <a name="windowing"></a>
