This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new b9f07c9dd29 [Spanner Change Streams] Ensure the partition watermark is 
monotonic by reading within the transaction (#36463)
b9f07c9dd29 is described below

commit b9f07c9dd2999a9ac6a670e51ea097f04402859e
Author: jiangzzhu <[email protected]>
AuthorDate: Fri Oct 10 01:47:23 2025 -0700

    [Spanner Change Streams] Ensure the partition watermark is monotonic by 
reading within the transaction (#36463)
    
    Bundle finalizations which are used to update the watermark are not 
ordered, so we must guard against stale watermark updates to ensure the 
watermark is correct.
---
 .../changestreams/dao/PartitionMetadataDao.java    | 16 +++++++++--
 .../dao/PartitionMetadataDaoTest.java              | 33 ++++++++++++++++++++--
 2 files changed, 44 insertions(+), 5 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java
index d850ea2d279..b407d5b0b6c 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java
@@ -32,6 +32,7 @@ import static 
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMeta
 import com.google.cloud.Timestamp;
 import com.google.cloud.spanner.DatabaseClient;
 import com.google.cloud.spanner.Dialect;
+import com.google.cloud.spanner.Key;
 import com.google.cloud.spanner.Mutation;
 import com.google.cloud.spanner.Options;
 import com.google.cloud.spanner.ResultSet;
@@ -528,14 +529,25 @@ public class PartitionMetadataDao {
     }
 
     /**
-     * Update the partition watermark to the given timestamp.
+     * Update the partition watermark to the given timestamp iff the partition 
watermark in metadata
+     * table is smaller than the given watermark.
      *
      * @param partitionToken the partition unique identifier
      * @param watermark the new partition watermark
      * @return the commit timestamp of the read / write transaction
      */
     public Void updateWatermark(String partitionToken, Timestamp watermark) {
-      
transaction.buffer(createUpdateMetadataWatermarkMutationFrom(partitionToken, 
watermark));
+      Struct row =
+          transaction.readRow(
+              metadataTableName, Key.of(partitionToken), 
Collections.singleton(COLUMN_WATERMARK));
+      if (row == null) {
+        LOG.error("[{}] Failed to read Watermark column", partitionToken);
+        return null;
+      }
+      Timestamp partitionWatermark = row.getTimestamp(COLUMN_WATERMARK);
+      if (partitionWatermark.compareTo(watermark) < 0) {
+        
transaction.buffer(createUpdateMetadataWatermarkMutationFrom(partitionToken, 
watermark));
+      }
       return null;
     }
 
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDaoTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDaoTest.java
index dc35c2ea493..dba8c4792c6 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDaoTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDaoTest.java
@@ -36,6 +36,8 @@ import com.google.cloud.spanner.Struct;
 import com.google.cloud.spanner.TransactionContext;
 import com.google.cloud.spanner.TransactionRunner;
 import com.google.cloud.spanner.Value;
+import java.time.Duration;
+import java.time.Instant;
 import java.util.Collections;
 import java.util.Map;
 import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
@@ -238,14 +240,39 @@ public class PartitionMetadataDaoTest {
   @Test
   public void testInTransactionContextUpdateWatermark() {
     ArgumentCaptor<Mutation> mutation = 
ArgumentCaptor.forClass(Mutation.class);
-    doNothing().when(transaction).buffer(mutation.capture());
-    assertNull(inTransactionContext.updateWatermark(PARTITION_TOKEN, 
WATERMARK));
+    when(transaction.readRow(any(), any(), any()))
+        .thenReturn(
+            Struct.newBuilder()
+                .set(PartitionMetadataAdminDao.COLUMN_WATERMARK)
+                .to(WATERMARK)
+                .build());
+    Instant largerWatermark = 
WATERMARK.toSqlTimestamp().toInstant().plus(Duration.ofSeconds(1));
+    assertNull(
+        inTransactionContext.updateWatermark(
+            PARTITION_TOKEN,
+            Timestamp.ofTimeSecondsAndNanos(
+                largerWatermark.getEpochSecond(), largerWatermark.getNano())));
+    verify(transaction).buffer(mutation.capture());
     Map<String, Value> mutationValueMap = mutation.getValue().asMap();
     assertEquals(
         PARTITION_TOKEN,
         
mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_PARTITION_TOKEN).getString());
     assertEquals(
-        WATERMARK, 
mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_WATERMARK).getTimestamp());
+        Timestamp.ofTimeSecondsAndNanos(
+            largerWatermark.getEpochSecond(), largerWatermark.getNano()),
+        
mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_WATERMARK).getTimestamp());
+  }
+
+  @Test
+  public void testInTransactionContextDoNotUpdateWatermark() {
+    when(transaction.readRow(any(), any(), any()))
+        .thenReturn(
+            Struct.newBuilder()
+                .set(PartitionMetadataAdminDao.COLUMN_WATERMARK)
+                .to(WATERMARK)
+                .build());
+    assertNull(inTransactionContext.updateWatermark(PARTITION_TOKEN, 
WATERMARK));
+    verify(transaction, times(0)).buffer(any(Mutation.class));
   }
 
   @Test

Reply via email to