This is an automated email from the ASF dual-hosted git repository.
kenn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 389f649 [BEAM-7755] adding support for nested rows and arrays in
BigQuery to Beam row conversion (#9089)
389f649 is described below
commit 389f649691344f5b86e1f7092db157dfe004c2bb
Author: Sahith Nallapareddy <[email protected]>
AuthorDate: Fri Jul 26 16:21:34 2019 -0400
[BEAM-7755] adding support for nested rows and arrays in BigQuery to Beam
row conversion (#9089)
---
.../beam/sdk/io/gcp/bigquery/BigQueryUtils.java | 39 +++++++----
.../sdk/io/gcp/bigquery/BigQueryUtilsTest.java | 79 ++++++++++++++++++++--
2 files changed, 100 insertions(+), 18 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
index 286420e..1316a18 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
@@ -34,6 +34,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.IntStream;
+import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.schemas.LogicalTypes;
import org.apache.beam.sdk.schemas.Schema;
@@ -313,7 +314,15 @@ public class BigQueryUtils {
public static Row toBeamRow(GenericRecord record, Schema schema,
ConversionOptions options) {
List<Object> valuesInOrder =
schema.getFields().stream()
- .map(field -> convertAvroFormat(field,
record.get(field.getName()), options))
+ .map(
+ field -> {
+ try {
+ return convertAvroFormat(field.getType(),
record.get(field.getName()), options);
+ } catch (Exception cause) {
+ throw new IllegalArgumentException(
+ "Error converting field " + field + ": " +
cause.getMessage(), cause);
+ }
+ })
.collect(toList());
return Row.withSchema(schema).addValues(valuesInOrder).build();
@@ -471,14 +480,13 @@ public class BigQueryUtils {
* Beam field.
*/
public static Object convertAvroFormat(
- Field beamField, Object avroValue, BigQueryUtils.ConversionOptions
options) {
- TypeName beamFieldTypeName = beamField.getType().getTypeName();
+ FieldType beamFieldType, Object avroValue,
BigQueryUtils.ConversionOptions options) {
+ TypeName beamFieldTypeName = beamFieldType.getTypeName();
if (avroValue == null) {
- if (beamField.getType().getNullable()) {
+ if (beamFieldType.getNullable()) {
return null;
} else {
- throw new IllegalArgumentException(
- String.format("Field %s not nullable", beamField.getName()));
+ throw new IllegalArgumentException(String.format("Field %s not
nullable", beamFieldType));
}
}
switch (beamFieldTypeName) {
@@ -505,9 +513,9 @@ public class BigQueryUtils {
case STRING:
return convertAvroPrimitiveTypes(beamFieldTypeName, avroValue);
case ARRAY:
- return convertAvroArray(beamField, avroValue);
+ return convertAvroArray(beamFieldType, avroValue, options);
case LOGICAL_TYPE:
- String identifier =
beamField.getType().getLogicalType().getIdentifier();
+ String identifier = beamFieldType.getLogicalType().getIdentifier();
if (SQL_DATE_TIME_TYPES.contains(identifier)) {
switch (options.getTruncateTimestamps()) {
case TRUNCATE:
@@ -524,6 +532,13 @@ public class BigQueryUtils {
} else {
throw new RuntimeException("Unknown logical type " + identifier);
}
+ case ROW:
+ Schema rowSchema = beamFieldType.getRowSchema();
+ if (rowSchema == null) {
+ throw new IllegalArgumentException("Nested ROW missing row schema");
+ }
+ GenericData.Record record = (GenericData.Record) avroValue;
+ return toBeamRow(record, rowSchema, options);
case DECIMAL:
throw new RuntimeException("Does not support converting DECIMAL type
value");
case MAP:
@@ -553,14 +568,14 @@ public class BigQueryUtils {
return new Instant((long) value / 1000);
}
- private static Object convertAvroArray(Field beamField, Object value) {
+ private static Object convertAvroArray(
+ FieldType beamField, Object value, BigQueryUtils.ConversionOptions
options) {
// Check whether the type of array element is equal.
List<Object> values = (List<Object>) value;
List<Object> ret = new ArrayList();
+ FieldType collectionElement = beamField.getCollectionElementType();
for (Object v : values) {
- ret.add(
- convertAvroPrimitiveTypes(
- beamField.getType().getCollectionElementType().getTypeName(),
v));
+ ret.add(convertAvroFormat(collectionElement, v, options));
}
return (Object) ret;
}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
index 86b2d27..67d997e 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
@@ -35,8 +35,10 @@ import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.List;
+import org.apache.avro.generic.GenericData;
import
org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils.ConversionOptions.TruncateTimestamps;
import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
import org.apache.beam.sdk.values.Row;
import org.joda.time.DateTime;
import org.joda.time.Instant;
@@ -172,6 +174,20 @@ public class BigQueryUtilsTest {
private static final TableSchema BQ_ARRAY_ROW_TYPE =
new TableSchema().setFields(Arrays.asList(ROWS));
+ private static final Schema AVRO_FLAT_TYPE =
+ Schema.builder()
+ .addNullableField("id", Schema.FieldType.INT64)
+ .addNullableField("value", Schema.FieldType.DOUBLE)
+ .addNullableField("name", Schema.FieldType.STRING)
+ .addNullableField("valid", Schema.FieldType.BOOLEAN)
+ .build();
+
+ private static final Schema AVRO_ARRAY_TYPE =
+ Schema.builder().addArrayField("rows",
Schema.FieldType.row(AVRO_FLAT_TYPE)).build();
+
+ private static final Schema AVRO_ARRAY_ARRAY_TYPE =
+ Schema.builder().addArrayField("array_rows",
Schema.FieldType.row(AVRO_ARRAY_TYPE)).build();
+
@Test
public void testToTableSchema_flat() {
TableSchema schema = toTableSchema(FLAT_TYPE);
@@ -289,7 +305,9 @@ public class BigQueryUtilsTest {
IllegalArgumentException.class,
() ->
BigQueryUtils.convertAvroFormat(
- Schema.Field.of("dummy", Schema.FieldType.DATETIME),
1000000001L, REJECT_OPTIONS));
+ Schema.Field.of("dummy", Schema.FieldType.DATETIME).getType(),
+ 1000000001L,
+ REJECT_OPTIONS));
}
@Test
@@ -297,7 +315,9 @@ public class BigQueryUtilsTest {
long millis = 123456789L;
assertThat(
BigQueryUtils.convertAvroFormat(
- Schema.Field.of("dummy", Schema.FieldType.DATETIME), millis *
1000, REJECT_OPTIONS),
+ Schema.Field.of("dummy", Schema.FieldType.DATETIME).getType(),
+ millis * 1000,
+ REJECT_OPTIONS),
equalTo(new Instant(millis)));
}
@@ -306,7 +326,7 @@ public class BigQueryUtilsTest {
long millis = 123456789L;
assertThat(
BigQueryUtils.convertAvroFormat(
- Schema.Field.of("dummy", Schema.FieldType.DATETIME),
+ Schema.Field.of("dummy", Schema.FieldType.DATETIME).getType(),
millis * 1000 + 123,
TRUNCATE_OPTIONS),
equalTo(new Instant(millis)));
@@ -319,7 +339,8 @@ public class BigQueryUtilsTest {
IllegalArgumentException.class,
() ->
BigQueryUtils.convertAvroFormat(
- Schema.Field.of("dummy", Schema.FieldType.logicalType(new
FakeSqlTimeType())),
+ Schema.Field.of("dummy", Schema.FieldType.logicalType(new
FakeSqlTimeType()))
+ .getType(),
1000000001L,
REJECT_OPTIONS));
}
@@ -329,7 +350,7 @@ public class BigQueryUtilsTest {
long millis = 123456789L;
assertThat(
BigQueryUtils.convertAvroFormat(
- Schema.Field.of("dummy", Schema.FieldType.logicalType(new
FakeSqlTimeType())),
+ Schema.Field.of("dummy", Schema.FieldType.logicalType(new
FakeSqlTimeType())).getType(),
millis * 1000,
REJECT_OPTIONS),
equalTo(new Instant(millis)));
@@ -340,7 +361,7 @@ public class BigQueryUtilsTest {
long millis = 123456789L;
assertThat(
BigQueryUtils.convertAvroFormat(
- Schema.Field.of("dummy", Schema.FieldType.logicalType(new
FakeSqlTimeType())),
+ Schema.Field.of("dummy", Schema.FieldType.logicalType(new
FakeSqlTimeType())).getType(),
millis * 1000 + 123,
TRUNCATE_OPTIONS),
equalTo(new Instant(millis)));
@@ -422,4 +443,50 @@ public class BigQueryUtilsTest {
Row beamRow = BigQueryUtils.toBeamRow(ARRAY_ROW_TYPE, BQ_ARRAY_ROW_ROW);
assertEquals(ARRAY_ROW_ROW, beamRow);
}
+
+ @Test
+ public void testToBeamRow_avro_array_row() {
+ Row flatRowExpected =
+ Row.withSchema(AVRO_FLAT_TYPE).addValues(123L, 123.456, "test",
false).build();
+ Row expected =
+ Row.withSchema(AVRO_ARRAY_TYPE).addValues((Object)
Arrays.asList(flatRowExpected)).build();
+ GenericData.Record record = new
GenericData.Record(AvroUtils.toAvroSchema(AVRO_ARRAY_TYPE));
+ GenericData.Record flat = new
GenericData.Record(AvroUtils.toAvroSchema(AVRO_FLAT_TYPE));
+ flat.put("id", 123L);
+ flat.put("value", 123.456);
+ flat.put("name", "test");
+ flat.put("valid", false);
+ record.put("rows", Arrays.asList(flat));
+ Row beamRow =
+ BigQueryUtils.toBeamRow(
+ record, AVRO_ARRAY_TYPE,
BigQueryUtils.ConversionOptions.builder().build());
+ assertEquals(expected, beamRow);
+ }
+
+ @Test
+ public void testToBeamRow_avro_array_array_row() {
+ Row flatRowExpected =
+ Row.withSchema(AVRO_FLAT_TYPE).addValues(123L, 123.456, "test",
false).build();
+ Row arrayRowExpected =
+ Row.withSchema(AVRO_ARRAY_TYPE).addValues((Object)
Arrays.asList(flatRowExpected)).build();
+ Row expected =
+ Row.withSchema(AVRO_ARRAY_ARRAY_TYPE)
+ .addValues((Object) Arrays.asList(arrayRowExpected))
+ .build();
+ GenericData.Record arrayRecord =
+ new GenericData.Record(AvroUtils.toAvroSchema(AVRO_ARRAY_TYPE));
+ GenericData.Record flat = new
GenericData.Record(AvroUtils.toAvroSchema(AVRO_FLAT_TYPE));
+ GenericData.Record record =
+ new GenericData.Record(AvroUtils.toAvroSchema(AVRO_ARRAY_ARRAY_TYPE));
+ flat.put("id", 123L);
+ flat.put("value", 123.456);
+ flat.put("name", "test");
+ flat.put("valid", false);
+ arrayRecord.put("rows", Arrays.asList(flat));
+ record.put("array_rows", Arrays.asList(arrayRecord));
+ Row beamRow =
+ BigQueryUtils.toBeamRow(
+ record, AVRO_ARRAY_ARRAY_TYPE,
BigQueryUtils.ConversionOptions.builder().build());
+ assertEquals(expected, beamRow);
+ }
}