ahmedabu98 commented on code in PR #32482:
URL: https://github.com/apache/beam/pull/32482#discussion_r1768754113


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java:
##########
@@ -239,32 +304,25 @@ private static TableRow convertGenericRecordToTableRow(
     return row;
   }
 
-  private static @Nullable Object getTypedCellValue(
-      Schema schema, TableFieldSchema fieldSchema, Object v) {
+  private static @Nullable Object getTypedCellValue(String name, Schema 
schema, Object v) {
     // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, 
the mode field
     // is optional (and so it may be null), but defaults to "NULLABLE".

Review Comment:
   Consider removing this comment block if it's no longer relevant



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java:
##########
@@ -273,145 +331,102 @@ private static List<Object> convertRepeatedField(
     @SuppressWarnings("unchecked")
     List<Object> elements = (List<Object>) v;
     ArrayList<Object> values = new ArrayList<>();
-    Type elementType = schema.getElementType().getType();
-    LogicalType elementLogicalType = schema.getElementType().getLogicalType();
     for (Object element : elements) {
-      values.add(convertRequiredField(elementType, elementLogicalType, 
fieldSchema, element));
+      values.add(convertRequiredField(name, elementType, element));
     }
     return values;
   }
 
-  private static Object convertRequiredField(
-      Type avroType, LogicalType avroLogicalType, TableFieldSchema 
fieldSchema, Object v) {
+  private static Object convertRequiredField(String name, Schema schema, 
Object v) {
     // REQUIRED fields are represented as the corresponding Avro types. For 
example, a BigQuery
     // INTEGER type maps to an Avro LONG type.
-    checkNotNull(v, "REQUIRED field %s should not be null", 
fieldSchema.getName());
-    // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, 
the type field
-    // is required, so it may not be null.
-    String bqType = fieldSchema.getType();
-    ImmutableCollection<Type> expectedAvroTypes = 
BIG_QUERY_TO_AVRO_TYPES.get(bqType);
-    verifyNotNull(expectedAvroTypes, "Unsupported BigQuery type: %s", bqType);
-    verify(
-        expectedAvroTypes.contains(avroType),
-        "Expected Avro schema types %s for BigQuery %s field %s, but received 
%s",
-        expectedAvroTypes,
-        bqType,
-        fieldSchema.getName(),
-        avroType);
-    // For historical reasons, don't validate avroLogicalType except for with 
NUMERIC.
-    // BigQuery represents NUMERIC in Avro format as BYTES with a DECIMAL 
logical type.
-    switch (bqType) {
-      case "STRING":
-      case "DATETIME":
-      case "GEOGRAPHY":
-      case "JSON":
-        // Avro will use a CharSequence to represent String objects, but it 
may not always use
-        // java.lang.String; for example, it may prefer 
org.apache.avro.util.Utf8.
-        verify(v instanceof CharSequence, "Expected CharSequence (String), got 
%s", v.getClass());
-        return v.toString();
-      case "DATE":
-        if (avroType == Type.INT) {
-          verify(v instanceof Integer, "Expected Integer, got %s", 
v.getClass());
-          verifyNotNull(avroLogicalType, "Expected Date logical type");
-          verify(avroLogicalType instanceof LogicalTypes.Date, "Expected Date 
logical type");
+    checkNotNull(v, "REQUIRED field %s should not be null", name);
+
+    Type type = schema.getType();
+    LogicalType logicalType = schema.getLogicalType();
+    switch (type) {
+      case BOOLEAN:
+        // SQL types BOOL, BOOLEAN
+        return v;
+      case INT:
+        if (logicalType instanceof LogicalTypes.Date) {
+          // SQL types DATE
           return formatDate((Integer) v);
         } else {
-          verify(v instanceof CharSequence, "Expected CharSequence (String), 
got %s", v.getClass());
-          return v.toString();
+          throw new UnsupportedOperationException(
+              String.format(
+                  "Unexpected BigQuery field schema type %s for field named 
%s", type, name));
         }
-      case "TIME":
-        if (avroType == Type.LONG) {
-          verify(v instanceof Long, "Expected Long, got %s", v.getClass());
-          verifyNotNull(avroLogicalType, "Expected TimeMicros logical type");
-          verify(
-              avroLogicalType instanceof LogicalTypes.TimeMicros,
-              "Expected TimeMicros logical type");
+      case LONG:
+        if (logicalType instanceof LogicalTypes.TimeMicros) {
+          // SQL types TIME
           return formatTime((Long) v);
+        } else if (logicalType instanceof LogicalTypes.TimestampMicros) {
+          // SQL types TIMESTAMP
+          return formatTimestamp((Long) v);
         } else {
-          verify(v instanceof CharSequence, "Expected CharSequence (String), 
got %s", v.getClass());
-          return v.toString();
+          // SQL types INT64 (INT, SMALLINT, INTEGER, BIGINT, TINYINT, BYTEINT)
+          return ((Long) v).toString();
         }
-      case "INTEGER":
-      case "INT64":
-        verify(v instanceof Long, "Expected Long, got %s", v.getClass());
-        return ((Long) v).toString();
-      case "FLOAT":
-      case "FLOAT64":
-        verify(v instanceof Double, "Expected Double, got %s", v.getClass());
+      case DOUBLE:
+        // SQL types FLOAT64
         return v;
-      case "NUMERIC":
-      case "BIGNUMERIC":
-        // NUMERIC data types are represented as BYTES with the DECIMAL 
logical type. They are
-        // converted back to Strings with precision and scale determined by 
the logical type.
-        verify(v instanceof ByteBuffer, "Expected ByteBuffer, got %s", 
v.getClass());
-        verifyNotNull(avroLogicalType, "Expected Decimal logical type");
-        verify(avroLogicalType instanceof LogicalTypes.Decimal, "Expected 
Decimal logical type");
-        BigDecimal numericValue =
-            new Conversions.DecimalConversion()
-                .fromBytes((ByteBuffer) v, Schema.create(avroType), 
avroLogicalType);
-        return numericValue.toString();
-      case "BOOL":
-      case "BOOLEAN":
-        verify(v instanceof Boolean, "Expected Boolean, got %s", v.getClass());
-        return v;
-      case "TIMESTAMP":
-        // TIMESTAMP data types are represented as Avro LONG types, 
microseconds since the epoch.
-        // Values may be negative since BigQuery timestamps start at 
0001-01-01 00:00:00 UTC.
-        verify(v instanceof Long, "Expected Long, got %s", v.getClass());
-        return formatTimestamp((Long) v);
-      case "RECORD":
-      case "STRUCT":
-        verify(v instanceof GenericRecord, "Expected GenericRecord, got %s", 
v.getClass());
-        return convertGenericRecordToTableRow((GenericRecord) v, 
fieldSchema.getFields());
-      case "BYTES":
-        verify(v instanceof ByteBuffer, "Expected ByteBuffer, got %s", 
v.getClass());
-        ByteBuffer byteBuffer = (ByteBuffer) v;
-        byte[] bytes = new byte[byteBuffer.limit()];
-        byteBuffer.get(bytes);
-        return BaseEncoding.base64().encode(bytes);
+      case BYTES:
+        if (logicalType instanceof LogicalTypes.Decimal) {
+          // SQL tpe NUMERIC, BIGNUMERIC
+          return new Conversions.DecimalConversion()
+              .fromBytes((ByteBuffer) v, schema, logicalType)
+              .toString();
+        } else {
+          // SQL types BYTES
+          return BaseEncoding.base64().encode(((ByteBuffer) v).array());
+        }
+      case STRING:
+        // SQL types STRING, DATETIME, GEOGRAPHY, JSON
+        // when not using logical type DATE, TIME too
+        return v.toString();
+      case RECORD:
+        return convertGenericRecordToTableRow((GenericRecord) v);
       default:
         throw new UnsupportedOperationException(
             String.format(
-                "Unexpected BigQuery field schema type %s for field named %s",
-                fieldSchema.getType(), fieldSchema.getName()));
+                "Unexpected BigQuery field schema type %s for field named %s", 
type, name));

Review Comment:
   ```suggestion
                   "Unexpected Avro field schema type %s for field named %s", 
type, name));
   ```
   Similarly



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java:
##########
@@ -273,145 +292,104 @@ private static List<Object> convertRepeatedField(
     @SuppressWarnings("unchecked")
     List<Object> elements = (List<Object>) v;
     ArrayList<Object> values = new ArrayList<>();
-    Type elementType = schema.getElementType().getType();
-    LogicalType elementLogicalType = schema.getElementType().getLogicalType();
     for (Object element : elements) {
-      values.add(convertRequiredField(elementType, elementLogicalType, 
fieldSchema, element));
+      values.add(convertRequiredField(name, elementType, element));
     }
     return values;
   }
 
-  private static Object convertRequiredField(
-      Type avroType, LogicalType avroLogicalType, TableFieldSchema 
fieldSchema, Object v) {
+  private static Object convertRequiredField(String name, Schema schema, 
Object v) {
     // REQUIRED fields are represented as the corresponding Avro types. For 
example, a BigQuery
     // INTEGER type maps to an Avro LONG type.
-    checkNotNull(v, "REQUIRED field %s should not be null", 
fieldSchema.getName());
-    // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, 
the type field
-    // is required, so it may not be null.
-    String bqType = fieldSchema.getType();
-    ImmutableCollection<Type> expectedAvroTypes = 
BIG_QUERY_TO_AVRO_TYPES.get(bqType);
-    verifyNotNull(expectedAvroTypes, "Unsupported BigQuery type: %s", bqType);
-    verify(
-        expectedAvroTypes.contains(avroType),
-        "Expected Avro schema types %s for BigQuery %s field %s, but received 
%s",
-        expectedAvroTypes,
-        bqType,
-        fieldSchema.getName(),
-        avroType);
+    checkNotNull(v, "REQUIRED field %s should not be null", name);
+
     // For historical reasons, don't validate avroLogicalType except for with 
NUMERIC.
     // BigQuery represents NUMERIC in Avro format as BYTES with a DECIMAL 
logical type.
-    switch (bqType) {
-      case "STRING":
-      case "DATETIME":
-      case "GEOGRAPHY":
-      case "JSON":
-        // Avro will use a CharSequence to represent String objects, but it 
may not always use
-        // java.lang.String; for example, it may prefer 
org.apache.avro.util.Utf8.
-        verify(v instanceof CharSequence, "Expected CharSequence (String), got 
%s", v.getClass());
-        return v.toString();
-      case "DATE":
-        if (avroType == Type.INT) {
-          verify(v instanceof Integer, "Expected Integer, got %s", 
v.getClass());
-          verifyNotNull(avroLogicalType, "Expected Date logical type");
-          verify(avroLogicalType instanceof LogicalTypes.Date, "Expected Date 
logical type");
+    Type type = schema.getType();
+    LogicalType logicalType = schema.getLogicalType();
+    switch (type) {
+      case BOOLEAN:
+        // SQL types BOOL, BOOLEAN
+        return v;
+      case INT:
+        if (logicalType instanceof LogicalTypes.Date) {
+          // SQL types DATE
           return formatDate((Integer) v);
         } else {
-          verify(v instanceof CharSequence, "Expected CharSequence (String), 
got %s", v.getClass());
-          return v.toString();
+          throw new UnsupportedOperationException(
+              String.format(
+                  "Unexpected BigQuery field schema type %s for field named 
%s", type, name));
         }
-      case "TIME":
-        if (avroType == Type.LONG) {
-          verify(v instanceof Long, "Expected Long, got %s", v.getClass());
-          verifyNotNull(avroLogicalType, "Expected TimeMicros logical type");
-          verify(
-              avroLogicalType instanceof LogicalTypes.TimeMicros,
-              "Expected TimeMicros logical type");
+      case LONG:
+        if (logicalType instanceof LogicalTypes.TimeMicros) {
+          // SQL types TIME
           return formatTime((Long) v);
+        } else if (logicalType instanceof LogicalTypes.TimestampMicros) {
+          // SQL types TIMESTAMP
+          return formatTimestamp((Long) v);
         } else {
-          verify(v instanceof CharSequence, "Expected CharSequence (String), 
got %s", v.getClass());
-          return v.toString();
+          // SQL types INT64 (INT, SMALLINT, INTEGER, BIGINT, TINYINT, BYTEINT)
+          return ((Long) v).toString();

Review Comment:
   What happens if we keep the long? would it be a breaking change?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java:
##########
@@ -273,145 +331,102 @@ private static List<Object> convertRepeatedField(
     @SuppressWarnings("unchecked")
     List<Object> elements = (List<Object>) v;
     ArrayList<Object> values = new ArrayList<>();
-    Type elementType = schema.getElementType().getType();
-    LogicalType elementLogicalType = schema.getElementType().getLogicalType();
     for (Object element : elements) {
-      values.add(convertRequiredField(elementType, elementLogicalType, 
fieldSchema, element));
+      values.add(convertRequiredField(name, elementType, element));
     }
     return values;
   }
 
-  private static Object convertRequiredField(
-      Type avroType, LogicalType avroLogicalType, TableFieldSchema 
fieldSchema, Object v) {
+  private static Object convertRequiredField(String name, Schema schema, 
Object v) {
     // REQUIRED fields are represented as the corresponding Avro types. For 
example, a BigQuery
     // INTEGER type maps to an Avro LONG type.
-    checkNotNull(v, "REQUIRED field %s should not be null", 
fieldSchema.getName());
-    // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, 
the type field
-    // is required, so it may not be null.
-    String bqType = fieldSchema.getType();
-    ImmutableCollection<Type> expectedAvroTypes = 
BIG_QUERY_TO_AVRO_TYPES.get(bqType);
-    verifyNotNull(expectedAvroTypes, "Unsupported BigQuery type: %s", bqType);
-    verify(
-        expectedAvroTypes.contains(avroType),
-        "Expected Avro schema types %s for BigQuery %s field %s, but received 
%s",
-        expectedAvroTypes,
-        bqType,
-        fieldSchema.getName(),
-        avroType);
-    // For historical reasons, don't validate avroLogicalType except for with 
NUMERIC.
-    // BigQuery represents NUMERIC in Avro format as BYTES with a DECIMAL 
logical type.
-    switch (bqType) {
-      case "STRING":
-      case "DATETIME":
-      case "GEOGRAPHY":
-      case "JSON":
-        // Avro will use a CharSequence to represent String objects, but it 
may not always use
-        // java.lang.String; for example, it may prefer 
org.apache.avro.util.Utf8.
-        verify(v instanceof CharSequence, "Expected CharSequence (String), got 
%s", v.getClass());
-        return v.toString();
-      case "DATE":
-        if (avroType == Type.INT) {
-          verify(v instanceof Integer, "Expected Integer, got %s", 
v.getClass());
-          verifyNotNull(avroLogicalType, "Expected Date logical type");
-          verify(avroLogicalType instanceof LogicalTypes.Date, "Expected Date 
logical type");
+    checkNotNull(v, "REQUIRED field %s should not be null", name);
+
+    Type type = schema.getType();
+    LogicalType logicalType = schema.getLogicalType();
+    switch (type) {
+      case BOOLEAN:
+        // SQL types BOOL, BOOLEAN
+        return v;
+      case INT:
+        if (logicalType instanceof LogicalTypes.Date) {
+          // SQL types DATE
           return formatDate((Integer) v);
         } else {
-          verify(v instanceof CharSequence, "Expected CharSequence (String), 
got %s", v.getClass());
-          return v.toString();
+          throw new UnsupportedOperationException(
+              String.format(
+                  "Unexpected BigQuery field schema type %s for field named 
%s", type, name));

Review Comment:
   ```suggestion
                     "Unexpected Avro field schema type %s for field named %s", 
type, name));
   ```
   
   Implementation switched to looking at Avro types, error should reflect that 
too.
   
   P.S. consider also mentioning the logical type here, since it caused this 
error



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java:
##########
@@ -239,32 +304,25 @@ private static TableRow convertGenericRecordToTableRow(
     return row;
   }
 
-  private static @Nullable Object getTypedCellValue(
-      Schema schema, TableFieldSchema fieldSchema, Object v) {
+  private static @Nullable Object getTypedCellValue(String name, Schema 
schema, Object v) {
     // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, 
the mode field
     // is optional (and so it may be null), but defaults to "NULLABLE".
-    String mode = firstNonNull(fieldSchema.getMode(), "NULLABLE");
-    switch (mode) {
-      case "REQUIRED":
-        return convertRequiredField(schema.getType(), schema.getLogicalType(), 
fieldSchema, v);
-      case "REPEATED":
-        return convertRepeatedField(schema, fieldSchema, v);
-      case "NULLABLE":
-        return convertNullableField(schema, fieldSchema, v);
-      default:
+    Type type = schema.getType();
+    switch (type) {
+      case ARRAY:
+        return convertRepeatedField(name, schema.getElementType(), v);
+      case UNION:
+        return convertNullableField(name, schema, v);
+      case MAP:
         throw new UnsupportedOperationException(
-            "Parsing a field with BigQuery field schema mode " + 
fieldSchema.getMode());
+            String.format(
+                "Unexpected BigQuery field schema type %s for field named %s", 
type, name));

Review Comment:
   ```suggestion
                   "Unexpected Avro field schema type %s for field named %s", 
type, name));
   ```
   Implementation switched to look at Avro types here right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to