[
https://issues.apache.org/jira/browse/BEAM-4417?focusedWorklogId=124239&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-124239
]
ASF GitHub Bot logged work on BEAM-4417:
----------------------------------------
Author: ASF GitHub Bot
Created on: 17/Jul/18 22:34
Start Date: 17/Jul/18 22:34
Worklog Time Spent: 10m
Work Description: aaltay closed pull request #5948: [BEAM-4417] Fix the
expected encoding of BigQuery's NUMERIC type when reading from Avro
URL: https://github.com/apache/beam/pull/5948
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
index bf2cb63e55c..4874f887176 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
@@ -33,6 +33,9 @@
import java.util.ArrayList;
import java.util.List;
import javax.annotation.Nullable;
+import org.apache.avro.Conversions;
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
@@ -54,7 +57,7 @@
.put("BYTES", Type.BYTES)
.put("INTEGER", Type.LONG)
.put("FLOAT", Type.DOUBLE)
- .put("NUMERIC", Type.STRING)
+ .put("NUMERIC", Type.BYTES)
.put("BOOLEAN", Type.BOOLEAN)
.put("TIMESTAMP", Type.LONG)
.put("RECORD", Type.RECORD)
@@ -131,7 +134,7 @@ private static Object getTypedCellValue(Schema schema,
TableFieldSchema fieldSch
String mode = firstNonNull(fieldSchema.getMode(), "NULLABLE");
switch (mode) {
case "REQUIRED":
- return convertRequiredField(schema.getType(), fieldSchema, v);
+ return convertRequiredField(schema.getType(), schema.getLogicalType(),
fieldSchema, v);
case "REPEATED":
return convertRepeatedField(schema, fieldSchema, v);
case "NULLABLE":
@@ -159,14 +162,15 @@ private static Object getTypedCellValue(Schema schema,
TableFieldSchema fieldSch
List<Object> elements = (List<Object>) v;
ImmutableList.Builder<Object> values = ImmutableList.builder();
Type elementType = schema.getElementType().getType();
+ LogicalType elementLogicalType = schema.getElementType().getLogicalType();
for (Object element : elements) {
- values.add(convertRequiredField(elementType, fieldSchema, element));
+ values.add(convertRequiredField(elementType, elementLogicalType,
fieldSchema, element));
}
return values.build();
}
private static Object convertRequiredField(
- Type avroType, TableFieldSchema fieldSchema, Object v) {
+ Type avroType, LogicalType avroLogicalType, TableFieldSchema
fieldSchema, Object v) {
// REQUIRED fields are represented as the corresponding Avro types. For
example, a BigQuery
// INTEGER type maps to an Avro LONG type.
checkNotNull(v, "REQUIRED field %s should not be null",
fieldSchema.getName());
@@ -182,6 +186,8 @@ private static Object convertRequiredField(
avroType,
bqType,
fieldSchema.getName());
+ // For historical reasons, don't validate avroLogicalType except for with
NUMERIC.
+ // BigQuery represents NUMERIC in Avro format as BYTES with a DECIMAL
logical type.
switch (fieldSchema.getType()) {
case "STRING":
case "DATE":
@@ -198,11 +204,15 @@ private static Object convertRequiredField(
verify(v instanceof Double, "Expected Double, got %s", v.getClass());
return v;
case "NUMERIC":
- verify(
- v instanceof CharSequence || v instanceof BigDecimal,
- "Expected CharSequence (String) or BigDecimal, got %s",
- v.getClass());
- return v.toString();
+ // NUMERIC data types are represented as BYTES with the DECIMAL
logical type. They are
+ // converted back to Strings with precision and scale determined by
the logical type.
+ verify(v instanceof ByteBuffer, "Expected ByteBuffer, got %s",
v.getClass());
+ verifyNotNull(avroLogicalType, "Expected Decimal logical type");
+ verify(avroLogicalType instanceof LogicalTypes.Decimal, "Expected
Decimal logical type");
+ BigDecimal numericValue =
+ new Conversions.DecimalConversion()
+ .fromBytes((ByteBuffer) v, Schema.create(avroType),
avroLogicalType);
+ return numericValue.toString();
case "BOOLEAN":
verify(v instanceof Boolean, "Expected Boolean, got %s", v.getClass());
return v;
@@ -252,9 +262,10 @@ private static Object convertNullableField(
Type firstType = unionTypes.get(0).getType();
if (!firstType.equals(Type.NULL)) {
- return convertRequiredField(firstType, fieldSchema, v);
+ return convertRequiredField(firstType,
unionTypes.get(0).getLogicalType(), fieldSchema, v);
}
- return convertRequiredField(unionTypes.get(1).getType(), fieldSchema, v);
+ return convertRequiredField(
+ unionTypes.get(1).getType(), unionTypes.get(1).getLogicalType(),
fieldSchema, v);
}
static Schema toGenericAvroSchema(String schemaName, List<TableFieldSchema>
fieldSchemas) {
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java
index e000b561b1b..236f22ebc08 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java
@@ -32,6 +32,9 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
+import org.apache.avro.Conversions;
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
@@ -90,7 +93,39 @@
public void testConvertGenericRecordToTableRow() throws Exception {
TableSchema tableSchema = new TableSchema();
tableSchema.setFields(fields);
- Schema avroSchema = AvroCoder.of(Bird.class).getSchema();
+
+ // BigQuery encodes NUMERIC values to Avro using the BYTES type with the
DECIMAL logical
+ // type. AvroCoder can't apply logical types to Schemas directly, so we
need to get the
+ // Schema for the Bird class defined below, then replace the field used to
test NUMERIC with
+ // a field that has the appropriate Schema.
+ BigDecimal birthdayMoney = new BigDecimal("123456789.123456789");
+ Schema birthdayMoneySchema = Schema.create(Type.BYTES);
+ LogicalType birthdayMoneyLogicalType =
+ LogicalTypes.decimal(birthdayMoney.precision(), birthdayMoney.scale());
+ // DecimalConversion.toBytes returns a ByteBuffer, which can be mutated by
callees if passed
+ // to other methods. We wrap the byte array as a ByteBuffer when adding it
to the
+ // GenericRecords below.
+ byte[] birthdayMoneyBytes =
+ new Conversions.DecimalConversion()
+ .toBytes(birthdayMoney, birthdayMoneySchema,
birthdayMoneyLogicalType)
+ .array();
+
+ // In order to update the Schema for birthdayMoney, we need to recreate
all of the Fields.
+ List<Schema.Field> avroFields = new ArrayList<>();
+ for (Schema.Field field :
AvroCoder.of(Bird.class).getSchema().getFields()) {
+ Schema schema = field.schema();
+ if (field.name().equals("birthdayMoney")) {
+ // birthdayMoney is a nullable field with type BYTES/DECIMAL.
+ schema =
+ Schema.createUnion(
+ Schema.create(Type.NULL),
+ birthdayMoneyLogicalType.addToSchema(birthdayMoneySchema));
+ }
+ // After a Field is added to a Schema, it is assigned a position, so we
can't simply reuse
+ // the existing Field.
+ avroFields.add(new Schema.Field(field.name(), schema, field.doc(),
field.defaultValue()));
+ }
+ Schema avroSchema = Schema.createRecord(avroFields);
{
// Test nullable fields.
@@ -110,7 +145,7 @@ public void testConvertGenericRecordToTableRow() throws
Exception {
record.put("number", 5L);
record.put("quality", 5.0);
record.put("birthday", 5L);
- record.put("birthdayMoney", new String("123456789.123456789"));
+ record.put("birthdayMoney", ByteBuffer.wrap(birthdayMoneyBytes));
record.put("flighted", Boolean.TRUE);
record.put("sound", soundByteBuffer);
record.put("anniversaryDate", new Utf8("2000-01-01"));
@@ -121,7 +156,7 @@ public void testConvertGenericRecordToTableRow() throws
Exception {
new TableRow()
.set("number", "5")
.set("birthday", "1970-01-01 00:00:00.000005 UTC")
- .set("birthdayMoney", "123456789.123456789")
+ .set("birthdayMoney", birthdayMoney.toString())
.set("quality", 5.0)
.set("associates", new ArrayList<TableRow>())
.set("flighted", Boolean.TRUE)
@@ -139,13 +174,13 @@ public void testConvertGenericRecordToTableRow() throws
Exception {
GenericRecord record = new GenericData.Record(avroSchema);
record.put("number", 5L);
record.put("associates", Lists.newArrayList(nestedRecord));
- record.put("birthdayMoney", new BigDecimal("987654321.987654321"));
+ record.put("birthdayMoney", ByteBuffer.wrap(birthdayMoneyBytes));
TableRow convertedRow =
BigQueryAvroUtils.convertGenericRecordToTableRow(record, tableSchema);
TableRow row =
new TableRow()
.set("associates", Lists.newArrayList(new
TableRow().set("species", "other")))
.set("number", "5")
- .set("birthdayMoney", "987654321.987654321");
+ .set("birthdayMoney", birthdayMoney.toString());
assertEquals(row, convertedRow);
}
}
@@ -172,7 +207,7 @@ public void testConvertBigQuerySchemaToAvroSchema() {
equalTo(Schema.createUnion(Schema.create(Type.NULL),
Schema.create(Type.LONG))));
assertThat(
avroSchema.getField("birthdayMoney").schema(),
- equalTo(Schema.createUnion(Schema.create(Type.NULL),
Schema.create(Type.STRING))));
+ equalTo(Schema.createUnion(Schema.create(Type.NULL),
Schema.create(Type.BYTES))));
assertThat(
avroSchema.getField("flighted").schema(),
equalTo(Schema.createUnion(Schema.create(Type.NULL),
Schema.create(Type.BOOLEAN))));
@@ -233,7 +268,7 @@ public void testConvertBigQuerySchemaToAvroSchema() {
@Nullable Double quality;
@Nullable Long quantity;
@Nullable Long birthday; // Exercises TIMESTAMP.
- @Nullable String birthdayMoney; // Exercises NUMERIC.
+ @Nullable ByteBuffer birthdayMoney; // Exercises NUMERIC.
@Nullable Boolean flighted;
@Nullable ByteBuffer sound;
@Nullable Utf8 anniversaryDate;
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 124239)
Time Spent: 6h 40m (was: 6.5h)
> BigqueryIO Numeric datatype Support
> -----------------------------------
>
> Key: BEAM-4417
> URL: https://issues.apache.org/jira/browse/BEAM-4417
> Project: Beam
> Issue Type: Improvement
> Components: io-java-gcp
> Affects Versions: 2.4.0
> Reporter: Kishan Kumar
> Assignee: Chamikara Jayalath
> Priority: Critical
> Labels: newbie, patch
> Fix For: 2.7.0
>
> Time Spent: 6h 40m
> Remaining Estimate: 0h
>
> The BigQueryIO.read fails while parsing the data from the avro file generated
> while reading the data from the table which has columns with *Numeric*
> datatypes.
> We have gone through the source code at Git-Hub and noticed that *Numeric
> data type is not yet supported.*
>
> Caused by: com.google.common.base.VerifyException: Unsupported BigQuery type:
> NUMERIC
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)