This is an automated email from the ASF dual-hosted git repository.

cvandermerwe pushed a commit to branch release-2.71
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.71 by this push:
     new 597768686ed Cherry pick  #37257 and  #37294 into release-2.71  (#37307)
597768686ed is described below

commit 597768686ed3711e70e9038570f03a4531a761d7
Author: claudevdm <[email protected]>
AuthorDate: Wed Jan 14 13:49:24 2026 -0500

    Cherry pick  #37257 and  #37294 into release-2.71  (#37307)
    
    * Map TIMSETAMP(12) BQ type -> timestamp-nanos Avro type in default 
schemafactory (#37257)
    
    * Map TIMSETAMP(12) BQ type -> timestamp-nanos Avro type in default 
schemafactory
    
    * Use default schemafactory in test.
    
    ---------
    
    Co-authored-by: Claude <[email protected]>
    
    * Support picosecond tiemstamps when writing GenericRecord and Beam Rows. 
(#37294)
    
    Co-authored-by: Claude <[email protected]>
    
    ---------
    
    Co-authored-by: Claude <[email protected]>
---
 .../AvroGenericRecordToStorageApiProto.java        |  3 ++
 .../io/gcp/bigquery/BeamRowToStorageApiProto.java  |  3 ++
 .../sdk/io/gcp/bigquery/BigQueryAvroUtils.java     | 12 ++++++--
 .../AvroGenericRecordToStorageApiProtoTest.java    | 34 ++++++++++++++++++++++
 .../gcp/bigquery/BeamRowToStorageApiProtoTest.java | 17 +++++++++++
 .../io/gcp/bigquery/BigQueryTimestampPicosIT.java  | 23 +++++++++++----
 6 files changed, 84 insertions(+), 8 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java
index 76174ac3d04..35751e2758e 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java
@@ -348,6 +348,9 @@ public class AvroGenericRecordToStorageApiProto {
             fieldDescriptorFromAvroField(
                 new Schema.Field(field.name(), elementType, field.doc(), 
field.defaultVal()));
         builder = builder.setType(elementFieldSchema.getType());
+        if (elementFieldSchema.hasTimestampPrecision()) {
+          
builder.setTimestampPrecision(elementFieldSchema.getTimestampPrecision());
+        }
         builder.addAllFields(elementFieldSchema.getFieldsList());
         builder = builder.setMode(TableFieldSchema.Mode.REPEATED);
         break;
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java
index adb8e4468c0..d940ff8dd7f 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java
@@ -237,6 +237,9 @@ public class BeamRowToStorageApiProto {
         TableFieldSchema elementFieldSchema =
             fieldDescriptorFromBeamField(Field.of(field.getName(), 
elementType));
         builder = builder.setType(elementFieldSchema.getType());
+        if (elementFieldSchema.hasTimestampPrecision()) {
+          builder = 
builder.setTimestampPrecision(elementFieldSchema.getTimestampPrecision());
+        }
         builder.addAllFields(elementFieldSchema.getFieldsList());
         builder = builder.setMode(TableFieldSchema.Mode.REPEATED);
         break;
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
index e1ff0f58f14..46a014f8196 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
@@ -102,9 +102,15 @@ class BigQueryAvroUtils {
         // boolean
         return SchemaBuilder.builder().booleanType();
       case "TIMESTAMP":
-        // in Extract Jobs, it always uses the Avro logical type
-        // we may have to change this if we move to EXPORT DATA
-        return 
LogicalTypes.timestampMicros().addToSchema(SchemaBuilder.builder().longType());
+        if (schema.getTimestampPrecision() == null || 
schema.getTimestampPrecision() == 6) {
+          // in Extract Jobs, it always uses the Avro logical type
+          // we may have to change this if we move to EXPORT DATA
+          return 
LogicalTypes.timestampMicros().addToSchema(SchemaBuilder.builder().longType());
+        }
+        return SchemaBuilder.builder()
+            .longBuilder()
+            .prop("logicalType", TIMESTAMP_NANOS_LOGICAL_TYPE)
+            .endLong();
       case "DATE":
         if (useAvroLogicalTypes) {
           return 
LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType());
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java
index deabb1dd05f..9698aaff1d7 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java
@@ -348,7 +348,23 @@ public class AvroGenericRecordToStorageApiProtoTest {
         .endRecord();
   }
 
+  private static Schema createRepeatedTimestampNanosSchema() {
+    Schema longSchema = Schema.create(Schema.Type.LONG);
+    longSchema.addProp("logicalType", "timestamp-nanos");
+
+    Schema arraySchema = Schema.createArray(longSchema);
+
+    return SchemaBuilder.record("RepeatedTimestampNanosRecord")
+        .fields()
+        .name("timestampNanosArray")
+        .type(arraySchema)
+        .noDefault()
+        .endRecord();
+  }
+
   private static final Schema TIMESTAMP_NANOS_SCHEMA = 
createTimestampNanosSchema();
+  private static final Schema REPEATED_TIMESTAMP_NANOS_SCHEMA =
+      createRepeatedTimestampNanosSchema();
 
   private static GenericRecord baseRecord;
   private static GenericRecord rawLogicalTypesRecord;
@@ -885,4 +901,22 @@ public class AvroGenericRecordToStorageApiProtoTest {
     assertTrue(field.hasTimestampPrecision());
     assertEquals(12L, field.getTimestampPrecision().getValue());
   }
+
+  @Test
+  public void testProtoTableSchemaFromAvroSchemaRepeatedTimestampNanos() {
+    com.google.cloud.bigquery.storage.v1.TableSchema protoSchema =
+        AvroGenericRecordToStorageApiProto.protoTableSchemaFromAvroSchema(
+            REPEATED_TIMESTAMP_NANOS_SCHEMA);
+
+    assertEquals(1, protoSchema.getFieldsCount());
+    com.google.cloud.bigquery.storage.v1.TableFieldSchema field = 
protoSchema.getFields(0);
+
+    assertEquals("timestampnanosarray", field.getName());
+    assertEquals(
+        com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.TIMESTAMP, 
field.getType());
+    assertEquals(
+        com.google.cloud.bigquery.storage.v1.TableFieldSchema.Mode.REPEATED, 
field.getMode());
+
+    assertEquals(12L, field.getTimestampPrecision().getValue());
+  }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java
index d7a88615a50..c546a7ca5d7 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java
@@ -69,6 +69,10 @@ public class BeamRowToStorageApiProtoTest {
       Schema.builder()
           .addField("timestampNanos", 
FieldType.logicalType(Timestamp.NANOS).withNullable(true))
           .build();
+  private static final Schema TIMESTAMP_NANOS_ARRAY_SCHEMA =
+      Schema.builder()
+          .addField("timestampNanosArray", 
FieldType.array(FieldType.logicalType(Timestamp.NANOS)))
+          .build();
   private static final EnumerationType TEST_ENUM =
       EnumerationType.create("ONE", "TWO", "RED", "BLUE");
   private static final Schema BASE_SCHEMA =
@@ -614,6 +618,19 @@ public class BeamRowToStorageApiProtoTest {
     assertEquals(12L, field.getTimestampPrecision().getValue());
   }
 
+  @Test
+  public void testTimestampNanosArraySchema() {
+    com.google.cloud.bigquery.storage.v1.TableSchema protoSchema =
+        
BeamRowToStorageApiProto.protoTableSchemaFromBeamSchema(TIMESTAMP_NANOS_ARRAY_SCHEMA);
+
+    assertEquals(1, protoSchema.getFieldsCount());
+    TableFieldSchema field = protoSchema.getFields(0);
+    assertEquals(TableFieldSchema.Type.TIMESTAMP, field.getType());
+    assertEquals(
+        com.google.cloud.bigquery.storage.v1.TableFieldSchema.Mode.REPEATED, 
field.getMode());
+    assertEquals(12L, field.getTimestampPrecision().getValue());
+  }
+
   @Test
   public void testTimestampNanosDescriptor() throws Exception {
     DescriptorProto descriptor =
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimestampPicosIT.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimestampPicosIT.java
index 6d155185ee6..07b6adf46bc 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimestampPicosIT.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimestampPicosIT.java
@@ -408,6 +408,10 @@ public class BigQueryTimestampPicosIT {
         .name("ts_nanos")
         .type(longSchema)
         .noDefault()
+        .name("ts_picos")
+        .type()
+        .stringType()
+        .noDefault()
         .endRecord();
   }
 
@@ -421,12 +425,12 @@ public class BigQueryTimestampPicosIT {
   public void testWriteGenericRecordTimestampNanos() throws Exception {
     String tableSpec =
         String.format("%s:%s.%s", project, DATASET_ID, 
"generic_record_ts_nanos_test");
-
     // Create GenericRecord with timestamp-nanos value
     GenericRecord record =
         new GenericRecordBuilder(TIMESTAMP_NANOS_AVRO_SCHEMA)
             .set(
                 "ts_nanos", TEST_INSTANT.getEpochSecond() * 1_000_000_000L + 
TEST_INSTANT.getNano())
+            .set("ts_picos", "2024-01-15T10:30:45.123456789123Z")
             .build();
 
     // Write using Storage Write API with Avro format
@@ -437,7 +441,6 @@ public class BigQueryTimestampPicosIT {
             "WriteGenericRecords",
             BigQueryIO.writeGenericRecords()
                 .to(tableSpec)
-                .withAvroSchemaFactory(tableSchema -> 
TIMESTAMP_NANOS_AVRO_SCHEMA)
                 
.withSchema(BigQueryUtils.fromGenericAvroSchema(TIMESTAMP_NANOS_AVRO_SCHEMA, 
true))
                 .useAvroLogicalTypes()
                 .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API)
@@ -457,12 +460,18 @@ public class BigQueryTimestampPicosIT {
                 .from(tableSpec));
 
     PAssert.that(result)
-        .containsInAnyOrder(new TableRow().set("ts_nanos", 
"2024-01-15T10:30:45.123456789000Z"));
+        .containsInAnyOrder(
+            new TableRow()
+                .set("ts_nanos", "2024-01-15T10:30:45.123456789000Z")
+                .set("ts_picos", "2024-01-15T10:30:45.123456789123Z"));
     readPipeline.run().waitUntilFinish();
   }
 
   private static final Schema BEAM_TIMESTAMP_NANOS_SCHEMA =
-      Schema.builder().addField("ts_nanos", 
Schema.FieldType.logicalType(Timestamp.NANOS)).build();
+      Schema.builder()
+          .addField("ts_nanos", Schema.FieldType.logicalType(Timestamp.NANOS))
+          .addField("ts_picos", Schema.FieldType.STRING)
+          .build();
 
   @Test
   public void testWriteBeamRowTimestampNanos() throws Exception {
@@ -472,6 +481,7 @@ public class BigQueryTimestampPicosIT {
     Row row =
         Row.withSchema(BEAM_TIMESTAMP_NANOS_SCHEMA)
             .withFieldValue("ts_nanos", TEST_INSTANT)
+            .withFieldValue("ts_picos", "2024-01-15T10:30:45.123456789123Z")
             .build();
 
     // Write using Storage Write API with Beam Schema
@@ -500,7 +510,10 @@ public class BigQueryTimestampPicosIT {
                 .from(tableSpec));
 
     PAssert.that(result)
-        .containsInAnyOrder(new TableRow().set("ts_nanos", 
"2024-01-15T10:30:45.123456789000Z"));
+        .containsInAnyOrder(
+            new TableRow()
+                .set("ts_nanos", "2024-01-15T10:30:45.123456789000Z")
+                .set("ts_picos", "2024-01-15T10:30:45.123456789123Z"));
     readPipeline.run().waitUntilFinish();
   }
 

Reply via email to