This is an automated email from the ASF dual-hosted git repository.
anton pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new fe327ef [SQL] Make BigQuery schema conversion order-aware
new 120394b Merge pull request #8193 from akedin/bq-schema-order
fe327ef is described below
commit fe327ef97acc92f31711a242685710c5dfbc8249
Author: akedin <[email protected]>
AuthorDate: Mon Apr 1 15:48:51 2019 -0700
[SQL] Make BigQuery schema conversion order-aware
---
.../{BeamBigQueryTable.java => BigQueryTable.java} | 30 +++++++++++-----------
.../provider/bigquery/BigQueryTableProvider.java | 6 +----
.../bigquery/BigQueryTableProviderTest.java | 6 ++---
.../beam/sdk/io/gcp/bigquery/BigQueryUtils.java | 11 ++++----
4 files changed, 24 insertions(+), 29 deletions(-)
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamBigQueryTable.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java
similarity index 72%
rename from
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamBigQueryTable.java
rename to
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java
index fdbcea4..6f3f56a 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamBigQueryTable.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java
@@ -20,25 +20,26 @@ package
org.apache.beam.sdk.extensions.sql.meta.provider.bigquery;
import java.io.Serializable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
-import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.Row;
/**
- * {@code BeamBigQueryTable} represent a BigQuery table as a target. This
provider does not
- * currently support being a source.
+ * {@code BigQueryTable} represent a BigQuery table as a target. This provider
does not currently
+ * support being a source.
*/
@Experimental
-public class BeamBigQueryTable extends BaseBeamTable implements Serializable {
- private String tableSpec;
+class BigQueryTable extends BaseBeamTable implements Serializable {
+ String bqLocation;
- public BeamBigQueryTable(Schema beamSchema, String tableSpec) {
- super(beamSchema);
- this.tableSpec = tableSpec;
+ BigQueryTable(Table table) {
+ super(table.getSchema());
+ this.bqLocation = table.getLocation();
}
@Override
@@ -48,9 +49,12 @@ public class BeamBigQueryTable extends BaseBeamTable
implements Serializable {
@Override
public PCollection<Row> buildIOReader(PBegin begin) {
- // TODO: make this more generic.
return begin
-
.apply(BigQueryIO.read(BigQueryUtils.toBeamRow(schema)).from(tableSpec))
+ .apply(
+ "Read Input BQ Rows",
+ BigQueryIO.read(record ->
BigQueryUtils.toBeamRow(record.getRecord(), getSchema()))
+ .from(bqLocation)
+ .withCoder(SchemaCoder.of(getSchema())))
.setRowSchema(getSchema());
}
@@ -60,10 +64,6 @@ public class BeamBigQueryTable extends BaseBeamTable
implements Serializable {
BigQueryIO.<Row>write()
.withSchema(BigQueryUtils.toTableSchema(getSchema()))
.withFormatFunction(BigQueryUtils.toTableRow())
- .to(tableSpec));
- }
-
- String getTableSpec() {
- return tableSpec;
+ .to(bqLocation));
}
}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTableProvider.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTableProvider.java
index 2eceb23..e96fe29 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTableProvider.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTableProvider.java
@@ -22,7 +22,6 @@ import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
import org.apache.beam.sdk.extensions.sql.meta.Table;
import
org.apache.beam.sdk.extensions.sql.meta.provider.InMemoryMetaTableProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
-import org.apache.beam.sdk.schemas.Schema;
/**
* BigQuery table provider.
@@ -49,9 +48,6 @@ public class BigQueryTableProvider extends
InMemoryMetaTableProvider {
@Override
public BeamSqlTable buildBeamSqlTable(Table table) {
- Schema schema = table.getSchema();
- String filePattern = table.getLocation();
-
- return new BeamBigQueryTable(schema, filePattern);
+ return new BigQueryTable(table);
}
}
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTableProviderTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTableProviderTest.java
index 3aa9089..47983e2 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTableProviderTest.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTableProviderTest.java
@@ -43,10 +43,10 @@ public class BigQueryTableProviderTest {
BeamSqlTable sqlTable = provider.buildBeamSqlTable(table);
assertNotNull(sqlTable);
- assertTrue(sqlTable instanceof BeamBigQueryTable);
+ assertTrue(sqlTable instanceof BigQueryTable);
- BeamBigQueryTable bqTable = (BeamBigQueryTable) sqlTable;
- assertEquals("project:dataset.table", bqTable.getTableSpec());
+ BigQueryTable bqTable = (BigQueryTable) sqlTable;
+ assertEquals("project:dataset.table", bqTable.bqLocation);
}
private static Table fakeTable(String name) {
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
index 6d1d15a..c51bb5c 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
@@ -203,13 +203,12 @@ public class BigQueryUtils {
}
public static Row toBeamRow(GenericRecord record, Schema schema) {
- List<Object> values = new ArrayList();
- for (int i = 0; i < record.getSchema().getFields().size(); i++) {
- org.apache.avro.Schema.Field avroField =
record.getSchema().getFields().get(i);
- values.add(AvroUtils.convertAvroFormat(schema.getField(i),
record.get(avroField.name())));
- }
+ List<Object> valuesInOrder =
+ schema.getFields().stream()
+ .map(field -> AvroUtils.convertAvroFormat(field,
record.get(field.getName())))
+ .collect(toList());
- return Row.withSchema(schema).addValues(values).build();
+ return Row.withSchema(schema).addValues(valuesInOrder).build();
}
/** Convert a BigQuery TableRow to a Beam Row. */