Repository: beam-site Updated Branches: refs/heads/asf-site 934a55f18 -> 47ad18557
Migrate hadoop inputformat to website Project: http://git-wip-us.apache.org/repos/asf/beam-site/repo Commit: http://git-wip-us.apache.org/repos/asf/beam-site/commit/69ee8d5d Tree: http://git-wip-us.apache.org/repos/asf/beam-site/tree/69ee8d5d Diff: http://git-wip-us.apache.org/repos/asf/beam-site/diff/69ee8d5d Branch: refs/heads/asf-site Commit: 69ee8d5d8157aa9fea5825d9995a664dbc157b1e Parents: 934a55f Author: Ahmet Altay <[email protected]> Authored: Tue May 9 10:25:33 2017 -0700 Committer: Ahmet Altay <[email protected]> Committed: Tue May 9 14:22:01 2017 -0700 ---------------------------------------------------------------------- src/documentation/io/built-in-hadoop.md | 197 +++++++++++++++++++++++++++ src/documentation/io/built-in.md | 2 +- 2 files changed, 198 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam-site/blob/69ee8d5d/src/documentation/io/built-in-hadoop.md ---------------------------------------------------------------------- diff --git a/src/documentation/io/built-in-hadoop.md b/src/documentation/io/built-in-hadoop.md new file mode 100644 index 0000000..dda5843 --- /dev/null +++ b/src/documentation/io/built-in-hadoop.md @@ -0,0 +1,197 @@ +--- +layout: default +title: "Apache Hadoop InputFormat IO" +permalink: /documentation/io/built-in/hadoop/ +--- + +[Pipeline I/O Table of Contents]({{site.baseurl}}/documentation/io/io-toc/) + +# Hadoop InputFormat IO + +A `HadoopInputFormatIO` is a transform for reading data from any source that implements Hadoop's `InputFormat`. For example, Cassandra, Elasticsearch, HBase, Redis, Postgres, etc. + +`HadoopInputFormatIO` allows you to connect to many data sources that do not yet have a Beam IO transform. However, `HadoopInputFormatIO` has to make several performance trade-offs in connecting to `InputFormat`. So, if there is another Beam IO transform for connecting specifically to your data source of choice, we recommend you use that one. + +You will need to pass a Hadoop `Configuration` with parameters specifying how the read will occur. Many properties of the `Configuration` are optional and some are required for certain `InputFormat` classes, but the following properties must be set for all `InputFormat` classes: + +- `mapreduce.job.inputformat.class` - The `InputFormat` class used to connect to your data source of choice. +- `key.class` - The `Key` class returned by the `InputFormat` in `mapreduce.job.inputformat.class`. +- `value.class` - The `Value` class returned by the `InputFormat` in `mapreduce.job.inputformat.class`. + +For example: +```java +Configuration myHadoopConfiguration = new Configuration(false); +// Set Hadoop InputFormat, key and value class in configuration +myHadoopConfiguration.setClass("mapreduce.job.inputformat.class", InputFormatClass, + InputFormat.class); +myHadoopConfiguration.setClass("key.class", InputFormatKeyClass, Object.class); +myHadoopConfiguration.setClass("value.class", InputFormatValueClass, Object.class); +``` + +```py + # The Beam SDK for Python does not support Hadoop InputFormat IO. +``` + +You will need to check if the `Key` and `Value` classes output by the `InputFormat` have a Beam `Coder` available. If not, you can use `withKeyTranslation` or `withValueTranslation` to specify a method transforming instances of those classes into another class that is supported by a Beam `Coder`. These settings are optional and you don't need to specify translation for both key and value. + +For example: +```java +SimpleFunction<InputFormatKeyClass, MyKeyClass> myOutputKeyType = +new SimpleFunction<InputFormatKeyClass, MyKeyClass>() { + public MyKeyClass apply(InputFormatKeyClass input) { + // ...logic to transform InputFormatKeyClass to MyKeyClass + } +}; +SimpleFunction<InputFormatValueClass, MyValueClass> myOutputValueType = +new SimpleFunction<InputFormatValueClass, MyValueClass>() { + public MyValueClass apply(InputFormatValueClass input) { + // ...logic to transform InputFormatValueClass to MyValueClass + } +}; +``` + +```py + # The Beam SDK for Python does not support Hadoop InputFormat IO. +``` + +### Reading using Hadoop InputFormat IO + +#### Read data only with Hadoop configuration. + +```java +p.apply("read", + HadoopInputFormatIO.<InputFormatKeyClass, InputFormatKeyClass>read() + .withConfiguration(myHadoopConfiguration); +``` + +```py + # The Beam SDK for Python does not support Hadoop InputFormat IO. +``` + +#### Read data with configuration and key translation + +For example scenario: Beam `Coder` is not available for key class hence key translation is required. + +```java +p.apply("read", + HadoopInputFormatIO.<MyKeyClass, InputFormatKeyClass>read() + .withConfiguration(myHadoopConfiguration) + .withKeyTranslation(myOutputKeyType); +``` + +```py + # The Beam SDK for Python does not support Hadoop InputFormat IO. +``` + +#### Read data with configuration and value translation + +For example scenario: Beam `Coder` is not available for value class hence value translation is required. + +```java +p.apply("read", + HadoopInputFormatIO.<InputFormatKeyClass, MyValueClass>read() + .withConfiguration(myHadoopConfiguration) + .withValueTranslation(myOutputValueType); +``` + +```py + # The Beam SDK for Python does not support Hadoop InputFormat IO. +``` + +#### Read data with configuration, value translation and key translation + +For example scenario: Beam Coders are not available for both `Key` class and `Value` class of `InputFormat` hence key and value translation is required. + +```java +p.apply("read", + HadoopInputFormatIO.<MyKeyClass, MyValueClass>read() + .withConfiguration(myHadoopConfiguration) + .withKeyTranslation(myOutputKeyType) + .withValueTranslation(myOutputValueType); +``` + +```py + # The Beam SDK for Python does not support Hadoop InputFormat IO. +``` + +# Examples for specific InputFormats + +### Cassandra - CqlInputFormat + +To read data from Cassandra, use `org.apache.cassandra.hadoop.cql3.CqlInputFormat`, which needs the following properties to be set: + +```java +Configuration cassandraConf = new Configuration(); +cassandraConf.set("cassandra.input.thrift.port", "9160"); +cassandraConf.set("cassandra.input.thrift.address", CassandraHostIp); +cassandraConf.set("cassandra.input.partitioner.class", "Murmur3Partitioner"); +cassandraConf.set("cassandra.input.keyspace", "myKeySpace"); +cassandraConf.set("cassandra.input.columnfamily", "myColumnFamily"); +cassandraConf.setClass("key.class", java.lang.Long Long.class, Object.class); +cassandraConf.setClass("value.class", com.datastax.driver.core.Row Row.class, Object.class); +cassandraConf.setClass("mapreduce.job.inputformat.class", org.apache.cassandra.hadoop.cql3.CqlInputFormat CqlInputFormat.class, InputFormat.class); +``` + +```py + # The Beam SDK for Python does not support Hadoop InputFormat IO. +``` + +Call Read transform as follows: + +```java +PCollection<KV<Long, String>> cassandraData = + p.apply("read", + HadoopInputFormatIO.<Long, String>read() + .withConfiguration(cassandraConf) + .withValueTranslation(cassandraOutputValueType); +``` + +```py + # The Beam SDK for Python does not support Hadoop InputFormat IO. +``` + +The `CqlInputFormat` key class is `java.lang.Long` `Long`, which has a Beam `Coder`. The `CqlInputFormat` value class is `com.datastax.driver.core.Row` `Row`, which does not have a Beam `Coder`. Rather than write a new coder, you can provide your own translation method, as follows: + +```java +SimpleFunction<Row, String> cassandraOutputValueType = SimpleFunction<Row, String>() +{ + public String apply(Row row) { + return row.getString('myColName'); + } +}; +``` + +```py + # The Beam SDK for Python does not support Hadoop InputFormat IO. +``` + +### Elasticsearch - EsInputFormat + +To read data from Elasticsearch, use `EsInputFormat`, which needs following properties to be set: + +```java +Configuration elasticSearchConf = new Configuration(); +elasticSearchConf.set("es.nodes", ElasticsearchHostIp); +elasticSearchConf.set("es.port", "9200"); +elasticSearchConf.set("es.resource", "ElasticIndexName/ElasticTypeName"); +elasticSearchConf.setClass("key.class", org.apache.hadoop.io.Text Text.class, Object.class); +elasticSearchConf.setClass("value.class", org.elasticsearch.hadoop.mr.LinkedMapWritable LinkedMapWritable.class, Object.class); +elasticSearchConf.setClass("mapreduce.job.inputformat.class", org.elasticsearch.hadoop.mr.EsInputFormat EsInputFormat.class, InputFormat.class); +``` + +```py + # The Beam SDK for Python does not support Hadoop InputFormat IO. +``` + +Call Read transform as follows: + +```java +PCollection<KV<Text, LinkedMapWritable>> elasticData = p.apply("read", + HadoopInputFormatIO.<Text, LinkedMapWritable>read().withConfiguration(elasticSearchConf)); +``` + +```py + # The Beam SDK for Python does not support Hadoop InputFormat IO. +``` + +The `org.elasticsearch.hadoop.mr.EsInputFormat`'s `EsInputFormat` key class is `org.apache.hadoop.io.Text` `Text`, and its value class is `org.elasticsearch.hadoop.mr.LinkedMapWritable` `LinkedMapWritable`. Both key and value classes have Beam Coders. http://git-wip-us.apache.org/repos/asf/beam-site/blob/69ee8d5d/src/documentation/io/built-in.md ---------------------------------------------------------------------- diff --git a/src/documentation/io/built-in.md b/src/documentation/io/built-in.md index 6a73f6b..3bb5343 100644 --- a/src/documentation/io/built-in.md +++ b/src/documentation/io/built-in.md @@ -35,7 +35,7 @@ Consult the [Programming Guide I/O section]({{site.baseurl }}/documentation/prog <p><a href="https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io">Google Cloud PubSub</a></p> </td> <td> - <p><a href="https://github.com/apache/beam/tree/master/sdks/java/io/hadoop">Apache Hadoop InputFormat</a></p> + <p><a href="{{site.baseurl}}/documentation/io/built-in/hadoop/">Apache Hadoop InputFormat</a></p> <p><a href="https://github.com/apache/beam/tree/master/sdks/java/io/hbase">Apache HBase</a></p> <p><a href="https://github.com/apache/beam/tree/master/sdks/java/io/mongodb">MongoDB</a></p> <p><a href="https://github.com/apache/beam/tree/master/sdks/java/io/jdbc">JDBC</a></p>
