This is an automated email from the ASF dual-hosted git repository.
chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new a679d98 [BEAM-6749] Update BQ tornadoes example to use the storage
API. (#7958)
a679d98 is described below
commit a679d98cbcc49b01528c168cce8b578338a5bcdd
Author: Kenneth Jung <[email protected]>
AuthorDate: Wed Feb 27 13:10:05 2019 -0800
[BEAM-6749] Update BQ tornadoes example to use the storage API. (#7958)
---
.../beam/examples/cookbook/BigQueryTornadoes.java | 35 +++++++++++++++++++++-
1 file changed, 34 insertions(+), 1 deletion(-)
diff --git
a/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
b/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
index 1d557a3..a4356c7 100644
---
a/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
+++
b/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
@@ -20,10 +20,13 @@ package org.apache.beam.examples.cookbook;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
+import com.google.cloud.bigquery.storage.v1beta1.ReadOptions.TableReadOptions;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -35,6 +38,7 @@ import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
/**
* An example that reads the public samples of weather data from BigQuery,
counts the number of
@@ -136,6 +140,12 @@ public class BigQueryTornadoes {
void setInput(String value);
+ @Description("Mode to use when reading from BigQuery")
+ @Default.Enum("EXPORT")
+ TypedRead.Method getReadMethod();
+
+ void setReadMethod(TypedRead.Method value);
+
@Description(
"BigQuery table to write to, specified as "
+ "<project_id>:<dataset_id>.<table_id>. The dataset must already
exist.")
@@ -154,7 +164,30 @@ public class BigQueryTornadoes {
fields.add(new
TableFieldSchema().setName("tornado_count").setType("INTEGER"));
TableSchema schema = new TableSchema().setFields(fields);
- p.apply(BigQueryIO.readTableRows().from(options.getInput()))
+ PCollection<TableRow> rowsFromBigQuery;
+
+ if (options.getReadMethod() == Method.DIRECT_READ) {
+ // Build the read options proto for the read operation.
+ TableReadOptions tableReadOptions =
+ TableReadOptions.newBuilder()
+ .addAllSelectedFields(Lists.newArrayList("month", "tornado"))
+ .build();
+
+ rowsFromBigQuery =
+ p.apply(
+ BigQueryIO.readTableRows()
+ .from(options.getInput())
+ .withMethod(Method.DIRECT_READ)
+ .withReadOptions(tableReadOptions));
+ } else {
+ rowsFromBigQuery =
+ p.apply(
+ BigQueryIO.readTableRows()
+ .from(options.getInput())
+ .withMethod(options.getReadMethod()));
+ }
+
+ rowsFromBigQuery
.apply(new CountTornadoes())
.apply(
BigQueryIO.writeTableRows()