This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d8e02e2629 [java] BQ: add missing avro conversions to BQ TableRow 
(#33221)
6d8e02e2629 is described below

commit 6d8e02e262951f26e8a4e835df462a219155bc11
Author: Michel Davit <[email protected]>
AuthorDate: Tue Dec 10 18:36:44 2024 +0100

    [java] BQ: add missing avro conversions to BQ TableRow (#33221)
    
    * [java] BQ: add missing avro conversions to BQ TableRow
    
    Avro float fields can be used to write BQ FLOAT columns.
    Add TableRow conversion for such field.
    
    Adding conversion for aveo 1.10+ logical types local-timestamp-millis
    and local-timestam-micros.
    
    * Rework tests
    
    * Add map and fixed types conversion
    
    * Fix checkstyle
    
    * Use valid parameters
    
    * Test record nullable field
---
 .../sdk/io/gcp/bigquery/BigQueryAvroUtils.java     |  112 ++-
 .../sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java | 1037 ++++++++++++--------
 2 files changed, 746 insertions(+), 403 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
index cddde05b194..1af44ba7a01 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
@@ -34,6 +34,8 @@ import java.time.format.DateTimeFormatterBuilder;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import org.apache.avro.Conversions;
 import org.apache.avro.LogicalType;
@@ -50,14 +52,14 @@ import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.format.DateTimeFormat;
 import org.joda.time.format.DateTimeFormatter;
 
-/**
- * A set of utilities for working with Avro files.
- *
- * <p>These utilities are based on the <a 
href="https://avro.apache.org/docs/1.8.1/spec.html";>Avro
- * 1.8.1</a> specification.
- */
+/** A set of utilities for working with Avro files. */
 class BigQueryAvroUtils {
 
+  private static final String VERSION_AVRO =
+      Optional.ofNullable(Schema.class.getPackage())
+          .map(Package::getImplementationVersion)
+          .orElse("");
+
   // org.apache.avro.LogicalType
   static class DateTimeLogicalType extends LogicalType {
     public DateTimeLogicalType() {
@@ -74,6 +76,8 @@ class BigQueryAvroUtils {
    *     export</a>
    * @see <a 
href=https://cloud.google.com/bigquery/docs/reference/storage#avro_schema_details>BQ
    *     avro storage</a>
+   * @see <a 
href=https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro>BQ 
avro
+   *     load</a>
    */
   static Schema getPrimitiveType(TableFieldSchema schema, Boolean 
useAvroLogicalTypes) {
     String bqType = schema.getType();
@@ -116,6 +120,9 @@ class BigQueryAvroUtils {
         }
       case "DATETIME":
         if (useAvroLogicalTypes) {
+          // BQ export uses a custom logical type
+          // TODO for load/storage use
+          // LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType())
           return 
DATETIME_LOGICAL_TYPE.addToSchema(SchemaBuilder.builder().stringType());
         } else {
           return SchemaBuilder.builder().stringBuilder().prop("sqlType", 
bqType).endString();
@@ -158,6 +165,12 @@ class BigQueryAvroUtils {
 
   @VisibleForTesting
   static String formatTimestamp(Long timestampMicro) {
+    String dateTime = formatDatetime(timestampMicro);
+    return dateTime + " UTC";
+  }
+
+  @VisibleForTesting
+  static String formatDatetime(Long timestampMicro) {
     // timestampMicro is in "microseconds since epoch" format,
     // e.g., 1452062291123456L means "2016-01-06 06:38:11.123456 UTC".
     // Separate into seconds and microseconds.
@@ -168,11 +181,13 @@ class BigQueryAvroUtils {
       timestampSec -= 1;
     }
     String dayAndTime = DATE_AND_SECONDS_FORMATTER.print(timestampSec * 1000);
-
     if (micros == 0) {
-      return String.format("%s UTC", dayAndTime);
+      return dayAndTime;
+    } else if (micros % 1000 == 0) {
+      return String.format("%s.%03d", dayAndTime, micros / 1000);
+    } else {
+      return String.format("%s.%06d", dayAndTime, micros);
     }
-    return String.format("%s.%06d UTC", dayAndTime, micros);
   }
 
   /**
@@ -274,8 +289,7 @@ class BigQueryAvroUtils {
       case UNION:
         return convertNullableField(name, schema, v);
       case MAP:
-        throw new UnsupportedOperationException(
-            String.format("Unexpected Avro field schema type %s for field 
named %s", type, name));
+        return convertMapField(name, schema, v);
       default:
         return convertRequiredField(name, schema, v);
     }
@@ -296,6 +310,26 @@ class BigQueryAvroUtils {
     return values;
   }
 
+  private static List<TableRow> convertMapField(String name, Schema map, 
Object v) {
+    // Avro maps are represented as key/value RECORD.
+    if (v == null) {
+      // Handle the case of an empty map.
+      return new ArrayList<>();
+    }
+
+    Schema type = map.getValueType();
+    Map<String, Object> elements = (Map<String, Object>) v;
+    ArrayList<TableRow> values = new ArrayList<>();
+    for (Map.Entry<String, Object> element : elements.entrySet()) {
+      TableRow row =
+          new TableRow()
+              .set("key", element.getKey())
+              .set("value", convertRequiredField(name, type, 
element.getValue()));
+      values.add(row);
+    }
+    return values;
+  }
+
   private static Object convertRequiredField(String name, Schema schema, 
Object v) {
     // REQUIRED fields are represented as the corresponding Avro types. For 
example, a BigQuery
     // INTEGER type maps to an Avro LONG type.
@@ -305,45 +339,83 @@ class BigQueryAvroUtils {
     LogicalType logicalType = schema.getLogicalType();
     switch (type) {
       case BOOLEAN:
-        // SQL types BOOL, BOOLEAN
+        // SQL type BOOL (BOOLEAN)
         return v;
       case INT:
         if (logicalType instanceof LogicalTypes.Date) {
-          // SQL types DATE
+          // SQL type DATE
+          // ideally LocalDate but TableRowJsonCoder encodes as String
           return formatDate((Integer) v);
+        } else if (logicalType instanceof LogicalTypes.TimeMillis) {
+          // Write only: SQL type TIME
+          // ideally LocalTime but TableRowJsonCoder encodes as String
+          return formatTime(((Integer) v) * 1000L);
         } else {
-          throw new UnsupportedOperationException(
-              String.format("Unexpected Avro field schema type %s for field 
named %s", type, name));
+          // Write only: SQL type INT64 (INT, SMALLINT, INTEGER, BIGINT, 
TINYINT, BYTEINT)
+          // ideally Integer but keep consistency with BQ JSON export that 
uses String
+          return ((Integer) v).toString();
         }
       case LONG:
         if (logicalType instanceof LogicalTypes.TimeMicros) {
-          // SQL types TIME
+          // SQL type TIME
+          // ideally LocalTime but TableRowJsonCoder encodes as String
           return formatTime((Long) v);
+        } else if (logicalType instanceof LogicalTypes.TimestampMillis) {
+          // Write only: SQL type TIMESTAMP
+          // ideally Instant but TableRowJsonCoder encodes as String
+          return formatTimestamp((Long) v * 1000L);
         } else if (logicalType instanceof LogicalTypes.TimestampMicros) {
-          // SQL types TIMESTAMP
+          // SQL type TIMESTAMP
+          // ideally Instant but TableRowJsonCoder encodes as String
           return formatTimestamp((Long) v);
+        } else if (!(VERSION_AVRO.startsWith("1.8") || 
VERSION_AVRO.startsWith("1.9"))
+            && logicalType instanceof LogicalTypes.LocalTimestampMillis) {
+          // Write only: SQL type DATETIME
+          // ideally LocalDateTime but TableRowJsonCoder encodes as String
+          return formatDatetime(((Long) v) * 1000);
+        } else if (!(VERSION_AVRO.startsWith("1.8") || 
VERSION_AVRO.startsWith("1.9"))
+            && logicalType instanceof LogicalTypes.LocalTimestampMicros) {
+          // Write only: SQL type DATETIME
+          // ideally LocalDateTime but TableRowJsonCoder encodes as String
+          return formatDatetime((Long) v);
         } else {
-          // SQL types INT64 (INT, SMALLINT, INTEGER, BIGINT, TINYINT, BYTEINT)
+          // SQL type INT64 (INT, SMALLINT, INTEGER, BIGINT, TINYINT, BYTEINT)
+          // ideally Long if in [2^53+1, 2^53-1] but keep consistency with BQ 
JSON export that uses
+          // String
           return ((Long) v).toString();
         }
+      case FLOAT:
+        // Write only: SQL type FLOAT64
+        // ideally Float but TableRowJsonCoder decodes as Double
+        return Double.valueOf(v.toString());
       case DOUBLE:
-        // SQL types FLOAT64
+        // SQL type FLOAT64
         return v;
       case BYTES:
         if (logicalType instanceof LogicalTypes.Decimal) {
           // SQL tpe NUMERIC, BIGNUMERIC
+          // ideally BigDecimal but TableRowJsonCoder encodes as String
           return new Conversions.DecimalConversion()
               .fromBytes((ByteBuffer) v, schema, logicalType)
               .toString();
         } else {
-          // SQL types BYTES
+          // SQL type BYTES
+          // ideally byte[] but TableRowJsonCoder encodes as String
           return BaseEncoding.base64().encode(((ByteBuffer) v).array());
         }
       case STRING:
         // SQL types STRING, DATETIME, GEOGRAPHY, JSON
         // when not using logical type DATE, TIME too
         return v.toString();
+      case ENUM:
+        // SQL types STRING
+        return v.toString();
+      case FIXED:
+        // SQL type BYTES
+        // ideally byte[] but TableRowJsonCoder encodes as String
+        return BaseEncoding.base64().encode(((ByteBuffer) v).array());
       case RECORD:
+        // SQL types RECORD
         return convertGenericRecordToTableRow((GenericRecord) v);
       default:
         throw new UnsupportedOperationException(
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java
index 662f2658eb6..2333278a11f 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java
@@ -28,23 +28,23 @@ import com.google.api.services.bigquery.model.TableSchema;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
-import java.time.Instant;
-import java.util.ArrayList;
-import java.util.List;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneOffset;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Function;
 import org.apache.avro.Conversions;
 import org.apache.avro.LogicalType;
 import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
-import org.apache.avro.Schema.Field;
-import org.apache.avro.Schema.Type;
+import org.apache.avro.SchemaBuilder;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.reflect.AvroSchema;
-import org.apache.avro.reflect.Nullable;
-import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.generic.GenericRecordBuilder;
 import org.apache.avro.util.Utf8;
-import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
-import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.BaseEncoding;
 import org.junit.Test;
@@ -54,363 +54,678 @@ import org.junit.runners.JUnit4;
 /** Tests for {@link BigQueryAvroUtils}. */
 @RunWith(JUnit4.class)
 public class BigQueryAvroUtilsTest {
-  private List<TableFieldSchema> subFields =
-      Lists.newArrayList(
-          new 
TableFieldSchema().setName("species").setType("STRING").setMode("NULLABLE"));
-  /*
-   * Note that the quality and quantity fields do not have their mode set, so 
they should default
-   * to NULLABLE. This is an important test of BigQuery semantics.
-   *
-   * All the other fields we set in this function are required on the Schema 
response.
-   *
-   * See https://cloud.google.com/bigquery/docs/reference/v2/tables#schema
-   */
-  private List<TableFieldSchema> fields =
-      Lists.newArrayList(
-          new 
TableFieldSchema().setName("number").setType("INTEGER").setMode("REQUIRED"),
-          new 
TableFieldSchema().setName("species").setType("STRING").setMode("NULLABLE"),
-          new TableFieldSchema().setName("quality").setType("FLOAT") /* 
default to NULLABLE */,
-          new TableFieldSchema().setName("quantity").setType("INTEGER") /* 
default to NULLABLE */,
-          new 
TableFieldSchema().setName("birthday").setType("TIMESTAMP").setMode("NULLABLE"),
-          new 
TableFieldSchema().setName("birthdayMoney").setType("NUMERIC").setMode("NULLABLE"),
-          new TableFieldSchema()
-              .setName("lotteryWinnings")
-              .setType("BIGNUMERIC")
-              .setMode("NULLABLE"),
-          new 
TableFieldSchema().setName("flighted").setType("BOOLEAN").setMode("NULLABLE"),
-          new 
TableFieldSchema().setName("sound").setType("BYTES").setMode("NULLABLE"),
-          new 
TableFieldSchema().setName("anniversaryDate").setType("DATE").setMode("NULLABLE"),
-          new TableFieldSchema()
-              .setName("anniversaryDatetime")
-              .setType("DATETIME")
-              .setMode("NULLABLE"),
-          new 
TableFieldSchema().setName("anniversaryTime").setType("TIME").setMode("NULLABLE"),
-          new TableFieldSchema()
-              .setName("scion")
-              .setType("RECORD")
-              .setMode("NULLABLE")
-              .setFields(subFields),
-          new TableFieldSchema()
-              .setName("associates")
-              .setType("RECORD")
-              .setMode("REPEATED")
-              .setFields(subFields),
-          new 
TableFieldSchema().setName("geoPositions").setType("GEOGRAPHY").setMode("NULLABLE"));
-
-  private ByteBuffer convertToBytes(BigDecimal bigDecimal, int precision, int 
scale) {
-    LogicalType bigDecimalLogicalType = LogicalTypes.decimal(precision, scale);
-    return new Conversions.DecimalConversion().toBytes(bigDecimal, null, 
bigDecimalLogicalType);
+
+  private TableSchema tableSchema(Function<TableFieldSchema, TableFieldSchema> 
fn) {
+    TableFieldSchema column = new TableFieldSchema().setName("value");
+    TableSchema tableSchema = new TableSchema();
+    tableSchema.setFields(Lists.newArrayList(fn.apply(column)));
+    return tableSchema;
+  }
+
+  private Schema avroSchema(
+      Function<SchemaBuilder.FieldBuilder<Schema>, 
SchemaBuilder.FieldAssembler<Schema>> fn) {
+    return fn.apply(
+            SchemaBuilder.record("root")
+                .namespace("org.apache.beam.sdk.io.gcp.bigquery")
+                .doc("Translated Avro Schema for root")
+                .fields()
+                .name("value"))
+        .endRecord();
   }
 
+  @SuppressWarnings("JavaInstantGetSecondsGetNano")
   @Test
-  public void testConvertGenericRecordToTableRow() throws Exception {
-    BigDecimal numeric = new BigDecimal("123456789.123456789");
-    ByteBuffer numericBytes = convertToBytes(numeric, 38, 9);
-    BigDecimal bigNumeric =
-        new BigDecimal(
-            
"578960446186580977117854925043439539266.34992332820282019728792003956564819967");
-    ByteBuffer bigNumericBytes = convertToBytes(bigNumeric, 77, 38);
-    Schema avroSchema = ReflectData.get().getSchema(Bird.class);
-
-    {
-      // Test nullable fields.
-      GenericRecord record = new GenericData.Record(avroSchema);
-      record.put("number", 5L);
-      TableRow convertedRow = 
BigQueryAvroUtils.convertGenericRecordToTableRow(record);
-      TableRow row = new TableRow().set("number", "5").set("associates", new 
ArrayList<TableRow>());
-      assertEquals(row, convertedRow);
-      TableRow clonedRow = convertedRow.clone();
-      assertEquals(convertedRow, clonedRow);
-    }
-    {
-      // Test type conversion for:
-      // INTEGER, FLOAT, NUMERIC, TIMESTAMP, BOOLEAN, BYTES, DATE, DATETIME, 
TIME.
-      GenericRecord record = new GenericData.Record(avroSchema);
-      byte[] soundBytes = "chirp,chirp".getBytes(StandardCharsets.UTF_8);
-      ByteBuffer soundByteBuffer = ByteBuffer.wrap(soundBytes);
-      soundByteBuffer.rewind();
-      record.put("number", 5L);
-      record.put("quality", 5.0);
-      record.put("birthday", 5L);
-      record.put("birthdayMoney", numericBytes);
-      record.put("lotteryWinnings", bigNumericBytes);
-      record.put("flighted", Boolean.TRUE);
-      record.put("sound", soundByteBuffer);
-      record.put("anniversaryDate", new Utf8("2000-01-01"));
-      record.put("anniversaryDatetime", new String("2000-01-01 
00:00:00.000005"));
-      record.put("anniversaryTime", new Utf8("00:00:00.000005"));
-      record.put("geoPositions", new String("LINESTRING(1 2, 3 4, 5 6, 7 8)"));
-      TableRow convertedRow = 
BigQueryAvroUtils.convertGenericRecordToTableRow(record);
-      TableRow row =
-          new TableRow()
-              .set("number", "5")
-              .set("birthday", "1970-01-01 00:00:00.000005 UTC")
-              .set("birthdayMoney", numeric.toString())
-              .set("lotteryWinnings", bigNumeric.toString())
-              .set("quality", 5.0)
-              .set("associates", new ArrayList<TableRow>())
-              .set("flighted", Boolean.TRUE)
-              .set("sound", BaseEncoding.base64().encode(soundBytes))
-              .set("anniversaryDate", "2000-01-01")
-              .set("anniversaryDatetime", "2000-01-01 00:00:00.000005")
-              .set("anniversaryTime", "00:00:00.000005")
-              .set("geoPositions", "LINESTRING(1 2, 3 4, 5 6, 7 8)");
-      TableRow clonedRow = convertedRow.clone();
-      assertEquals(convertedRow, clonedRow);
-      assertEquals(row, convertedRow);
-    }
-    {
-      // Test repeated fields.
-      Schema subBirdSchema = AvroCoder.of(Bird.SubBird.class).getSchema();
-      GenericRecord nestedRecord = new GenericData.Record(subBirdSchema);
-      nestedRecord.put("species", "other");
-      GenericRecord record = new GenericData.Record(avroSchema);
-      record.put("number", 5L);
-      record.put("associates", Lists.newArrayList(nestedRecord));
-      record.put("birthdayMoney", numericBytes);
-      record.put("lotteryWinnings", bigNumericBytes);
-      TableRow convertedRow = 
BigQueryAvroUtils.convertGenericRecordToTableRow(record);
-      TableRow row =
+  public void testConvertGenericRecordToTableRow() {
+    {
+      // bool
+      GenericRecord record =
+          new GenericRecordBuilder(avroSchema(f -> 
f.type().booleanType().noDefault()))
+              .set("value", false)
+              .build();
+      TableRow expected = new TableRow().set("value", false);
+      TableRow row = BigQueryAvroUtils.convertGenericRecordToTableRow(record);
+
+      assertEquals(expected, row);
+      assertEquals(expected, row.clone());
+    }
+
+    {
+      // int
+      GenericRecord record =
+          new GenericRecordBuilder(avroSchema(f -> 
f.type().intType().noDefault()))
+              .set("value", 5)
+              .build();
+      TableRow expected = new TableRow().set("value", "5");
+      TableRow row = BigQueryAvroUtils.convertGenericRecordToTableRow(record);
+
+      assertEquals(expected, row);
+      assertEquals(expected, row.clone());
+    }
+
+    {
+      // long
+      GenericRecord record =
+          new GenericRecordBuilder(avroSchema(f -> 
f.type().longType().noDefault()))
+              .set("value", 5L)
+              .build();
+      TableRow expected = new TableRow().set("value", "5");
+      TableRow row = BigQueryAvroUtils.convertGenericRecordToTableRow(record);
+
+      assertEquals(expected, row);
+      assertEquals(expected, row.clone());
+    }
+
+    {
+      // float
+      GenericRecord record =
+          new GenericRecordBuilder(avroSchema(f -> 
f.type().floatType().noDefault()))
+              .set("value", 5.5f)
+              .build();
+      TableRow expected = new TableRow().set("value", 5.5);
+      TableRow row = BigQueryAvroUtils.convertGenericRecordToTableRow(record);
+
+      assertEquals(expected, row);
+      assertEquals(expected, row.clone());
+    }
+
+    {
+      // double
+      GenericRecord record =
+          new GenericRecordBuilder(avroSchema(f -> 
f.type().doubleType().noDefault()))
+              .set("value", 5.55)
+              .build();
+      TableRow expected = new TableRow().set("value", 5.55);
+      TableRow row = BigQueryAvroUtils.convertGenericRecordToTableRow(record);
+
+      assertEquals(expected, row);
+      assertEquals(expected, row.clone());
+    }
+
+    {
+      // bytes
+      byte[] bytes = "chirp,chirp".getBytes(StandardCharsets.UTF_8);
+      ByteBuffer bb = ByteBuffer.wrap(bytes);
+      GenericRecord record =
+          new GenericRecordBuilder(avroSchema(f -> 
f.type().bytesType().noDefault()))
+              .set("value", bb)
+              .build();
+      TableRow expected = new TableRow().set("value", 
BaseEncoding.base64().encode(bytes));
+      TableRow row = BigQueryAvroUtils.convertGenericRecordToTableRow(record);
+
+      assertEquals(expected, row);
+      assertEquals(expected, row.clone());
+    }
+
+    {
+      // string
+      Schema schema = avroSchema(f -> f.type().stringType().noDefault());
+      GenericRecord record = new GenericRecordBuilder(schema).set("value", 
"test").build();
+      GenericRecord utf8Record =
+          new GenericRecordBuilder(schema).set("value", new 
Utf8("test")).build();
+      TableRow expected = new TableRow().set("value", "test");
+      TableRow row = BigQueryAvroUtils.convertGenericRecordToTableRow(record);
+      TableRow utf8Row = 
BigQueryAvroUtils.convertGenericRecordToTableRow(utf8Record);
+
+      assertEquals(expected, row);
+      assertEquals(expected, row.clone());
+      assertEquals(expected, utf8Row);
+      assertEquals(expected, utf8Row.clone());
+    }
+
+    {
+      // decimal
+      LogicalType lt = LogicalTypes.decimal(38, 9);
+      Schema decimalType = lt.addToSchema(SchemaBuilder.builder().bytesType());
+      BigDecimal bd = new BigDecimal("123456789.123456789");
+      ByteBuffer bytes = new Conversions.DecimalConversion().toBytes(bd, null, 
lt);
+      GenericRecord record =
+          new GenericRecordBuilder(avroSchema(f -> 
f.type(decimalType).noDefault()))
+              .set("value", bytes)
+              .build();
+      TableRow expected = new TableRow().set("value", bd.toString());
+      TableRow row = BigQueryAvroUtils.convertGenericRecordToTableRow(record);
+
+      assertEquals(expected, row);
+      assertEquals(expected, row.clone());
+    }
+
+    {
+      // date
+      LogicalType lt = LogicalTypes.date();
+      Schema dateType = lt.addToSchema(SchemaBuilder.builder().intType());
+      LocalDate date = LocalDate.of(2000, 1, 1);
+      int days = (int) date.toEpochDay();
+      GenericRecord record =
+          new GenericRecordBuilder(avroSchema(f -> 
f.type(dateType).noDefault()))
+              .set("value", days)
+              .build();
+      TableRow expected = new TableRow().set("value", "2000-01-01");
+      TableRow row = BigQueryAvroUtils.convertGenericRecordToTableRow(record);
+
+      assertEquals(expected, row);
+      assertEquals(expected, row.clone());
+    }
+
+    {
+      // time-millis
+      LogicalType lt = LogicalTypes.timeMillis();
+      Schema timeType = lt.addToSchema(SchemaBuilder.builder().intType());
+      LocalTime time = LocalTime.of(1, 2, 3, 123456789);
+      int millis = (int) (time.toNanoOfDay() / 1000000);
+      GenericRecord record =
+          new GenericRecordBuilder(avroSchema(f -> 
f.type(timeType).noDefault()))
+              .set("value", millis)
+              .build();
+      TableRow expected = new TableRow().set("value", "01:02:03.123");
+      TableRow row = BigQueryAvroUtils.convertGenericRecordToTableRow(record);
+
+      assertEquals(expected, row);
+      assertEquals(expected, row.clone());
+    }
+
+    {
+      // time-micros
+      LogicalType lt = LogicalTypes.timeMicros();
+      Schema timeType = lt.addToSchema(SchemaBuilder.builder().longType());
+      LocalTime time = LocalTime.of(1, 2, 3, 123456789);
+      long micros = time.toNanoOfDay() / 1000;
+      GenericRecord record =
+          new GenericRecordBuilder(avroSchema(f -> 
f.type(timeType).noDefault()))
+              .set("value", micros)
+              .build();
+      TableRow expected = new TableRow().set("value", "01:02:03.123456");
+      TableRow row = BigQueryAvroUtils.convertGenericRecordToTableRow(record);
+
+      assertEquals(expected, row);
+      assertEquals(expected, row.clone());
+    }
+
+    {
+      // local-timestamp-millis
+      LogicalType lt = LogicalTypes.localTimestampMillis();
+      Schema timestampType = 
lt.addToSchema(SchemaBuilder.builder().longType());
+      LocalDate date = LocalDate.of(2000, 1, 1);
+      LocalTime time = LocalTime.of(1, 2, 3, 123456789);
+      LocalDateTime ts = LocalDateTime.of(date, time);
+      long millis = ts.toInstant(ZoneOffset.UTC).toEpochMilli();
+      GenericRecord record =
+          new GenericRecordBuilder(avroSchema(f -> 
f.type(timestampType).noDefault()))
+              .set("value", millis)
+              .build();
+      TableRow expected = new TableRow().set("value", "2000-01-01 
01:02:03.123");
+      TableRow row = BigQueryAvroUtils.convertGenericRecordToTableRow(record);
+
+      assertEquals(expected, row);
+      assertEquals(expected, row.clone());
+    }
+
+    {
+      // local-timestamp-micros
+      LogicalType lt = LogicalTypes.localTimestampMicros();
+      Schema timestampType = 
lt.addToSchema(SchemaBuilder.builder().longType());
+      LocalDate date = LocalDate.of(2000, 1, 1);
+      LocalTime time = LocalTime.of(1, 2, 3, 123456789);
+      LocalDateTime ts = LocalDateTime.of(date, time);
+      long seconds = ts.toInstant(ZoneOffset.UTC).getEpochSecond();
+      int nanos = ts.toInstant(ZoneOffset.UTC).getNano();
+      long micros = seconds * 1000000 + (nanos / 1000);
+      GenericRecord record =
+          new GenericRecordBuilder(avroSchema(f -> 
f.type(timestampType).noDefault()))
+              .set("value", micros)
+              .build();
+      TableRow expected = new TableRow().set("value", "2000-01-01 
01:02:03.123456");
+      TableRow row = BigQueryAvroUtils.convertGenericRecordToTableRow(record);
+
+      assertEquals(expected, row);
+      assertEquals(expected, row.clone());
+    }
+
+    {
+      // timestamp-micros
+      LogicalType lt = LogicalTypes.timestampMillis();
+      Schema timestampType = 
lt.addToSchema(SchemaBuilder.builder().longType());
+      LocalDate date = LocalDate.of(2000, 1, 1);
+      LocalTime time = LocalTime.of(1, 2, 3, 123456789);
+      LocalDateTime ts = LocalDateTime.of(date, time);
+      long millis = ts.toInstant(ZoneOffset.UTC).toEpochMilli();
+      GenericRecord record =
+          new GenericRecordBuilder(avroSchema(f -> 
f.type(timestampType).noDefault()))
+              .set("value", millis)
+              .build();
+      TableRow expected = new TableRow().set("value", "2000-01-01 01:02:03.123 
UTC");
+      TableRow row = BigQueryAvroUtils.convertGenericRecordToTableRow(record);
+
+      assertEquals(expected, row);
+      assertEquals(expected, row.clone());
+    }
+
+    {
+      // timestamp-millis
+      LogicalType lt = LogicalTypes.timestampMicros();
+      Schema timestampType = 
lt.addToSchema(SchemaBuilder.builder().longType());
+      LocalDate date = LocalDate.of(2000, 1, 1);
+      LocalTime time = LocalTime.of(1, 2, 3, 123456789);
+      LocalDateTime ts = LocalDateTime.of(date, time);
+      long seconds = ts.toInstant(ZoneOffset.UTC).getEpochSecond();
+      int nanos = ts.toInstant(ZoneOffset.UTC).getNano();
+      long micros = seconds * 1000000 + (nanos / 1000);
+      GenericRecord record =
+          new GenericRecordBuilder(avroSchema(f -> 
f.type(timestampType).noDefault()))
+              .set("value", micros)
+              .build();
+      TableRow expected = new TableRow().set("value", "2000-01-01 
01:02:03.123456 UTC");
+      TableRow row = BigQueryAvroUtils.convertGenericRecordToTableRow(record);
+
+      assertEquals(expected, row);
+      assertEquals(expected, row.clone());
+    }
+
+    {
+      // enum
+      Schema enumSchema = SchemaBuilder.enumeration("color").symbols("red", 
"green", "blue");
+      GenericData.EnumSymbol symbol = new GenericData.EnumSymbol(enumSchema, 
"RED");
+      GenericRecord record =
+          new GenericRecordBuilder(avroSchema(f -> 
f.type(enumSchema).noDefault()))
+              .set("value", symbol)
+              .build();
+      TableRow expected = new TableRow().set("value", "RED");
+      TableRow row = BigQueryAvroUtils.convertGenericRecordToTableRow(record);
+
+      assertEquals(expected, row);
+      assertEquals(expected, row.clone());
+    }
+
+    {
+      // fixed
+      UUID uuid = UUID.randomUUID();
+      ByteBuffer bb = ByteBuffer.allocate(16);
+      bb.putLong(uuid.getMostSignificantBits());
+      bb.putLong(uuid.getLeastSignificantBits());
+      bb.rewind();
+      byte[] bytes = bb.array();
+      GenericRecord record =
+          new GenericRecordBuilder(avroSchema(f -> 
f.type().fixed("uuid").size(16).noDefault()))
+              .set("value", bb)
+              .build();
+      TableRow expected = new TableRow().set("value", 
BaseEncoding.base64().encode(bytes));
+      TableRow row = BigQueryAvroUtils.convertGenericRecordToTableRow(record);
+
+      assertEquals(expected, row);
+      assertEquals(expected, row.clone());
+    }
+
+    {
+      // null
+      GenericRecord record =
+          new GenericRecordBuilder(avroSchema(f -> 
f.type().optional().booleanType())).build();
+      TableRow expected = new TableRow();
+      TableRow row = BigQueryAvroUtils.convertGenericRecordToTableRow(record);
+
+      assertEquals(expected, row);
+      assertEquals(expected, row.clone());
+    }
+
+    {
+      // array
+      GenericRecord record =
+          new GenericRecordBuilder(
+                  avroSchema(f -> 
f.type().array().items().booleanType().noDefault()))
+              .set("value", Lists.newArrayList(true, false))
+              .build();
+      TableRow expected = new TableRow().set("value", Lists.newArrayList(true, 
false));
+      TableRow row = BigQueryAvroUtils.convertGenericRecordToTableRow(record);
+
+      assertEquals(expected, row);
+      assertEquals(expected, row.clone());
+    }
+
+    {
+      // map
+      Map<String, Integer> map = new HashMap<>();
+      map.put("left", 1);
+      map.put("right", -1);
+      GenericRecord record =
+          new GenericRecordBuilder(avroSchema(f -> 
f.type().map().values().intType().noDefault()))
+              .set("value", map)
+              .build();
+      TableRow expected =
           new TableRow()
-              .set("associates", Lists.newArrayList(new 
TableRow().set("species", "other")))
-              .set("number", "5")
-              .set("birthdayMoney", numeric.toString())
-              .set("lotteryWinnings", bigNumeric.toString());
-      assertEquals(row, convertedRow);
-      TableRow clonedRow = convertedRow.clone();
-      assertEquals(convertedRow, clonedRow);
+              .set(
+                  "value",
+                  Lists.newArrayList(
+                      new TableRow().set("key", "left").set("value", "1"),
+                      new TableRow().set("key", "right").set("value", "-1")));
+      TableRow row = BigQueryAvroUtils.convertGenericRecordToTableRow(record);
+
+      assertEquals(expected, row);
+      assertEquals(expected, row.clone());
+    }
+
+    {
+      // record
+      Schema subSchema =
+          SchemaBuilder.builder()
+              .record("record")
+              .fields()
+              .name("int")
+              .type()
+              .intType()
+              .noDefault()
+              .name("float")
+              .type()
+              .floatType()
+              .noDefault()
+              .endRecord();
+      GenericRecord subRecord =
+          new GenericRecordBuilder(subSchema).set("int", 5).set("float", 
5.5f).build();
+      GenericRecord record =
+          new GenericRecordBuilder(avroSchema(f -> 
f.type(subSchema).noDefault()))
+              .set("value", subRecord)
+              .build();
+      TableRow expected =
+          new TableRow().set("value", new TableRow().set("int", 
"5").set("float", 5.5));
+      TableRow row = BigQueryAvroUtils.convertGenericRecordToTableRow(record);
+
+      assertEquals(expected, row);
+      assertEquals(expected, row.clone());
     }
   }
 
   @Test
   public void testConvertBigQuerySchemaToAvroSchema() {
-    TableSchema tableSchema = new TableSchema();
-    tableSchema.setFields(fields);
-    Schema avroSchema = BigQueryAvroUtils.toGenericAvroSchema(tableSchema);
+    {
+      // REQUIRED
+      TableSchema tableSchema = tableSchema(f -> 
f.setType("BOOLEAN").setMode("REQUIRED"));
+      Schema expected = avroSchema(f -> f.type().booleanType().noDefault());
 
-    assertThat(avroSchema.getField("number").schema(), 
equalTo(Schema.create(Type.LONG)));
-    assertThat(
-        avroSchema.getField("species").schema(),
-        equalTo(Schema.createUnion(Schema.create(Type.NULL), 
Schema.create(Type.STRING))));
-    assertThat(
-        avroSchema.getField("quality").schema(),
-        equalTo(Schema.createUnion(Schema.create(Type.NULL), 
Schema.create(Type.DOUBLE))));
-    assertThat(
-        avroSchema.getField("quantity").schema(),
-        equalTo(Schema.createUnion(Schema.create(Type.NULL), 
Schema.create(Type.LONG))));
-    assertThat(
-        avroSchema.getField("birthday").schema(),
-        equalTo(
-            Schema.createUnion(
-                Schema.create(Type.NULL),
-                
LogicalTypes.timestampMicros().addToSchema(Schema.create(Type.LONG)))));
-    assertThat(
-        avroSchema.getField("birthdayMoney").schema(),
-        equalTo(
-            Schema.createUnion(
-                Schema.create(Type.NULL),
-                LogicalTypes.decimal(38, 
9).addToSchema(Schema.create(Type.BYTES)))));
-    assertThat(
-        avroSchema.getField("lotteryWinnings").schema(),
-        equalTo(
-            Schema.createUnion(
-                Schema.create(Type.NULL),
-                LogicalTypes.decimal(77, 
38).addToSchema(Schema.create(Type.BYTES)))));
-    assertThat(
-        avroSchema.getField("flighted").schema(),
-        equalTo(Schema.createUnion(Schema.create(Type.NULL), 
Schema.create(Type.BOOLEAN))));
-    assertThat(
-        avroSchema.getField("sound").schema(),
-        equalTo(Schema.createUnion(Schema.create(Type.NULL), 
Schema.create(Type.BYTES))));
-    Schema dateSchema = Schema.create(Type.INT);
-    LogicalTypes.date().addToSchema(dateSchema);
-    assertThat(
-        avroSchema.getField("anniversaryDate").schema(),
-        equalTo(Schema.createUnion(Schema.create(Type.NULL), dateSchema)));
-    Schema dateTimeSchema = Schema.create(Type.STRING);
-    BigQueryAvroUtils.DATETIME_LOGICAL_TYPE.addToSchema(dateTimeSchema);
-    assertThat(
-        avroSchema.getField("anniversaryDatetime").schema(),
-        equalTo(Schema.createUnion(Schema.create(Type.NULL), dateTimeSchema)));
-    Schema timeSchema = Schema.create(Type.LONG);
-    LogicalTypes.timeMicros().addToSchema(timeSchema);
-    assertThat(
-        avroSchema.getField("anniversaryTime").schema(),
-        equalTo(Schema.createUnion(Schema.create(Type.NULL), timeSchema)));
-    Schema geoSchema = Schema.create(Type.STRING);
-    geoSchema.addProp("sqlType", "GEOGRAPHY");
-    assertThat(
-        avroSchema.getField("geoPositions").schema(),
-        equalTo(Schema.createUnion(Schema.create(Type.NULL), geoSchema)));
-    assertThat(
-        avroSchema.getField("scion").schema(),
-        equalTo(
-            Schema.createUnion(
-                Schema.create(Type.NULL),
-                Schema.createRecord(
-                    "scion",
-                    "Translated Avro Schema for scion",
-                    "org.apache.beam.sdk.io.gcp.bigquery",
-                    false,
-                    ImmutableList.of(
-                        new Field(
-                            "species",
-                            Schema.createUnion(
-                                Schema.create(Type.NULL), 
Schema.create(Type.STRING)),
-                            null,
-                            (Object) null))))));
-    assertThat(
-        avroSchema.getField("associates").schema(),
-        equalTo(
-            Schema.createArray(
-                Schema.createRecord(
-                    "associates",
-                    "Translated Avro Schema for associates",
-                    "org.apache.beam.sdk.io.gcp.bigquery",
-                    false,
-                    ImmutableList.of(
-                        new Field(
-                            "species",
-                            Schema.createUnion(
-                                Schema.create(Type.NULL), 
Schema.create(Type.STRING)),
-                            null,
-                            (Object) null))))));
-  }
+      assertEquals(expected, 
BigQueryAvroUtils.toGenericAvroSchema(tableSchema));
+    }
 
-  @Test
-  public void testConvertBigQuerySchemaToAvroSchemaWithoutLogicalTypes() {
-    TableSchema tableSchema = new TableSchema();
-    tableSchema.setFields(fields);
-    Schema avroSchema = BigQueryAvroUtils.toGenericAvroSchema(tableSchema, 
false);
+    {
+      // NULLABLE
+      TableSchema tableSchema = tableSchema(f -> 
f.setType("BOOLEAN").setMode("NULLABLE"));
+      Schema expected =
+          avroSchema(f -> 
f.type().unionOf().nullType().and().booleanType().endUnion().noDefault());
 
-    assertThat(avroSchema.getField("number").schema(), 
equalTo(Schema.create(Schema.Type.LONG)));
-    assertThat(
-        avroSchema.getField("species").schema(),
-        equalTo(
-            Schema.createUnion(
-                Schema.create(Schema.Type.NULL), 
Schema.create(Schema.Type.STRING))));
-    assertThat(
-        avroSchema.getField("quality").schema(),
-        equalTo(
-            Schema.createUnion(
-                Schema.create(Schema.Type.NULL), 
Schema.create(Schema.Type.DOUBLE))));
-    assertThat(
-        avroSchema.getField("quantity").schema(),
-        equalTo(
-            Schema.createUnion(Schema.create(Schema.Type.NULL), 
Schema.create(Schema.Type.LONG))));
-    assertThat(
-        avroSchema.getField("birthday").schema(),
-        equalTo(
-            Schema.createUnion(
-                Schema.create(Schema.Type.NULL),
-                
LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG)))));
-    assertThat(
-        avroSchema.getField("birthdayMoney").schema(),
-        equalTo(
-            Schema.createUnion(
-                Schema.create(Schema.Type.NULL),
-                LogicalTypes.decimal(38, 
9).addToSchema(Schema.create(Schema.Type.BYTES)))));
-    assertThat(
-        avroSchema.getField("lotteryWinnings").schema(),
-        equalTo(
-            Schema.createUnion(
-                Schema.create(Schema.Type.NULL),
-                LogicalTypes.decimal(77, 
38).addToSchema(Schema.create(Schema.Type.BYTES)))));
-    assertThat(
-        avroSchema.getField("flighted").schema(),
-        equalTo(
-            Schema.createUnion(
-                Schema.create(Schema.Type.NULL), 
Schema.create(Schema.Type.BOOLEAN))));
-    assertThat(
-        avroSchema.getField("sound").schema(),
-        equalTo(
-            Schema.createUnion(Schema.create(Schema.Type.NULL), 
Schema.create(Schema.Type.BYTES))));
-    Schema dateSchema = Schema.create(Schema.Type.STRING);
-    dateSchema.addProp("sqlType", "DATE");
-    assertThat(
-        avroSchema.getField("anniversaryDate").schema(),
-        equalTo(Schema.createUnion(Schema.create(Schema.Type.NULL), 
dateSchema)));
-    Schema dateTimeSchema = Schema.create(Schema.Type.STRING);
-    dateTimeSchema.addProp("sqlType", "DATETIME");
-    assertThat(
-        avroSchema.getField("anniversaryDatetime").schema(),
-        equalTo(Schema.createUnion(Schema.create(Schema.Type.NULL), 
dateTimeSchema)));
-    Schema timeSchema = Schema.create(Schema.Type.STRING);
-    timeSchema.addProp("sqlType", "TIME");
-    assertThat(
-        avroSchema.getField("anniversaryTime").schema(),
-        equalTo(Schema.createUnion(Schema.create(Schema.Type.NULL), 
timeSchema)));
-    Schema geoSchema = Schema.create(Type.STRING);
-    geoSchema.addProp("sqlType", "GEOGRAPHY");
-    assertThat(
-        avroSchema.getField("geoPositions").schema(),
-        equalTo(Schema.createUnion(Schema.create(Schema.Type.NULL), 
geoSchema)));
-    assertThat(
-        avroSchema.getField("scion").schema(),
-        equalTo(
-            Schema.createUnion(
-                Schema.create(Schema.Type.NULL),
-                Schema.createRecord(
-                    "scion",
-                    "Translated Avro Schema for scion",
-                    "org.apache.beam.sdk.io.gcp.bigquery",
-                    false,
-                    ImmutableList.of(
-                        new Schema.Field(
-                            "species",
-                            Schema.createUnion(
-                                Schema.create(Schema.Type.NULL), 
Schema.create(Schema.Type.STRING)),
-                            null,
-                            (Object) null))))));
-    assertThat(
-        avroSchema.getField("associates").schema(),
-        equalTo(
-            Schema.createArray(
-                Schema.createRecord(
-                    "associates",
-                    "Translated Avro Schema for associates",
-                    "org.apache.beam.sdk.io.gcp.bigquery",
-                    false,
-                    ImmutableList.of(
-                        new Schema.Field(
-                            "species",
-                            Schema.createUnion(
-                                Schema.create(Schema.Type.NULL), 
Schema.create(Schema.Type.STRING)),
-                            null,
-                            (Object) null))))));
+      assertEquals(expected, 
BigQueryAvroUtils.toGenericAvroSchema(tableSchema));
+    }
+
+    {
+      // default mode -> NULLABLE
+      TableSchema tableSchema = tableSchema(f -> f.setType("BOOLEAN"));
+      Schema expected =
+          avroSchema(f -> 
f.type().unionOf().nullType().and().booleanType().endUnion().noDefault());
+
+      assertEquals(expected, 
BigQueryAvroUtils.toGenericAvroSchema(tableSchema));
+    }
+
+    {
+      // REPEATED
+      TableSchema tableSchema = tableSchema(f -> 
f.setType("BOOLEAN").setMode("REPEATED"));
+      Schema expected = avroSchema(f -> 
f.type().array().items().booleanType().noDefault());
+
+      assertEquals(expected, 
BigQueryAvroUtils.toGenericAvroSchema(tableSchema));
+    }
+
+    {
+      // INTEGER
+      TableSchema tableSchema = tableSchema(f -> 
f.setType("INTEGER").setMode("REQUIRED"));
+      Schema expected = avroSchema(f -> f.type().longType().noDefault());
+
+      assertEquals(expected, 
BigQueryAvroUtils.toGenericAvroSchema(tableSchema, false));
+      assertEquals(expected, 
BigQueryAvroUtils.toGenericAvroSchema(tableSchema));
+    }
+
+    {
+      // FLOAT
+      TableSchema tableSchema = tableSchema(f -> 
f.setType("FLOAT").setMode("REQUIRED"));
+      Schema expected = avroSchema(f -> f.type().doubleType().noDefault());
+
+      assertEquals(expected, 
BigQueryAvroUtils.toGenericAvroSchema(tableSchema));
+      assertEquals(expected, 
BigQueryAvroUtils.toGenericAvroSchema(tableSchema, false));
+    }
+
+    {
+      // BYTES
+      TableSchema tableSchema = tableSchema(f -> 
f.setType("BYTES").setMode("REQUIRED"));
+      Schema expected = avroSchema(f -> f.type().bytesType().noDefault());
+
+      assertEquals(expected, 
BigQueryAvroUtils.toGenericAvroSchema(tableSchema));
+      assertEquals(expected, 
BigQueryAvroUtils.toGenericAvroSchema(tableSchema, false));
+    }
+
+    {
+      // STRING
+      TableSchema tableSchema = tableSchema(f -> 
f.setType("STRING").setMode("REQUIRED"));
+      Schema expected = avroSchema(f -> f.type().stringType().noDefault());
+
+      assertEquals(expected, 
BigQueryAvroUtils.toGenericAvroSchema(tableSchema));
+      assertEquals(expected, 
BigQueryAvroUtils.toGenericAvroSchema(tableSchema, false));
+    }
+
+    {
+      // NUMERIC
+      TableSchema tableSchema = tableSchema(f -> 
f.setType("NUMERIC").setMode("REQUIRED"));
+      Schema decimalType =
+          LogicalTypes.decimal(38, 
9).addToSchema(SchemaBuilder.builder().bytesType());
+      Schema expected = avroSchema(f -> f.type(decimalType).noDefault());
+
+      assertEquals(expected, 
BigQueryAvroUtils.toGenericAvroSchema(tableSchema));
+      assertEquals(expected, 
BigQueryAvroUtils.toGenericAvroSchema(tableSchema, false));
+    }
+
+    {
+      // NUMERIC with precision
+      TableSchema tableSchema =
+          tableSchema(f -> 
f.setType("NUMERIC").setPrecision(29L).setMode("REQUIRED"));
+      Schema decimalType =
+          LogicalTypes.decimal(29, 
0).addToSchema(SchemaBuilder.builder().bytesType());
+      Schema expected = avroSchema(f -> f.type(decimalType).noDefault());
+
+      assertEquals(expected, 
BigQueryAvroUtils.toGenericAvroSchema(tableSchema));
+      assertEquals(expected, 
BigQueryAvroUtils.toGenericAvroSchema(tableSchema, false));
+    }
+
+    {
+      // NUMERIC with precision and scale
+      TableSchema tableSchema =
+          tableSchema(f -> 
f.setType("NUMERIC").setPrecision(10L).setScale(9L).setMode("REQUIRED"));
+      Schema decimalType =
+          LogicalTypes.decimal(10, 
9).addToSchema(SchemaBuilder.builder().bytesType());
+      Schema expected = avroSchema(f -> f.type(decimalType).noDefault());
+
+      assertEquals(expected, 
BigQueryAvroUtils.toGenericAvroSchema(tableSchema));
+      assertEquals(expected, 
BigQueryAvroUtils.toGenericAvroSchema(tableSchema, false));
+    }
+
+    {
+      // BIGNUMERIC
+      TableSchema tableSchema = tableSchema(f -> 
f.setType("BIGNUMERIC").setMode("REQUIRED"));
+      Schema decimalType =
+          LogicalTypes.decimal(77, 
38).addToSchema(SchemaBuilder.builder().bytesType());
+      Schema expected = avroSchema(f -> f.type(decimalType).noDefault());
+
+      assertEquals(expected, 
BigQueryAvroUtils.toGenericAvroSchema(tableSchema));
+      assertEquals(expected, 
BigQueryAvroUtils.toGenericAvroSchema(tableSchema, false));
+    }
+
+    {
+      // BIGNUMERIC with precision
+      TableSchema tableSchema =
+          tableSchema(f -> 
f.setType("BIGNUMERIC").setPrecision(38L).setMode("REQUIRED"));
+      Schema decimalType =
+          LogicalTypes.decimal(38, 
0).addToSchema(SchemaBuilder.builder().bytesType());
+      Schema expected = avroSchema(f -> f.type(decimalType).noDefault());
+
+      assertEquals(expected, 
BigQueryAvroUtils.toGenericAvroSchema(tableSchema));
+      assertEquals(expected, 
BigQueryAvroUtils.toGenericAvroSchema(tableSchema, false));
+    }
+
+    {
+      // BIGNUMERIC with precision and scale
+      TableSchema tableSchema =
+          tableSchema(
+              f -> 
f.setType("BIGNUMERIC").setPrecision(39L).setScale(38L).setMode("REQUIRED"));
+      Schema decimalType =
+          LogicalTypes.decimal(39, 
38).addToSchema(SchemaBuilder.builder().bytesType());
+      Schema expected = avroSchema(f -> f.type(decimalType).noDefault());
+
+      assertEquals(expected, 
BigQueryAvroUtils.toGenericAvroSchema(tableSchema));
+      assertEquals(expected, 
BigQueryAvroUtils.toGenericAvroSchema(tableSchema, false));
+    }
+
+    {
+      // DATE
+      TableSchema tableSchema = tableSchema(f -> 
f.setType("DATE").setMode("REQUIRED"));
+      Schema dateType = 
LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType());
+      Schema expected = avroSchema(f -> f.type(dateType).noDefault());
+      Schema expectedExport =
+          avroSchema(f -> f.type().stringBuilder().prop("sqlType", 
"DATE").endString().noDefault());
+
+      assertEquals(expected, 
BigQueryAvroUtils.toGenericAvroSchema(tableSchema));
+      assertEquals(expectedExport, 
BigQueryAvroUtils.toGenericAvroSchema(tableSchema, false));
+    }
+
+    {
+      // TIME
+      TableSchema tableSchema = tableSchema(f -> 
f.setType("TIME").setMode("REQUIRED"));
+      Schema timeType = 
LogicalTypes.timeMicros().addToSchema(SchemaBuilder.builder().longType());
+      Schema expected = avroSchema(f -> f.type(timeType).noDefault());
+      Schema expectedExport =
+          avroSchema(f -> f.type().stringBuilder().prop("sqlType", 
"TIME").endString().noDefault());
+
+      assertEquals(expected, 
BigQueryAvroUtils.toGenericAvroSchema(tableSchema));
+      assertEquals(expectedExport, 
BigQueryAvroUtils.toGenericAvroSchema(tableSchema, false));
+    }
+
+    {
+      // DATETIME
+      TableSchema tableSchema = tableSchema(f -> 
f.setType("DATETIME").setMode("REQUIRED"));
+      Schema timeType =
+          
BigQueryAvroUtils.DATETIME_LOGICAL_TYPE.addToSchema(SchemaBuilder.builder().stringType());
+      Schema expected = avroSchema(f -> f.type(timeType).noDefault());
+      Schema expectedExport =
+          avroSchema(
+              f -> f.type().stringBuilder().prop("sqlType", 
"DATETIME").endString().noDefault());
+
+      assertEquals(expected, 
BigQueryAvroUtils.toGenericAvroSchema(tableSchema));
+      assertEquals(expectedExport, 
BigQueryAvroUtils.toGenericAvroSchema(tableSchema, false));
+    }
+
+    {
+      // TIMESTAMP
+      TableSchema tableSchema = tableSchema(f -> 
f.setType("TIMESTAMP").setMode("REQUIRED"));
+      Schema timestampType =
+          
LogicalTypes.timestampMicros().addToSchema(SchemaBuilder.builder().longType());
+      Schema expected = avroSchema(f -> f.type(timestampType).noDefault());
+
+      assertEquals(expected, 
BigQueryAvroUtils.toGenericAvroSchema(tableSchema));
+      assertEquals(expected, 
BigQueryAvroUtils.toGenericAvroSchema(tableSchema, false));
+    }
+
+    {
+      // GEOGRAPHY
+      TableSchema tableSchema = tableSchema(f -> 
f.setType("GEOGRAPHY").setMode("REQUIRED"));
+      Schema expected =
+          avroSchema(
+              f -> f.type().stringBuilder().prop("sqlType", 
"GEOGRAPHY").endString().noDefault());
+
+      assertEquals(expected, 
BigQueryAvroUtils.toGenericAvroSchema(tableSchema));
+      assertEquals(expected, 
BigQueryAvroUtils.toGenericAvroSchema(tableSchema, false));
+    }
+
+    {
+      // JSON
+      TableSchema tableSchema = tableSchema(f -> 
f.setType("JSON").setMode("REQUIRED"));
+      Schema expected =
+          avroSchema(f -> f.type().stringBuilder().prop("sqlType", 
"JSON").endString().noDefault());
+
+      assertEquals(expected, 
BigQueryAvroUtils.toGenericAvroSchema(tableSchema));
+      assertEquals(expected, 
BigQueryAvroUtils.toGenericAvroSchema(tableSchema, false));
+    }
+
+    {
+      // STRUCT/RECORD
+      TableFieldSchema subInteger =
+          new 
TableFieldSchema().setName("int").setType("INTEGER").setMode("NULLABLE");
+      TableFieldSchema subFloat =
+          new 
TableFieldSchema().setName("float").setType("FLOAT").setMode("REQUIRED");
+      TableSchema structTableSchema =
+          tableSchema(
+              f ->
+                  f.setType("STRUCT")
+                      .setMode("REQUIRED")
+                      .setFields(Lists.newArrayList(subInteger, subFloat)));
+      TableSchema recordTableSchema =
+          tableSchema(
+              f ->
+                  f.setType("RECORD")
+                      .setMode("REQUIRED")
+                      .setFields(Lists.newArrayList(subInteger, subFloat)));
+
+      Schema expected =
+          avroSchema(
+              f ->
+                  f.type()
+                      .record("value")
+                      .fields()
+                      .name("int")
+                      .type()
+                      .unionOf()
+                      .nullType()
+                      .and()
+                      .longType()
+                      .endUnion()
+                      .noDefault()
+                      .name("float")
+                      .type()
+                      .doubleType()
+                      .noDefault()
+                      .endRecord()
+                      .noDefault());
+
+      assertEquals(expected, 
BigQueryAvroUtils.toGenericAvroSchema(structTableSchema));
+      assertEquals(expected, 
BigQueryAvroUtils.toGenericAvroSchema(structTableSchema, false));
+      assertEquals(expected, 
BigQueryAvroUtils.toGenericAvroSchema(recordTableSchema));
+      assertEquals(expected, 
BigQueryAvroUtils.toGenericAvroSchema(recordTableSchema, false));
+    }
   }
 
   @Test
   public void testFormatTimestamp() {
-    assertThat(
-        BigQueryAvroUtils.formatTimestamp(1452062291123456L),
-        equalTo("2016-01-06 06:38:11.123456 UTC"));
+    long micros = 1452062291123456L;
+    String expected = "2016-01-06 06:38:11.123456";
+    assertThat(BigQueryAvroUtils.formatDatetime(micros), equalTo(expected));
+    assertThat(BigQueryAvroUtils.formatTimestamp(micros), equalTo(expected + " 
UTC"));
   }
 
   @Test
-  public void testFormatTimestampLeadingZeroesOnMicros() {
-    assertThat(
-        BigQueryAvroUtils.formatTimestamp(1452062291000456L),
-        equalTo("2016-01-06 06:38:11.000456 UTC"));
+  public void testFormatTimestampMillis() {
+    long millis = 1452062291123L;
+    long micros = millis * 1000L;
+    String expected = "2016-01-06 06:38:11.123";
+    assertThat(BigQueryAvroUtils.formatDatetime(micros), equalTo(expected));
+    assertThat(BigQueryAvroUtils.formatTimestamp(micros), equalTo(expected + " 
UTC"));
   }
 
   @Test
-  public void testFormatTimestampTrailingZeroesOnMicros() {
-    assertThat(
-        BigQueryAvroUtils.formatTimestamp(1452062291123000L),
-        equalTo("2016-01-06 06:38:11.123000 UTC"));
+  public void testFormatTimestampSeconds() {
+    long seconds = 1452062291L;
+    long micros = seconds * 1000L * 1000L;
+    String expected = "2016-01-06 06:38:11";
+    assertThat(BigQueryAvroUtils.formatDatetime(micros), equalTo(expected));
+    assertThat(BigQueryAvroUtils.formatTimestamp(micros), equalTo(expected + " 
UTC"));
   }
 
   @Test
   public void testFormatTimestampNegative() {
-    assertThat(BigQueryAvroUtils.formatTimestamp(-1L), equalTo("1969-12-31 
23:59:59.999999 UTC"));
-    assertThat(
-        BigQueryAvroUtils.formatTimestamp(-100_000L), equalTo("1969-12-31 
23:59:59.900000 UTC"));
-    assertThat(BigQueryAvroUtils.formatTimestamp(-1_000_000L), 
equalTo("1969-12-31 23:59:59 UTC"));
+    assertThat(BigQueryAvroUtils.formatDatetime(-1L), equalTo("1969-12-31 
23:59:59.999999"));
+    assertThat(BigQueryAvroUtils.formatDatetime(-100_000L), 
equalTo("1969-12-31 23:59:59.900"));
+    assertThat(BigQueryAvroUtils.formatDatetime(-1_000_000L), 
equalTo("1969-12-31 23:59:59"));
     // No leap seconds before 1972. 477 leap years from 1 through 1969.
     assertThat(
-        BigQueryAvroUtils.formatTimestamp(-(1969L * 365 + 477) * 86400 * 
1_000_000),
-        equalTo("0001-01-01 00:00:00 UTC"));
+        BigQueryAvroUtils.formatDatetime(-(1969L * 365 + 477) * 86400 * 
1_000_000),
+        equalTo("0001-01-01 00:00:00"));
   }
 
   @Test
@@ -501,48 +816,4 @@ public class BigQueryAvroUtilsTest {
     String output = BigQueryAvroUtils.toGenericAvroSchema(schema, 
false).toString();
     assertThat(output.length(), greaterThan(0));
   }
-
-  /** Pojo class used as the record type in tests. */
-  @SuppressWarnings("unused") // Used by Avro reflection.
-  static class Bird {
-    long number;
-    @Nullable String species;
-    @Nullable Double quality;
-    @Nullable Long quantity;
-
-    @AvroSchema(value = "[\"null\", {\"type\": \"long\", \"logicalType\": 
\"timestamp-micros\"}]")
-    Instant birthday;
-
-    @AvroSchema(
-        value =
-            "[\"null\", {\"type\": \"bytes\", \"logicalType\": \"decimal\", 
\"precision\": 38, \"scale\": 9}]")
-    BigDecimal birthdayMoney;
-
-    @AvroSchema(
-        value =
-            "[\"null\", {\"type\": \"bytes\", \"logicalType\": \"decimal\", 
\"precision\": 77, \"scale\": 38}]")
-    BigDecimal lotteryWinnings;
-
-    @AvroSchema(value = "[\"null\", {\"type\": \"string\", \"sqlType\": 
\"GEOGRAPHY\"}]")
-    String geoPositions;
-
-    @Nullable Boolean flighted;
-    @Nullable ByteBuffer sound;
-    @Nullable Utf8 anniversaryDate;
-    @Nullable String anniversaryDatetime;
-    @Nullable Utf8 anniversaryTime;
-    @Nullable SubBird scion;
-    SubBird[] associates;
-
-    static class SubBird {
-      @Nullable String species;
-
-      public SubBird() {}
-    }
-
-    public Bird() {
-      associates = new SubBird[1];
-      associates[0] = new SubBird();
-    }
-  }
 }

Reply via email to