Abacn commented on code in PR #33221:
URL: https://github.com/apache/beam/pull/33221#discussion_r1873541877


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java:
##########
@@ -305,45 +339,83 @@ private static Object convertRequiredField(String name, 
Schema schema, Object v)
     LogicalType logicalType = schema.getLogicalType();
     switch (type) {
       case BOOLEAN:
-        // SQL types BOOL, BOOLEAN
+        // SQL type BOOL (BOOLEAN)
         return v;
       case INT:
         if (logicalType instanceof LogicalTypes.Date) {
-          // SQL types DATE
+          // SQL type DATE
+          // ideally LocalDate but TableRowJsonCoder encodes as String
           return formatDate((Integer) v);
+        } else if (logicalType instanceof LogicalTypes.TimeMillis) {
+          // Write only: SQL type TIME
+          // ideally LocalTime but TableRowJsonCoder encodes as String
+          return formatTime(((Integer) v) * 1000L);
         } else {
-          throw new UnsupportedOperationException(
-              String.format("Unexpected Avro field schema type %s for field 
named %s", type, name));
+          // Write only: SQL type INT64 (INT, SMALLINT, INTEGER, BIGINT, 
TINYINT, BYTEINT)
+          // ideally Integer but keep consistency with BQ JSON export that 
uses String
+          return ((Integer) v).toString();
         }
       case LONG:
         if (logicalType instanceof LogicalTypes.TimeMicros) {
-          // SQL types TIME
+          // SQL type TIME
+          // ideally LocalTime but TableRowJsonCoder encodes as String
           return formatTime((Long) v);
+        } else if (logicalType instanceof LogicalTypes.TimestampMillis) {
+          // Write only: SQL type TIMESTAMP
+          // ideally Instant but TableRowJsonCoder encodes as String
+          return formatTimestamp((Long) v * 1000L);
         } else if (logicalType instanceof LogicalTypes.TimestampMicros) {
-          // SQL types TIMESTAMP
+          // SQL type TIMESTAMP
+          // ideally Instant but TableRowJsonCoder encodes as String
           return formatTimestamp((Long) v);
+        } else if (!(VERSION_AVRO.startsWith("1.8") || 
VERSION_AVRO.startsWith("1.9"))
+            && logicalType instanceof LogicalTypes.LocalTimestampMillis) {
+          // Write only: SQL type DATETIME
+          // ideally LocalDateTime but TableRowJsonCoder encodes as String
+          return formatDatetime(((Long) v) * 1000);
+        } else if (!(VERSION_AVRO.startsWith("1.8") || 
VERSION_AVRO.startsWith("1.9"))
+            && logicalType instanceof LogicalTypes.LocalTimestampMicros) {
+          // Write only: SQL type DATETIME
+          // ideally LocalDateTime but TableRowJsonCoder encodes as String
+          return formatDatetime((Long) v);
         } else {
-          // SQL types INT64 (INT, SMALLINT, INTEGER, BIGINT, TINYINT, BYTEINT)
+          // SQL type INT64 (INT, SMALLINT, INTEGER, BIGINT, TINYINT, BYTEINT)
+          // ideally Long if in [2^53+1, 2^53-1] but keep consistency with BQ 
JSON export that uses
+          // String
           return ((Long) v).toString();
         }
+      case FLOAT:

Review Comment:
   Is it correct that this piece resolves #33197 ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to