RustedBones commented on code in PR #32482:
URL: https://github.com/apache/beam/pull/32482#discussion_r1768170812


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java:
##########
@@ -273,145 +292,104 @@ private static List<Object> convertRepeatedField(
     @SuppressWarnings("unchecked")
     List<Object> elements = (List<Object>) v;
     ArrayList<Object> values = new ArrayList<>();
-    Type elementType = schema.getElementType().getType();
-    LogicalType elementLogicalType = schema.getElementType().getLogicalType();
     for (Object element : elements) {
-      values.add(convertRequiredField(elementType, elementLogicalType, 
fieldSchema, element));
+      values.add(convertRequiredField(name, elementType, element));
     }
     return values;
   }
 
-  private static Object convertRequiredField(
-      Type avroType, LogicalType avroLogicalType, TableFieldSchema 
fieldSchema, Object v) {
+  private static Object convertRequiredField(String name, Schema schema, 
Object v) {
     // REQUIRED fields are represented as the corresponding Avro types. For 
example, a BigQuery
     // INTEGER type maps to an Avro LONG type.
-    checkNotNull(v, "REQUIRED field %s should not be null", 
fieldSchema.getName());
-    // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, 
the type field
-    // is required, so it may not be null.
-    String bqType = fieldSchema.getType();
-    ImmutableCollection<Type> expectedAvroTypes = 
BIG_QUERY_TO_AVRO_TYPES.get(bqType);
-    verifyNotNull(expectedAvroTypes, "Unsupported BigQuery type: %s", bqType);
-    verify(
-        expectedAvroTypes.contains(avroType),
-        "Expected Avro schema types %s for BigQuery %s field %s, but received 
%s",
-        expectedAvroTypes,
-        bqType,
-        fieldSchema.getName(),
-        avroType);
+    checkNotNull(v, "REQUIRED field %s should not be null", name);
+
     // For historical reasons, don't validate avroLogicalType except for with 
NUMERIC.
     // BigQuery represents NUMERIC in Avro format as BYTES with a DECIMAL 
logical type.
-    switch (bqType) {
-      case "STRING":
-      case "DATETIME":
-      case "GEOGRAPHY":
-      case "JSON":
-        // Avro will use a CharSequence to represent String objects, but it 
may not always use
-        // java.lang.String; for example, it may prefer 
org.apache.avro.util.Utf8.
-        verify(v instanceof CharSequence, "Expected CharSequence (String), got 
%s", v.getClass());
-        return v.toString();
-      case "DATE":
-        if (avroType == Type.INT) {
-          verify(v instanceof Integer, "Expected Integer, got %s", 
v.getClass());
-          verifyNotNull(avroLogicalType, "Expected Date logical type");
-          verify(avroLogicalType instanceof LogicalTypes.Date, "Expected Date 
logical type");
+    Type type = schema.getType();
+    LogicalType logicalType = schema.getLogicalType();
+    switch (type) {
+      case BOOLEAN:
+        // SQL types BOOL, BOOLEAN
+        return v;
+      case INT:
+        if (logicalType instanceof LogicalTypes.Date) {
+          // SQL types DATE
           return formatDate((Integer) v);
         } else {
-          verify(v instanceof CharSequence, "Expected CharSequence (String), 
got %s", v.getClass());
-          return v.toString();
+          throw new UnsupportedOperationException(

Review Comment:
   We are never supposed to get an avro `INT` from BQ except for a sql `DATE` 
when logical-type is enabled. IMHO we should consider that as an error.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java:
##########
@@ -322,15 +322,21 @@ private static FieldType fromTableFieldSchemaType(
       case "BYTES":
         return FieldType.BYTES;
       case "INT64":
+      case "INT":
+      case "SMALLINT":
       case "INTEGER":
+      case "BIGINT":
+      case "TINYINT":
+      case "BYTEINT":

Review Comment:
   those are the BQ types as described 
[here](https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#data_type_list).
 But it looks the schema uses another set of values. See in 
[getType](https://googleapis.dev/java/google-api-services-bigquery/latest/com/google/api/services/bigquery/model/TableFieldSchema.html#getType--)



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java:
##########
@@ -273,145 +292,104 @@ private static List<Object> convertRepeatedField(
     @SuppressWarnings("unchecked")
     List<Object> elements = (List<Object>) v;
     ArrayList<Object> values = new ArrayList<>();
-    Type elementType = schema.getElementType().getType();
-    LogicalType elementLogicalType = schema.getElementType().getLogicalType();
     for (Object element : elements) {
-      values.add(convertRequiredField(elementType, elementLogicalType, 
fieldSchema, element));
+      values.add(convertRequiredField(name, elementType, element));
     }
     return values;
   }
 
-  private static Object convertRequiredField(
-      Type avroType, LogicalType avroLogicalType, TableFieldSchema 
fieldSchema, Object v) {
+  private static Object convertRequiredField(String name, Schema schema, 
Object v) {
     // REQUIRED fields are represented as the corresponding Avro types. For 
example, a BigQuery
     // INTEGER type maps to an Avro LONG type.
-    checkNotNull(v, "REQUIRED field %s should not be null", 
fieldSchema.getName());
-    // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, 
the type field
-    // is required, so it may not be null.
-    String bqType = fieldSchema.getType();
-    ImmutableCollection<Type> expectedAvroTypes = 
BIG_QUERY_TO_AVRO_TYPES.get(bqType);
-    verifyNotNull(expectedAvroTypes, "Unsupported BigQuery type: %s", bqType);
-    verify(
-        expectedAvroTypes.contains(avroType),
-        "Expected Avro schema types %s for BigQuery %s field %s, but received 
%s",
-        expectedAvroTypes,
-        bqType,
-        fieldSchema.getName(),
-        avroType);
+    checkNotNull(v, "REQUIRED field %s should not be null", name);
+
     // For historical reasons, don't validate avroLogicalType except for with 
NUMERIC.
     // BigQuery represents NUMERIC in Avro format as BYTES with a DECIMAL 
logical type.
-    switch (bqType) {
-      case "STRING":
-      case "DATETIME":
-      case "GEOGRAPHY":
-      case "JSON":
-        // Avro will use a CharSequence to represent String objects, but it 
may not always use
-        // java.lang.String; for example, it may prefer 
org.apache.avro.util.Utf8.
-        verify(v instanceof CharSequence, "Expected CharSequence (String), got 
%s", v.getClass());
-        return v.toString();
-      case "DATE":
-        if (avroType == Type.INT) {
-          verify(v instanceof Integer, "Expected Integer, got %s", 
v.getClass());
-          verifyNotNull(avroLogicalType, "Expected Date logical type");
-          verify(avroLogicalType instanceof LogicalTypes.Date, "Expected Date 
logical type");
+    Type type = schema.getType();
+    LogicalType logicalType = schema.getLogicalType();
+    switch (type) {
+      case BOOLEAN:
+        // SQL types BOOL, BOOLEAN
+        return v;
+      case INT:
+        if (logicalType instanceof LogicalTypes.Date) {
+          // SQL types DATE
           return formatDate((Integer) v);
         } else {
-          verify(v instanceof CharSequence, "Expected CharSequence (String), 
got %s", v.getClass());
-          return v.toString();
+          throw new UnsupportedOperationException(
+              String.format(
+                  "Unexpected BigQuery field schema type %s for field named 
%s", type, name));
         }
-      case "TIME":
-        if (avroType == Type.LONG) {
-          verify(v instanceof Long, "Expected Long, got %s", v.getClass());
-          verifyNotNull(avroLogicalType, "Expected TimeMicros logical type");
-          verify(
-              avroLogicalType instanceof LogicalTypes.TimeMicros,
-              "Expected TimeMicros logical type");
+      case LONG:
+        if (logicalType instanceof LogicalTypes.TimeMicros) {
+          // SQL types TIME
           return formatTime((Long) v);
+        } else if (logicalType instanceof LogicalTypes.TimestampMicros) {
+          // SQL types TIMESTAMP
+          return formatTimestamp((Long) v);
         } else {
-          verify(v instanceof CharSequence, "Expected CharSequence (String), 
got %s", v.getClass());
-          return v.toString();
+          // SQL types INT64 (INT, SMALLINT, INTEGER, BIGINT, TINYINT, BYTEINT)
+          return ((Long) v).toString();

Review Comment:
   It feels not natural but that's the old behavior



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -1275,8 +1272,12 @@ public PCollection<T> expand(PBegin input) {
 
       Schema beamSchema = null;
       if (getTypeDescriptor() != null && getToBeamRowFn() != null && 
getFromBeamRowFn() != null) {
-        beamSchema = sourceDef.getBeamSchema(bqOptions);
-        beamSchema = getFinalSchema(beamSchema, getSelectedFields());
+        TableSchema tableSchema = sourceDef.getTableSchema(bqOptions);
+        ValueProvider<List<String>> selectedFields = getSelectedFields();
+        if (selectedFields != null) {
+          tableSchema = BigQueryUtils.trimSchema(tableSchema, 
selectedFields.get());

Review Comment:
   After reflection, this change is unrelated to the PR description. will move 
to a separate PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to