This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 203f45cee86 Support beam:logical_type:micros_instant:v1 in SpannerIo.
(#36840)
203f45cee86 is described below
commit 203f45cee86be7f1ebaac632c961394641be4e3b
Author: claudevdm <[email protected]>
AuthorDate: Tue Nov 25 15:02:02 2025 -0500
Support beam:logical_type:micros_instant:v1 in SpannerIo. (#36840)
* Support micros isntant in spannerio.
* Trigger tests.
* Comments.
* Fix test.
---
.../beam_PostCommit_Python_Xlang_Gcp_Direct.json | 2 +-
.../beam/sdk/io/gcp/spanner/MutationUtils.java | 59 +++++++++-
.../beam/sdk/io/gcp/spanner/StructUtils.java | 37 +++++-
.../beam/sdk/io/gcp/spanner/MutationUtilsTest.java | 33 ++++++
.../beam/sdk/io/gcp/spanner/StructUtilsTest.java | 39 +++++++
.../io/gcp/tests/xlang_spannerio_it_test.py | 129 +++++++++++++++------
6 files changed, 261 insertions(+), 38 deletions(-)
diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
index e3d6056a5de..99a8fc8ff6d 100644
--- a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
+++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to
run",
- "modification": 1
+ "modification": 14
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationUtils.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationUtils.java
index dcdbdb44c00..2cc32c44a62 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationUtils.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationUtils.java
@@ -28,11 +28,13 @@ import com.google.cloud.spanner.Key;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.Value;
import java.math.BigDecimal;
+import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.StreamSupport;
import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.logicaltypes.MicrosInstant;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.Row;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
@@ -102,6 +104,11 @@ final class MutationUtils {
return mutationBuilder.build();
}
+ private static Timestamp toSpannerTimestamp(Instant instant) {
+ long micros = instant.getEpochSecond() * 1_000_000L + instant.getNano() /
1_000L;
+ return Timestamp.ofTimeMicroseconds(micros);
+ }
+
private static void setBeamValueToKey(
Key.Builder keyBuilder, Schema.FieldType field, String columnName, Row
row) {
switch (field.getTypeName()) {
@@ -147,6 +154,21 @@ final class MutationUtils {
keyBuilder.append(row.getDecimal(columnName));
break;
// TODO: Implement logical date and datetime
+ case LOGICAL_TYPE:
+ Schema.LogicalType<?, ?> logicalType =
checkNotNull(field.getLogicalType());
+ String identifier = logicalType.getIdentifier();
+ if (identifier.equals(MicrosInstant.IDENTIFIER)) {
+ Instant instant = row.getValue(columnName);
+ if (instant == null) {
+ keyBuilder.append((Timestamp) null);
+ } else {
+ keyBuilder.append(toSpannerTimestamp(instant));
+ }
+ } else {
+ throw new IllegalArgumentException(
+ String.format("Unsupported logical type in key: %s",
identifier));
+ }
+ break;
case DATETIME:
@Nullable ReadableDateTime dateTime = row.getDateTime(columnName);
if (dateTime == null) {
@@ -224,7 +246,21 @@ final class MutationUtils {
mutationBuilder.set(columnName).to(decimal);
}
break;
- // TODO: Implement logical date and datetime
+ case LOGICAL_TYPE:
+ Schema.LogicalType<?, ?> logicalType =
checkNotNull(fieldType.getLogicalType());
+ String identifier = logicalType.getIdentifier();
+ if (identifier.equals(MicrosInstant.IDENTIFIER)) {
+ @Nullable Instant instant = row.getValue(columnName);
+ if (instant == null) {
+ mutationBuilder.set(columnName).to((Timestamp) null);
+ } else {
+ mutationBuilder.set(columnName).to(toSpannerTimestamp(instant));
+ }
+ } else {
+ throw new IllegalArgumentException(
+ String.format("Unsupported logical type: %s", identifier));
+ }
+ break;
case DATETIME:
@Nullable ReadableDateTime dateTime = row.getDateTime(columnName);
if (dateTime == null) {
@@ -335,6 +371,27 @@ final class MutationUtils {
case STRING:
mutationBuilder.set(column).toStringArray((Iterable<String>) ((Object)
iterable));
break;
+ case LOGICAL_TYPE:
+ String identifier =
checkNotNull(beamIterableType.getLogicalType()).getIdentifier();
+ if (identifier.equals(MicrosInstant.IDENTIFIER)) {
+ if (iterable == null) {
+ mutationBuilder.set(column).toTimestampArray(null);
+ } else {
+ mutationBuilder
+ .set(column)
+ .toTimestampArray(
+ StreamSupport.stream(iterable.spliterator(), false)
+ .map(
+ instant -> {
+ return toSpannerTimestamp((java.time.Instant)
instant);
+ })
+ .collect(toList()));
+ }
+ } else {
+ throw new IllegalArgumentException(
+ String.format("Unsupported logical type in iterable: %s",
identifier));
+ }
+ break;
case DATETIME:
if (iterable == null) {
mutationBuilder.set(column).toDateArray(null);
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/StructUtils.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/StructUtils.java
index 51eda7d16eb..ac8f4becbd0 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/StructUtils.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/StructUtils.java
@@ -31,6 +31,7 @@ import java.util.List;
import java.util.Map;
import java.util.stream.StreamSupport;
import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.logicaltypes.MicrosInstant;
import org.apache.beam.sdk.values.Row;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.DateTime;
@@ -352,6 +353,11 @@ final class StructUtils {
}
}
+ private static java.time.Instant fromSpannerTimestamp(Timestamp
spannerTimestamp) {
+ long micros = spannerTimestamp.getSeconds() * 1_000_000L +
spannerTimestamp.getNanos() / 1_000L;
+ return java.time.Instant.ofEpochSecond(micros / 1_000_000L, (micros %
1_000_000L) * 1_000L);
+ }
+
private static @Nullable Object getStructValue(Struct struct, Schema.Field
field) {
String column = field.getName();
Type.Code typeCode = struct.getColumnType(column).getCode();
@@ -365,7 +371,19 @@ final class StructUtils {
return struct.getBytes(column).toByteArray();
// TODO: implement logical datetime
case TIMESTAMP:
- return
Instant.ofEpochSecond(struct.getTimestamp(column).getSeconds()).toDateTime();
+ Timestamp spannerTimestamp = struct.getTimestamp(column);
+
+ // Check if the Beam schema expects MicrosInstant logical type
+ Schema.FieldType fieldType = field.getType();
+ if (fieldType.getTypeName().isLogicalType()) {
+ Schema.@Nullable LogicalType<?, ?> logicalType =
fieldType.getLogicalType();
+ if (logicalType != null &&
logicalType.getIdentifier().equals(MicrosInstant.IDENTIFIER)) {
+ return fromSpannerTimestamp(spannerTimestamp);
+ }
+ }
+ // Default DATETIME behavior: convert to Joda DateTime
+ return
Instant.ofEpochSecond(spannerTimestamp.getSeconds()).toDateTime();
+
// TODO: implement logical date
case DATE:
return DateTime.parse(struct.getDate(column).toString());
@@ -407,11 +425,26 @@ final class StructUtils {
return struct.getBooleanList(column);
case BYTES:
return struct.getBytesList(column);
- // TODO: implement logical datetime
case TIMESTAMP:
+ // Check if expects MicrosInstant in arrays
+ Schema.@Nullable FieldType elementType =
field.getType().getCollectionElementType();
+ if (elementType != null && elementType.getTypeName().isLogicalType()) {
+ Schema.@Nullable LogicalType<?, ?> logicalType =
elementType.getLogicalType();
+ if (logicalType != null &&
logicalType.getIdentifier().equals(MicrosInstant.IDENTIFIER)) {
+ // Return List<java.time.Instant> for MicrosInstant arrays
+ return struct.getTimestampList(column).stream()
+ .map(
+ timestamp -> {
+ return fromSpannerTimestamp(timestamp);
+ })
+ .collect(toList());
+ }
+ }
+ // Default: return List<DateTime> for DATETIME type
return struct.getTimestampList(column).stream()
.map(timestamp ->
Instant.ofEpochSecond(timestamp.getSeconds()).toDateTime())
.collect(toList());
+
// TODO: implement logical date
case DATE:
return struct.getDateList(column).stream()
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationUtilsTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationUtilsTest.java
index 6a0a1787dec..c68c2d3a021 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationUtilsTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationUtilsTest.java
@@ -28,8 +28,10 @@ import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.Struct;
import com.google.cloud.spanner.Type;
import java.math.BigDecimal;
+import java.time.Instant;
import java.util.List;
import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.logicaltypes.MicrosInstant;
import org.apache.beam.sdk.values.Row;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.joda.time.DateTime;
@@ -44,6 +46,7 @@ public class MutationUtilsTest {
private static final Struct EMPTY_STRUCT = Struct.newBuilder().build();
private static final Struct INT64_STRUCT =
Struct.newBuilder().set("int64").to(3L).build();
private static final String TABLE = "some_table";
+ private static final Instant TEST_INSTANT =
Instant.parse("2024-01-15T10:30:00.123456Z");
private static final Schema WRITE_ROW_SCHEMA =
Schema.builder()
@@ -71,6 +74,10 @@ public class MutationUtilsTest {
.addNullableField("f_decimal", Schema.FieldType.DECIMAL)
.addNullableField("f_byte", Schema.FieldType.BYTE)
.addNullableField("f_iterable",
Schema.FieldType.iterable(Schema.FieldType.INT64))
+ .addNullableField("f_micros_instant",
Schema.FieldType.logicalType(new MicrosInstant()))
+ .addNullableField(
+ "f_micros_instant_array",
+ Schema.FieldType.array(Schema.FieldType.logicalType(new
MicrosInstant())))
.build();
private static final Row WRITE_ROW =
@@ -107,6 +114,8 @@ public class MutationUtilsTest {
.withFieldValue("f_decimal", BigDecimal.valueOf(Long.MIN_VALUE))
.withFieldValue("f_byte", Byte.parseByte("127"))
.withFieldValue("f_iterable", ImmutableList.of(2L, 3L))
+ .withFieldValue("f_micros_instant", TEST_INSTANT)
+ .withFieldValue("f_micros_instant_array",
ImmutableList.of(TEST_INSTANT, TEST_INSTANT))
.build();
private static final Schema WRITE_ROW_SCHEMA_NULLS =
@@ -123,6 +132,10 @@ public class MutationUtilsTest {
.addNullableField("f_array",
Schema.FieldType.array(Schema.FieldType.INT64))
.addNullableField(
"f_struct_array",
Schema.FieldType.array(Schema.FieldType.row(INT64_SCHEMA)))
+ .addNullableField("f_micros_instant",
Schema.FieldType.logicalType(new MicrosInstant()))
+ .addNullableField(
+ "f_micros_instant_array",
+ Schema.FieldType.array(Schema.FieldType.logicalType(new
MicrosInstant())))
.build();
private static final Row WRITE_ROW_NULLS =
@@ -138,6 +151,8 @@ public class MutationUtilsTest {
.addValue(null)
.addValue(null)
.addValue(null)
+ .addValue(null)
+ .addValue(null)
.build();
private static final Schema KEY_SCHEMA =
@@ -153,6 +168,7 @@ public class MutationUtilsTest {
.addNullableField("f_int32", Schema.FieldType.INT32)
.addNullableField("f_decimal", Schema.FieldType.DECIMAL)
.addNullableField("f_byte", Schema.FieldType.BYTE)
+ .addNullableField("f_micros_instant",
Schema.FieldType.logicalType(new MicrosInstant()))
.build();
private static final Row KEY_ROW =
@@ -168,6 +184,7 @@ public class MutationUtilsTest {
.withFieldValue("f_int32", 0x7fffffff)
.withFieldValue("f_decimal", BigDecimal.valueOf(Long.MIN_VALUE))
.withFieldValue("f_byte", Byte.parseByte("127"))
+ .withFieldValue("f_micros_instant", TEST_INSTANT)
.build();
private static final Schema KEY_SCHEMA_NULLS =
@@ -178,6 +195,7 @@ public class MutationUtilsTest {
.addNullableField("f_bytes", Schema.FieldType.BYTES)
.addNullableField("f_date_time", Schema.FieldType.DATETIME)
.addNullableField("f_bool", Schema.FieldType.BOOLEAN)
+ .addNullableField("f_micros_instant",
Schema.FieldType.logicalType(new MicrosInstant()))
.build();
private static final Row KEY_ROW_NULLS =
@@ -188,6 +206,7 @@ public class MutationUtilsTest {
.addValue(null)
.addValue(null)
.addValue(null)
+ .addValue(null)
.build();
@Test
@@ -264,6 +283,7 @@ public class MutationUtilsTest {
}
private static Mutation createDeleteMutation() {
+ long micros = TEST_INSTANT.getEpochSecond() * 1_000_000L +
TEST_INSTANT.getNano() / 1_000L;
Key key =
Key.newBuilder()
.append(1L)
@@ -277,6 +297,7 @@ public class MutationUtilsTest {
.append(0x7fffffff)
.append(BigDecimal.valueOf(Long.MIN_VALUE))
.append(Byte.parseByte("127"))
+ .append(Timestamp.ofTimeMicroseconds(micros))
.build();
return Mutation.delete(TABLE, key);
}
@@ -290,12 +311,14 @@ public class MutationUtilsTest {
.append((ByteArray) null)
.append((Timestamp) null)
.append((Boolean) null)
+ .append((Timestamp) null)
.build();
return Mutation.delete(TABLE, key);
}
private static Mutation createMutation(Mutation.Op operation) {
Mutation.WriteBuilder builder = chooseBuilder(operation);
+ long micros = TEST_INSTANT.getEpochSecond() * 1_000_000L +
TEST_INSTANT.getNano() / 1_000L;
return builder
.set("f_int64")
.to(1L)
@@ -353,6 +376,12 @@ public class MutationUtilsTest {
.to(Byte.parseByte("127"))
.set("f_iterable")
.toInt64Array(ImmutableList.of(2L, 3L))
+ .set("f_micros_instant")
+ .to(Timestamp.ofTimeMicroseconds(micros))
+ .set("f_micros_instant_array")
+ .toTimestampArray(
+ ImmutableList.of(
+ Timestamp.ofTimeMicroseconds(micros),
Timestamp.ofTimeMicroseconds(micros)))
.build();
}
@@ -381,6 +410,10 @@ public class MutationUtilsTest {
.toInt64Array((List<Long>) null)
.set("f_struct_array")
.toStructArray(Type.struct(Type.StructField.of("int64",
Type.int64())), null)
+ .set("f_micros_instant")
+ .to((Timestamp) null)
+ .set("f_micros_instant_array")
+ .toTimestampArray(null)
.build();
}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/StructUtilsTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/StructUtilsTest.java
index 1cdf9afa7de..9a378b01518 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/StructUtilsTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/StructUtilsTest.java
@@ -33,8 +33,10 @@ import com.google.cloud.spanner.Type;
import com.google.spanner.v1.StructType;
import com.google.spanner.v1.TypeCode;
import java.math.BigDecimal;
+import java.time.Instant;
import java.util.List;
import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.logicaltypes.MicrosInstant;
import org.apache.beam.sdk.values.Row;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
@@ -45,6 +47,10 @@ import org.junit.Test;
public class StructUtilsTest {
private static final Schema EMPTY_SCHEMA = Schema.builder().build();
private static final Schema INT64_SCHEMA =
Schema.builder().addInt64Field("int64").build();
+ private static final Timestamp TIMESTAMP =
Timestamp.ofTimeMicroseconds(1234567890123456L);
+ private static final Instant INSTANT =
+ Instant.ofEpochSecond(
+ 1234567890123456L / 1_000_000L, (1234567890123456L % 1_000_000L) *
1_000L);
@Test
public void testStructToBeamRow() {
@@ -286,6 +292,39 @@ public class StructUtilsTest {
"Error processing struct to row: Unsupported type 'STRUCT'.",
exception.getMessage());
}
+ @Test
+ public void testStructToBeamRowWithMicrosInstant() {
+ Schema schema =
+ Schema.builder()
+ .addInt64Field("f_int64")
+ .addNullableField("f_micros_instant",
Schema.FieldType.logicalType(new MicrosInstant()))
+ .addNullableField(
+ "f_micros_instant_array",
+ Schema.FieldType.array(Schema.FieldType.logicalType(new
MicrosInstant())))
+ .build();
+
+ Struct struct =
+ Struct.newBuilder()
+ .set("f_int64")
+ .to(42L)
+ .set("f_micros_instant")
+ .to(TIMESTAMP)
+ .set("f_micros_instant_array")
+ .toTimestampArray(ImmutableList.of(TIMESTAMP, TIMESTAMP))
+ .build();
+
+ Row result = StructUtils.structToBeamRow(struct, schema);
+
+ assertEquals(42L, result.getInt64("f_int64").longValue());
+
+ assertEquals(INSTANT, result.getValue("f_micros_instant"));
+
+ @SuppressWarnings("unchecked")
+ List<Instant> instants = (List<Instant>)
result.getValue("f_micros_instant_array");
+ assertEquals(2, instants.size());
+ assertEquals(INSTANT, instants.get(0));
+ }
+
private StructType.Field getFieldForTypeCode(String name, TypeCode typeCode)
{
return StructType.Field.newBuilder()
.setName(name)
diff --git a/sdks/python/apache_beam/io/gcp/tests/xlang_spannerio_it_test.py
b/sdks/python/apache_beam/io/gcp/tests/xlang_spannerio_it_test.py
index 43a74f17053..b5d5304245c 100644
--- a/sdks/python/apache_beam/io/gcp/tests/xlang_spannerio_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/tests/xlang_spannerio_it_test.py
@@ -26,6 +26,8 @@ import uuid
from typing import NamedTuple
from typing import Optional
+import pytest
+
import apache_beam as beam
from apache_beam import coders
from apache_beam.io.gcp.spanner import ReadFromSpanner
@@ -37,6 +39,7 @@ from apache_beam.io.gcp.spanner import SpannerUpdate
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
+from apache_beam.utils.timestamp import Timestamp
# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
try:
@@ -50,6 +53,8 @@ except ImportError:
DockerContainer = None
# pylint: enable=wrong-import-order, wrong-import-position, ungrouped-imports
+TIMESTAMPS = [Timestamp.of(1234567890.0 + i) for i in range(1000)]
+
class SpannerTestKey(NamedTuple):
f_string: str
@@ -59,13 +64,20 @@ class SpannerTestRow(NamedTuple):
f_string: str
f_int64: Optional[int]
f_boolean: Optional[bool]
+ f_timestamp: Optional[Timestamp]
class SpannerPartTestRow(NamedTuple):
f_string: str
f_int64: Optional[int]
+ f_timestamp: Optional[Timestamp]
[email protected]_gcp_java_expansion_service
[email protected](
+ os.environ.get('EXPANSION_JARS'),
+ "EXPANSION_JARS environment var is not provided, "
+ "indicating that jars have not been built")
@unittest.skipIf(spanner is None, 'GCP dependencies are not installed.')
@unittest.skipIf(
DockerContainer is None, 'testcontainers package is not installed.')
@@ -118,76 +130,112 @@ class CrossLanguageSpannerIOTest(unittest.TestCase):
def test_spanner_insert_or_update(self):
self.spanner_helper.insert_values(
- self.database_id, [('or_update0', 5, False), ('or_update1', 9, False)])
+ self.database_id,
+ [('or_update0', 5, False, TIMESTAMPS[1].to_rfc3339()),
+ ('or_update1', 9, False, TIMESTAMPS[0].to_rfc3339())])
def to_row_fn(i):
return SpannerTestRow(
- f_int64=i, f_string=f'or_update{i}', f_boolean=i % 2 == 0)
+ f_int64=i,
+ f_string=f'or_update{i}',
+ f_boolean=i % 2 == 0,
+ f_timestamp=TIMESTAMPS[i])
self.run_write_pipeline(3, to_row_fn, SpannerTestRow,
SpannerInsertOrUpdate)
- self.assertEqual(
- self.spanner_helper.read_data(self.database_id, prefix='or_update'),
- [[f'or_update{i}', i, i % 2 == 0] for i in range(3)])
+ results = self.spanner_helper.read_data(
+ self.database_id, prefix='or_update')
+ self.assertEqual(len(results), 3)
+ for i, row in enumerate(results):
+ self.assertEqual(row[0], f'or_update{i}')
+ self.assertEqual(row[1], i)
+ self.assertEqual(row[2], i % 2 == 0)
+ self.assertEqual(row[3].timestamp_pb(), TIMESTAMPS[i].to_proto())
def test_spanner_insert(self):
def to_row_fn(num):
return SpannerTestRow(
- f_string=f'insert{num}', f_int64=num, f_boolean=None)
+ f_string=f'insert{num}',
+ f_int64=num,
+ f_boolean=None,
+ f_timestamp=TIMESTAMPS[num])
self.run_write_pipeline(1000, to_row_fn, SpannerTestRow, SpannerInsert)
def compare_row(row):
return row[1]
- self.assertEqual(
- sorted(
- self.spanner_helper.read_data(self.database_id, 'insert'),
- key=compare_row), [[f'insert{i}', i, None] for i in range(1000)])
+ results = sorted(
+ self.spanner_helper.read_data(self.database_id, 'insert'),
+ key=compare_row)
+
+ self.assertEqual(len(results), 1000)
+ for i, row in enumerate(results):
+ self.assertEqual(row[0], f'insert{i}')
+ self.assertEqual(row[1], i)
+ self.assertIsNone(row[2])
+ self.assertEqual(row[3].timestamp_pb(), TIMESTAMPS[i].to_proto())
def test_spanner_replace(self):
self.spanner_helper.insert_values(
- self.database_id, [('replace0', 0, True), ('replace1', 1, False)])
+ self.database_id,
+ [('replace0', 0, True, TIMESTAMPS[10].to_rfc3339()),
+ ('replace1', 1, False, TIMESTAMPS[11].to_rfc3339())])
def to_row_fn(num):
- return SpannerPartTestRow(f_string=f'replace{num}', f_int64=num + 10)
+ return SpannerPartTestRow(
+ f_string=f'replace{num}',
+ f_int64=num + 10,
+ f_timestamp=TIMESTAMPS[num])
self.run_write_pipeline(2, to_row_fn, SpannerPartTestRow, SpannerReplace)
-
+ results = self.spanner_helper.read_data(self.database_id, prefix='replace')
+ for i in range(len(results)):
+ results[i][3] = results[i][3].timestamp_pb()
self.assertEqual(
- self.spanner_helper.read_data(self.database_id, prefix='replace'),
- [['replace0', 10, None], ['replace1', 11, None]])
+ results,
+ [['replace0', 10, None, TIMESTAMPS[0].to_proto()],
+ ['replace1', 11, None, TIMESTAMPS[1].to_proto()]])
def test_spanner_update(self):
self.spanner_helper.insert_values(
- self.database_id, [('update0', 5, False), ('update1', 9, False)])
+ self.database_id,
+ [('update0', 5, False, TIMESTAMPS[10].to_rfc3339()),
+ ('update1', 9, False, TIMESTAMPS[100].to_rfc3339())])
def to_row_fn(num):
- return SpannerPartTestRow(f_string=f'update{num}', f_int64=num + 10)
+ return SpannerPartTestRow(
+ f_string=f'update{num}',
+ f_int64=num + 10,
+ f_timestamp=TIMESTAMPS[num])
self.run_write_pipeline(2, to_row_fn, SpannerPartTestRow, SpannerUpdate)
-
+ results = self.spanner_helper.read_data(self.database_id, 'update')
+ for i in range(len(results)):
+ results[i][3] = results[i][3].timestamp_pb()
self.assertEqual(
- self.spanner_helper.read_data(self.database_id, 'update'),
- [['update0', 10, False], ['update1', 11, False]])
+ results,
+ [['update0', 10, False, TIMESTAMPS[0].to_proto()],
+ ['update1', 11, False, TIMESTAMPS[1].to_proto()]])
def test_spanner_delete(self):
self.spanner_helper.insert_values(
self.database_id,
values=[
- ('delete0', 0, None),
- ('delete6', 6, False),
- ('delete20', 20, True),
+ ('delete0', 0, None, TIMESTAMPS[0].to_rfc3339()),
+ ('delete6', 6, False, TIMESTAMPS[0].to_rfc3339()),
+ ('delete20', 20, True, TIMESTAMPS[0].to_rfc3339()),
])
def to_row_fn(num):
return SpannerTestKey(f_string=f'delete{num}')
self.run_write_pipeline(10, to_row_fn, SpannerTestKey, SpannerDelete)
-
+ results = self.spanner_helper.read_data(self.database_id, prefix='delete')
+ for i in range(len(results)):
+ results[i][3] = results[i][3].timestamp_pb()
self.assertEqual(
- self.spanner_helper.read_data(self.database_id, prefix='delete'),
- [['delete20', 20, True]])
+ results, [['delete20', 20, True, TIMESTAMPS[0].to_proto()]])
def test_spanner_read_query(self):
self.insert_read_values('query_read')
@@ -215,9 +263,21 @@ class CrossLanguageSpannerIOTest(unittest.TestCase):
assert_that(
result,
equal_to([
- SpannerTestRow(f_int64=0, f_string=f'{prefix}0', f_boolean=None),
- SpannerTestRow(f_int64=1, f_string=f'{prefix}1', f_boolean=True),
- SpannerTestRow(f_int64=2, f_string=f'{prefix}2',
f_boolean=False),
+ SpannerTestRow(
+ f_int64=0,
+ f_string=f'{prefix}0',
+ f_boolean=None,
+ f_timestamp=TIMESTAMPS[0]),
+ SpannerTestRow(
+ f_int64=1,
+ f_string=f'{prefix}1',
+ f_boolean=True,
+ f_timestamp=TIMESTAMPS[1]),
+ SpannerTestRow(
+ f_int64=2,
+ f_string=f'{prefix}2',
+ f_boolean=False,
+ f_timestamp=TIMESTAMPS[2]),
]))
def run_write_pipeline(
@@ -242,9 +302,9 @@ class CrossLanguageSpannerIOTest(unittest.TestCase):
self.spanner_helper.insert_values(
self.database_id,
values=[
- (f'{prefix}0', 0, None),
- (f'{prefix}1', 1, True),
- (f'{prefix}2', 2, False),
+ (f'{prefix}0', 0, None, TIMESTAMPS[0].to_rfc3339()),
+ (f'{prefix}1', 1, True, TIMESTAMPS[1].to_rfc3339()),
+ (f'{prefix}2', 2, False, TIMESTAMPS[2].to_rfc3339()),
])
@@ -288,14 +348,15 @@ class SpannerHelper(object):
CREATE TABLE {self.table} (
f_string STRING(1024) NOT NULL,
f_int64 INT64,
- f_boolean BOOL
+ f_boolean BOOL,
+ f_timestamp TIMESTAMP
) PRIMARY KEY (f_string)'''
])
database.create().result(120)
def insert_values(self, database_id, values, columns=None):
values = values or []
- columns = columns or ('f_string', 'f_int64', 'f_boolean')
+ columns = columns or ('f_string', 'f_int64', 'f_boolean', 'f_timestamp')
with self.instance.database(database_id).batch() as batch:
batch.insert(
table=self.table,