This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 203f45cee86 Support beam:logical_type:micros_instant:v1 in SpannerIo. 
(#36840)
203f45cee86 is described below

commit 203f45cee86be7f1ebaac632c961394641be4e3b
Author: claudevdm <[email protected]>
AuthorDate: Tue Nov 25 15:02:02 2025 -0500

    Support beam:logical_type:micros_instant:v1 in SpannerIo. (#36840)
    
    * Support micros isntant in spannerio.
    
    * Trigger tests.
    
    * Comments.
    
    * Fix test.
---
 .../beam_PostCommit_Python_Xlang_Gcp_Direct.json   |   2 +-
 .../beam/sdk/io/gcp/spanner/MutationUtils.java     |  59 +++++++++-
 .../beam/sdk/io/gcp/spanner/StructUtils.java       |  37 +++++-
 .../beam/sdk/io/gcp/spanner/MutationUtilsTest.java |  33 ++++++
 .../beam/sdk/io/gcp/spanner/StructUtilsTest.java   |  39 +++++++
 .../io/gcp/tests/xlang_spannerio_it_test.py        | 129 +++++++++++++++------
 6 files changed, 261 insertions(+), 38 deletions(-)

diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json 
b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
index e3d6056a5de..99a8fc8ff6d 100644
--- a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
+++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
@@ -1,4 +1,4 @@
 {
   "comment": "Modify this file in a trivial way to cause this test suite to 
run",
-  "modification": 1
+  "modification": 14
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationUtils.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationUtils.java
index dcdbdb44c00..2cc32c44a62 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationUtils.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationUtils.java
@@ -28,11 +28,13 @@ import com.google.cloud.spanner.Key;
 import com.google.cloud.spanner.Mutation;
 import com.google.cloud.spanner.Value;
 import java.math.BigDecimal;
+import java.time.Instant;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.StreamSupport;
 import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.logicaltypes.MicrosInstant;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.values.Row;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
@@ -102,6 +104,11 @@ final class MutationUtils {
     return mutationBuilder.build();
   }
 
+  private static Timestamp toSpannerTimestamp(Instant instant) {
+    long micros = instant.getEpochSecond() * 1_000_000L + instant.getNano() / 
1_000L;
+    return Timestamp.ofTimeMicroseconds(micros);
+  }
+
   private static void setBeamValueToKey(
       Key.Builder keyBuilder, Schema.FieldType field, String columnName, Row 
row) {
     switch (field.getTypeName()) {
@@ -147,6 +154,21 @@ final class MutationUtils {
         keyBuilder.append(row.getDecimal(columnName));
         break;
         // TODO: Implement logical date and datetime
+      case LOGICAL_TYPE:
+        Schema.LogicalType<?, ?> logicalType = 
checkNotNull(field.getLogicalType());
+        String identifier = logicalType.getIdentifier();
+        if (identifier.equals(MicrosInstant.IDENTIFIER)) {
+          Instant instant = row.getValue(columnName);
+          if (instant == null) {
+            keyBuilder.append((Timestamp) null);
+          } else {
+            keyBuilder.append(toSpannerTimestamp(instant));
+          }
+        } else {
+          throw new IllegalArgumentException(
+              String.format("Unsupported logical type in key: %s", 
identifier));
+        }
+        break;
       case DATETIME:
         @Nullable ReadableDateTime dateTime = row.getDateTime(columnName);
         if (dateTime == null) {
@@ -224,7 +246,21 @@ final class MutationUtils {
           mutationBuilder.set(columnName).to(decimal);
         }
         break;
-        // TODO: Implement logical date and datetime
+      case LOGICAL_TYPE:
+        Schema.LogicalType<?, ?> logicalType = 
checkNotNull(fieldType.getLogicalType());
+        String identifier = logicalType.getIdentifier();
+        if (identifier.equals(MicrosInstant.IDENTIFIER)) {
+          @Nullable Instant instant = row.getValue(columnName);
+          if (instant == null) {
+            mutationBuilder.set(columnName).to((Timestamp) null);
+          } else {
+            mutationBuilder.set(columnName).to(toSpannerTimestamp(instant));
+          }
+        } else {
+          throw new IllegalArgumentException(
+              String.format("Unsupported logical type: %s", identifier));
+        }
+        break;
       case DATETIME:
         @Nullable ReadableDateTime dateTime = row.getDateTime(columnName);
         if (dateTime == null) {
@@ -335,6 +371,27 @@ final class MutationUtils {
       case STRING:
         mutationBuilder.set(column).toStringArray((Iterable<String>) ((Object) 
iterable));
         break;
+      case LOGICAL_TYPE:
+        String identifier = 
checkNotNull(beamIterableType.getLogicalType()).getIdentifier();
+        if (identifier.equals(MicrosInstant.IDENTIFIER)) {
+          if (iterable == null) {
+            mutationBuilder.set(column).toTimestampArray(null);
+          } else {
+            mutationBuilder
+                .set(column)
+                .toTimestampArray(
+                    StreamSupport.stream(iterable.spliterator(), false)
+                        .map(
+                            instant -> {
+                              return toSpannerTimestamp((java.time.Instant) 
instant);
+                            })
+                        .collect(toList()));
+          }
+        } else {
+          throw new IllegalArgumentException(
+              String.format("Unsupported logical type in iterable: %s", 
identifier));
+        }
+        break;
       case DATETIME:
         if (iterable == null) {
           mutationBuilder.set(column).toDateArray(null);
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/StructUtils.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/StructUtils.java
index 51eda7d16eb..ac8f4becbd0 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/StructUtils.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/StructUtils.java
@@ -31,6 +31,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.StreamSupport;
 import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.logicaltypes.MicrosInstant;
 import org.apache.beam.sdk.values.Row;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.DateTime;
@@ -352,6 +353,11 @@ final class StructUtils {
     }
   }
 
+  private static java.time.Instant fromSpannerTimestamp(Timestamp 
spannerTimestamp) {
+    long micros = spannerTimestamp.getSeconds() * 1_000_000L + 
spannerTimestamp.getNanos() / 1_000L;
+    return java.time.Instant.ofEpochSecond(micros / 1_000_000L, (micros % 
1_000_000L) * 1_000L);
+  }
+
   private static @Nullable Object getStructValue(Struct struct, Schema.Field 
field) {
     String column = field.getName();
     Type.Code typeCode = struct.getColumnType(column).getCode();
@@ -365,7 +371,19 @@ final class StructUtils {
         return struct.getBytes(column).toByteArray();
         // TODO: implement logical datetime
       case TIMESTAMP:
-        return 
Instant.ofEpochSecond(struct.getTimestamp(column).getSeconds()).toDateTime();
+        Timestamp spannerTimestamp = struct.getTimestamp(column);
+
+        // Check if the Beam schema expects MicrosInstant logical type
+        Schema.FieldType fieldType = field.getType();
+        if (fieldType.getTypeName().isLogicalType()) {
+          Schema.@Nullable LogicalType<?, ?> logicalType = 
fieldType.getLogicalType();
+          if (logicalType != null && 
logicalType.getIdentifier().equals(MicrosInstant.IDENTIFIER)) {
+            return fromSpannerTimestamp(spannerTimestamp);
+          }
+        }
+        // Default DATETIME behavior: convert to Joda DateTime
+        return 
Instant.ofEpochSecond(spannerTimestamp.getSeconds()).toDateTime();
+
         // TODO: implement logical date
       case DATE:
         return DateTime.parse(struct.getDate(column).toString());
@@ -407,11 +425,26 @@ final class StructUtils {
         return struct.getBooleanList(column);
       case BYTES:
         return struct.getBytesList(column);
-        // TODO: implement logical datetime
       case TIMESTAMP:
+        // Check if expects MicrosInstant in arrays
+        Schema.@Nullable FieldType elementType = 
field.getType().getCollectionElementType();
+        if (elementType != null && elementType.getTypeName().isLogicalType()) {
+          Schema.@Nullable LogicalType<?, ?> logicalType = 
elementType.getLogicalType();
+          if (logicalType != null && 
logicalType.getIdentifier().equals(MicrosInstant.IDENTIFIER)) {
+            // Return List<java.time.Instant> for MicrosInstant arrays
+            return struct.getTimestampList(column).stream()
+                .map(
+                    timestamp -> {
+                      return fromSpannerTimestamp(timestamp);
+                    })
+                .collect(toList());
+          }
+        }
+        // Default: return List<DateTime> for DATETIME type
         return struct.getTimestampList(column).stream()
             .map(timestamp -> 
Instant.ofEpochSecond(timestamp.getSeconds()).toDateTime())
             .collect(toList());
+
         // TODO: implement logical date
       case DATE:
         return struct.getDateList(column).stream()
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationUtilsTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationUtilsTest.java
index 6a0a1787dec..c68c2d3a021 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationUtilsTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationUtilsTest.java
@@ -28,8 +28,10 @@ import com.google.cloud.spanner.Mutation;
 import com.google.cloud.spanner.Struct;
 import com.google.cloud.spanner.Type;
 import java.math.BigDecimal;
+import java.time.Instant;
 import java.util.List;
 import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.logicaltypes.MicrosInstant;
 import org.apache.beam.sdk.values.Row;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
 import org.joda.time.DateTime;
@@ -44,6 +46,7 @@ public class MutationUtilsTest {
   private static final Struct EMPTY_STRUCT = Struct.newBuilder().build();
   private static final Struct INT64_STRUCT = 
Struct.newBuilder().set("int64").to(3L).build();
   private static final String TABLE = "some_table";
+  private static final Instant TEST_INSTANT = 
Instant.parse("2024-01-15T10:30:00.123456Z");
 
   private static final Schema WRITE_ROW_SCHEMA =
       Schema.builder()
@@ -71,6 +74,10 @@ public class MutationUtilsTest {
           .addNullableField("f_decimal", Schema.FieldType.DECIMAL)
           .addNullableField("f_byte", Schema.FieldType.BYTE)
           .addNullableField("f_iterable", 
Schema.FieldType.iterable(Schema.FieldType.INT64))
+          .addNullableField("f_micros_instant", 
Schema.FieldType.logicalType(new MicrosInstant()))
+          .addNullableField(
+              "f_micros_instant_array",
+              Schema.FieldType.array(Schema.FieldType.logicalType(new 
MicrosInstant())))
           .build();
 
   private static final Row WRITE_ROW =
@@ -107,6 +114,8 @@ public class MutationUtilsTest {
           .withFieldValue("f_decimal", BigDecimal.valueOf(Long.MIN_VALUE))
           .withFieldValue("f_byte", Byte.parseByte("127"))
           .withFieldValue("f_iterable", ImmutableList.of(2L, 3L))
+          .withFieldValue("f_micros_instant", TEST_INSTANT)
+          .withFieldValue("f_micros_instant_array", 
ImmutableList.of(TEST_INSTANT, TEST_INSTANT))
           .build();
 
   private static final Schema WRITE_ROW_SCHEMA_NULLS =
@@ -123,6 +132,10 @@ public class MutationUtilsTest {
           .addNullableField("f_array", 
Schema.FieldType.array(Schema.FieldType.INT64))
           .addNullableField(
               "f_struct_array", 
Schema.FieldType.array(Schema.FieldType.row(INT64_SCHEMA)))
+          .addNullableField("f_micros_instant", 
Schema.FieldType.logicalType(new MicrosInstant()))
+          .addNullableField(
+              "f_micros_instant_array",
+              Schema.FieldType.array(Schema.FieldType.logicalType(new 
MicrosInstant())))
           .build();
 
   private static final Row WRITE_ROW_NULLS =
@@ -138,6 +151,8 @@ public class MutationUtilsTest {
           .addValue(null)
           .addValue(null)
           .addValue(null)
+          .addValue(null)
+          .addValue(null)
           .build();
 
   private static final Schema KEY_SCHEMA =
@@ -153,6 +168,7 @@ public class MutationUtilsTest {
           .addNullableField("f_int32", Schema.FieldType.INT32)
           .addNullableField("f_decimal", Schema.FieldType.DECIMAL)
           .addNullableField("f_byte", Schema.FieldType.BYTE)
+          .addNullableField("f_micros_instant", 
Schema.FieldType.logicalType(new MicrosInstant()))
           .build();
 
   private static final Row KEY_ROW =
@@ -168,6 +184,7 @@ public class MutationUtilsTest {
           .withFieldValue("f_int32", 0x7fffffff)
           .withFieldValue("f_decimal", BigDecimal.valueOf(Long.MIN_VALUE))
           .withFieldValue("f_byte", Byte.parseByte("127"))
+          .withFieldValue("f_micros_instant", TEST_INSTANT)
           .build();
 
   private static final Schema KEY_SCHEMA_NULLS =
@@ -178,6 +195,7 @@ public class MutationUtilsTest {
           .addNullableField("f_bytes", Schema.FieldType.BYTES)
           .addNullableField("f_date_time", Schema.FieldType.DATETIME)
           .addNullableField("f_bool", Schema.FieldType.BOOLEAN)
+          .addNullableField("f_micros_instant", 
Schema.FieldType.logicalType(new MicrosInstant()))
           .build();
 
   private static final Row KEY_ROW_NULLS =
@@ -188,6 +206,7 @@ public class MutationUtilsTest {
           .addValue(null)
           .addValue(null)
           .addValue(null)
+          .addValue(null)
           .build();
 
   @Test
@@ -264,6 +283,7 @@ public class MutationUtilsTest {
   }
 
   private static Mutation createDeleteMutation() {
+    long micros = TEST_INSTANT.getEpochSecond() * 1_000_000L + 
TEST_INSTANT.getNano() / 1_000L;
     Key key =
         Key.newBuilder()
             .append(1L)
@@ -277,6 +297,7 @@ public class MutationUtilsTest {
             .append(0x7fffffff)
             .append(BigDecimal.valueOf(Long.MIN_VALUE))
             .append(Byte.parseByte("127"))
+            .append(Timestamp.ofTimeMicroseconds(micros))
             .build();
     return Mutation.delete(TABLE, key);
   }
@@ -290,12 +311,14 @@ public class MutationUtilsTest {
             .append((ByteArray) null)
             .append((Timestamp) null)
             .append((Boolean) null)
+            .append((Timestamp) null)
             .build();
     return Mutation.delete(TABLE, key);
   }
 
   private static Mutation createMutation(Mutation.Op operation) {
     Mutation.WriteBuilder builder = chooseBuilder(operation);
+    long micros = TEST_INSTANT.getEpochSecond() * 1_000_000L + 
TEST_INSTANT.getNano() / 1_000L;
     return builder
         .set("f_int64")
         .to(1L)
@@ -353,6 +376,12 @@ public class MutationUtilsTest {
         .to(Byte.parseByte("127"))
         .set("f_iterable")
         .toInt64Array(ImmutableList.of(2L, 3L))
+        .set("f_micros_instant")
+        .to(Timestamp.ofTimeMicroseconds(micros))
+        .set("f_micros_instant_array")
+        .toTimestampArray(
+            ImmutableList.of(
+                Timestamp.ofTimeMicroseconds(micros), 
Timestamp.ofTimeMicroseconds(micros)))
         .build();
   }
 
@@ -381,6 +410,10 @@ public class MutationUtilsTest {
         .toInt64Array((List<Long>) null)
         .set("f_struct_array")
         .toStructArray(Type.struct(Type.StructField.of("int64", 
Type.int64())), null)
+        .set("f_micros_instant")
+        .to((Timestamp) null)
+        .set("f_micros_instant_array")
+        .toTimestampArray(null)
         .build();
   }
 
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/StructUtilsTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/StructUtilsTest.java
index 1cdf9afa7de..9a378b01518 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/StructUtilsTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/StructUtilsTest.java
@@ -33,8 +33,10 @@ import com.google.cloud.spanner.Type;
 import com.google.spanner.v1.StructType;
 import com.google.spanner.v1.TypeCode;
 import java.math.BigDecimal;
+import java.time.Instant;
 import java.util.List;
 import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.logicaltypes.MicrosInstant;
 import org.apache.beam.sdk.values.Row;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
@@ -45,6 +47,10 @@ import org.junit.Test;
 public class StructUtilsTest {
   private static final Schema EMPTY_SCHEMA = Schema.builder().build();
   private static final Schema INT64_SCHEMA = 
Schema.builder().addInt64Field("int64").build();
+  private static final Timestamp TIMESTAMP = 
Timestamp.ofTimeMicroseconds(1234567890123456L);
+  private static final Instant INSTANT =
+      Instant.ofEpochSecond(
+          1234567890123456L / 1_000_000L, (1234567890123456L % 1_000_000L) * 
1_000L);
 
   @Test
   public void testStructToBeamRow() {
@@ -286,6 +292,39 @@ public class StructUtilsTest {
         "Error processing struct to row: Unsupported type 'STRUCT'.", 
exception.getMessage());
   }
 
+  @Test
+  public void testStructToBeamRowWithMicrosInstant() {
+    Schema schema =
+        Schema.builder()
+            .addInt64Field("f_int64")
+            .addNullableField("f_micros_instant", 
Schema.FieldType.logicalType(new MicrosInstant()))
+            .addNullableField(
+                "f_micros_instant_array",
+                Schema.FieldType.array(Schema.FieldType.logicalType(new 
MicrosInstant())))
+            .build();
+
+    Struct struct =
+        Struct.newBuilder()
+            .set("f_int64")
+            .to(42L)
+            .set("f_micros_instant")
+            .to(TIMESTAMP)
+            .set("f_micros_instant_array")
+            .toTimestampArray(ImmutableList.of(TIMESTAMP, TIMESTAMP))
+            .build();
+
+    Row result = StructUtils.structToBeamRow(struct, schema);
+
+    assertEquals(42L, result.getInt64("f_int64").longValue());
+
+    assertEquals(INSTANT, result.getValue("f_micros_instant"));
+
+    @SuppressWarnings("unchecked")
+    List<Instant> instants = (List<Instant>) 
result.getValue("f_micros_instant_array");
+    assertEquals(2, instants.size());
+    assertEquals(INSTANT, instants.get(0));
+  }
+
   private StructType.Field getFieldForTypeCode(String name, TypeCode typeCode) 
{
     return StructType.Field.newBuilder()
         .setName(name)
diff --git a/sdks/python/apache_beam/io/gcp/tests/xlang_spannerio_it_test.py 
b/sdks/python/apache_beam/io/gcp/tests/xlang_spannerio_it_test.py
index 43a74f17053..b5d5304245c 100644
--- a/sdks/python/apache_beam/io/gcp/tests/xlang_spannerio_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/tests/xlang_spannerio_it_test.py
@@ -26,6 +26,8 @@ import uuid
 from typing import NamedTuple
 from typing import Optional
 
+import pytest
+
 import apache_beam as beam
 from apache_beam import coders
 from apache_beam.io.gcp.spanner import ReadFromSpanner
@@ -37,6 +39,7 @@ from apache_beam.io.gcp.spanner import SpannerUpdate
 from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.testing.util import assert_that
 from apache_beam.testing.util import equal_to
+from apache_beam.utils.timestamp import Timestamp
 
 # pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
 try:
@@ -50,6 +53,8 @@ except ImportError:
   DockerContainer = None
 # pylint: enable=wrong-import-order, wrong-import-position, ungrouped-imports
 
+TIMESTAMPS = [Timestamp.of(1234567890.0 + i) for i in range(1000)]
+
 
 class SpannerTestKey(NamedTuple):
   f_string: str
@@ -59,13 +64,20 @@ class SpannerTestRow(NamedTuple):
   f_string: str
   f_int64: Optional[int]
   f_boolean: Optional[bool]
+  f_timestamp: Optional[Timestamp]
 
 
 class SpannerPartTestRow(NamedTuple):
   f_string: str
   f_int64: Optional[int]
+  f_timestamp: Optional[Timestamp]
 
 
[email protected]_gcp_java_expansion_service
[email protected](
+    os.environ.get('EXPANSION_JARS'),
+    "EXPANSION_JARS environment var is not provided, "
+    "indicating that jars have not been built")
 @unittest.skipIf(spanner is None, 'GCP dependencies are not installed.')
 @unittest.skipIf(
     DockerContainer is None, 'testcontainers package is not installed.')
@@ -118,76 +130,112 @@ class CrossLanguageSpannerIOTest(unittest.TestCase):
 
   def test_spanner_insert_or_update(self):
     self.spanner_helper.insert_values(
-        self.database_id, [('or_update0', 5, False), ('or_update1', 9, False)])
+        self.database_id,
+        [('or_update0', 5, False, TIMESTAMPS[1].to_rfc3339()),
+         ('or_update1', 9, False, TIMESTAMPS[0].to_rfc3339())])
 
     def to_row_fn(i):
       return SpannerTestRow(
-          f_int64=i, f_string=f'or_update{i}', f_boolean=i % 2 == 0)
+          f_int64=i,
+          f_string=f'or_update{i}',
+          f_boolean=i % 2 == 0,
+          f_timestamp=TIMESTAMPS[i])
 
     self.run_write_pipeline(3, to_row_fn, SpannerTestRow, 
SpannerInsertOrUpdate)
 
-    self.assertEqual(
-        self.spanner_helper.read_data(self.database_id, prefix='or_update'),
-        [[f'or_update{i}', i, i % 2 == 0] for i in range(3)])
+    results = self.spanner_helper.read_data(
+        self.database_id, prefix='or_update')
+    self.assertEqual(len(results), 3)
+    for i, row in enumerate(results):
+      self.assertEqual(row[0], f'or_update{i}')
+      self.assertEqual(row[1], i)
+      self.assertEqual(row[2], i % 2 == 0)
+      self.assertEqual(row[3].timestamp_pb(), TIMESTAMPS[i].to_proto())
 
   def test_spanner_insert(self):
     def to_row_fn(num):
       return SpannerTestRow(
-          f_string=f'insert{num}', f_int64=num, f_boolean=None)
+          f_string=f'insert{num}',
+          f_int64=num,
+          f_boolean=None,
+          f_timestamp=TIMESTAMPS[num])
 
     self.run_write_pipeline(1000, to_row_fn, SpannerTestRow, SpannerInsert)
 
     def compare_row(row):
       return row[1]
 
-    self.assertEqual(
-        sorted(
-            self.spanner_helper.read_data(self.database_id, 'insert'),
-            key=compare_row), [[f'insert{i}', i, None] for i in range(1000)])
+    results = sorted(
+        self.spanner_helper.read_data(self.database_id, 'insert'),
+        key=compare_row)
+
+    self.assertEqual(len(results), 1000)
+    for i, row in enumerate(results):
+      self.assertEqual(row[0], f'insert{i}')
+      self.assertEqual(row[1], i)
+      self.assertIsNone(row[2])
+      self.assertEqual(row[3].timestamp_pb(), TIMESTAMPS[i].to_proto())
 
   def test_spanner_replace(self):
     self.spanner_helper.insert_values(
-        self.database_id, [('replace0', 0, True), ('replace1', 1, False)])
+        self.database_id,
+        [('replace0', 0, True, TIMESTAMPS[10].to_rfc3339()),
+         ('replace1', 1, False, TIMESTAMPS[11].to_rfc3339())])
 
     def to_row_fn(num):
-      return SpannerPartTestRow(f_string=f'replace{num}', f_int64=num + 10)
+      return SpannerPartTestRow(
+          f_string=f'replace{num}',
+          f_int64=num + 10,
+          f_timestamp=TIMESTAMPS[num])
 
     self.run_write_pipeline(2, to_row_fn, SpannerPartTestRow, SpannerReplace)
-
+    results = self.spanner_helper.read_data(self.database_id, prefix='replace')
+    for i in range(len(results)):
+      results[i][3] = results[i][3].timestamp_pb()
     self.assertEqual(
-        self.spanner_helper.read_data(self.database_id, prefix='replace'),
-        [['replace0', 10, None], ['replace1', 11, None]])
+        results,
+        [['replace0', 10, None, TIMESTAMPS[0].to_proto()],
+         ['replace1', 11, None, TIMESTAMPS[1].to_proto()]])
 
   def test_spanner_update(self):
     self.spanner_helper.insert_values(
-        self.database_id, [('update0', 5, False), ('update1', 9, False)])
+        self.database_id,
+        [('update0', 5, False, TIMESTAMPS[10].to_rfc3339()),
+         ('update1', 9, False, TIMESTAMPS[100].to_rfc3339())])
 
     def to_row_fn(num):
-      return SpannerPartTestRow(f_string=f'update{num}', f_int64=num + 10)
+      return SpannerPartTestRow(
+          f_string=f'update{num}',
+          f_int64=num + 10,
+          f_timestamp=TIMESTAMPS[num])
 
     self.run_write_pipeline(2, to_row_fn, SpannerPartTestRow, SpannerUpdate)
-
+    results = self.spanner_helper.read_data(self.database_id, 'update')
+    for i in range(len(results)):
+      results[i][3] = results[i][3].timestamp_pb()
     self.assertEqual(
-        self.spanner_helper.read_data(self.database_id, 'update'),
-        [['update0', 10, False], ['update1', 11, False]])
+        results,
+        [['update0', 10, False, TIMESTAMPS[0].to_proto()],
+         ['update1', 11, False, TIMESTAMPS[1].to_proto()]])
 
   def test_spanner_delete(self):
     self.spanner_helper.insert_values(
         self.database_id,
         values=[
-            ('delete0', 0, None),
-            ('delete6', 6, False),
-            ('delete20', 20, True),
+            ('delete0', 0, None, TIMESTAMPS[0].to_rfc3339()),
+            ('delete6', 6, False, TIMESTAMPS[0].to_rfc3339()),
+            ('delete20', 20, True, TIMESTAMPS[0].to_rfc3339()),
         ])
 
     def to_row_fn(num):
       return SpannerTestKey(f_string=f'delete{num}')
 
     self.run_write_pipeline(10, to_row_fn, SpannerTestKey, SpannerDelete)
-
+    results = self.spanner_helper.read_data(self.database_id, prefix='delete')
+    for i in range(len(results)):
+      results[i][3] = results[i][3].timestamp_pb()
     self.assertEqual(
-        self.spanner_helper.read_data(self.database_id, prefix='delete'),
-        [['delete20', 20, True]])
+        results, [['delete20', 20, True, TIMESTAMPS[0].to_proto()]])
 
   def test_spanner_read_query(self):
     self.insert_read_values('query_read')
@@ -215,9 +263,21 @@ class CrossLanguageSpannerIOTest(unittest.TestCase):
       assert_that(
           result,
           equal_to([
-              SpannerTestRow(f_int64=0, f_string=f'{prefix}0', f_boolean=None),
-              SpannerTestRow(f_int64=1, f_string=f'{prefix}1', f_boolean=True),
-              SpannerTestRow(f_int64=2, f_string=f'{prefix}2', 
f_boolean=False),
+              SpannerTestRow(
+                  f_int64=0,
+                  f_string=f'{prefix}0',
+                  f_boolean=None,
+                  f_timestamp=TIMESTAMPS[0]),
+              SpannerTestRow(
+                  f_int64=1,
+                  f_string=f'{prefix}1',
+                  f_boolean=True,
+                  f_timestamp=TIMESTAMPS[1]),
+              SpannerTestRow(
+                  f_int64=2,
+                  f_string=f'{prefix}2',
+                  f_boolean=False,
+                  f_timestamp=TIMESTAMPS[2]),
           ]))
 
   def run_write_pipeline(
@@ -242,9 +302,9 @@ class CrossLanguageSpannerIOTest(unittest.TestCase):
     self.spanner_helper.insert_values(
         self.database_id,
         values=[
-            (f'{prefix}0', 0, None),
-            (f'{prefix}1', 1, True),
-            (f'{prefix}2', 2, False),
+            (f'{prefix}0', 0, None, TIMESTAMPS[0].to_rfc3339()),
+            (f'{prefix}1', 1, True, TIMESTAMPS[1].to_rfc3339()),
+            (f'{prefix}2', 2, False, TIMESTAMPS[2].to_rfc3339()),
         ])
 
 
@@ -288,14 +348,15 @@ class SpannerHelper(object):
           CREATE TABLE {self.table} (
               f_string  STRING(1024) NOT NULL,
               f_int64   INT64,
-              f_boolean BOOL
+              f_boolean BOOL,
+              f_timestamp TIMESTAMP
           ) PRIMARY KEY (f_string)'''
         ])
     database.create().result(120)
 
   def insert_values(self, database_id, values, columns=None):
     values = values or []
-    columns = columns or ('f_string', 'f_int64', 'f_boolean')
+    columns = columns or ('f_string', 'f_int64', 'f_boolean', 'f_timestamp')
     with self.instance.database(database_id).batch() as batch:
       batch.insert(
           table=self.table,

Reply via email to