[ 
https://issues.apache.org/jira/browse/BEAM-4417?focusedWorklogId=124239&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-124239
 ]

ASF GitHub Bot logged work on BEAM-4417:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 17/Jul/18 22:34
            Start Date: 17/Jul/18 22:34
    Worklog Time Spent: 10m 
      Work Description: aaltay closed pull request #5948: [BEAM-4417] Fix the 
expected encoding of BigQuery's NUMERIC type when reading from Avro
URL: https://github.com/apache/beam/pull/5948
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
index bf2cb63e55c..4874f887176 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
@@ -33,6 +33,9 @@
 import java.util.ArrayList;
 import java.util.List;
 import javax.annotation.Nullable;
+import org.apache.avro.Conversions;
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.Schema.Type;
@@ -54,7 +57,7 @@
           .put("BYTES", Type.BYTES)
           .put("INTEGER", Type.LONG)
           .put("FLOAT", Type.DOUBLE)
-          .put("NUMERIC", Type.STRING)
+          .put("NUMERIC", Type.BYTES)
           .put("BOOLEAN", Type.BOOLEAN)
           .put("TIMESTAMP", Type.LONG)
           .put("RECORD", Type.RECORD)
@@ -131,7 +134,7 @@ private static Object getTypedCellValue(Schema schema, 
TableFieldSchema fieldSch
     String mode = firstNonNull(fieldSchema.getMode(), "NULLABLE");
     switch (mode) {
       case "REQUIRED":
-        return convertRequiredField(schema.getType(), fieldSchema, v);
+        return convertRequiredField(schema.getType(), schema.getLogicalType(), 
fieldSchema, v);
       case "REPEATED":
         return convertRepeatedField(schema, fieldSchema, v);
       case "NULLABLE":
@@ -159,14 +162,15 @@ private static Object getTypedCellValue(Schema schema, 
TableFieldSchema fieldSch
     List<Object> elements = (List<Object>) v;
     ImmutableList.Builder<Object> values = ImmutableList.builder();
     Type elementType = schema.getElementType().getType();
+    LogicalType elementLogicalType = schema.getElementType().getLogicalType();
     for (Object element : elements) {
-      values.add(convertRequiredField(elementType, fieldSchema, element));
+      values.add(convertRequiredField(elementType, elementLogicalType, 
fieldSchema, element));
     }
     return values.build();
   }
 
   private static Object convertRequiredField(
-      Type avroType, TableFieldSchema fieldSchema, Object v) {
+      Type avroType, LogicalType avroLogicalType, TableFieldSchema 
fieldSchema, Object v) {
     // REQUIRED fields are represented as the corresponding Avro types. For 
example, a BigQuery
     // INTEGER type maps to an Avro LONG type.
     checkNotNull(v, "REQUIRED field %s should not be null", 
fieldSchema.getName());
@@ -182,6 +186,8 @@ private static Object convertRequiredField(
         avroType,
         bqType,
         fieldSchema.getName());
+    // For historical reasons, don't validate avroLogicalType except for with 
NUMERIC.
+    // BigQuery represents NUMERIC in Avro format as BYTES with a DECIMAL 
logical type.
     switch (fieldSchema.getType()) {
       case "STRING":
       case "DATE":
@@ -198,11 +204,15 @@ private static Object convertRequiredField(
         verify(v instanceof Double, "Expected Double, got %s", v.getClass());
         return v;
       case "NUMERIC":
-        verify(
-            v instanceof CharSequence || v instanceof BigDecimal,
-            "Expected CharSequence (String) or BigDecimal, got %s",
-            v.getClass());
-        return v.toString();
+        // NUMERIC data types are represented as BYTES with the DECIMAL 
logical type. They are
+        // converted back to Strings with precision and scale determined by 
the logical type.
+        verify(v instanceof ByteBuffer, "Expected ByteBuffer, got %s", 
v.getClass());
+        verifyNotNull(avroLogicalType, "Expected Decimal logical type");
+        verify(avroLogicalType instanceof LogicalTypes.Decimal, "Expected 
Decimal logical type");
+        BigDecimal numericValue =
+            new Conversions.DecimalConversion()
+                .fromBytes((ByteBuffer) v, Schema.create(avroType), 
avroLogicalType);
+        return numericValue.toString();
       case "BOOLEAN":
         verify(v instanceof Boolean, "Expected Boolean, got %s", v.getClass());
         return v;
@@ -252,9 +262,10 @@ private static Object convertNullableField(
 
     Type firstType = unionTypes.get(0).getType();
     if (!firstType.equals(Type.NULL)) {
-      return convertRequiredField(firstType, fieldSchema, v);
+      return convertRequiredField(firstType, 
unionTypes.get(0).getLogicalType(), fieldSchema, v);
     }
-    return convertRequiredField(unionTypes.get(1).getType(), fieldSchema, v);
+    return convertRequiredField(
+        unionTypes.get(1).getType(), unionTypes.get(1).getLogicalType(), 
fieldSchema, v);
   }
 
   static Schema toGenericAvroSchema(String schemaName, List<TableFieldSchema> 
fieldSchemas) {
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java
index e000b561b1b..236f22ebc08 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java
@@ -32,6 +32,9 @@
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.avro.Conversions;
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.Schema.Type;
@@ -90,7 +93,39 @@
   public void testConvertGenericRecordToTableRow() throws Exception {
     TableSchema tableSchema = new TableSchema();
     tableSchema.setFields(fields);
-    Schema avroSchema = AvroCoder.of(Bird.class).getSchema();
+
+    // BigQuery encodes NUMERIC values to Avro using the BYTES type with the 
DECIMAL logical
+    // type. AvroCoder can't apply logical types to Schemas directly, so we 
need to get the
+    // Schema for the Bird class defined below, then replace the field used to 
test NUMERIC with
+    // a field that has the appropriate Schema.
+    BigDecimal birthdayMoney = new BigDecimal("123456789.123456789");
+    Schema birthdayMoneySchema = Schema.create(Type.BYTES);
+    LogicalType birthdayMoneyLogicalType =
+        LogicalTypes.decimal(birthdayMoney.precision(), birthdayMoney.scale());
+    // DecimalConversion.toBytes returns a ByteBuffer, which can be mutated by 
callees if passed
+    // to other methods. We wrap the byte array as a ByteBuffer when adding it 
to the
+    // GenericRecords below.
+    byte[] birthdayMoneyBytes =
+        new Conversions.DecimalConversion()
+            .toBytes(birthdayMoney, birthdayMoneySchema, 
birthdayMoneyLogicalType)
+            .array();
+
+    // In order to update the Schema for birthdayMoney, we need to recreate 
all of the Fields.
+    List<Schema.Field> avroFields = new ArrayList<>();
+    for (Schema.Field field : 
AvroCoder.of(Bird.class).getSchema().getFields()) {
+      Schema schema = field.schema();
+      if (field.name().equals("birthdayMoney")) {
+        // birthdayMoney is a nullable field with type BYTES/DECIMAL.
+        schema =
+            Schema.createUnion(
+                Schema.create(Type.NULL),
+                birthdayMoneyLogicalType.addToSchema(birthdayMoneySchema));
+      }
+      // After a Field is added to a Schema, it is assigned a position, so we 
can't simply reuse
+      // the existing Field.
+      avroFields.add(new Schema.Field(field.name(), schema, field.doc(), 
field.defaultValue()));
+    }
+    Schema avroSchema = Schema.createRecord(avroFields);
 
     {
       // Test nullable fields.
@@ -110,7 +145,7 @@ public void testConvertGenericRecordToTableRow() throws 
Exception {
       record.put("number", 5L);
       record.put("quality", 5.0);
       record.put("birthday", 5L);
-      record.put("birthdayMoney", new String("123456789.123456789"));
+      record.put("birthdayMoney", ByteBuffer.wrap(birthdayMoneyBytes));
       record.put("flighted", Boolean.TRUE);
       record.put("sound", soundByteBuffer);
       record.put("anniversaryDate", new Utf8("2000-01-01"));
@@ -121,7 +156,7 @@ public void testConvertGenericRecordToTableRow() throws 
Exception {
           new TableRow()
               .set("number", "5")
               .set("birthday", "1970-01-01 00:00:00.000005 UTC")
-              .set("birthdayMoney", "123456789.123456789")
+              .set("birthdayMoney", birthdayMoney.toString())
               .set("quality", 5.0)
               .set("associates", new ArrayList<TableRow>())
               .set("flighted", Boolean.TRUE)
@@ -139,13 +174,13 @@ public void testConvertGenericRecordToTableRow() throws 
Exception {
       GenericRecord record = new GenericData.Record(avroSchema);
       record.put("number", 5L);
       record.put("associates", Lists.newArrayList(nestedRecord));
-      record.put("birthdayMoney", new BigDecimal("987654321.987654321"));
+      record.put("birthdayMoney", ByteBuffer.wrap(birthdayMoneyBytes));
       TableRow convertedRow = 
BigQueryAvroUtils.convertGenericRecordToTableRow(record, tableSchema);
       TableRow row =
           new TableRow()
               .set("associates", Lists.newArrayList(new 
TableRow().set("species", "other")))
               .set("number", "5")
-              .set("birthdayMoney", "987654321.987654321");
+              .set("birthdayMoney", birthdayMoney.toString());
       assertEquals(row, convertedRow);
     }
   }
@@ -172,7 +207,7 @@ public void testConvertBigQuerySchemaToAvroSchema() {
         equalTo(Schema.createUnion(Schema.create(Type.NULL), 
Schema.create(Type.LONG))));
     assertThat(
         avroSchema.getField("birthdayMoney").schema(),
-        equalTo(Schema.createUnion(Schema.create(Type.NULL), 
Schema.create(Type.STRING))));
+        equalTo(Schema.createUnion(Schema.create(Type.NULL), 
Schema.create(Type.BYTES))));
     assertThat(
         avroSchema.getField("flighted").schema(),
         equalTo(Schema.createUnion(Schema.create(Type.NULL), 
Schema.create(Type.BOOLEAN))));
@@ -233,7 +268,7 @@ public void testConvertBigQuerySchemaToAvroSchema() {
     @Nullable Double quality;
     @Nullable Long quantity;
     @Nullable Long birthday; // Exercises TIMESTAMP.
-    @Nullable String birthdayMoney; // Exercises NUMERIC.
+    @Nullable ByteBuffer birthdayMoney; // Exercises NUMERIC.
     @Nullable Boolean flighted;
     @Nullable ByteBuffer sound;
     @Nullable Utf8 anniversaryDate;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 124239)
    Time Spent: 6h 40m  (was: 6.5h)

> BigqueryIO Numeric datatype Support
> -----------------------------------
>
>                 Key: BEAM-4417
>                 URL: https://issues.apache.org/jira/browse/BEAM-4417
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-gcp
>    Affects Versions: 2.4.0
>            Reporter: Kishan Kumar
>            Assignee: Chamikara Jayalath
>            Priority: Critical
>              Labels: newbie, patch
>             Fix For: 2.7.0
>
>          Time Spent: 6h 40m
>  Remaining Estimate: 0h
>
> The BigQueryIO.read fails while parsing the data from the avro file generated 
> while reading the data from the table which has columns with *Numeric* 
> datatypes. 
> We have gone through the source code at Git-Hub and noticed that *Numeric 
> data type is not yet supported.* 
>  
> Caused by: com.google.common.base.VerifyException: Unsupported BigQuery type: 
> NUMERIC
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to