This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new ce76f81fbbc Add timestamp-nanos avro logical type support in bigquery 
avro utils. (#36892)
ce76f81fbbc is described below

commit ce76f81fbbc7f58d20c44a1f36121ba591bed866
Author: claudevdm <[email protected]>
AuthorDate: Wed Dec 3 14:39:35 2025 -0500

    Add timestamp-nanos avro logical type support in bigquery avro utils. 
(#36892)
    
    * Add timestamp-nanos avro logical type support in bigquery utils.
    
    * Lint.
    
    * Avoid overflows for millis/micros.
    
    * Remove println
    
    * Comments.
    
    * Comments
    
    * Comments.
    
    ---------
    
    Co-authored-by: Claude <[email protected]>
---
 .../beam/sdk/schemas/logicaltypes/Timestamp.java   |   1 -
 .../extensions/avro/schemas/utils/AvroUtils.java   |  46 ++++++
 .../avro/schemas/utils/AvroUtilsTest.java          |  83 ++++++++++
 .../sdk/io/gcp/bigquery/BigQueryAvroUtils.java     | 106 +++++++++----
 .../sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java | 170 ++++++++++++++++++---
 5 files changed, 357 insertions(+), 49 deletions(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/Timestamp.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/Timestamp.java
index 058331a44cf..87e47f5961e 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/Timestamp.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/Timestamp.java
@@ -157,7 +157,6 @@ public class Timestamp implements 
Schema.LogicalType<Instant, Row> {
         maxSubseconds,
         precision,
         subseconds);
-
     return Instant.ofEpochSecond(
         checkArgumentNotNull(
             base.getInt64(0), "While trying to convert to Instant: Row missing 
seconds field"),
diff --git 
a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java
 
b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java
index 38621571ca1..882e46208a9 100644
--- 
a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java
+++ 
b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java
@@ -27,6 +27,7 @@ import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.lang.reflect.Method;
 import java.math.BigDecimal;
+import java.math.BigInteger;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
@@ -81,6 +82,7 @@ import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes;
 import org.apache.beam.sdk.schemas.logicaltypes.FixedString;
 import org.apache.beam.sdk.schemas.logicaltypes.OneOfType;
 import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
+import org.apache.beam.sdk.schemas.logicaltypes.Timestamp;
 import org.apache.beam.sdk.schemas.logicaltypes.VariableBytes;
 import org.apache.beam.sdk.schemas.logicaltypes.VariableString;
 import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertType;
@@ -137,6 +139,7 @@ import org.joda.time.ReadableInstant;
  *   LogicalTypes.TimestampMillis   <-----> DATETIME
  *   LogicalTypes.TimestampMicros   ------> Long
  *   LogicalTypes.TimestampMicros   <------ 
LogicalType(urn="beam:logical_type:micros_instant:v1")
+ *   LogicalTypes.TimestampNanos   <------> LogicalType(TIMESTAMP(9))
  *   LogicalTypes.Decimal           <-----> DECIMAL
  * </pre>
  *
@@ -164,6 +167,8 @@ public class AvroUtils {
 
   private static final GenericData GENERIC_DATA_WITH_DEFAULT_CONVERSIONS;
 
+  private static final String TIMESTAMP_NANOS_LOGICAL_TYPE = "timestamp-nanos";
+
   static {
     GENERIC_DATA_WITH_DEFAULT_CONVERSIONS = new GenericData();
     addLogicalTypeConversions(GENERIC_DATA_WITH_DEFAULT_CONVERSIONS);
@@ -1027,6 +1032,11 @@ public class AvroUtils {
         fieldType = FieldType.DATETIME;
       }
     }
+    // TODO: Remove once Avro 1.12+ has timestamp-nanos
+    if (fieldType == null
+        && 
TIMESTAMP_NANOS_LOGICAL_TYPE.equals(avroSchema.getProp("logicalType"))) {
+      fieldType = FieldType.logicalType(Timestamp.NANOS);
+    }
 
     if (fieldType == null) {
       switch (type.type.getType()) {
@@ -1186,6 +1196,14 @@ public class AvroUtils {
         } else if (SqlTypes.TIMESTAMP.getIdentifier().equals(identifier)) {
           baseType =
               
LogicalTypes.timestampMicros().addToSchema(org.apache.avro.Schema.create(Type.LONG));
+        } else if (Timestamp.IDENTIFIER.equals(identifier)) {
+          int precision = checkNotNull(logicalType.getArgument());
+          if (precision != 9) {
+            throw new RuntimeException(
+                "Timestamp logical type precision not supported:" + precision);
+          }
+          baseType = org.apache.avro.Schema.create(Type.LONG);
+          baseType.addProp("logicalType", TIMESTAMP_NANOS_LOGICAL_TYPE);
         } else {
           throw new RuntimeException(
               "Unhandled logical type " + 
checkNotNull(fieldType.getLogicalType()).getIdentifier());
@@ -1340,6 +1358,16 @@ public class AvroUtils {
           java.time.Instant instant = (java.time.Instant) value;
           return TimeUnit.SECONDS.toMicros(instant.getEpochSecond())
               + TimeUnit.NANOSECONDS.toMicros(instant.getNano());
+        } else if (Timestamp.IDENTIFIER.equals(identifier)) {
+          java.time.Instant instant = (java.time.Instant) value;
+          // Use BigInteger to work around long overflows so that epochNanos = 
Long.MIN_VALUE can be
+          // supported. Instant always stores nanos as positive adjustment so 
the math will silently
+          // overflow with regular int64.
+          BigInteger epochSeconds = 
BigInteger.valueOf(instant.getEpochSecond());
+          BigInteger nanosOfSecond = BigInteger.valueOf(instant.getNano());
+          BigInteger epochNanos =
+              
epochSeconds.multiply(BigInteger.valueOf(1_000_000_000L)).add(nanosOfSecond);
+          return epochNanos.longValueExact();
         } else {
           throw new RuntimeException("Unhandled logical type " + identifier);
         }
@@ -1387,6 +1415,24 @@ public class AvroUtils {
       @Nonnull FieldType fieldType,
       @Nonnull GenericData genericData) {
     TypeWithNullability type = new TypeWithNullability(avroSchema);
+
+    // TODO: Remove this workaround once Avro is upgraded to 1.12+ where 
timestamp-nanos
+    if (TIMESTAMP_NANOS_LOGICAL_TYPE.equals(type.type.getProp("logicalType"))) 
{
+      if (type.type.getType() == Type.LONG) {
+        Long nanos = (Long) value;
+        // Check if Beam expects Timestamp logical type
+        if (fieldType.getTypeName() == TypeName.LOGICAL_TYPE
+            && 
org.apache.beam.sdk.schemas.logicaltypes.Timestamp.IDENTIFIER.equals(
+                fieldType.getLogicalType().getIdentifier())) {
+          long seconds = Math.floorDiv(nanos, 1_000_000_000L);
+          long nanoAdjustment = Math.floorMod(nanos, 1_000_000_000L);
+          return java.time.Instant.ofEpochSecond(seconds, nanoAdjustment);
+        } else {
+          return nanos;
+        }
+      }
+    }
+
     LogicalType logicalType = LogicalTypes.fromSchema(type.type);
     if (logicalType == null) {
       return null;
diff --git 
a/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java
 
b/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java
index 41a43ed850b..d087ed0a20b 100644
--- 
a/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java
+++ 
b/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java
@@ -54,6 +54,7 @@ import org.apache.beam.sdk.schemas.Schema.FieldType;
 import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType;
 import org.apache.beam.sdk.schemas.logicaltypes.OneOfType;
 import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
+import org.apache.beam.sdk.schemas.logicaltypes.Timestamp;
 import org.apache.beam.sdk.testing.CoderProperties;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.SimpleFunction;
@@ -549,6 +550,88 @@ public class AvroUtilsTest {
     assertEquals(getAvroSchema(), avroSchema);
   }
 
+  @Test
+  public void testBeamTimestampNanosLogicalTypeToAvroSchema() {
+    Schema beamSchema =
+        Schema.builder().addLogicalTypeField("timestampNanos", 
Timestamp.NANOS).build();
+
+    // Expected Avro schema with timestamp-nanos
+    String expectedJson =
+        "{\"type\": \"record\", \"name\": \"topLevelRecord\", "
+            + "\"fields\": [{\"name\": \"timestampNanos\", "
+            + "\"type\": {\"type\": \"long\", \"logicalType\": 
\"timestamp-nanos\"}}]}";
+
+    org.apache.avro.Schema expectedAvroSchema =
+        new org.apache.avro.Schema.Parser().parse(expectedJson);
+
+    assertEquals(expectedAvroSchema, AvroUtils.toAvroSchema(beamSchema));
+  }
+
+  @Test
+  public void testBeamTimestampNanosToGenericRecord() {
+    Schema beamSchema =
+        Schema.builder().addLogicalTypeField("timestampNanos", 
Timestamp.NANOS).build();
+
+    java.time.Instant instant = 
java.time.Instant.parse("2000-01-01T01:02:03.123456789Z");
+    Row beamRow = Row.withSchema(beamSchema).addValue(instant).build();
+
+    // Expected nanos since epoch
+    long expectedNanos = TimeUnit.SECONDS.toNanos(instant.getEpochSecond()) + 
instant.getNano();
+
+    org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(beamSchema);
+    GenericRecord avroRecord = AvroUtils.toGenericRecord(beamRow, avroSchema);
+
+    assertEquals(expectedNanos, avroRecord.get("timestampNanos"));
+  }
+
+  @Test
+  public void testTimestampNanosRoundTrip() {
+    Schema beamSchema =
+        Schema.builder().addLogicalTypeField("timestampNanos", 
Timestamp.NANOS).build();
+
+    // Test various nanosecond precisions
+    java.time.Instant[] testInstants = {
+      java.time.Instant.parse("2000-01-01T00:00:00.000000001Z"), // 1 nano
+      java.time.Instant.parse("2000-01-01T00:00:00.123456789Z"), // full nanos
+      java.time.Instant.parse("2000-01-01T00:00:00.999999999Z"), // max nanos
+      java.time.Instant.ofEpochSecond(0L, Long.MAX_VALUE), // max supported
+      java.time.Instant.parse("1677-09-21T00:12:43.145224192Z"), // min 
supported by an int64
+    };
+
+    org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(beamSchema);
+
+    for (java.time.Instant instant : testInstants) {
+      Row originalRow = Row.withSchema(beamSchema).addValue(instant).build();
+      GenericRecord avroRecord = AvroUtils.toGenericRecord(originalRow, 
avroSchema);
+      Row roundTripRow = AvroUtils.toBeamRowStrict(avroRecord, beamSchema);
+
+      assertEquals(originalRow, roundTripRow);
+      java.time.Instant roundTripInstant =
+          (java.time.Instant) roundTripRow.getValue("timestampNanos");
+      assertEquals(instant, roundTripInstant);
+    }
+  }
+
+  @Test
+  public void testTimestampNanosAvroSchemaToBeamSchema() {
+    List<org.apache.avro.Schema.Field> fields = Lists.newArrayList();
+    fields.add(
+        new org.apache.avro.Schema.Field(
+            "timestampNanos",
+            new org.apache.avro.Schema.Parser()
+                .parse("{\"type\": \"long\", \"logicalType\": 
\"timestamp-nanos\"}"),
+            "",
+            (Object) null));
+    org.apache.avro.Schema avroSchema =
+        org.apache.avro.Schema.createRecord("test", null, null, false, fields);
+
+    Schema beamSchema = AvroUtils.toBeamSchema(avroSchema);
+
+    Schema expected =
+        Schema.builder().addLogicalTypeField("timestampNanos", 
Timestamp.NANOS).build();
+    assertEquals(expected, beamSchema);
+  }
+
   @Test
   public void testAvroSchemaFromBeamSchemaCanBeParsed() {
     org.apache.avro.Schema convertedSchema = 
AvroUtils.toAvroSchema(getBeamSchema());
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
index c169a0571b7..b5243a8110b 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
@@ -50,8 +50,6 @@ import org.apache.avro.generic.GenericRecord;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.BaseEncoding;
 import org.checkerframework.checker.nullness.qual.Nullable;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
 
 /** A set of utilities for working with Avro files. */
 class BigQueryAvroUtils {
@@ -60,7 +58,7 @@ class BigQueryAvroUtils {
       Optional.ofNullable(Schema.class.getPackage())
           .map(Package::getImplementationVersion)
           .orElse("");
-
+  private static final String TIMESTAMP_NANOS_LOGICAL_TYPE = "timestamp-nanos";
   // org.apache.avro.LogicalType
   static class DateTimeLogicalType extends LogicalType {
     public DateTimeLogicalType() {
@@ -161,36 +159,73 @@ class BigQueryAvroUtils {
    * Formats BigQuery seconds-since-epoch into String matching JSON export. 
Thread-safe and
    * immutable.
    */
-  private static final DateTimeFormatter DATE_AND_SECONDS_FORMATTER =
-      DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").withZoneUTC();
-
-  @VisibleForTesting
-  static String formatTimestamp(Long timestampMicro) {
-    String dateTime = formatDatetime(timestampMicro);
-    return dateTime + " UTC";
+  private static final java.time.format.DateTimeFormatter DATE_TIME_FORMATTER =
+      java.time.format.DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")
+          .withZone(java.time.ZoneOffset.UTC);
+
+  /** Enum to define the precision of a timestamp since the epoch. */
+  enum TimestampPrecision {
+    MILLISECONDS,
+    MICROSECONDS,
+    NANOSECONDS;
+
+    /** Converts an epoch value of this precision to an Instant. */
+    java.time.Instant toInstant(long epochValue) {
+      switch (this) {
+        case MILLISECONDS:
+          return java.time.Instant.ofEpochMilli(epochValue);
+        case MICROSECONDS:
+          {
+            long seconds = Math.floorDiv(epochValue, 1_000_000L);
+            long microsOfSecond = Math.floorMod(epochValue, 1_000_000L);
+            return java.time.Instant.ofEpochSecond(seconds, microsOfSecond * 
1_000L);
+          }
+        case NANOSECONDS:
+          {
+            long seconds = Math.floorDiv(epochValue, 1_000_000_000L);
+            long nanosOfSecond = Math.floorMod(epochValue, 1_000_000_000L);
+            return java.time.Instant.ofEpochSecond(seconds, nanosOfSecond);
+          }
+        default:
+          throw new IllegalStateException("Unknown precision: " + this);
+      }
+    }
   }
 
+  /**
+   * Formats an Instant with minimal fractional second precision. Shows 0, 3, 
6, or 9 decimal places
+   * based on actual precision of the value.
+   */
   @VisibleForTesting
-  static String formatDatetime(Long timestampMicro) {
-    // timestampMicro is in "microseconds since epoch" format,
-    // e.g., 1452062291123456L means "2016-01-06 06:38:11.123456 UTC".
-    // Separate into seconds and microseconds.
-    long timestampSec = timestampMicro / 1_000_000;
-    long micros = timestampMicro % 1_000_000;
-    if (micros < 0) {
-      micros += 1_000_000;
-      timestampSec -= 1;
-    }
-    String dayAndTime = DATE_AND_SECONDS_FORMATTER.print(timestampSec * 1000);
-    if (micros == 0) {
-      return dayAndTime;
-    } else if (micros % 1000 == 0) {
-      return String.format("%s.%03d", dayAndTime, micros / 1000);
+  @SuppressWarnings("JavaInstantGetSecondsGetNano")
+  static String formatDatetime(java.time.Instant instant) {
+    String dateTime = DATE_TIME_FORMATTER.format(instant);
+    int nanos = instant.getNano();
+
+    if (nanos == 0) {
+      return dateTime;
+    } else if (nanos % 1_000_000 == 0) {
+      return dateTime + String.format(".%03d", nanos / 1_000_000);
+    } else if (nanos % 1_000 == 0) {
+      return dateTime + String.format(".%06d", nanos / 1_000);
     } else {
-      return String.format("%s.%06d", dayAndTime, micros);
+      return dateTime + String.format(".%09d", nanos);
     }
   }
 
+  @VisibleForTesting
+  static String formatDatetime(long epochValue, TimestampPrecision precision) {
+    return formatDatetime(precision.toInstant(epochValue));
+  }
+
+  static String formatTimestamp(java.time.Instant instant) {
+    return formatDatetime(instant) + " UTC";
+  }
+
+  static String formatTimestamp(long epochValue, TimestampPrecision precision) 
{
+    return formatTimestamp(precision.toInstant(epochValue));
+  }
+
   /**
    * This method formats a BigQuery DATE value into a String matching the 
format used by JSON
    * export. Date records are stored in "days since epoch" format, and 
BigQuery uses the proleptic
@@ -335,7 +370,6 @@ class BigQueryAvroUtils {
     // REQUIRED fields are represented as the corresponding Avro types. For 
example, a BigQuery
     // INTEGER type maps to an Avro LONG type.
     checkNotNull(v, "REQUIRED field %s should not be null", name);
-
     Type type = schema.getType();
     LogicalType logicalType = schema.getLogicalType();
     switch (type) {
@@ -364,21 +398,26 @@ class BigQueryAvroUtils {
         } else if (logicalType instanceof LogicalTypes.TimestampMillis) {
           // Write only: SQL type TIMESTAMP
           // ideally Instant but TableRowJsonCoder encodes as String
-          return formatTimestamp((Long) v * 1000L);
+          return formatTimestamp((Long) v, TimestampPrecision.MILLISECONDS);
         } else if (logicalType instanceof LogicalTypes.TimestampMicros) {
           // SQL type TIMESTAMP
           // ideally Instant but TableRowJsonCoder encodes as String
-          return formatTimestamp((Long) v);
+          return formatTimestamp((Long) v, TimestampPrecision.MICROSECONDS);
+          // TODO: Use LogicalTypes.TimestampNanos once avro version is 
updated.
+        } else if 
(TIMESTAMP_NANOS_LOGICAL_TYPE.equals(schema.getProp("logicalType"))) {
+          // SQL type TIMESTAMP
+          // ideally Instant but TableRowJsonCoder encodes as String
+          return formatTimestamp((Long) v, TimestampPrecision.NANOSECONDS);
         } else if (!(VERSION_AVRO.startsWith("1.8") || 
VERSION_AVRO.startsWith("1.9"))
             && logicalType instanceof LogicalTypes.LocalTimestampMillis) {
           // Write only: SQL type DATETIME
           // ideally LocalDateTime but TableRowJsonCoder encodes as String
-          return formatDatetime(((Long) v) * 1000);
+          return formatDatetime(((Long) v), TimestampPrecision.MILLISECONDS);
         } else if (!(VERSION_AVRO.startsWith("1.8") || 
VERSION_AVRO.startsWith("1.9"))
             && logicalType instanceof LogicalTypes.LocalTimestampMicros) {
           // Write only: SQL type DATETIME
           // ideally LocalDateTime but TableRowJsonCoder encodes as String
-          return formatDatetime((Long) v);
+          return formatDatetime((Long) v, TimestampPrecision.MICROSECONDS);
         } else {
           // SQL type INT64 (INT, SMALLINT, INTEGER, BIGINT, TINYINT, BYTEINT)
           // ideally Long if in [2^53+1, 2^53-1] but keep consistency with BQ 
JSON export that uses
@@ -602,6 +641,11 @@ class BigQueryAvroUtils {
           return fieldSchema.setType("INTEGER");
         }
       case LONG:
+        // TODO: Use LogicalTypes.TimestampNanos once avro version is updated.
+        if (useAvroLogicalTypes
+            && 
(TIMESTAMP_NANOS_LOGICAL_TYPE.equals(type.getProp("logicalType")))) {
+          return fieldSchema.setType("TIMESTAMP");
+        }
         if (logicalType instanceof LogicalTypes.TimeMicros) {
           return fieldSchema.setType("TIME");
         } else if (!(VERSION_AVRO.startsWith("1.8") || 
VERSION_AVRO.startsWith("1.9"))
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java
index 9b752055d01..e95e1546596 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java
@@ -280,6 +280,30 @@ public class BigQueryAvroUtilsTest {
       assertEquals(expected, row.clone());
     }
 
+    {
+      // timestamp-nanos
+      // TODO: Use LogicalTypes.TimestampNanos once avro version is updated.
+      String timestampNanosJson = "{\"type\": \"long\", \"logicalType\": 
\"timestamp-nanos\"}";
+      Schema timestampType = new Schema.Parser().parse(timestampNanosJson);
+
+      // 2000-01-01 01:02:03.123456789 UTC
+      LocalDate date = LocalDate.of(2000, 1, 1);
+      LocalTime time = LocalTime.of(1, 2, 3, 123456789);
+      LocalDateTime ts = LocalDateTime.of(date, time);
+      long seconds = ts.toInstant(ZoneOffset.UTC).getEpochSecond();
+      int nanos = ts.toInstant(ZoneOffset.UTC).getNano();
+      long totalNanos = seconds * 1_000_000_000L + nanos;
+      GenericRecord record =
+          new GenericRecordBuilder(avroSchema(f -> 
f.type(timestampType).noDefault()))
+              .set("value", totalNanos)
+              .build();
+      TableRow expected = new TableRow().set("value", "2000-01-01 
01:02:03.123456789 UTC");
+      TableRow row = BigQueryAvroUtils.convertGenericRecordToTableRow(record);
+
+      assertEquals(expected, row);
+      assertEquals(expected, row.clone());
+    }
+
     {
       // timestamp-micros
       LogicalType lt = LogicalTypes.timestampMillis();
@@ -923,6 +947,19 @@ public class BigQueryAvroUtilsTest {
       assertEquals(expectedRaw, 
BigQueryAvroUtils.fromGenericAvroSchema(avroSchema, false));
     }
 
+    {
+      // timestamp-nanos
+      // TODO: Use LogicalTypes.TimestampNanos once avro version is updated.
+      String timestampNanosJson = "{\"type\": \"long\", \"logicalType\": 
\"timestamp-nanos\"}";
+      Schema timestampType = new Schema.Parser().parse(timestampNanosJson);
+      Schema avroSchema = avroSchema(f -> f.type(timestampType).noDefault());
+      TableSchema expected = tableSchema(f -> 
f.setType("TIMESTAMP").setMode("REQUIRED"));
+      TableSchema expectedRaw = tableSchema(f -> 
f.setType("INTEGER").setMode("REQUIRED"));
+
+      assertEquals(expected, 
BigQueryAvroUtils.fromGenericAvroSchema(avroSchema));
+      assertEquals(expectedRaw, 
BigQueryAvroUtils.fromGenericAvroSchema(avroSchema, false));
+    }
+
     {
       // string prop: sqlType=GEOGRAPHY
       Schema avroSchema =
@@ -978,39 +1015,138 @@ public class BigQueryAvroUtilsTest {
   }
 
   @Test
-  public void testFormatTimestamp() {
-    long micros = 1452062291123456L;
+  public void testFormatTimestampInputMillis() {
+    // Min: Earliest timestamp supported by BQ
+    // 
https://docs.cloud.google.com/bigquery/docs/reference/standard-sql/data-types#timestamp_type
+    long minMillis = -62135596800000L;
+    String expectedMin = "0001-01-01 00:00:00";
+    assertThat(
+        BigQueryAvroUtils.formatDatetime(
+            minMillis, BigQueryAvroUtils.TimestampPrecision.MILLISECONDS),
+        equalTo(expectedMin));
+    assertThat(
+        BigQueryAvroUtils.formatTimestamp(
+            minMillis, BigQueryAvroUtils.TimestampPrecision.MILLISECONDS),
+        equalTo(expectedMin + " UTC"));
+
+    // Existing: Regular timestamp
+    long millis = 1452062291123L;
+    String expected = "2016-01-06 06:38:11.123";
+    assertThat(
+        BigQueryAvroUtils.formatDatetime(millis, 
BigQueryAvroUtils.TimestampPrecision.MILLISECONDS),
+        equalTo(expected));
+    assertThat(
+        BigQueryAvroUtils.formatTimestamp(
+            millis, BigQueryAvroUtils.TimestampPrecision.MILLISECONDS),
+        equalTo(expected + " UTC"));
+
+    // Max: Latest timestamp supported by BQ
+    long maxMillis = 253402300799999L;
+    String expectedMax = "9999-12-31 23:59:59.999";
+    assertThat(
+        BigQueryAvroUtils.formatDatetime(
+            maxMillis, BigQueryAvroUtils.TimestampPrecision.MILLISECONDS),
+        equalTo(expectedMax));
+    assertThat(
+        BigQueryAvroUtils.formatTimestamp(
+            maxMillis, BigQueryAvroUtils.TimestampPrecision.MILLISECONDS),
+        equalTo(expectedMax + " UTC"));
+  }
+
+  @Test
+  public void testFormatTimestampInputMicros() {
+    long minMicro = -62_135_596_800_000_000L;
+    String expectedMin = "0001-01-01 00:00:00";
+    assertThat(
+        BigQueryAvroUtils.formatDatetime(
+            minMicro, BigQueryAvroUtils.TimestampPrecision.MICROSECONDS),
+        equalTo(expectedMin));
+    assertThat(
+        BigQueryAvroUtils.formatTimestamp(
+            minMicro, BigQueryAvroUtils.TimestampPrecision.MICROSECONDS),
+        equalTo(expectedMin + " UTC"));
+
+    long micros = 1452_062_291_123_456L;
     String expected = "2016-01-06 06:38:11.123456";
-    assertThat(BigQueryAvroUtils.formatDatetime(micros), equalTo(expected));
-    assertThat(BigQueryAvroUtils.formatTimestamp(micros), equalTo(expected + " 
UTC"));
+    assertThat(
+        BigQueryAvroUtils.formatDatetime(micros, 
BigQueryAvroUtils.TimestampPrecision.MICROSECONDS),
+        equalTo(expected));
+    assertThat(
+        BigQueryAvroUtils.formatTimestamp(
+            micros, BigQueryAvroUtils.TimestampPrecision.MICROSECONDS),
+        equalTo(expected + " UTC"));
+
+    // Max: Latest timestamp supported by BQ
+    long maxMicros = 253_402_300_799_999_000L;
+    String expectedMax = "9999-12-31 23:59:59.999";
+    assertThat(
+        BigQueryAvroUtils.formatDatetime(
+            maxMicros, BigQueryAvroUtils.TimestampPrecision.MICROSECONDS),
+        equalTo(expectedMax));
+    assertThat(
+        BigQueryAvroUtils.formatTimestamp(
+            maxMicros, BigQueryAvroUtils.TimestampPrecision.MICROSECONDS),
+        equalTo(expectedMax + " UTC"));
   }
 
   @Test
-  public void testFormatTimestampMillis() {
-    long millis = 1452062291123L;
-    long micros = millis * 1000L;
-    String expected = "2016-01-06 06:38:11.123";
-    assertThat(BigQueryAvroUtils.formatDatetime(micros), equalTo(expected));
-    assertThat(BigQueryAvroUtils.formatTimestamp(micros), equalTo(expected + " 
UTC"));
+  public void testFormatTimestampInputNanos() {
+    long minNanos = Long.MIN_VALUE; // -9223372036854775808L
+    String expectedMin = "1677-09-21 00:12:43.145224192";
+    assertThat(
+        BigQueryAvroUtils.formatDatetime(
+            minNanos, BigQueryAvroUtils.TimestampPrecision.NANOSECONDS),
+        equalTo(expectedMin));
+    assertThat(
+        BigQueryAvroUtils.formatTimestamp(
+            minNanos, BigQueryAvroUtils.TimestampPrecision.NANOSECONDS),
+        equalTo(expectedMin + " UTC"));
+
+    long nanos = 1452062291123456789L;
+    String expected = "2016-01-06 06:38:11.123456789";
+    assertThat(
+        BigQueryAvroUtils.formatDatetime(nanos, 
BigQueryAvroUtils.TimestampPrecision.NANOSECONDS),
+        equalTo(expected));
+    assertThat(
+        BigQueryAvroUtils.formatTimestamp(nanos, 
BigQueryAvroUtils.TimestampPrecision.NANOSECONDS),
+        equalTo(expected + " UTC"));
+
+    long maxNanos = Long.MAX_VALUE; // 9223372036854775807L
+    String expectedMax = "2262-04-11 23:47:16.854775807";
+    assertThat(
+        BigQueryAvroUtils.formatDatetime(
+            maxNanos, BigQueryAvroUtils.TimestampPrecision.NANOSECONDS),
+        equalTo(expectedMax));
+    assertThat(
+        BigQueryAvroUtils.formatTimestamp(
+            maxNanos, BigQueryAvroUtils.TimestampPrecision.NANOSECONDS),
+        equalTo(expectedMax + " UTC"));
   }
 
   @Test
-  public void testFormatTimestampSeconds() {
+  public void testFormatTimestampInputMicrosOutputSecondsFormat() {
+    BigQueryAvroUtils.TimestampPrecision precision =
+        BigQueryAvroUtils.TimestampPrecision.MICROSECONDS;
     long seconds = 1452062291L;
     long micros = seconds * 1000L * 1000L;
     String expected = "2016-01-06 06:38:11";
-    assertThat(BigQueryAvroUtils.formatDatetime(micros), equalTo(expected));
-    assertThat(BigQueryAvroUtils.formatTimestamp(micros), equalTo(expected + " 
UTC"));
+    assertThat(BigQueryAvroUtils.formatDatetime(micros, precision), 
equalTo(expected));
+    assertThat(BigQueryAvroUtils.formatTimestamp(micros, precision), 
equalTo(expected + " UTC"));
   }
 
   @Test
   public void testFormatTimestampNegative() {
-    assertThat(BigQueryAvroUtils.formatDatetime(-1L), equalTo("1969-12-31 
23:59:59.999999"));
-    assertThat(BigQueryAvroUtils.formatDatetime(-100_000L), 
equalTo("1969-12-31 23:59:59.900"));
-    assertThat(BigQueryAvroUtils.formatDatetime(-1_000_000L), 
equalTo("1969-12-31 23:59:59"));
+    BigQueryAvroUtils.TimestampPrecision precision =
+        BigQueryAvroUtils.TimestampPrecision.MICROSECONDS;
+    assertThat(
+        BigQueryAvroUtils.formatDatetime(-1L, precision), equalTo("1969-12-31 
23:59:59.999999"));
+    assertThat(
+        BigQueryAvroUtils.formatDatetime(-100_000L, precision), 
equalTo("1969-12-31 23:59:59.900"));
+    assertThat(
+        BigQueryAvroUtils.formatDatetime(-1_000_000L, precision), 
equalTo("1969-12-31 23:59:59"));
     // No leap seconds before 1972. 477 leap years from 1 through 1969.
     assertThat(
-        BigQueryAvroUtils.formatDatetime(-(1969L * 365 + 477) * 86400 * 
1_000_000),
+        BigQueryAvroUtils.formatDatetime(-(1969L * 365 + 477) * 86400 * 
1_000_000, precision),
         equalTo("0001-01-01 00:00:00"));
   }
 

Reply via email to