This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 801bf89e3da Parse proto bytes change stream record in
ChangeStreamRecordMapper (#37427)
801bf89e3da is described below
commit 801bf89e3da44a665fdabaf04164d63025928045
Author: chenxuesdu <[email protected]>
AuthorDate: Tue Feb 10 11:58:43 2026 -0800
Parse proto bytes change stream record in ChangeStreamRecordMapper (#37427)
---
.../apache/beam/sdk/io/gcp/spanner/SpannerIO.java | 6 +-
.../changestreams/dao/ChangeStreamResultSet.java | 31 +++
.../mapper/ChangeStreamRecordMapper.java | 13 +-
.../dao/ChangeStreamResultSetTest.java | 60 +++++
...pannerChangeStreamPlacementTablePostgresIT.java | 279 +++++++++++++++++++++
.../mapper/ChangeStreamRecordMapperTest.java | 105 ++++++++
6 files changed, 487 insertions(+), 7 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
index bbce5fad82f..3a69d1177f4 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
@@ -1977,11 +1977,6 @@ public class SpannerIO {
+ changeStreamDatabaseId
+ " has dialect "
+ changeStreamDatabaseDialect);
- LOG.info(
- "The Spanner database "
- + fullPartitionMetadataDatabaseId
- + " has dialect "
- + metadataDatabaseDialect);
PartitionMetadataTableNames partitionMetadataTableNames =
Optional.ofNullable(getMetadataTable())
.map(
@@ -2005,6 +2000,7 @@ public class SpannerIO {
final boolean isMutableChangeStream =
isMutableChangeStream(
spannerAccessor.getDatabaseClient(),
changeStreamDatabaseDialect, changeStreamName);
+ LOG.info("The change stream " + changeStreamName + " is mutable: " +
isMutableChangeStream);
final DaoFactory daoFactory =
new DaoFactory(
changeStreamSpannerConfig,
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamResultSet.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamResultSet.java
index 1268c739164..846c8095129 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamResultSet.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamResultSet.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.io.gcp.spanner.changestreams.dao;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.Struct;
+import com.google.protobuf.InvalidProtocolBufferException;
import org.joda.time.Duration;
/**
@@ -113,6 +114,9 @@ public class ChangeStreamResultSet implements AutoCloseable
{
* updates the timestamp at which the record was read. This function
enhances the getProtoMessage
* function but only focus on the ChangeStreamRecord type.
*
+ * <p>Should only be used for GoogleSQL databases when the change stream
record is delivered as
+ * proto.
+ *
* @return a change stream record as a proto or null
*/
public com.google.spanner.v1.ChangeStreamRecord getProtoChangeStreamRecord()
{
@@ -128,6 +132,33 @@ public class ChangeStreamResultSet implements
AutoCloseable {
&& resultSet.getColumnType(0).getCode() ==
com.google.cloud.spanner.Type.Code.PROTO;
}
+ /**
+ * Returns the change stream record at the current pointer by parsing the
bytes column. It also
+ * updates the timestamp at which the record was read.
+ *
+ * <p>Should only be used for PostgreSQL databases when the change stream
record is delivered as
+ * proto bytes.
+ *
+ * @return a change stream record as a proto or null
+ */
+ public com.google.spanner.v1.ChangeStreamRecord getBytes(int index) {
+ recordReadAt = Timestamp.now();
+ try {
+ // Use getBytes(0) for the BYTES column returned by read_proto_bytes_ TVF
+ return com.google.spanner.v1.ChangeStreamRecord.parseFrom(
+ resultSet.getBytes(index).toByteArray());
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException("Failed to parse the proto bytes to
ChangeStreamRecord proto", e);
+ }
+ }
+
+ /** Returns true if the result set at the current pointer contain only one
bytes change record. */
+ public boolean isProtoBytesChangeRecord() {
+ return resultSet.getColumnCount() == 1
+ && !resultSet.isNull(0)
+ && resultSet.getColumnType(0).getCode() ==
com.google.cloud.spanner.Type.Code.BYTES;
+ }
+
/**
* Returns the record at the current pointer as {@link JsonB}. It also
updates the timestamp at
* which the record was read.
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java
index 63153864666..368fec88918 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java
@@ -218,18 +218,27 @@ public class ChangeStreamRecordMapper {
* @param resultSet the change stream result set
* @param resultSetMetadata the metadata generated when reading the change
stream row
* @return a {@link List} of {@link ChangeStreamRecord} subclasses
+ * @throws InvalidProtocolBufferException
*/
public List<ChangeStreamRecord> toChangeStreamRecords(
PartitionMetadata partition,
ChangeStreamResultSet resultSet,
ChangeStreamResultSetMetadata resultSetMetadata) {
if (this.isPostgres()) {
- // In PostgresQL, change stream records are returned as JsonB.
+ // For `MUTABLE_KEY_RANGE` option, change stream records are returned as
protos.
+ if (resultSet.isProtoBytesChangeRecord()) {
+ return Arrays.asList(
+ toChangeStreamRecord(partition, resultSet.getBytes(0),
resultSetMetadata));
+ }
+
+ // For `IMMUTABLE_KEY_RANGE` option, change stream records are returned
as
+ // JsonB.
return Collections.singletonList(
toChangeStreamRecordJson(partition, resultSet.getPgJsonb(0),
resultSetMetadata));
}
- // In GoogleSQL, for `MUTABLE_KEY_RANGE` option, change stream records are
returned as Protos.
+ // In GoogleSQL, for `MUTABLE_KEY_RANGE` option, change stream records are
+ // returned as Protos.
if (resultSet.isProtoChangeRecord()) {
return Arrays.asList(
toChangeStreamRecord(
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamResultSetTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamResultSetTest.java
new file mode 100644
index 00000000000..d3408536c82
--- /dev/null
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamResultSetTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.dao;
+
+import static
org.apache.beam.sdk.io.gcp.spanner.changestreams.util.TestProtoMapper.recordToProto;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.cloud.ByteArray;
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.ResultSet;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord;
+import org.junit.Test;
+
+public class ChangeStreamResultSetTest {
+
+ @Test
+ public void testGetBytes() throws Exception {
+ // 1. Create an expected ChangeStreamRecord proto
+ Timestamp now = Timestamp.now();
+ final HeartbeatRecord heartbeatRecord =
+ new HeartbeatRecord(Timestamp.ofTimeSecondsAndNanos(10L, 20), null);
+ com.google.spanner.v1.ChangeStreamRecord expectedRecord =
recordToProto(heartbeatRecord);
+ assertNotNull(expectedRecord);
+
+ // 2. Convert it to bytes (simulating how Spanner PostgreSQL returns it)
+ byte[] protoBytes = expectedRecord.toByteArray();
+
+ // 3. Mock the underlying Spanner ResultSet
+ ResultSet mockResultSet = mock(ResultSet.class);
+ // Simulate column 0 containing the BYTES representation of the proto
+ when(mockResultSet.getBytes(0)).thenReturn(ByteArray.copyFrom(protoBytes));
+
+ // 4. Initialize ChangeStreamResultSet with the mock
+ ChangeStreamResultSet changeStreamResultSet = new
ChangeStreamResultSet(mockResultSet);
+
+ // 5. Call the new method and assert it parses correctly
+ // (Note: This assumes you have added getBytes(0) to the class)
+ com.google.spanner.v1.ChangeStreamRecord actualRecord =
changeStreamResultSet.getBytes(0);
+
+ assertEquals(expectedRecord, actualRecord);
+ }
+}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamPlacementTablePostgresIT.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamPlacementTablePostgresIT.java
new file mode 100644
index 00000000000..573ac825910
--- /dev/null
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamPlacementTablePostgresIT.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.it;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.DatabaseClient;
+import com.google.cloud.spanner.ErrorCode;
+import com.google.cloud.spanner.Key;
+import com.google.cloud.spanner.Mutation;
+import com.google.cloud.spanner.Options;
+import com.google.cloud.spanner.ResultSet;
+import com.google.cloud.spanner.SpannerException;
+import com.google.cloud.spanner.Statement;
+import com.google.gson.Gson;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
+import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.Mod;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.commons.lang3.tuple.Pair;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** End-to-end test of Cloud Spanner CDC Source. */
+@RunWith(JUnit4.class)
+public class SpannerChangeStreamPlacementTablePostgresIT {
+
+ @Rule public transient Timeout globalTimeout = Timeout.seconds(3600);
+
+ @ClassRule
+ public static final IntegrationTestEnv ENV =
+ new IntegrationTestEnv(
+ /*isPostgres=*/ true,
+ /*isPlacementTableBasedChangeStream=*/ true,
+ /*host=*/ Optional.empty());
+
+ @Rule public final transient TestPipeline pipeline = TestPipeline.create();
+
+ private static String instanceId;
+ private static String projectId;
+ private static String databaseId;
+ private static String metadataTableName;
+ private static String changeStreamTableName;
+ private static String changeStreamName;
+ private static DatabaseClient databaseClient;
+ private static String host = "https://spanner.googleapis.com";
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ projectId = ENV.getProjectId();
+ instanceId = ENV.getInstanceId();
+ databaseId = ENV.getDatabaseId();
+
+ metadataTableName = ENV.getMetadataTableName();
+ changeStreamTableName = ENV.createSingersTable();
+ changeStreamName = ENV.createChangeStreamFor(changeStreamTableName);
+ databaseClient = ENV.getDatabaseClient();
+ }
+
+ @Before
+ public void before() {
+
pipeline.getOptions().as(ChangeStreamTestPipelineOptions.class).setStreaming(true);
+
pipeline.getOptions().as(ChangeStreamTestPipelineOptions.class).setBlockOnRun(false);
+ }
+
+ @Test
+ public void testReadSpannerChangeStream() {
+ // Defines how many rows are going to be inserted / updated / deleted in
the test
+ final int numRows = 5;
+ // Inserts numRows rows and uses the first commit timestamp as the startAt
for reading the
+ // change stream
+ final Pair<Timestamp, Timestamp> insertTimestamps = insertRows(numRows);
+ final Timestamp startAt = insertTimestamps.getLeft();
+ // Updates the created rows
+ updateRows(numRows);
+ // Delete the created rows and uses the last commit timestamp as the endAt
for reading the
+ // change stream
+ final Pair<Timestamp, Timestamp> deleteTimestamps = deleteRows(numRows);
+ final Timestamp endAt = deleteTimestamps.getRight();
+
+ final SpannerConfig spannerConfig =
+ SpannerConfig.create()
+ .withProjectId(projectId)
+ .withInstanceId(instanceId)
+ .withDatabaseId(databaseId)
+ .withHost(ValueProvider.StaticValueProvider.of(host));
+
+ final PCollection<String> tokens =
+ pipeline
+ .apply(
+ SpannerIO.readChangeStream()
+ .withSpannerConfig(spannerConfig)
+ .withChangeStreamName(changeStreamName)
+ .withMetadataDatabase(databaseId)
+ .withMetadataTable(metadataTableName)
+ .withInclusiveStartAt(startAt)
+ .withInclusiveEndAt(endAt))
+ .apply(ParDo.of(new ModsToString()));
+
+ // Each row is composed by the following data
+ // <mod type, singer id, old first name, old last name, new first name,
new last name>
+ PAssert.that(tokens)
+ .containsInAnyOrder(
+ "INSERT,1,null,null,First Name 1,Last Name 1",
+ "INSERT,2,null,null,First Name 2,Last Name 2",
+ "INSERT,3,null,null,First Name 3,Last Name 3",
+ "INSERT,4,null,null,First Name 4,Last Name 4",
+ "INSERT,5,null,null,First Name 5,Last Name 5",
+ "UPDATE,1,First Name 1,Last Name 1,Updated First Name 1,Updated
Last Name 1",
+ "UPDATE,2,First Name 2,Last Name 2,Updated First Name 2,Updated
Last Name 2",
+ "UPDATE,3,First Name 3,Last Name 3,Updated First Name 3,Updated
Last Name 3",
+ "UPDATE,4,First Name 4,Last Name 4,Updated First Name 4,Updated
Last Name 4",
+ "UPDATE,5,First Name 5,Last Name 5,Updated First Name 5,Updated
Last Name 5",
+ "DELETE,1,Updated First Name 1,Updated Last Name 1,null,null",
+ "DELETE,2,Updated First Name 2,Updated Last Name 2,null,null",
+ "DELETE,3,Updated First Name 3,Updated Last Name 3,null,null",
+ "DELETE,4,Updated First Name 4,Updated Last Name 4,null,null",
+ "DELETE,5,Updated First Name 5,Updated Last Name 5,null,null");
+ pipeline.run().waitUntilFinish();
+
+ assertMetadataTableHasBeenDropped();
+ }
+
+ private static void assertMetadataTableHasBeenDropped() {
+ try (ResultSet resultSet =
+ databaseClient
+ .singleUse()
+ .executeQuery(Statement.of("SELECT * FROM \"" + metadataTableName
+ "\""))) {
+ resultSet.next();
+ fail(
+ "The metadata table "
+ + metadataTableName
+ + " should had been dropped, but it still exists");
+ } catch (SpannerException e) {
+ assertEquals(ErrorCode.INVALID_ARGUMENT, e.getErrorCode());
+ assertTrue(
+ "Error message must contain \"Table not found\"",
+ e.getMessage().contains("relation \"" + metadataTableName + "\" does
not exist"));
+ }
+ }
+
+ private static Pair<Timestamp, Timestamp> insertRows(int n) {
+ final Timestamp firstCommitTimestamp = insertRow(1);
+ for (int i = 2; i < n; i++) {
+ insertRow(i);
+ }
+ final Timestamp lastCommitTimestamp = insertRow(n);
+ return Pair.of(firstCommitTimestamp, lastCommitTimestamp);
+ }
+
+ private static Pair<Timestamp, Timestamp> updateRows(int n) {
+ final Timestamp firstCommitTimestamp = updateRow(1);
+ for (int i = 2; i < n; i++) {
+ updateRow(i);
+ }
+ final Timestamp lastCommitTimestamp = updateRow(n);
+ return Pair.of(firstCommitTimestamp, lastCommitTimestamp);
+ }
+
+ private static Pair<Timestamp, Timestamp> deleteRows(int n) {
+ final Timestamp firstCommitTimestamp = deleteRow(1);
+ for (int i = 2; i < n; i++) {
+ deleteRow(i);
+ }
+ final Timestamp lastCommitTimestamp = deleteRow(n);
+ return Pair.of(firstCommitTimestamp, lastCommitTimestamp);
+ }
+
+ private static Timestamp insertRow(int singerId) {
+ return databaseClient
+ .writeWithOptions(
+ Collections.singletonList(
+ Mutation.newInsertBuilder(changeStreamTableName)
+ .set("SingerId")
+ .to(singerId)
+ .set("FirstName")
+ .to("First Name " + singerId)
+ .set("LastName")
+ .to("Last Name " + singerId)
+ .build()))
+ .getCommitTimestamp();
+ }
+
+ private static Timestamp updateRow(int singerId) {
+ return databaseClient
+ .writeWithOptions(
+ Collections.singletonList(
+ Mutation.newUpdateBuilder(changeStreamTableName)
+ .set("SingerId")
+ .to(singerId)
+ .set("FirstName")
+ .to("Updated First Name " + singerId)
+ .set("LastName")
+ .to("Updated Last Name " + singerId)
+ .build()),
+ Options.tag("app=beam;action=update"))
+ .getCommitTimestamp();
+ }
+
+ private static Timestamp deleteRow(int singerId) {
+ return databaseClient
+ .writeWithOptions(
+ Collections.singletonList(Mutation.delete(changeStreamTableName,
Key.of(singerId))),
+ Options.tag("app=beam;action=delete"))
+ .getCommitTimestamp();
+ }
+
+ private static class ModsToString extends DoFn<DataChangeRecord, String> {
+
+ private transient Gson gson;
+
+ @Setup
+ public void setup() {
+ gson = new Gson();
+ }
+
+ @ProcessElement
+ public void processElement(
+ @Element DataChangeRecord record, OutputReceiver<String>
outputReceiver) {
+ final Mod mod = record.getMods().get(0);
+ final Map<String, String> keys = gson.fromJson(mod.getKeysJson(),
Map.class);
+ final Map<String, String> oldValues =
+ Optional.ofNullable(mod.getOldValuesJson())
+ .map(nonNullValues -> gson.fromJson(nonNullValues, Map.class))
+ .orElseGet(Collections::emptyMap);
+ final Map<String, String> newValues =
+ Optional.ofNullable(mod.getNewValuesJson())
+ .map(nonNullValues -> gson.fromJson(nonNullValues, Map.class))
+ .orElseGet(Collections::emptyMap);
+
+ final String modsAsString =
+ String.join(
+ ",",
+ record.getModType().toString(),
+ keys.get("SingerId"),
+ oldValues.get("FirstName"),
+ oldValues.get("LastName"),
+ newValues.get("FirstName"),
+ newValues.get("LastName"));
+ final Instant timestamp = new
Instant(record.getRecordTimestamp().toSqlTimestamp());
+
+ outputReceiver.outputWithTimestamp(modsAsString, timestamp);
+ }
+ }
+}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapperTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapperTest.java
index b3dd1bef049..f73647beb00 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapperTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapperTest.java
@@ -1039,4 +1039,109 @@ public class ChangeStreamRecordMapperTest {
Collections.singletonList(dataChangeRecord),
mapper.toChangeStreamRecords(partition, resultSet, resultSetMetadata));
}
+
+ @Test
+ public void testMappingProtoBytesRowToPartitionStartRecord() {
+ final PartitionStartRecord partitionStartRecord =
+ new PartitionStartRecord(
+ Timestamp.MIN_VALUE,
+ "fakeRecordSequence",
+ Arrays.asList("partitionToken1", "partitionToken2"),
+ null);
+ com.google.spanner.v1.ChangeStreamRecord changeStreamRecordProto =
+ recordToProto(partitionStartRecord);
+ assertNotNull(changeStreamRecordProto);
+ ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
+
+ when(resultSet.isProtoBytesChangeRecord()).thenReturn(true);
+ when(resultSet.getBytes(0)).thenReturn(changeStreamRecordProto);
+ assertEquals(
+ Collections.singletonList(partitionStartRecord),
+ mapperPostgres.toChangeStreamRecords(partition, resultSet,
resultSetMetadata));
+ }
+
+ @Test
+ public void testMappingProtoBytesRowToPartitionEndRecord() {
+ final PartitionEndRecord partitionEndChange =
+ new PartitionEndRecord(Timestamp.MIN_VALUE, "fakeRecordSequence",
null);
+ com.google.spanner.v1.ChangeStreamRecord changeStreamRecordProto =
+ recordToProto(partitionEndChange);
+ assertNotNull(changeStreamRecordProto);
+ ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
+
+ when(resultSet.isProtoBytesChangeRecord()).thenReturn(true);
+ when(resultSet.getBytes(0)).thenReturn(changeStreamRecordProto);
+ assertEquals(
+ Collections.singletonList(partitionEndChange),
+ mapperPostgres.toChangeStreamRecords(partition, resultSet,
resultSetMetadata));
+ }
+
+ @Test
+ public void testMappingProtoBytesRowToPartitionEventRecord() {
+ final PartitionEventRecord partitionEventRecord =
+ new PartitionEventRecord(Timestamp.MIN_VALUE, "fakeRecordSequence",
null);
+ com.google.spanner.v1.ChangeStreamRecord changeStreamRecordProto =
+ recordToProto(partitionEventRecord);
+ assertNotNull(changeStreamRecordProto);
+ ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
+
+ when(resultSet.isProtoBytesChangeRecord()).thenReturn(true);
+ when(resultSet.getBytes(0)).thenReturn(changeStreamRecordProto);
+ assertEquals(
+ Collections.singletonList(partitionEventRecord),
+ mapperPostgres.toChangeStreamRecords(partition, resultSet,
resultSetMetadata));
+ }
+
+ @Test
+ public void testMappingProtoBytesRowToHeartbeatRecord() {
+ final HeartbeatRecord heartbeatRecord =
+ new HeartbeatRecord(Timestamp.ofTimeSecondsAndNanos(10L, 20), null);
+ com.google.spanner.v1.ChangeStreamRecord changeStreamRecordProto =
+ recordToProto(heartbeatRecord);
+ assertNotNull(changeStreamRecordProto);
+ ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
+
+ when(resultSet.isProtoBytesChangeRecord()).thenReturn(true);
+ when(resultSet.getBytes(0)).thenReturn(changeStreamRecordProto);
+ assertEquals(
+ Collections.singletonList(heartbeatRecord),
+ mapperPostgres.toChangeStreamRecords(partition, resultSet,
resultSetMetadata));
+ }
+
+ @Test
+ public void testMappingProtoBytesRowToDataChangeRecord() {
+ final DataChangeRecord dataChangeRecord =
+ new DataChangeRecord(
+ "partitionToken",
+ Timestamp.ofTimeSecondsAndNanos(10L, 20),
+ "serverTransactionId",
+ true,
+ "1",
+ "tableName",
+ Arrays.asList(
+ new ColumnType("column1", new
TypeCode("{\"code\":\"INT64\"}"), true, 1L),
+ new ColumnType("column2", new
TypeCode("{\"code\":\"BYTES\"}"), false, 2L)),
+ Collections.singletonList(
+ new Mod(
+ "{\"column1\":\"value1\"}",
+ "{\"column2\":\"oldValue2\"}",
+ "{\"column2\":\"newValue2\"}")),
+ ModType.UPDATE,
+ ValueCaptureType.OLD_AND_NEW_VALUES,
+ 10L,
+ 2L,
+ "transactionTag",
+ true,
+ null);
+ com.google.spanner.v1.ChangeStreamRecord changeStreamRecordProto =
+ recordToProto(dataChangeRecord);
+ assertNotNull(changeStreamRecordProto);
+ ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
+
+ when(resultSet.isProtoBytesChangeRecord()).thenReturn(true);
+ when(resultSet.getBytes(0)).thenReturn(changeStreamRecordProto);
+ assertEquals(
+ Collections.singletonList(dataChangeRecord),
+ mapperPostgres.toChangeStreamRecords(partition, resultSet,
resultSetMetadata));
+ }
}