[
https://issues.apache.org/jira/browse/BEAM-4601?focusedWorklogId=118306&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-118306
]
ASF GitHub Bot logged work on BEAM-4601:
----------------------------------------
Author: ASF GitHub Bot
Created on: 02/Jul/18 17:58
Start Date: 02/Jul/18 17:58
Worklog Time Spent: 10m
Work Description: kennknowles closed pull request #5830: [BEAM-4601][SQL]
Support BigQuery read from SQL.
URL: https://github.com/apache/beam/pull/5830
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamBigQueryTable.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamBigQueryTable.java
index 7d1e68e9667..a5cdc829bb8 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamBigQueryTable.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamBigQueryTable.java
@@ -43,7 +43,9 @@ public BeamBigQueryTable(Schema beamSchema, String tableSpec)
{
@Override
public PCollection<Row> buildIOReader(PBegin begin) {
- throw new UnsupportedOperationException();
+ return begin
+
.apply(BigQueryIO.read(BigQueryUtils.toBeamRow(schema)).from(tableSpec))
+ .setCoder(getSchema().getRowCoder());
}
@Override
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryWriteIT.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryReadWriteIT.java
similarity index 79%
rename from
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryWriteIT.java
rename to
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryReadWriteIT.java
index e49b84c875f..c39da40939e 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryWriteIT.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryReadWriteIT.java
@@ -27,6 +27,7 @@
import static org.apache.beam.sdk.schemas.Schema.FieldType.INT64;
import static org.apache.beam.sdk.schemas.Schema.FieldType.STRING;
import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import com.google.common.collect.ImmutableMap;
@@ -35,6 +36,8 @@
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.PipelineResult.State;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
import org.apache.beam.sdk.extensions.sql.impl.schema.BeamPCollectionTable;
@@ -43,6 +46,7 @@
import org.apache.beam.sdk.io.gcp.bigquery.TestBigQuery;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.PCollection;
@@ -57,7 +61,7 @@
/** Integration tests form writing to BigQuery with Beam SQL. */
@RunWith(JUnit4.class)
-public class BigQueryWriteIT implements Serializable {
+public class BigQueryReadWriteIT implements Serializable {
private static final Schema SOURCE_SCHEMA =
Schema.builder()
.addNullableField("id", INT64)
@@ -82,11 +86,12 @@
.build();
@Rule public transient TestPipeline pipeline = TestPipeline.create();
+ @Rule public transient TestPipeline readPipeline = TestPipeline.create();
@Rule public transient TestBigQuery bigQuery =
TestBigQuery.create(SOURCE_SCHEMA);
@Rule public transient TestBigQuery bigQueryTestingTypes =
TestBigQuery.create(SOURCE_SCHEMA_TWO);
@Test
- public void testSQLTypes() {
+ public void testSQLRead() {
BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(new BigQueryTableProvider());
String createTableStatement =
@@ -125,13 +130,17 @@ public void testSQLTypes() {
+ "'char', "
+ "ARRAY['123', '456']"
+ ")";
+
sqlEnv.parseQuery(insertStatement);
BeamSqlRelUtils.toPCollection(pipeline,
sqlEnv.parseQuery(insertStatement));
pipeline.run().waitUntilFinish(Duration.standardMinutes(5));
- assertThat(
- bigQueryTestingTypes.getFlatJsonRows(SOURCE_SCHEMA_TWO),
- containsInAnyOrder(
+ String selectTableStatement = "SELECT * FROM TEST";
+ PCollection<Row> output =
+ BeamSqlRelUtils.toPCollection(readPipeline,
sqlEnv.parseQuery(selectTableStatement));
+
+ PAssert.that(output)
+ .containsInAnyOrder(
row(
SOURCE_SCHEMA_TWO,
9223372036854775807L,
@@ -145,34 +154,72 @@ public void testSQLTypes() {
new DateTime(2018, 05, 28, 20, 17, 40, 123,
ISOChronology.getInstanceUTC()),
"varchar",
"char",
- Arrays.asList("123", "456"))));
+ Arrays.asList("123", "456")));
+ PipelineResult.State state =
readPipeline.run().waitUntilFinish(Duration.standardMinutes(5));
+ assertEquals(state, State.DONE);
}
@Test
- public void testInsertValues() throws Exception {
+ public void testSQLTypes() {
BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(new BigQueryTableProvider());
String createTableStatement =
- "CREATE TABLE ORDERS( \n"
- + " id BIGINT, \n"
- + " name VARCHAR, \n "
- + " arr ARRAY<VARCHAR> \n"
+ "CREATE TABLE TEST( \n"
+ + " c_bigint BIGINT, \n"
+ + " c_tinyint TINYINT, \n"
+ + " c_smallint SMALLINT, \n"
+ + " c_integer INTEGER, \n"
+ + " c_float FLOAT, \n"
+ + " c_double DOUBLE, \n"
+ + " c_decimal DECIMAL, \n"
+ + " c_boolean BOOLEAN, \n"
+ + " c_timestamp TIMESTAMP, \n"
+ + " c_varchar VARCHAR, \n "
+ + " c_char CHAR, \n"
+ + " c_arr ARRAY<VARCHAR> \n"
+ ") \n"
+ "TYPE 'bigquery' \n"
+ "LOCATION '"
- + bigQuery.tableSpec()
+ + bigQueryTestingTypes.tableSpec()
+ "'";
sqlEnv.executeDdl(createTableStatement);
- String insertStatement = "INSERT INTO ORDERS VALUES (1, 'foo',
ARRAY['123', '456'])";
-
+ String insertStatement =
+ "INSERT INTO TEST VALUES ("
+ + "9223372036854775807, "
+ + "127, "
+ + "32767, "
+ + "2147483647, "
+ + "1.0, "
+ + "1.0, "
+ + "123.45, "
+ + "TRUE, "
+ + "TIMESTAMP '2018-05-28 20:17:40.123', "
+ + "'varchar', "
+ + "'char', "
+ + "ARRAY['123', '456']"
+ + ")";
+ sqlEnv.parseQuery(insertStatement);
BeamSqlRelUtils.toPCollection(pipeline,
sqlEnv.parseQuery(insertStatement));
-
pipeline.run().waitUntilFinish(Duration.standardMinutes(5));
assertThat(
- bigQuery.getFlatJsonRows(SOURCE_SCHEMA),
- containsInAnyOrder(row(SOURCE_SCHEMA, 1L, "foo", Arrays.asList("123",
"456"))));
+ bigQueryTestingTypes.getFlatJsonRows(SOURCE_SCHEMA_TWO),
+ containsInAnyOrder(
+ row(
+ SOURCE_SCHEMA_TWO,
+ 9223372036854775807L,
+ (byte) 127,
+ (short) 32767,
+ 2147483647,
+ (float) 1.0,
+ 1.0,
+ BigDecimal.valueOf(123.45),
+ true,
+ new DateTime(2018, 05, 28, 20, 17, 40, 123,
ISOChronology.getInstanceUTC()),
+ "varchar",
+ "char",
+ Arrays.asList("123", "456"))));
}
@Test
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroUtils.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroUtils.java
new file mode 100644
index 00000000000..c9d9c9fe17b
--- /dev/null
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroUtils.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.joda.time.Instant;
+
+/** Utils to help convert Apache Avro types to Beam types. */
+public class AvroUtils {
+ public static Object convertAvroFormat(Field beamField, Object value) throws
RuntimeException {
+ Object ret;
+ TypeName beamFieldTypeName = beamField.getType().getTypeName();
+ switch (beamFieldTypeName) {
+ case INT16:
+ case INT32:
+ case INT64:
+ case FLOAT:
+ case DOUBLE:
+ case DECIMAL:
+ case BYTE:
+ case BOOLEAN:
+ ret = convertAvroPrimitiveTypes(beamFieldTypeName, value);
+ break;
+ case DATETIME:
+ // Expecting value in microseconds.
+ ret = new Instant().withMillis(((long) value) / 1000);
+ break;
+ case STRING:
+ ret = convertAvroPrimitiveTypes(beamFieldTypeName, value);
+ break;
+ case ARRAY:
+ ret = convertAvroArray(beamField, value);
+ break;
+ case MAP:
+ throw new RuntimeException("Does not support converting MAP type
value");
+ default:
+ throw new RuntimeException("Does not support converting unknown type
value");
+ }
+
+ return ret;
+ }
+
+ private static Object convertAvroArray(Field beamField, Object value) {
+ // Check whether the type of array element is equal.
+ List<Object> values = (List<Object>) value;
+ List<Object> ret = new ArrayList();
+ for (Object v : values) {
+ ret.add(
+ convertAvroPrimitiveTypes(
+ beamField.getType().getCollectionElementType().getTypeName(),
v));
+ }
+ return (Object) ret;
+ }
+
+ private static Object convertAvroString(Object value) {
+ if (value instanceof org.apache.avro.util.Utf8) {
+ return ((org.apache.avro.util.Utf8) value).toString();
+ } else if (value instanceof String) {
+ return value;
+ } else {
+ throw new RuntimeException(
+ "Does not support converting avro format: " +
value.getClass().getName());
+ }
+ }
+
+ private static Object convertAvroPrimitiveTypes(TypeName beamType, Object
value) {
+ switch (beamType) {
+ case BYTE:
+ return ((Long) value).byteValue();
+ case INT16:
+ return ((Long) value).shortValue();
+ case INT32:
+ return ((Long) value).intValue();
+ case INT64:
+ return value;
+ case FLOAT:
+ return ((Double) value).floatValue();
+ case DOUBLE:
+ return (Double) value;
+ case BOOLEAN:
+ return (Boolean) value;
+ case DECIMAL:
+ return BigDecimal.valueOf((double) value);
+ case STRING:
+ return convertAvroString(value);
+ default:
+ throw new RuntimeException(beamType + " is not primitive type.");
+ }
+ }
+}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
index 218e9c272f5..390b744a045 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.io.gcp.bigquery;
+import static com.google.common.base.Preconditions.checkState;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
import static org.apache.beam.sdk.values.Row.toRow;
@@ -33,6 +34,7 @@
import java.util.Map;
import java.util.function.Function;
import java.util.stream.IntStream;
+import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.Field;
@@ -167,6 +169,11 @@ public static TableSchema toTableSchema(PCollection<Row>
rows) {
return TO_TABLE_ROW;
}
+ /** Convert {@link SchemaAndRecord} to a Beam {@link Row}. */
+ public static SerializableFunction<SchemaAndRecord, Row> toBeamRow(Schema
schema) {
+ return new ToBeamRow(schema);
+ }
+
/** Convert a Beam {@link Row} to a BigQuery {@link TableRow}. */
private static class ToTableRow implements SerializableFunction<Row,
TableRow> {
@Override
@@ -175,6 +182,34 @@ public TableRow apply(Row input) {
}
}
+ /** Convert {@link SchemaAndRecord} to a Beam {@link Row}. */
+ private static class ToBeamRow implements
SerializableFunction<SchemaAndRecord, Row> {
+ private Schema schema;
+
+ public ToBeamRow(Schema schema) {
+ this.schema = schema;
+ }
+
+ @Override
+ public Row apply(SchemaAndRecord input) {
+ GenericRecord record = input.getRecord();
+ checkState(
+ schema.getFields().size() == record.getSchema().getFields().size(),
+ "Schema sizes are different.");
+ return toBeamRow(record, schema);
+ }
+ }
+
+ public static Row toBeamRow(GenericRecord record, Schema schema) {
+ List<Object> values = new ArrayList();
+ for (int i = 0; i < record.getSchema().getFields().size(); i++) {
+ org.apache.avro.Schema.Field avroField =
record.getSchema().getFields().get(i);
+ values.add(AvroUtils.convertAvroFormat(schema.getField(i),
record.get(avroField.name())));
+ }
+
+ return Row.withSchema(schema).addValues(values).build();
+ }
+
public static TableRow toTableRow(Row row) {
TableRow output = new TableRow();
for (int i = 0; i < row.getFieldCount(); i++) {
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 118306)
Time Spent: 6h 10m (was: 6h)
> BigQuery reads from pure SQL
> ----------------------------
>
> Key: BEAM-4601
> URL: https://issues.apache.org/jira/browse/BEAM-4601
> Project: Beam
> Issue Type: New Feature
> Components: dsl-sql
> Reporter: Rui Wang
> Assignee: Rui Wang
> Priority: Major
> Time Spent: 6h 10m
> Remaining Estimate: 0h
>
> Right now Beam SQL can created a BigQuery table, however, read from BigQuery
> table is not supported yet. We want to support reading from BigQuery table.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)