This is an automated email from the ASF dual-hosted git repository.
cvandermerwe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 44ed498ee5c Add Picosecond timestamp support to storage API
WriteTableRows (#37091)
44ed498ee5c is described below
commit 44ed498ee5cb3336cc3915732b0fa6b2068c1f58
Author: claudevdm <[email protected]>
AuthorDate: Mon Jan 5 09:41:31 2026 -0500
Add Picosecond timestamp support to storage API WriteTableRows (#37091)
* initial
* typo
* test refactor.
* Fix tests.
* spotless
* comments.
* handle joda instant.
---------
Co-authored-by: Claude <[email protected]>
---
runners/google-cloud-dataflow-java/build.gradle | 2 +
.../beam/sdk/io/gcp/bigquery/BigQueryUtils.java | 63 ++++
.../io/gcp/bigquery/TableRowToStorageApiProto.java | 106 +++++-
.../io/gcp/bigquery/BigQueryTimestampPicosIT.java | 414 +++++++++++++++++++++
.../sdk/io/gcp/bigquery/BigQueryUtilsTest.java | 94 +++++
.../bigquery/TableRowToStorageApiProtoTest.java | 142 ++++++-
6 files changed, 812 insertions(+), 9 deletions(-)
diff --git a/runners/google-cloud-dataflow-java/build.gradle
b/runners/google-cloud-dataflow-java/build.gradle
index 50498d24c62..3792626a1fd 100644
--- a/runners/google-cloud-dataflow-java/build.gradle
+++ b/runners/google-cloud-dataflow-java/build.gradle
@@ -643,6 +643,7 @@ task googleCloudPlatformLegacyWorkerIntegrationTest(type:
Test, dependsOn: copyG
exclude '**/BigQueryIODynamicQueryIT.class'
exclude '**/BigQueryIODynamicReadIT.class'
exclude '**/BigQueryIODynamicReadTableRowIT.class'
+ exclude '**/BigQueryTimestampPicosIT.java'
exclude '**/PubsubReadIT.class'
exclude '**/FhirIOReadIT.class'
exclude '**/DicomIOReadIT.class'
@@ -698,6 +699,7 @@ task googleCloudPlatformRunnerV2IntegrationTest(type: Test)
{
exclude '**/BigQueryIODynamicQueryIT.class'
exclude '**/BigQueryIODynamicReadIT.class'
exclude '**/BigQueryIODynamicReadTableRowIT.class'
+ exclude '**/BigQueryTimestampPicosIT.java'
exclude '**/SpannerWriteIT.class'
exclude '**/*KmsKeyIT.class'
exclude '**/FhirIOReadIT.class'
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
index f9a59ba089c..94898787127 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
@@ -115,6 +115,8 @@ public class BigQueryUtils {
+ "(?<DATASET>[a-zA-Z0-9_]{1,1024})[\\.]"
+
"(?<TABLE>[\\p{L}\\p{M}\\p{N}\\p{Pc}\\p{Pd}\\p{Zs}$]{1,1024})$");
+ private static final long PICOSECOND_PRECISION = 12L;
+
/** Options for how to convert BigQuery data to Beam data. */
@AutoValue
public abstract static class ConversionOptions implements Serializable {
@@ -380,6 +382,67 @@ public class BigQueryUtils {
return ret;
}
+ /**
+ * Represents a timestamp with picosecond precision, split into seconds and
picoseconds
+ * components.
+ */
+ public static class TimestampPicos {
+ final long seconds;
+ final long picoseconds;
+
+ TimestampPicos(long seconds, long picoseconds) {
+ this.seconds = seconds;
+ this.picoseconds = picoseconds;
+ }
+
+ /**
+ * Parses a timestamp string into seconds and picoseconds components.
+ *
+ * <p>Handles two formats:
+ *
+ * <ul>
+ * <li>ISO format with exactly 12 fractional digits ending in Z
(picosecond precision): e.g.,
+ * "2024-01-15T10:30:45.123456789012Z"
+ * <li>UTC format with 0-9 fractional digits ending in "UTC" (up to
nanosecond precision):
+ * e.g., "2024-01-15 10:30:45.123456789 UTC", "2024-01-15 10:30:45
UTC"
+ * </ul>
+ */
+ public static TimestampPicos fromString(String timestampString) {
+ // Check for ISO picosecond format up to 12 fractional digits before Z
+ // Format: "2024-01-15T10:30:45.123456789012Z"
+ if (timestampString.endsWith("Z")) {
+ int dotIndex = timestampString.lastIndexOf('.');
+
+ if (dotIndex > 0) {
+ String fractionalPart =
+ timestampString.substring(dotIndex + 1, timestampString.length()
- 1);
+
+ if ((long) fractionalPart.length() == PICOSECOND_PRECISION) {
+ // ISO timestamp with 12 decimal digits (picosecond precision)
+ // Parse the datetime part (without fractional seconds)
+ String dateTimePart = timestampString.substring(0, dotIndex) + "Z";
+ java.time.Instant baseInstant =
java.time.Instant.parse(dateTimePart);
+
+ // Parse all 12 digits directly as picoseconds (subsecond portion)
+ long picoseconds = Long.parseLong(fractionalPart);
+
+ return new TimestampPicos(baseInstant.getEpochSecond(),
picoseconds);
+ }
+ }
+
+ // ISO format with 0-9 fractional digits - Instant.parse handles this
+ java.time.Instant timestamp = java.time.Instant.parse(timestampString);
+ return new TimestampPicos(timestamp.getEpochSecond(),
timestamp.getNano() * 1000L);
+ }
+
+ // UTC format: "2024-01-15 10:30:45.123456789 UTC"
+ // Use TIMESTAMP_FORMATTER which handles space separator and "UTC" suffix
+ java.time.Instant timestamp =
+ java.time.Instant.from(TIMESTAMP_FORMATTER.parse(timestampString));
+ return new TimestampPicos(timestamp.getEpochSecond(),
timestamp.getNano() * 1000L);
+ }
+ }
+
/**
* Get the Beam {@link FieldType} from a BigQuery type name.
*
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
index c5451b04a4b..ab5ae80065a 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
@@ -69,6 +69,7 @@ import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils.TimestampPicos;
import org.apache.beam.sdk.util.Preconditions;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Functions;
@@ -191,6 +192,23 @@ public class TableRowToStorageApiProto {
.put(TableFieldSchema.Type.JSON, "JSON")
.build();
+ static final DescriptorProto TIMESTAMP_PICOS_DESCRIPTOR_PROTO =
+ DescriptorProto.newBuilder()
+ .setName("TimestampPicos")
+ .addField(
+ DescriptorProtos.FieldDescriptorProto.newBuilder()
+ .setName("seconds")
+ .setNumber(1)
+
.setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_INT64)
+ .build())
+ .addField(
+ DescriptorProtos.FieldDescriptorProto.newBuilder()
+ .setName("picoseconds")
+ .setNumber(2)
+
.setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_INT64)
+ .build())
+ .build();
+
@FunctionalInterface
public interface ThrowingBiFunction<FirstInputT, SecondInputT, OutputT> {
OutputT apply(FirstInputT t, SecondInputT u) throws
SchemaConversionException;
@@ -199,6 +217,8 @@ public class TableRowToStorageApiProto {
static final DecimalFormat DECIMAL_FORMAT =
new DecimalFormat("0.0###############",
DecimalFormatSymbols.getInstance(Locale.ROOT));
+ private static final long PICOSECOND_PRECISION = 12L;
+
// Map of functions to convert json values into the value expected in the
Vortex proto object.
static final Map<TableFieldSchema.Type, ThrowingBiFunction<String, Object,
@Nullable Object>>
TYPE_MAP_PROTO_CONVERTERS =
@@ -533,6 +553,9 @@ public class TableRowToStorageApiProto {
if (field.getScale() != null) {
builder.setScale(field.getScale());
}
+ if (field.getTimestampPrecision() != null) {
+
builder.getTimestampPrecisionBuilder().setValue(field.getTimestampPrecision());
+ }
builder.setType(typeToProtoType(field.getType()));
if (builder.getType().equals(TableFieldSchema.Type.STRUCT)) {
for (com.google.api.services.bigquery.model.TableFieldSchema subField :
field.getFields()) {
@@ -587,6 +610,10 @@ public class TableRowToStorageApiProto {
return tableFieldSchema.getMode().equals(TableFieldSchema.Mode.REPEATED);
}
+ public long getTimestampPrecision() {
+ return tableFieldSchema.getTimestampPrecision().getValue();
+ }
+
public SchemaInformation getSchemaForField(String name) {
SchemaInformation schemaInformation =
subFieldsByName.get(name.toLowerCase());
if (schemaInformation == null) {
@@ -631,7 +658,6 @@ public class TableRowToStorageApiProto {
.put(TableFieldSchema.Type.DATE, Type.TYPE_INT32)
.put(TableFieldSchema.Type.TIME, Type.TYPE_INT64)
.put(TableFieldSchema.Type.DATETIME, Type.TYPE_INT64)
- .put(TableFieldSchema.Type.TIMESTAMP, Type.TYPE_INT64)
.put(TableFieldSchema.Type.JSON, Type.TYPE_STRING)
.build();
@@ -957,10 +983,16 @@ public class TableRowToStorageApiProto {
switch (fieldDescriptor.getType()) {
case MESSAGE:
- tableFieldSchemaBuilder =
tableFieldSchemaBuilder.setType(TableFieldSchema.Type.STRUCT);
- TableSchema nestedTableField =
tableSchemaFromDescriptor(fieldDescriptor.getMessageType());
- tableFieldSchemaBuilder =
-
tableFieldSchemaBuilder.addAllFields(nestedTableField.getFieldsList());
+ if
(fieldDescriptor.getMessageType().getName().equals("TimestampPicos")) {
+ tableFieldSchemaBuilder.setType(TableFieldSchema.Type.TIMESTAMP);
+ tableFieldSchemaBuilder.setPrecision(PICOSECOND_PRECISION);
+ } else {
+ tableFieldSchemaBuilder =
tableFieldSchemaBuilder.setType(TableFieldSchema.Type.STRUCT);
+ TableSchema nestedTableField =
+ tableSchemaFromDescriptor(fieldDescriptor.getMessageType());
+ tableFieldSchemaBuilder =
+
tableFieldSchemaBuilder.addAllFields(nestedTableField.getFieldsList());
+ }
break;
default:
TableFieldSchema.Type type =
PRIMITIVE_TYPES_PROTO_TO_BQ.get(fieldDescriptor.getType());
@@ -1060,6 +1092,25 @@ public class TableRowToStorageApiProto {
fieldDescriptorBuilder =
fieldDescriptorBuilder.setType(Type.TYPE_MESSAGE).setTypeName(nested.getName());
break;
+ case TIMESTAMP:
+ if (fieldSchema.getTimestampPrecision().getValue() ==
PICOSECOND_PRECISION) {
+ boolean typeAlreadyExists =
+ descriptorBuilder.getNestedTypeList().stream()
+ .anyMatch(d ->
TIMESTAMP_PICOS_DESCRIPTOR_PROTO.getName().equals(d.getName()));
+
+ if (!typeAlreadyExists) {
+ descriptorBuilder.addNestedType(TIMESTAMP_PICOS_DESCRIPTOR_PROTO);
+ }
+ fieldDescriptorBuilder =
+ fieldDescriptorBuilder
+ .setType(Type.TYPE_MESSAGE)
+ .setTypeName(TIMESTAMP_PICOS_DESCRIPTOR_PROTO.getName());
+ } else {
+ // Microsecond precision - use simple INT64
+ fieldDescriptorBuilder =
fieldDescriptorBuilder.setType(Type.TYPE_INT64);
+ }
+ break;
+
default:
@Nullable Type type =
PRIMITIVE_TYPES_BQ_TO_PROTO.get(fieldSchema.getType());
if (type == null) {
@@ -1313,6 +1364,36 @@ public class TableRowToStorageApiProto {
null,
null);
}
+ } else if (schemaInformation.getType() == TableFieldSchema.Type.TIMESTAMP
+ && schemaInformation.getTimestampPrecision() == PICOSECOND_PRECISION) {
+
+ long seconds;
+ long picoseconds;
+
+ if (value instanceof String) {
+ TimestampPicos parsed = TimestampPicos.fromString((String) value);
+ seconds = parsed.seconds;
+ picoseconds = parsed.picoseconds;
+
+ } else if (value instanceof Instant || value instanceof
org.joda.time.Instant) {
+ Instant timestamp =
+ value instanceof Instant
+ ? (Instant) value
+ : Instant.ofEpochMilli(((org.joda.time.Instant)
value).getMillis());
+ seconds = timestamp.getEpochSecond();
+ picoseconds = timestamp.getNano() * 1000L;
+ } else {
+ throw new IllegalArgumentException(
+ "Unsupported timestamp value type: " + value.getClass().getName());
+ }
+
+ converted =
+ DynamicMessage.newBuilder(fieldDescriptor.getMessageType())
+
.setField(fieldDescriptor.getMessageType().findFieldByName("seconds"), seconds)
+ .setField(
+
fieldDescriptor.getMessageType().findFieldByName("picoseconds"), picoseconds)
+ .build();
+
} else {
@Nullable
ThrowingBiFunction<String, Object, @Nullable Object> converter =
@@ -1633,6 +1714,7 @@ public class TableRowToStorageApiProto {
return LocalDateTime.ofInstant(instant,
ZoneOffset.UTC).format(TIMESTAMP_FORMATTER);
} else if
(fieldDescriptor.getType().equals(FieldDescriptor.Type.MESSAGE)) {
Message message = (Message) fieldValue;
+ String messageName = fieldDescriptor.getMessageType().getName();
if (TIMESTAMP_VALUE_DESCRIPTOR_NAMES.contains(
fieldDescriptor.getMessageType().getName())) {
Descriptor descriptor = message.getDescriptorForType();
@@ -1640,6 +1722,20 @@ public class TableRowToStorageApiProto {
int nanos = (int)
message.getField(descriptor.findFieldByName("nanos"));
Instant instant = Instant.ofEpochSecond(seconds, nanos);
return LocalDateTime.ofInstant(instant,
ZoneOffset.UTC).format(TIMESTAMP_FORMATTER);
+ } else if (messageName.equals("TimestampPicos")) {
+ Descriptor descriptor = message.getDescriptorForType();
+ long seconds = (long)
message.getField(descriptor.findFieldByName("seconds"));
+ long picoseconds = (long)
message.getField(descriptor.findFieldByName("picoseconds"));
+
+ // Convert to ISO timestamp string with picoseconds
+ Instant instant = Instant.ofEpochSecond(seconds);
+ String baseTimestamp = instant.toString(); //
"2024-01-15T10:30:45Z"
+
+ // Format picoseconds as 12-digit string
+ String picosPart = String.format("%012d", picoseconds);
+
+ // Insert before 'Z': "2024-01-15T10:30:45Z" →
"2024-01-15T10:30:45.123456789012Z"
+ return baseTimestamp.replace("Z", "." + picosPart + "Z");
} else {
throw new RuntimeException(
"Not implemented yet " +
fieldDescriptor.getMessageType().getFullName());
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimestampPicosIT.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimestampPicosIT.java
new file mode 100644
index 00000000000..5deffd4028f
--- /dev/null
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimestampPicosIT.java
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.cloud.bigquery.storage.v1.DataFormat;
+import java.security.SecureRandom;
+import java.util.List;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.gcp.testing.BigqueryClient;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Integration tests for BigQuery TIMESTAMP with picosecond precision.
+ *
+ * <p>Tests write data via Storage Write API and read back using different
precision settings. Each
+ * test clearly shows: WRITE DATA → READ SETTINGS → EXPECTED OUTPUT.
+ */
+@RunWith(JUnit4.class)
+public class BigQueryTimestampPicosIT {
+
+ private static final long PICOS_PRECISION = 12L;
+
+ private static String project;
+ private static final String DATASET_ID =
+ "bq_ts_picos_" + System.currentTimeMillis() + "_" + new
SecureRandom().nextInt(32);
+ private static final BigqueryClient BQ_CLIENT = new
BigqueryClient("BigQueryTimestampPicosIT");
+ private static TestBigQueryOptions bqOptions;
+ private static String nestedTableSpec;
+ private static String simpleTableSpec;
+
+ private static final TableSchema NESTED_SCHEMA =
+ new TableSchema()
+ .setFields(
+ ImmutableList.of(
+ // Simple timestamp column
+ new TableFieldSchema()
+ .setName("ts_simple")
+ .setType("TIMESTAMP")
+ .setTimestampPrecision(PICOS_PRECISION),
+ // Array of timestamps
+ new TableFieldSchema()
+ .setName("ts_array")
+ .setType("TIMESTAMP")
+ .setTimestampPrecision(PICOS_PRECISION)
+ .setMode("REPEATED"),
+ // Nested struct with timestamp
+ new TableFieldSchema()
+ .setName("event")
+ .setType("STRUCT")
+ .setFields(
+ ImmutableList.of(
+ new
TableFieldSchema().setName("name").setType("STRING"),
+ new TableFieldSchema()
+ .setName("ts")
+ .setType("TIMESTAMP")
+ .setTimestampPrecision(PICOS_PRECISION))),
+ // Repeated struct with timestamp
+ new TableFieldSchema()
+ .setName("events")
+ .setType("STRUCT")
+ .setMode("REPEATED")
+ .setFields(
+ ImmutableList.of(
+ new
TableFieldSchema().setName("name").setType("STRING"),
+ new TableFieldSchema()
+ .setName("ts")
+ .setType("TIMESTAMP")
+ .setTimestampPrecision(PICOS_PRECISION))),
+ // Map-like: repeated struct with timestamp key and value
+ new TableFieldSchema()
+ .setName("ts_map")
+ .setType("STRUCT")
+ .setMode("REPEATED")
+ .setFields(
+ ImmutableList.of(
+ new TableFieldSchema()
+ .setName("key")
+ .setType("TIMESTAMP")
+ .setTimestampPrecision(PICOS_PRECISION),
+ new TableFieldSchema()
+ .setName("value")
+ .setType("TIMESTAMP")
+ .setTimestampPrecision(PICOS_PRECISION)))));
+
+ private static final TableSchema SIMPLE_SCHEMA =
+ new TableSchema()
+ .setFields(
+ ImmutableList.of(
+ // Simple timestamp column
+ new TableFieldSchema()
+ .setName("ts_simple")
+ .setType("TIMESTAMP")
+ .setTimestampPrecision(PICOS_PRECISION)));
+
+ //
============================================================================
+ // TEST DATA - Written once, read with different precision settings
+ //
============================================================================
+ private static final List<TableRow> NESTED_WRITE_DATA =
+ ImmutableList.of(
+ new TableRow()
+ .set("ts_simple", "2024-01-15T10:30:45.123456789012Z")
+ .set(
+ "ts_array",
+ ImmutableList.of(
+ "2024-01-15T10:30:45.111111111111Z",
"2024-06-20T15:45:30.222222222222Z"))
+ .set(
+ "event",
+ new TableRow()
+ .set("name", "login")
+ .set("ts", "2024-01-15T10:30:45.333333333333Z"))
+ .set(
+ "events",
+ ImmutableList.of(
+ new TableRow()
+ .set("name", "click")
+ .set("ts", "2024-01-15T10:30:45.444444444444Z"),
+ new TableRow()
+ .set("name", "scroll")
+ .set("ts", "2024-01-15T10:30:45.555555555555Z")))
+ .set(
+ "ts_map",
+ ImmutableList.of(
+ new TableRow()
+ .set("key", "2024-01-15T10:30:45.666666666666Z")
+ .set("value", "2024-01-15T10:30:45.777777777777Z"))),
+ new TableRow()
+ .set("ts_simple", "1890-01-01T00:00:00.123456789123Z")
+ .set("ts_array",
ImmutableList.of("1970-01-01T00:00:00.000000000002Z"))
+ .set(
+ "event",
+ new TableRow()
+ .set("name", "epoch")
+ .set("ts", "1970-01-01T00:00:00.000000000003Z"))
+ .set(
+ "events",
+ ImmutableList.of(
+ new TableRow()
+ .set("name", "start")
+ .set("ts", "1970-01-01T00:00:00.000000000004Z")))
+ .set(
+ "ts_map",
+ ImmutableList.of(
+ new TableRow()
+ .set("key", "1970-01-01T00:00:00.000000000005Z")
+ .set("value",
"1970-01-01T00:00:00.000000000006Z"))));
+
+ private static final List<TableRow> SIMPLE_WRITE_DATA =
+ ImmutableList.of(
+ new TableRow().set("ts_simple", "2024-01-15T10:30:45.123456789012Z"),
+ new TableRow().set("ts_simple",
"1890-01-01T00:00:00.123456789123Z"));
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ bqOptions =
TestPipeline.testingPipelineOptions().as(TestBigQueryOptions.class);
+ project = bqOptions.as(GcpOptions.class).getProject();
+ BQ_CLIENT.createNewDataset(project, DATASET_ID, null, "us-central1");
+ nestedTableSpec = String.format("%s:%s.%s", project, DATASET_ID,
"nested_timestamp_picos_test");
+ simpleTableSpec = String.format("%s:%s.%s", project, DATASET_ID,
"simple_timestamp_picos_test");
+
+ // Write test data
+ Pipeline writePipeline = Pipeline.create(bqOptions);
+ writePipeline
+ .apply("CreateNestedData", Create.of(NESTED_WRITE_DATA))
+ .apply(
+ "WriteNestedData",
+ BigQueryIO.writeTableRows()
+ .to(nestedTableSpec)
+ .withSchema(NESTED_SCHEMA)
+ .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API)
+
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
+
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
+ writePipeline
+ .apply("CreateSimpleData", Create.of(SIMPLE_WRITE_DATA))
+ .apply(
+ "WriteSimpleData",
+ BigQueryIO.writeTableRows()
+ .to(simpleTableSpec)
+ .withSchema(SIMPLE_SCHEMA)
+ .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API)
+
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
+
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
+ writePipeline.run().waitUntilFinish();
+ }
+
+ @AfterClass
+ public static void cleanup() {
+ BQ_CLIENT.deleteDataset(project, DATASET_ID);
+ }
+
+ @Test
+ public void testReadWithPicosPrecision_Avro() {
+
+ List<TableRow> expectedOutput =
+ ImmutableList.of(
+ new TableRow()
+ .set("ts_simple", "2024-01-15T10:30:45.123456789012Z")
+ .set(
+ "ts_array",
+ ImmutableList.of(
+ "2024-01-15T10:30:45.111111111111Z",
"2024-06-20T15:45:30.222222222222Z"))
+ .set(
+ "event",
+ new TableRow()
+ .set("name", "login")
+ .set("ts", "2024-01-15T10:30:45.333333333333Z"))
+ .set(
+ "events",
+ ImmutableList.of(
+ new TableRow()
+ .set("name", "click")
+ .set("ts", "2024-01-15T10:30:45.444444444444Z"),
+ new TableRow()
+ .set("name", "scroll")
+ .set("ts", "2024-01-15T10:30:45.555555555555Z")))
+ .set(
+ "ts_map",
+ ImmutableList.of(
+ new TableRow()
+ .set("key", "2024-01-15T10:30:45.666666666666Z")
+ .set("value",
"2024-01-15T10:30:45.777777777777Z"))),
+ new TableRow()
+ .set("ts_simple", "1890-01-01T00:00:00.123456789123Z")
+ .set("ts_array",
ImmutableList.of("1970-01-01T00:00:00.000000000002Z"))
+ .set(
+ "event",
+ new TableRow()
+ .set("name", "epoch")
+ .set("ts", "1970-01-01T00:00:00.000000000003Z"))
+ .set(
+ "events",
+ ImmutableList.of(
+ new TableRow()
+ .set("name", "start")
+ .set("ts", "1970-01-01T00:00:00.000000000004Z")))
+ .set(
+ "ts_map",
+ ImmutableList.of(
+ new TableRow()
+ .set("key", "1970-01-01T00:00:00.000000000005Z")
+ .set("value",
"1970-01-01T00:00:00.000000000006Z"))));
+
+ runReadTest(TimestampPrecision.PICOS, DataFormat.AVRO, expectedOutput,
nestedTableSpec);
+ }
+
+ @Test
+ public void testReadWithNanosPrecision_Avro() {
+
+ List<TableRow> expectedOutput =
+ ImmutableList.of(
+ new TableRow()
+ .set("ts_simple", "2024-01-15 10:30:45.123456789 UTC")
+ .set(
+ "ts_array",
+ ImmutableList.of(
+ "2024-01-15 10:30:45.111111111 UTC", "2024-06-20
15:45:30.222222222 UTC"))
+ .set(
+ "event",
+ new TableRow()
+ .set("name", "login")
+ .set("ts", "2024-01-15 10:30:45.333333333 UTC"))
+ .set(
+ "events",
+ ImmutableList.of(
+ new TableRow()
+ .set("name", "click")
+ .set("ts", "2024-01-15 10:30:45.444444444 UTC"),
+ new TableRow()
+ .set("name", "scroll")
+ .set("ts", "2024-01-15 10:30:45.555555555 UTC")))
+ .set(
+ "ts_map",
+ ImmutableList.of(
+ new TableRow()
+ .set("key", "2024-01-15 10:30:45.666666666 UTC")
+ .set("value", "2024-01-15 10:30:45.777777777
UTC"))),
+ new TableRow()
+ .set("ts_simple", "1890-01-01 00:00:00.123456789 UTC")
+ .set("ts_array", ImmutableList.of("1970-01-01 00:00:00 UTC"))
+ .set(
+ "event",
+ new TableRow().set("name", "epoch").set("ts", "1970-01-01
00:00:00 UTC"))
+ .set(
+ "events",
+ ImmutableList.of(
+ new TableRow().set("name", "start").set("ts",
"1970-01-01 00:00:00 UTC")))
+ .set(
+ "ts_map",
+ ImmutableList.of(
+ new TableRow()
+ .set("key", "1970-01-01 00:00:00 UTC")
+ .set("value", "1970-01-01 00:00:00 UTC"))));
+
+ runReadTest(TimestampPrecision.NANOS, DataFormat.AVRO, expectedOutput,
nestedTableSpec);
+ }
+
+ @Test
+ public void testReadWithMicrosPrecision_Avro() {
+
+ List<TableRow> expectedOutput =
+ ImmutableList.of(
+ new TableRow()
+ .set("ts_simple", "2024-01-15 10:30:45.123456 UTC")
+ .set(
+ "ts_array",
+ ImmutableList.of(
+ "2024-01-15 10:30:45.111111 UTC", "2024-06-20
15:45:30.222222 UTC"))
+ .set(
+ "event",
+ new TableRow().set("name", "login").set("ts", "2024-01-15
10:30:45.333333 UTC"))
+ .set(
+ "events",
+ ImmutableList.of(
+ new TableRow()
+ .set("name", "click")
+ .set("ts", "2024-01-15 10:30:45.444444 UTC"),
+ new TableRow()
+ .set("name", "scroll")
+ .set("ts", "2024-01-15 10:30:45.555555 UTC")))
+ .set(
+ "ts_map",
+ ImmutableList.of(
+ new TableRow()
+ .set("key", "2024-01-15 10:30:45.666666 UTC")
+ .set("value", "2024-01-15 10:30:45.777777 UTC"))),
+ new TableRow()
+ .set("ts_simple", "1890-01-01 00:00:00.123456 UTC")
+ .set("ts_array", ImmutableList.of("1970-01-01 00:00:00 UTC"))
+ .set(
+ "event",
+ new TableRow().set("name", "epoch").set("ts", "1970-01-01
00:00:00 UTC"))
+ .set(
+ "events",
+ ImmutableList.of(
+ new TableRow().set("name", "start").set("ts",
"1970-01-01 00:00:00 UTC")))
+ .set(
+ "ts_map",
+ ImmutableList.of(
+ new TableRow()
+ .set("key", "1970-01-01 00:00:00 UTC")
+ .set("value", "1970-01-01 00:00:00 UTC"))));
+
+ runReadTest(TimestampPrecision.MICROS, DataFormat.AVRO, expectedOutput,
nestedTableSpec);
+ }
+
+ @Test
+ public void testReadWithPicosPrecision_Arrow() {
+
+ List<TableRow> expectedOutput =
+ ImmutableList.of(
+ new TableRow().set("ts_simple",
"2024-01-15T10:30:45.123456789012Z"),
+ new TableRow().set("ts_simple",
"1890-01-01T00:00:00.123456789123Z"));
+
+ runReadTest(TimestampPrecision.PICOS, DataFormat.ARROW, expectedOutput,
simpleTableSpec);
+ }
+
+ @Test
+ public void testReadWithNanosPrecision_Arrow() {
+
+ List<TableRow> expectedOutput =
+ ImmutableList.of(
+ new TableRow().set("ts_simple", "2024-01-15 10:30:45.123456789
UTC"),
+ new TableRow().set("ts_simple", "1890-01-01 00:00:00.123456789
UTC"));
+
+ runReadTest(TimestampPrecision.NANOS, DataFormat.ARROW, expectedOutput,
simpleTableSpec);
+ }
+
+ private void runReadTest(
+ TimestampPrecision precision,
+ DataFormat format,
+ List<TableRow> expectedOutput,
+ String tableSpec) {
+ Pipeline readPipeline = Pipeline.create(bqOptions);
+
+ PCollection<TableRow> result =
+ readPipeline.apply(
+ String.format("Read_%s_%s", precision, format),
+ BigQueryIO.readTableRows()
+ .withMethod(BigQueryIO.TypedRead.Method.DIRECT_READ)
+ .withFormat(format)
+ .withDirectReadPicosTimestampPrecision(precision)
+ .from(tableSpec));
+
+ PAssert.that(result).containsInAnyOrder(expectedOutput);
+ readPipeline.run().waitUntilFinish();
+ }
+}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
index 675885b4e94..76a492bebd2 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
@@ -1464,4 +1464,98 @@ public class BigQueryUtilsTest {
schema, new TableRow().set("ts", "2024-08-10 16:52:07.123456789
UTC"));
assertEquals(123456789, ((java.time.Instant)
row9.getValue("ts")).getNano());
}
+
+ /** Computes expected epoch seconds from an ISO-8601 timestamp. */
+ private static long expectedSeconds(String isoTimestamp) {
+ return java.time.Instant.parse(isoTimestamp).getEpochSecond();
+ }
+
+ @Test
+ public void testParseTimestampPicosFromString() {
+ // Format: {input, isoEquivalentForSeconds, expectedPicoseconds,
description}
+ Object[][] testCases = {
+ // UTC format tests (space separator, "UTC" suffix)
+ {"2024-01-15 10:30:45 UTC", "2024-01-15T10:30:45Z", 0L, "UTC no
fractional"},
+ {"2024-01-15 10:30:45.123 UTC", "2024-01-15T10:30:45Z",
123_000_000_000L, "UTC 3 digits"},
+ {"2024-01-15 10:30:45.123456 UTC", "2024-01-15T10:30:45Z",
123_456_000_000L, "UTC 6 digits"},
+ {
+ "2024-01-15 10:30:45.123456789 UTC",
+ "2024-01-15T10:30:45Z",
+ 123_456_789_000L,
+ "UTC 9 digits"
+ },
+
+ // ISO format tests (T separator, "Z" suffix)
+ {"2024-01-15T10:30:45Z", "2024-01-15T10:30:45Z", 0L, "ISO no
fractional"},
+ {"2024-01-15T10:30:45.123Z", "2024-01-15T10:30:45Z", 123_000_000_000L,
"ISO 3 digits"},
+ {"2024-01-15T10:30:45.123456Z", "2024-01-15T10:30:45Z",
123_456_000_000L, "ISO 6 digits"},
+ {"2024-01-15T10:30:45.123456789Z", "2024-01-15T10:30:45Z",
123_456_789_000L, "ISO 9 digits"},
+ {
+ "2024-01-15T10:30:45.123456789012Z",
+ "2024-01-15T10:30:45Z",
+ 123_456_789_012L,
+ "ISO 12 digits (picos)"
+ },
+
+ // Boundary: earliest date (0001-01-01)
+ {"0001-01-01 00:00:00.000000 UTC", "0001-01-01T00:00:00Z", 0L, "Earliest
UTC"},
+ {"0001-01-01T00:00:00Z", "0001-01-01T00:00:00Z", 0L, "Earliest ISO"},
+ {"0001-01-01T00:00:00.000000000001Z", "0001-01-01T00:00:00Z", 1L,
"Earliest ISO 1 pico"},
+
+ // Boundary: latest date (9999-12-31)
+ {"9999-12-31 23:59:59.999999 UTC", "9999-12-31T23:59:59Z",
999_999_000_000L, "Latest UTC"},
+ {
+ "9999-12-31T23:59:59.999999999Z",
+ "9999-12-31T23:59:59Z",
+ 999_999_999_000L,
+ "Latest ISO 9 digits"
+ },
+ {
+ "9999-12-31T23:59:59.999999999999Z",
+ "9999-12-31T23:59:59Z",
+ 999_999_999_999L,
+ "Latest ISO max picos"
+ },
+
+ // Unix epoch (1970-01-01)
+ {"1970-01-01 00:00:00 UTC", "1970-01-01T00:00:00Z", 0L, "Epoch UTC"},
+ {"1970-01-01T00:00:00Z", "1970-01-01T00:00:00Z", 0L, "Epoch ISO"},
+ {"1970-01-01T00:00:00.000000000001Z", "1970-01-01T00:00:00Z", 1L, "Epoch
+ 1 pico"},
+
+ // Fractional boundaries
+ {"2024-01-15T10:30:45.000000000000Z", "2024-01-15T10:30:45Z", 0L, "All
zeros picos"},
+ {
+ "2024-01-15T10:30:45.999999999999Z",
+ "2024-01-15T10:30:45Z",
+ 999_999_999_999L,
+ "All nines picos"
+ },
+ {
+ "2024-01-15T10:30:45.1Z",
+ "2024-01-15T10:30:45Z",
+ 100_000_000_000L,
+ "Single digit fractional"
+ },
+ };
+
+ for (Object[] testCase : testCases) {
+ String input = (String) testCase[0];
+ String isoEquivalent = (String) testCase[1];
+ long expectedPicos = (Long) testCase[2];
+ String description = (String) testCase[3];
+
+ long expectedSecs = expectedSeconds(isoEquivalent);
+
+ BigQueryUtils.TimestampPicos result =
BigQueryUtils.TimestampPicos.fromString(input);
+
+ assertEquals(
+ String.format("Seconds mismatch for '%s' (%s)", input, description),
+ expectedSecs,
+ result.seconds);
+ assertEquals(
+ String.format("Picoseconds mismatch for '%s' (%s)", input,
description),
+ expectedPicos,
+ result.picoseconds);
+ }
+ }
}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
index 05f0e9c993c..ea3bb29e081 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
@@ -42,6 +42,7 @@ import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import com.google.protobuf.Descriptors.FieldDescriptor;
import com.google.protobuf.DynamicMessage;
+import com.google.protobuf.Int64Value;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
@@ -52,6 +53,7 @@ import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -131,6 +133,11 @@ public class TableRowToStorageApiProtoTest {
.add(new
TableFieldSchema().setType("TIMESTAMP").setName("timestampvaluemaximum"))
.add(
new
TableFieldSchema().setType("STRING").setName("123_illegalprotofieldname"))
+ .add(
+ new TableFieldSchema()
+ .setType("TIMESTAMP")
+ .setName("timestamppicosvalue")
+ .setTimestampPrecision(12L))
.build());
private static final TableSchema BASE_TABLE_SCHEMA_NO_F =
@@ -183,6 +190,11 @@ public class TableRowToStorageApiProtoTest {
.add(new
TableFieldSchema().setType("TIMESTAMP").setName("timestampvaluemaximum"))
.add(
new
TableFieldSchema().setType("STRING").setName("123_illegalprotofieldname"))
+ .add(
+ new TableFieldSchema()
+ .setType("TIMESTAMP")
+ .setName("timestamppicosvalue")
+ .setTimestampPrecision(12L))
.build());
private static final DescriptorProto BASE_TABLE_SCHEMA_PROTO_DESCRIPTOR =
@@ -396,6 +408,14 @@ public class TableRowToStorageApiProtoTest {
AnnotationsProto.columnName.getDescriptor(),
"123_illegalprotofieldname"))
.build())
+ .addField(
+ FieldDescriptorProto.newBuilder()
+ .setName("timestamppicosvalue")
+ .setNumber(30)
+ .setType(Type.TYPE_MESSAGE)
+ .setLabel(Label.LABEL_OPTIONAL)
+ .setTypeName("TimestampPicos")
+ .build())
.build();
private static final com.google.cloud.bigquery.storage.v1.TableSchema
BASE_TABLE_PROTO_SCHEMA =
@@ -545,6 +565,12 @@ public class TableRowToStorageApiProtoTest {
.setName("123_illegalprotofieldname")
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.STRING)
.build())
+ .addFields(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+ .setName("timestamppicosvalue")
+
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.TIMESTAMP)
+ .setTimestampPrecision(Int64Value.newBuilder().setValue(12L))
+ .build())
.build();
private static final DescriptorProto BASE_TABLE_SCHEMA_NO_F_PROTO =
@@ -751,6 +777,14 @@ public class TableRowToStorageApiProtoTest {
AnnotationsProto.columnName.getDescriptor(),
"123_illegalprotofieldname"))
.build())
+ .addField(
+ FieldDescriptorProto.newBuilder()
+ .setName("timestamppicosvalue")
+ .setNumber(29)
+ .setType(Type.TYPE_MESSAGE)
+ .setLabel(Label.LABEL_OPTIONAL)
+ .setTypeName("TimestampPicos")
+ .build())
.build();
private static final com.google.cloud.bigquery.storage.v1.TableSchema
@@ -896,6 +930,12 @@ public class TableRowToStorageApiProtoTest {
.setName("123_illegalprotofieldname")
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.STRING)
.build())
+ .addFields(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+ .setName("timestamppicosvalue")
+
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.TIMESTAMP)
+
.setTimestampPrecision(Int64Value.newBuilder().setValue(12L))
+ .build())
.build();
private static final TableSchema NESTED_TABLE_SCHEMA =
new TableSchema()
@@ -1137,6 +1177,34 @@ public class TableRowToStorageApiProtoTest {
assertEquals(roundTripExpectedBaseTypesNoF, nestedRoundTripTypes);
}
+ private static final DescriptorProto TIMESTAMP_PICOS_PROTO =
+ DescriptorProto.newBuilder()
+ .setName("TimestampPicos")
+ .addField(
+ FieldDescriptorProto.newBuilder()
+ .setName("seconds")
+ .setNumber(1)
+ .setType(Type.TYPE_INT64)
+ .setLabel(Label.LABEL_OPTIONAL))
+ .addField(
+ FieldDescriptorProto.newBuilder()
+ .setName("picoseconds")
+ .setNumber(2)
+ .setType(Type.TYPE_INT64)
+ .setLabel(Label.LABEL_OPTIONAL))
+ .build();
+
+ private static final Descriptor TIMESTAMP_PICOS_DESCRIPTOR;
+
+ static {
+ try {
+ TIMESTAMP_PICOS_DESCRIPTOR =
+ TableRowToStorageApiProto.wrapDescriptorProto(TIMESTAMP_PICOS_PROTO);
+ } catch (DescriptorValidationException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
private static final List<Object> REPEATED_BYTES =
ImmutableList.of(
BaseEncoding.base64().encode("hello".getBytes(StandardCharsets.UTF_8)),
@@ -1183,7 +1251,8 @@ public class TableRowToStorageApiProtoTest {
new TableCell().setV("1970-01-01 00:00:00.1230"),
new TableCell().setV("2019-08-16 00:52:07.123456"),
new TableCell().setV("9999-12-31 23:59:59.999999Z"),
- new TableCell().setV("madeit")));
+ new TableCell().setV("madeit"),
+ new TableCell().setV("2024-01-15T10:30:45.123456789012Z")));
private static final TableRow BASE_TABLE_ROW_NO_F =
new TableRow()
@@ -1217,7 +1286,8 @@ public class TableRowToStorageApiProtoTest {
.set("timestampvaluespacetrailingzero", "1970-01-01 00:00:00.1230")
.set("datetimevaluespace", "2019-08-16 00:52:07.123456")
.set("timestampvaluemaximum", "9999-12-31 23:59:59.999999Z")
- .set("123_illegalprotofieldname", "madeit");
+ .set("123_illegalprotofieldname", "madeit")
+ .set("timestamppicosvalue", "2024-01-15T10:30:45.123456789012Z");
private static final Map<String, Object> BASE_ROW_EXPECTED_PROTO_VALUES =
ImmutableMap.<String, Object>builder()
@@ -1261,6 +1331,15 @@ public class TableRowToStorageApiProtoTest {
.put(
BigQuerySchemaUtil.generatePlaceholderFieldName("123_illegalprotofieldname"),
"madeit")
+ .put(
+ "timestamppicosvalue",
+ DynamicMessage.newBuilder(TIMESTAMP_PICOS_DESCRIPTOR)
+ .setField(
+ TIMESTAMP_PICOS_DESCRIPTOR.findFieldByName("seconds"),
+ Instant.parse("2024-01-15T10:30:45Z").getEpochSecond())
+ .setField(
+
TIMESTAMP_PICOS_DESCRIPTOR.findFieldByName("picoseconds"), 123456789012L)
+ .build())
.build();
private static final Map<String, String> BASE_ROW_EXPECTED_NAME_OVERRIDES =
@@ -1309,6 +1388,15 @@ public class TableRowToStorageApiProtoTest {
.put(
BigQuerySchemaUtil.generatePlaceholderFieldName("123_illegalprotofieldname"),
"madeit")
+ .put(
+ "timestamppicosvalue",
+ DynamicMessage.newBuilder(TIMESTAMP_PICOS_DESCRIPTOR)
+ .setField(
+ TIMESTAMP_PICOS_DESCRIPTOR.findFieldByName("seconds"),
+ Instant.parse("2024-01-15T10:30:45Z").getEpochSecond())
+ .setField(
+
TIMESTAMP_PICOS_DESCRIPTOR.findFieldByName("picoseconds"), 123456789012L)
+ .build())
.build();
private static final Map<String, String>
BASE_ROW_NO_F_EXPECTED_NAME_OVERRIDES =
@@ -1394,6 +1482,16 @@ public class TableRowToStorageApiProtoTest {
== com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.STRUCT) {
return normalizeTableRow((TableRow) value, schemaInformation,
outputUsingF);
} else {
+ if (schemaInformation.getType()
+ ==
com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.TIMESTAMP) {
+ // Handle picosecond timestamp (12-digit precision)
+ if (schemaInformation.getTimestampPrecision() == 12) {
+ // Already a string, return as-is.
+ if (value instanceof String) {
+ return value;
+ }
+ }
+ }
convertedValue =
TYPE_MAP_PROTO_CONVERTERS.get(schemaInformation.getType()).apply("", value);
switch (schemaInformation.getType()) {
case BOOL:
@@ -1461,8 +1559,42 @@ public class TableRowToStorageApiProtoTest {
entry ->
entry.getKey().getOptions().getExtension(AnnotationsProto.columnName)));
- assertEquals(
- withF ? BASE_ROW_EXPECTED_PROTO_VALUES :
BASE_ROW_NO_F_EXPECTED_PROTO_VALUES, recordFields);
+ // Get expected values
+ Map<String, Object> expectedValues =
+ withF ? BASE_ROW_EXPECTED_PROTO_VALUES :
BASE_ROW_NO_F_EXPECTED_PROTO_VALUES;
+
+ // Handle timestamppicosvalue separately since DynamicMessage doesn't have
proper equals()
+ Object actualPicos = recordFields.get("timestamppicosvalue");
+ Object expectedPicos = expectedValues.get("timestamppicosvalue");
+
+ if (actualPicos != null && expectedPicos != null) {
+ // Compare DynamicMessages by their field values
+ DynamicMessage actualPicosMsg = (DynamicMessage) actualPicos;
+ DynamicMessage expectedPicosMsg = (DynamicMessage) expectedPicos;
+
+ Descriptor actualDescriptor = actualPicosMsg.getDescriptorForType();
+
+ assertEquals(
+ "TimestampPicos seconds mismatch",
+ expectedPicosMsg.getField(
+
expectedPicosMsg.getDescriptorForType().findFieldByName("seconds")),
+
actualPicosMsg.getField(actualDescriptor.findFieldByName("seconds")));
+ assertEquals(
+ "TimestampPicos picoseconds mismatch",
+ expectedPicosMsg.getField(
+
expectedPicosMsg.getDescriptorForType().findFieldByName("picoseconds")),
+
actualPicosMsg.getField(actualDescriptor.findFieldByName("picoseconds")));
+ }
+
+ // Remove timestamppicosvalue from both maps for remaining comparison
+ Map<String, Object> recordFieldsWithoutPicos = new HashMap<>(recordFields);
+ Map<String, Object> expectedValuesWithoutPicos = new
HashMap<>(expectedValues);
+ recordFieldsWithoutPicos.remove("timestamppicosvalue");
+ expectedValuesWithoutPicos.remove("timestamppicosvalue");
+
+ // Compare remaining fields
+ assertEquals(expectedValuesWithoutPicos, recordFieldsWithoutPicos);
+
assertEquals(
withF ? BASE_ROW_EXPECTED_NAME_OVERRIDES :
BASE_ROW_NO_F_EXPECTED_NAME_OVERRIDES,
overriddenNames);
@@ -1484,6 +1616,7 @@ public class TableRowToStorageApiProtoTest {
DynamicMessage msg =
TableRowToStorageApiProto.messageFromTableRow(
schemaInformation, descriptor, tableRow, false, false, null, null,
-1);
+
assertEquals(4, msg.getAllFields().size());
Map<String, FieldDescriptor> fieldDescriptors =
@@ -1511,6 +1644,7 @@ public class TableRowToStorageApiProtoTest {
DynamicMessage msg =
TableRowToStorageApiProto.messageFromTableRow(
schemaInformation, descriptor, tableRow, false, false, null, null,
-1);
+
TableRow recovered =
TableRowToStorageApiProto.tableRowFromMessage(
schemaInformation, msg, true, Predicates.alwaysTrue());