ahmedabu98 commented on code in PR #32482: URL: https://github.com/apache/beam/pull/32482#discussion_r1768754113
########## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java: ########## @@ -239,32 +304,25 @@ private static TableRow convertGenericRecordToTableRow( return row; } - private static @Nullable Object getTypedCellValue( - Schema schema, TableFieldSchema fieldSchema, Object v) { + private static @Nullable Object getTypedCellValue(String name, Schema schema, Object v) { // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, the mode field // is optional (and so it may be null), but defaults to "NULLABLE". Review Comment: Consider removing this comment block if it's no longer relevant ########## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java: ########## @@ -273,145 +331,102 @@ private static List<Object> convertRepeatedField( @SuppressWarnings("unchecked") List<Object> elements = (List<Object>) v; ArrayList<Object> values = new ArrayList<>(); - Type elementType = schema.getElementType().getType(); - LogicalType elementLogicalType = schema.getElementType().getLogicalType(); for (Object element : elements) { - values.add(convertRequiredField(elementType, elementLogicalType, fieldSchema, element)); + values.add(convertRequiredField(name, elementType, element)); } return values; } - private static Object convertRequiredField( - Type avroType, LogicalType avroLogicalType, TableFieldSchema fieldSchema, Object v) { + private static Object convertRequiredField(String name, Schema schema, Object v) { // REQUIRED fields are represented as the corresponding Avro types. For example, a BigQuery // INTEGER type maps to an Avro LONG type. - checkNotNull(v, "REQUIRED field %s should not be null", fieldSchema.getName()); - // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, the type field - // is required, so it may not be null. - String bqType = fieldSchema.getType(); - ImmutableCollection<Type> expectedAvroTypes = BIG_QUERY_TO_AVRO_TYPES.get(bqType); - verifyNotNull(expectedAvroTypes, "Unsupported BigQuery type: %s", bqType); - verify( - expectedAvroTypes.contains(avroType), - "Expected Avro schema types %s for BigQuery %s field %s, but received %s", - expectedAvroTypes, - bqType, - fieldSchema.getName(), - avroType); - // For historical reasons, don't validate avroLogicalType except for with NUMERIC. - // BigQuery represents NUMERIC in Avro format as BYTES with a DECIMAL logical type. - switch (bqType) { - case "STRING": - case "DATETIME": - case "GEOGRAPHY": - case "JSON": - // Avro will use a CharSequence to represent String objects, but it may not always use - // java.lang.String; for example, it may prefer org.apache.avro.util.Utf8. - verify(v instanceof CharSequence, "Expected CharSequence (String), got %s", v.getClass()); - return v.toString(); - case "DATE": - if (avroType == Type.INT) { - verify(v instanceof Integer, "Expected Integer, got %s", v.getClass()); - verifyNotNull(avroLogicalType, "Expected Date logical type"); - verify(avroLogicalType instanceof LogicalTypes.Date, "Expected Date logical type"); + checkNotNull(v, "REQUIRED field %s should not be null", name); + + Type type = schema.getType(); + LogicalType logicalType = schema.getLogicalType(); + switch (type) { + case BOOLEAN: + // SQL types BOOL, BOOLEAN + return v; + case INT: + if (logicalType instanceof LogicalTypes.Date) { + // SQL types DATE return formatDate((Integer) v); } else { - verify(v instanceof CharSequence, "Expected CharSequence (String), got %s", v.getClass()); - return v.toString(); + throw new UnsupportedOperationException( + String.format( + "Unexpected BigQuery field schema type %s for field named %s", type, name)); } - case "TIME": - if (avroType == Type.LONG) { - verify(v instanceof Long, "Expected Long, got %s", v.getClass()); - verifyNotNull(avroLogicalType, "Expected TimeMicros logical type"); - verify( - avroLogicalType instanceof LogicalTypes.TimeMicros, - "Expected TimeMicros logical type"); + case LONG: + if (logicalType instanceof LogicalTypes.TimeMicros) { + // SQL types TIME return formatTime((Long) v); + } else if (logicalType instanceof LogicalTypes.TimestampMicros) { + // SQL types TIMESTAMP + return formatTimestamp((Long) v); } else { - verify(v instanceof CharSequence, "Expected CharSequence (String), got %s", v.getClass()); - return v.toString(); + // SQL types INT64 (INT, SMALLINT, INTEGER, BIGINT, TINYINT, BYTEINT) + return ((Long) v).toString(); } - case "INTEGER": - case "INT64": - verify(v instanceof Long, "Expected Long, got %s", v.getClass()); - return ((Long) v).toString(); - case "FLOAT": - case "FLOAT64": - verify(v instanceof Double, "Expected Double, got %s", v.getClass()); + case DOUBLE: + // SQL types FLOAT64 return v; - case "NUMERIC": - case "BIGNUMERIC": - // NUMERIC data types are represented as BYTES with the DECIMAL logical type. They are - // converted back to Strings with precision and scale determined by the logical type. - verify(v instanceof ByteBuffer, "Expected ByteBuffer, got %s", v.getClass()); - verifyNotNull(avroLogicalType, "Expected Decimal logical type"); - verify(avroLogicalType instanceof LogicalTypes.Decimal, "Expected Decimal logical type"); - BigDecimal numericValue = - new Conversions.DecimalConversion() - .fromBytes((ByteBuffer) v, Schema.create(avroType), avroLogicalType); - return numericValue.toString(); - case "BOOL": - case "BOOLEAN": - verify(v instanceof Boolean, "Expected Boolean, got %s", v.getClass()); - return v; - case "TIMESTAMP": - // TIMESTAMP data types are represented as Avro LONG types, microseconds since the epoch. - // Values may be negative since BigQuery timestamps start at 0001-01-01 00:00:00 UTC. - verify(v instanceof Long, "Expected Long, got %s", v.getClass()); - return formatTimestamp((Long) v); - case "RECORD": - case "STRUCT": - verify(v instanceof GenericRecord, "Expected GenericRecord, got %s", v.getClass()); - return convertGenericRecordToTableRow((GenericRecord) v, fieldSchema.getFields()); - case "BYTES": - verify(v instanceof ByteBuffer, "Expected ByteBuffer, got %s", v.getClass()); - ByteBuffer byteBuffer = (ByteBuffer) v; - byte[] bytes = new byte[byteBuffer.limit()]; - byteBuffer.get(bytes); - return BaseEncoding.base64().encode(bytes); + case BYTES: + if (logicalType instanceof LogicalTypes.Decimal) { + // SQL tpe NUMERIC, BIGNUMERIC + return new Conversions.DecimalConversion() + .fromBytes((ByteBuffer) v, schema, logicalType) + .toString(); + } else { + // SQL types BYTES + return BaseEncoding.base64().encode(((ByteBuffer) v).array()); + } + case STRING: + // SQL types STRING, DATETIME, GEOGRAPHY, JSON + // when not using logical type DATE, TIME too + return v.toString(); + case RECORD: + return convertGenericRecordToTableRow((GenericRecord) v); default: throw new UnsupportedOperationException( String.format( - "Unexpected BigQuery field schema type %s for field named %s", - fieldSchema.getType(), fieldSchema.getName())); + "Unexpected BigQuery field schema type %s for field named %s", type, name)); Review Comment: ```suggestion "Unexpected Avro field schema type %s for field named %s", type, name)); ``` Similarly ########## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java: ########## @@ -273,145 +292,104 @@ private static List<Object> convertRepeatedField( @SuppressWarnings("unchecked") List<Object> elements = (List<Object>) v; ArrayList<Object> values = new ArrayList<>(); - Type elementType = schema.getElementType().getType(); - LogicalType elementLogicalType = schema.getElementType().getLogicalType(); for (Object element : elements) { - values.add(convertRequiredField(elementType, elementLogicalType, fieldSchema, element)); + values.add(convertRequiredField(name, elementType, element)); } return values; } - private static Object convertRequiredField( - Type avroType, LogicalType avroLogicalType, TableFieldSchema fieldSchema, Object v) { + private static Object convertRequiredField(String name, Schema schema, Object v) { // REQUIRED fields are represented as the corresponding Avro types. For example, a BigQuery // INTEGER type maps to an Avro LONG type. - checkNotNull(v, "REQUIRED field %s should not be null", fieldSchema.getName()); - // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, the type field - // is required, so it may not be null. - String bqType = fieldSchema.getType(); - ImmutableCollection<Type> expectedAvroTypes = BIG_QUERY_TO_AVRO_TYPES.get(bqType); - verifyNotNull(expectedAvroTypes, "Unsupported BigQuery type: %s", bqType); - verify( - expectedAvroTypes.contains(avroType), - "Expected Avro schema types %s for BigQuery %s field %s, but received %s", - expectedAvroTypes, - bqType, - fieldSchema.getName(), - avroType); + checkNotNull(v, "REQUIRED field %s should not be null", name); + // For historical reasons, don't validate avroLogicalType except for with NUMERIC. // BigQuery represents NUMERIC in Avro format as BYTES with a DECIMAL logical type. - switch (bqType) { - case "STRING": - case "DATETIME": - case "GEOGRAPHY": - case "JSON": - // Avro will use a CharSequence to represent String objects, but it may not always use - // java.lang.String; for example, it may prefer org.apache.avro.util.Utf8. - verify(v instanceof CharSequence, "Expected CharSequence (String), got %s", v.getClass()); - return v.toString(); - case "DATE": - if (avroType == Type.INT) { - verify(v instanceof Integer, "Expected Integer, got %s", v.getClass()); - verifyNotNull(avroLogicalType, "Expected Date logical type"); - verify(avroLogicalType instanceof LogicalTypes.Date, "Expected Date logical type"); + Type type = schema.getType(); + LogicalType logicalType = schema.getLogicalType(); + switch (type) { + case BOOLEAN: + // SQL types BOOL, BOOLEAN + return v; + case INT: + if (logicalType instanceof LogicalTypes.Date) { + // SQL types DATE return formatDate((Integer) v); } else { - verify(v instanceof CharSequence, "Expected CharSequence (String), got %s", v.getClass()); - return v.toString(); + throw new UnsupportedOperationException( + String.format( + "Unexpected BigQuery field schema type %s for field named %s", type, name)); } - case "TIME": - if (avroType == Type.LONG) { - verify(v instanceof Long, "Expected Long, got %s", v.getClass()); - verifyNotNull(avroLogicalType, "Expected TimeMicros logical type"); - verify( - avroLogicalType instanceof LogicalTypes.TimeMicros, - "Expected TimeMicros logical type"); + case LONG: + if (logicalType instanceof LogicalTypes.TimeMicros) { + // SQL types TIME return formatTime((Long) v); + } else if (logicalType instanceof LogicalTypes.TimestampMicros) { + // SQL types TIMESTAMP + return formatTimestamp((Long) v); } else { - verify(v instanceof CharSequence, "Expected CharSequence (String), got %s", v.getClass()); - return v.toString(); + // SQL types INT64 (INT, SMALLINT, INTEGER, BIGINT, TINYINT, BYTEINT) + return ((Long) v).toString(); Review Comment: What happens if we keep the long? would it be a breaking change? ########## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java: ########## @@ -273,145 +331,102 @@ private static List<Object> convertRepeatedField( @SuppressWarnings("unchecked") List<Object> elements = (List<Object>) v; ArrayList<Object> values = new ArrayList<>(); - Type elementType = schema.getElementType().getType(); - LogicalType elementLogicalType = schema.getElementType().getLogicalType(); for (Object element : elements) { - values.add(convertRequiredField(elementType, elementLogicalType, fieldSchema, element)); + values.add(convertRequiredField(name, elementType, element)); } return values; } - private static Object convertRequiredField( - Type avroType, LogicalType avroLogicalType, TableFieldSchema fieldSchema, Object v) { + private static Object convertRequiredField(String name, Schema schema, Object v) { // REQUIRED fields are represented as the corresponding Avro types. For example, a BigQuery // INTEGER type maps to an Avro LONG type. - checkNotNull(v, "REQUIRED field %s should not be null", fieldSchema.getName()); - // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, the type field - // is required, so it may not be null. - String bqType = fieldSchema.getType(); - ImmutableCollection<Type> expectedAvroTypes = BIG_QUERY_TO_AVRO_TYPES.get(bqType); - verifyNotNull(expectedAvroTypes, "Unsupported BigQuery type: %s", bqType); - verify( - expectedAvroTypes.contains(avroType), - "Expected Avro schema types %s for BigQuery %s field %s, but received %s", - expectedAvroTypes, - bqType, - fieldSchema.getName(), - avroType); - // For historical reasons, don't validate avroLogicalType except for with NUMERIC. - // BigQuery represents NUMERIC in Avro format as BYTES with a DECIMAL logical type. - switch (bqType) { - case "STRING": - case "DATETIME": - case "GEOGRAPHY": - case "JSON": - // Avro will use a CharSequence to represent String objects, but it may not always use - // java.lang.String; for example, it may prefer org.apache.avro.util.Utf8. - verify(v instanceof CharSequence, "Expected CharSequence (String), got %s", v.getClass()); - return v.toString(); - case "DATE": - if (avroType == Type.INT) { - verify(v instanceof Integer, "Expected Integer, got %s", v.getClass()); - verifyNotNull(avroLogicalType, "Expected Date logical type"); - verify(avroLogicalType instanceof LogicalTypes.Date, "Expected Date logical type"); + checkNotNull(v, "REQUIRED field %s should not be null", name); + + Type type = schema.getType(); + LogicalType logicalType = schema.getLogicalType(); + switch (type) { + case BOOLEAN: + // SQL types BOOL, BOOLEAN + return v; + case INT: + if (logicalType instanceof LogicalTypes.Date) { + // SQL types DATE return formatDate((Integer) v); } else { - verify(v instanceof CharSequence, "Expected CharSequence (String), got %s", v.getClass()); - return v.toString(); + throw new UnsupportedOperationException( + String.format( + "Unexpected BigQuery field schema type %s for field named %s", type, name)); Review Comment: ```suggestion "Unexpected Avro field schema type %s for field named %s", type, name)); ``` Implementation switched to looking at Avro types, error should reflect that too. P.S. consider also mentioning the logical type here, since it caused this error ########## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java: ########## @@ -239,32 +304,25 @@ private static TableRow convertGenericRecordToTableRow( return row; } - private static @Nullable Object getTypedCellValue( - Schema schema, TableFieldSchema fieldSchema, Object v) { + private static @Nullable Object getTypedCellValue(String name, Schema schema, Object v) { // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, the mode field // is optional (and so it may be null), but defaults to "NULLABLE". - String mode = firstNonNull(fieldSchema.getMode(), "NULLABLE"); - switch (mode) { - case "REQUIRED": - return convertRequiredField(schema.getType(), schema.getLogicalType(), fieldSchema, v); - case "REPEATED": - return convertRepeatedField(schema, fieldSchema, v); - case "NULLABLE": - return convertNullableField(schema, fieldSchema, v); - default: + Type type = schema.getType(); + switch (type) { + case ARRAY: + return convertRepeatedField(name, schema.getElementType(), v); + case UNION: + return convertNullableField(name, schema, v); + case MAP: throw new UnsupportedOperationException( - "Parsing a field with BigQuery field schema mode " + fieldSchema.getMode()); + String.format( + "Unexpected BigQuery field schema type %s for field named %s", type, name)); Review Comment: ```suggestion "Unexpected Avro field schema type %s for field named %s", type, name)); ``` Implementation switched to look at Avro types here right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org