This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 801bf89e3da Parse proto bytes change stream record in 
ChangeStreamRecordMapper (#37427)
801bf89e3da is described below

commit 801bf89e3da44a665fdabaf04164d63025928045
Author: chenxuesdu <[email protected]>
AuthorDate: Tue Feb 10 11:58:43 2026 -0800

    Parse proto bytes change stream record in ChangeStreamRecordMapper (#37427)
---
 .../apache/beam/sdk/io/gcp/spanner/SpannerIO.java  |   6 +-
 .../changestreams/dao/ChangeStreamResultSet.java   |  31 +++
 .../mapper/ChangeStreamRecordMapper.java           |  13 +-
 .../dao/ChangeStreamResultSetTest.java             |  60 +++++
 ...pannerChangeStreamPlacementTablePostgresIT.java | 279 +++++++++++++++++++++
 .../mapper/ChangeStreamRecordMapperTest.java       | 105 ++++++++
 6 files changed, 487 insertions(+), 7 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
index bbce5fad82f..3a69d1177f4 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
@@ -1977,11 +1977,6 @@ public class SpannerIO {
               + changeStreamDatabaseId
               + " has dialect "
               + changeStreamDatabaseDialect);
-      LOG.info(
-          "The Spanner database "
-              + fullPartitionMetadataDatabaseId
-              + " has dialect "
-              + metadataDatabaseDialect);
       PartitionMetadataTableNames partitionMetadataTableNames =
           Optional.ofNullable(getMetadataTable())
               .map(
@@ -2005,6 +2000,7 @@ public class SpannerIO {
       final boolean isMutableChangeStream =
           isMutableChangeStream(
               spannerAccessor.getDatabaseClient(), 
changeStreamDatabaseDialect, changeStreamName);
+      LOG.info("The change stream " + changeStreamName + " is mutable: " + 
isMutableChangeStream);
       final DaoFactory daoFactory =
           new DaoFactory(
               changeStreamSpannerConfig,
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamResultSet.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamResultSet.java
index 1268c739164..846c8095129 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamResultSet.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamResultSet.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.io.gcp.spanner.changestreams.dao;
 import com.google.cloud.Timestamp;
 import com.google.cloud.spanner.ResultSet;
 import com.google.cloud.spanner.Struct;
+import com.google.protobuf.InvalidProtocolBufferException;
 import org.joda.time.Duration;
 
 /**
@@ -113,6 +114,9 @@ public class ChangeStreamResultSet implements AutoCloseable 
{
    * updates the timestamp at which the record was read. This function 
enhances the getProtoMessage
    * function but only focus on the ChangeStreamRecord type.
    *
+   * <p>Should only be used for GoogleSQL databases when the change stream 
record is delivered as
+   * proto.
+   *
    * @return a change stream record as a proto or null
    */
   public com.google.spanner.v1.ChangeStreamRecord getProtoChangeStreamRecord() 
{
@@ -128,6 +132,33 @@ public class ChangeStreamResultSet implements 
AutoCloseable {
         && resultSet.getColumnType(0).getCode() == 
com.google.cloud.spanner.Type.Code.PROTO;
   }
 
+  /**
+   * Returns the change stream record at the current pointer by parsing the 
bytes column. It also
+   * updates the timestamp at which the record was read.
+   *
+   * <p>Should only be used for PostgreSQL databases when the change stream 
record is delivered as
+   * proto bytes.
+   *
+   * @return a change stream record as a proto or null
+   */
+  public com.google.spanner.v1.ChangeStreamRecord getBytes(int index) {
+    recordReadAt = Timestamp.now();
+    try {
+      // Use getBytes(0) for the BYTES column returned by read_proto_bytes_ TVF
+      return com.google.spanner.v1.ChangeStreamRecord.parseFrom(
+          resultSet.getBytes(index).toByteArray());
+    } catch (InvalidProtocolBufferException e) {
+      throw new RuntimeException("Failed to parse the proto bytes to 
ChangeStreamRecord proto", e);
+    }
+  }
+
+  /** Returns true if the result set at the current pointer contain only one 
bytes change record. */
+  public boolean isProtoBytesChangeRecord() {
+    return resultSet.getColumnCount() == 1
+        && !resultSet.isNull(0)
+        && resultSet.getColumnType(0).getCode() == 
com.google.cloud.spanner.Type.Code.BYTES;
+  }
+
   /**
    * Returns the record at the current pointer as {@link JsonB}. It also 
updates the timestamp at
    * which the record was read.
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java
index 63153864666..368fec88918 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java
@@ -218,18 +218,27 @@ public class ChangeStreamRecordMapper {
    * @param resultSet the change stream result set
    * @param resultSetMetadata the metadata generated when reading the change 
stream row
    * @return a {@link List} of {@link ChangeStreamRecord} subclasses
+   * @throws InvalidProtocolBufferException
    */
   public List<ChangeStreamRecord> toChangeStreamRecords(
       PartitionMetadata partition,
       ChangeStreamResultSet resultSet,
       ChangeStreamResultSetMetadata resultSetMetadata) {
     if (this.isPostgres()) {
-      // In PostgresQL, change stream records are returned as JsonB.
+      // For `MUTABLE_KEY_RANGE` option, change stream records are returned as 
protos.
+      if (resultSet.isProtoBytesChangeRecord()) {
+        return Arrays.asList(
+            toChangeStreamRecord(partition, resultSet.getBytes(0), 
resultSetMetadata));
+      }
+
+      // For `IMMUTABLE_KEY_RANGE` option, change stream records are returned 
as
+      // JsonB.
       return Collections.singletonList(
           toChangeStreamRecordJson(partition, resultSet.getPgJsonb(0), 
resultSetMetadata));
     }
 
-    // In GoogleSQL, for `MUTABLE_KEY_RANGE` option, change stream records are 
returned as Protos.
+    // In GoogleSQL, for `MUTABLE_KEY_RANGE` option, change stream records are
+    // returned as Protos.
     if (resultSet.isProtoChangeRecord()) {
       return Arrays.asList(
           toChangeStreamRecord(
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamResultSetTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamResultSetTest.java
new file mode 100644
index 00000000000..d3408536c82
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamResultSetTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.dao;
+
+import static 
org.apache.beam.sdk.io.gcp.spanner.changestreams.util.TestProtoMapper.recordToProto;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.cloud.ByteArray;
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.ResultSet;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord;
+import org.junit.Test;
+
+public class ChangeStreamResultSetTest {
+
+  @Test
+  public void testGetBytes() throws Exception {
+    // 1. Create an expected ChangeStreamRecord proto
+    Timestamp now = Timestamp.now();
+    final HeartbeatRecord heartbeatRecord =
+        new HeartbeatRecord(Timestamp.ofTimeSecondsAndNanos(10L, 20), null);
+    com.google.spanner.v1.ChangeStreamRecord expectedRecord = 
recordToProto(heartbeatRecord);
+    assertNotNull(expectedRecord);
+
+    // 2. Convert it to bytes (simulating how Spanner PostgreSQL returns it)
+    byte[] protoBytes = expectedRecord.toByteArray();
+
+    // 3. Mock the underlying Spanner ResultSet
+    ResultSet mockResultSet = mock(ResultSet.class);
+    // Simulate column 0 containing the BYTES representation of the proto
+    when(mockResultSet.getBytes(0)).thenReturn(ByteArray.copyFrom(protoBytes));
+
+    // 4. Initialize ChangeStreamResultSet with the mock
+    ChangeStreamResultSet changeStreamResultSet = new 
ChangeStreamResultSet(mockResultSet);
+
+    // 5. Call the new method and assert it parses correctly
+    // (Note: This assumes you have added getBytes(0) to the class)
+    com.google.spanner.v1.ChangeStreamRecord actualRecord = 
changeStreamResultSet.getBytes(0);
+
+    assertEquals(expectedRecord, actualRecord);
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamPlacementTablePostgresIT.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamPlacementTablePostgresIT.java
new file mode 100644
index 00000000000..573ac825910
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamPlacementTablePostgresIT.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.it;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.DatabaseClient;
+import com.google.cloud.spanner.ErrorCode;
+import com.google.cloud.spanner.Key;
+import com.google.cloud.spanner.Mutation;
+import com.google.cloud.spanner.Options;
+import com.google.cloud.spanner.ResultSet;
+import com.google.cloud.spanner.SpannerException;
+import com.google.cloud.spanner.Statement;
+import com.google.gson.Gson;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
+import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.Mod;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.commons.lang3.tuple.Pair;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** End-to-end test of Cloud Spanner CDC Source. */
+@RunWith(JUnit4.class)
+public class SpannerChangeStreamPlacementTablePostgresIT {
+
+  @Rule public transient Timeout globalTimeout = Timeout.seconds(3600);
+
+  @ClassRule
+  public static final IntegrationTestEnv ENV =
+      new IntegrationTestEnv(
+          /*isPostgres=*/ true,
+          /*isPlacementTableBasedChangeStream=*/ true,
+          /*host=*/ Optional.empty());
+
+  @Rule public final transient TestPipeline pipeline = TestPipeline.create();
+
+  private static String instanceId;
+  private static String projectId;
+  private static String databaseId;
+  private static String metadataTableName;
+  private static String changeStreamTableName;
+  private static String changeStreamName;
+  private static DatabaseClient databaseClient;
+  private static String host = "https://spanner.googleapis.com";;
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    projectId = ENV.getProjectId();
+    instanceId = ENV.getInstanceId();
+    databaseId = ENV.getDatabaseId();
+
+    metadataTableName = ENV.getMetadataTableName();
+    changeStreamTableName = ENV.createSingersTable();
+    changeStreamName = ENV.createChangeStreamFor(changeStreamTableName);
+    databaseClient = ENV.getDatabaseClient();
+  }
+
+  @Before
+  public void before() {
+    
pipeline.getOptions().as(ChangeStreamTestPipelineOptions.class).setStreaming(true);
+    
pipeline.getOptions().as(ChangeStreamTestPipelineOptions.class).setBlockOnRun(false);
+  }
+
+  @Test
+  public void testReadSpannerChangeStream() {
+    // Defines how many rows are going to be inserted / updated / deleted in 
the test
+    final int numRows = 5;
+    // Inserts numRows rows and uses the first commit timestamp as the startAt 
for reading the
+    // change stream
+    final Pair<Timestamp, Timestamp> insertTimestamps = insertRows(numRows);
+    final Timestamp startAt = insertTimestamps.getLeft();
+    // Updates the created rows
+    updateRows(numRows);
+    // Delete the created rows and uses the last commit timestamp as the endAt 
for reading the
+    // change stream
+    final Pair<Timestamp, Timestamp> deleteTimestamps = deleteRows(numRows);
+    final Timestamp endAt = deleteTimestamps.getRight();
+
+    final SpannerConfig spannerConfig =
+        SpannerConfig.create()
+            .withProjectId(projectId)
+            .withInstanceId(instanceId)
+            .withDatabaseId(databaseId)
+            .withHost(ValueProvider.StaticValueProvider.of(host));
+
+    final PCollection<String> tokens =
+        pipeline
+            .apply(
+                SpannerIO.readChangeStream()
+                    .withSpannerConfig(spannerConfig)
+                    .withChangeStreamName(changeStreamName)
+                    .withMetadataDatabase(databaseId)
+                    .withMetadataTable(metadataTableName)
+                    .withInclusiveStartAt(startAt)
+                    .withInclusiveEndAt(endAt))
+            .apply(ParDo.of(new ModsToString()));
+
+    // Each row is composed by the following data
+    // <mod type, singer id, old first name, old last name, new first name, 
new last name>
+    PAssert.that(tokens)
+        .containsInAnyOrder(
+            "INSERT,1,null,null,First Name 1,Last Name 1",
+            "INSERT,2,null,null,First Name 2,Last Name 2",
+            "INSERT,3,null,null,First Name 3,Last Name 3",
+            "INSERT,4,null,null,First Name 4,Last Name 4",
+            "INSERT,5,null,null,First Name 5,Last Name 5",
+            "UPDATE,1,First Name 1,Last Name 1,Updated First Name 1,Updated 
Last Name 1",
+            "UPDATE,2,First Name 2,Last Name 2,Updated First Name 2,Updated 
Last Name 2",
+            "UPDATE,3,First Name 3,Last Name 3,Updated First Name 3,Updated 
Last Name 3",
+            "UPDATE,4,First Name 4,Last Name 4,Updated First Name 4,Updated 
Last Name 4",
+            "UPDATE,5,First Name 5,Last Name 5,Updated First Name 5,Updated 
Last Name 5",
+            "DELETE,1,Updated First Name 1,Updated Last Name 1,null,null",
+            "DELETE,2,Updated First Name 2,Updated Last Name 2,null,null",
+            "DELETE,3,Updated First Name 3,Updated Last Name 3,null,null",
+            "DELETE,4,Updated First Name 4,Updated Last Name 4,null,null",
+            "DELETE,5,Updated First Name 5,Updated Last Name 5,null,null");
+    pipeline.run().waitUntilFinish();
+
+    assertMetadataTableHasBeenDropped();
+  }
+
+  private static void assertMetadataTableHasBeenDropped() {
+    try (ResultSet resultSet =
+        databaseClient
+            .singleUse()
+            .executeQuery(Statement.of("SELECT * FROM \"" + metadataTableName 
+ "\""))) {
+      resultSet.next();
+      fail(
+          "The metadata table "
+              + metadataTableName
+              + " should had been dropped, but it still exists");
+    } catch (SpannerException e) {
+      assertEquals(ErrorCode.INVALID_ARGUMENT, e.getErrorCode());
+      assertTrue(
+          "Error message must contain \"Table not found\"",
+          e.getMessage().contains("relation \"" + metadataTableName + "\" does 
not exist"));
+    }
+  }
+
+  private static Pair<Timestamp, Timestamp> insertRows(int n) {
+    final Timestamp firstCommitTimestamp = insertRow(1);
+    for (int i = 2; i < n; i++) {
+      insertRow(i);
+    }
+    final Timestamp lastCommitTimestamp = insertRow(n);
+    return Pair.of(firstCommitTimestamp, lastCommitTimestamp);
+  }
+
+  private static Pair<Timestamp, Timestamp> updateRows(int n) {
+    final Timestamp firstCommitTimestamp = updateRow(1);
+    for (int i = 2; i < n; i++) {
+      updateRow(i);
+    }
+    final Timestamp lastCommitTimestamp = updateRow(n);
+    return Pair.of(firstCommitTimestamp, lastCommitTimestamp);
+  }
+
+  private static Pair<Timestamp, Timestamp> deleteRows(int n) {
+    final Timestamp firstCommitTimestamp = deleteRow(1);
+    for (int i = 2; i < n; i++) {
+      deleteRow(i);
+    }
+    final Timestamp lastCommitTimestamp = deleteRow(n);
+    return Pair.of(firstCommitTimestamp, lastCommitTimestamp);
+  }
+
+  private static Timestamp insertRow(int singerId) {
+    return databaseClient
+        .writeWithOptions(
+            Collections.singletonList(
+                Mutation.newInsertBuilder(changeStreamTableName)
+                    .set("SingerId")
+                    .to(singerId)
+                    .set("FirstName")
+                    .to("First Name " + singerId)
+                    .set("LastName")
+                    .to("Last Name " + singerId)
+                    .build()))
+        .getCommitTimestamp();
+  }
+
+  private static Timestamp updateRow(int singerId) {
+    return databaseClient
+        .writeWithOptions(
+            Collections.singletonList(
+                Mutation.newUpdateBuilder(changeStreamTableName)
+                    .set("SingerId")
+                    .to(singerId)
+                    .set("FirstName")
+                    .to("Updated First Name " + singerId)
+                    .set("LastName")
+                    .to("Updated Last Name " + singerId)
+                    .build()),
+            Options.tag("app=beam;action=update"))
+        .getCommitTimestamp();
+  }
+
+  private static Timestamp deleteRow(int singerId) {
+    return databaseClient
+        .writeWithOptions(
+            Collections.singletonList(Mutation.delete(changeStreamTableName, 
Key.of(singerId))),
+            Options.tag("app=beam;action=delete"))
+        .getCommitTimestamp();
+  }
+
+  private static class ModsToString extends DoFn<DataChangeRecord, String> {
+
+    private transient Gson gson;
+
+    @Setup
+    public void setup() {
+      gson = new Gson();
+    }
+
+    @ProcessElement
+    public void processElement(
+        @Element DataChangeRecord record, OutputReceiver<String> 
outputReceiver) {
+      final Mod mod = record.getMods().get(0);
+      final Map<String, String> keys = gson.fromJson(mod.getKeysJson(), 
Map.class);
+      final Map<String, String> oldValues =
+          Optional.ofNullable(mod.getOldValuesJson())
+              .map(nonNullValues -> gson.fromJson(nonNullValues, Map.class))
+              .orElseGet(Collections::emptyMap);
+      final Map<String, String> newValues =
+          Optional.ofNullable(mod.getNewValuesJson())
+              .map(nonNullValues -> gson.fromJson(nonNullValues, Map.class))
+              .orElseGet(Collections::emptyMap);
+
+      final String modsAsString =
+          String.join(
+              ",",
+              record.getModType().toString(),
+              keys.get("SingerId"),
+              oldValues.get("FirstName"),
+              oldValues.get("LastName"),
+              newValues.get("FirstName"),
+              newValues.get("LastName"));
+      final Instant timestamp = new 
Instant(record.getRecordTimestamp().toSqlTimestamp());
+
+      outputReceiver.outputWithTimestamp(modsAsString, timestamp);
+    }
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapperTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapperTest.java
index b3dd1bef049..f73647beb00 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapperTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapperTest.java
@@ -1039,4 +1039,109 @@ public class ChangeStreamRecordMapperTest {
         Collections.singletonList(dataChangeRecord),
         mapper.toChangeStreamRecords(partition, resultSet, resultSetMetadata));
   }
+
+  @Test
+  public void testMappingProtoBytesRowToPartitionStartRecord() {
+    final PartitionStartRecord partitionStartRecord =
+        new PartitionStartRecord(
+            Timestamp.MIN_VALUE,
+            "fakeRecordSequence",
+            Arrays.asList("partitionToken1", "partitionToken2"),
+            null);
+    com.google.spanner.v1.ChangeStreamRecord changeStreamRecordProto =
+        recordToProto(partitionStartRecord);
+    assertNotNull(changeStreamRecordProto);
+    ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
+
+    when(resultSet.isProtoBytesChangeRecord()).thenReturn(true);
+    when(resultSet.getBytes(0)).thenReturn(changeStreamRecordProto);
+    assertEquals(
+        Collections.singletonList(partitionStartRecord),
+        mapperPostgres.toChangeStreamRecords(partition, resultSet, 
resultSetMetadata));
+  }
+
+  @Test
+  public void testMappingProtoBytesRowToPartitionEndRecord() {
+    final PartitionEndRecord partitionEndChange =
+        new PartitionEndRecord(Timestamp.MIN_VALUE, "fakeRecordSequence", 
null);
+    com.google.spanner.v1.ChangeStreamRecord changeStreamRecordProto =
+        recordToProto(partitionEndChange);
+    assertNotNull(changeStreamRecordProto);
+    ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
+
+    when(resultSet.isProtoBytesChangeRecord()).thenReturn(true);
+    when(resultSet.getBytes(0)).thenReturn(changeStreamRecordProto);
+    assertEquals(
+        Collections.singletonList(partitionEndChange),
+        mapperPostgres.toChangeStreamRecords(partition, resultSet, 
resultSetMetadata));
+  }
+
+  @Test
+  public void testMappingProtoBytesRowToPartitionEventRecord() {
+    final PartitionEventRecord partitionEventRecord =
+        new PartitionEventRecord(Timestamp.MIN_VALUE, "fakeRecordSequence", 
null);
+    com.google.spanner.v1.ChangeStreamRecord changeStreamRecordProto =
+        recordToProto(partitionEventRecord);
+    assertNotNull(changeStreamRecordProto);
+    ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
+
+    when(resultSet.isProtoBytesChangeRecord()).thenReturn(true);
+    when(resultSet.getBytes(0)).thenReturn(changeStreamRecordProto);
+    assertEquals(
+        Collections.singletonList(partitionEventRecord),
+        mapperPostgres.toChangeStreamRecords(partition, resultSet, 
resultSetMetadata));
+  }
+
+  @Test
+  public void testMappingProtoBytesRowToHeartbeatRecord() {
+    final HeartbeatRecord heartbeatRecord =
+        new HeartbeatRecord(Timestamp.ofTimeSecondsAndNanos(10L, 20), null);
+    com.google.spanner.v1.ChangeStreamRecord changeStreamRecordProto =
+        recordToProto(heartbeatRecord);
+    assertNotNull(changeStreamRecordProto);
+    ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
+
+    when(resultSet.isProtoBytesChangeRecord()).thenReturn(true);
+    when(resultSet.getBytes(0)).thenReturn(changeStreamRecordProto);
+    assertEquals(
+        Collections.singletonList(heartbeatRecord),
+        mapperPostgres.toChangeStreamRecords(partition, resultSet, 
resultSetMetadata));
+  }
+
+  @Test
+  public void testMappingProtoBytesRowToDataChangeRecord() {
+    final DataChangeRecord dataChangeRecord =
+        new DataChangeRecord(
+            "partitionToken",
+            Timestamp.ofTimeSecondsAndNanos(10L, 20),
+            "serverTransactionId",
+            true,
+            "1",
+            "tableName",
+            Arrays.asList(
+                new ColumnType("column1", new 
TypeCode("{\"code\":\"INT64\"}"), true, 1L),
+                new ColumnType("column2", new 
TypeCode("{\"code\":\"BYTES\"}"), false, 2L)),
+            Collections.singletonList(
+                new Mod(
+                    "{\"column1\":\"value1\"}",
+                    "{\"column2\":\"oldValue2\"}",
+                    "{\"column2\":\"newValue2\"}")),
+            ModType.UPDATE,
+            ValueCaptureType.OLD_AND_NEW_VALUES,
+            10L,
+            2L,
+            "transactionTag",
+            true,
+            null);
+    com.google.spanner.v1.ChangeStreamRecord changeStreamRecordProto =
+        recordToProto(dataChangeRecord);
+    assertNotNull(changeStreamRecordProto);
+    ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
+
+    when(resultSet.isProtoBytesChangeRecord()).thenReturn(true);
+    when(resultSet.getBytes(0)).thenReturn(changeStreamRecordProto);
+    assertEquals(
+        Collections.singletonList(dataChangeRecord),
+        mapperPostgres.toChangeStreamRecords(partition, resultSet, 
resultSetMetadata));
+  }
 }

Reply via email to