This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new b9f07c9dd29 [Spanner Change Streams] Ensure the partition watermark is
monotonic by reading within the transaction (#36463)
b9f07c9dd29 is described below
commit b9f07c9dd2999a9ac6a670e51ea097f04402859e
Author: jiangzzhu <[email protected]>
AuthorDate: Fri Oct 10 01:47:23 2025 -0700
[Spanner Change Streams] Ensure the partition watermark is monotonic by
reading within the transaction (#36463)
Bundle finalizations which are used to update the watermark are not
ordered, so we must guard against stale watermark updates to ensure the
watermark is correct.
---
.../changestreams/dao/PartitionMetadataDao.java | 16 +++++++++--
.../dao/PartitionMetadataDaoTest.java | 33 ++++++++++++++++++++--
2 files changed, 44 insertions(+), 5 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java
index d850ea2d279..b407d5b0b6c 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java
@@ -32,6 +32,7 @@ import static
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMeta
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.Dialect;
+import com.google.cloud.spanner.Key;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.Options;
import com.google.cloud.spanner.ResultSet;
@@ -528,14 +529,25 @@ public class PartitionMetadataDao {
}
/**
- * Update the partition watermark to the given timestamp.
+ * Update the partition watermark to the given timestamp iff the partition
watermark in metadata
+ * table is smaller than the given watermark.
*
* @param partitionToken the partition unique identifier
* @param watermark the new partition watermark
* @return the commit timestamp of the read / write transaction
*/
public Void updateWatermark(String partitionToken, Timestamp watermark) {
-
transaction.buffer(createUpdateMetadataWatermarkMutationFrom(partitionToken,
watermark));
+ Struct row =
+ transaction.readRow(
+ metadataTableName, Key.of(partitionToken),
Collections.singleton(COLUMN_WATERMARK));
+ if (row == null) {
+ LOG.error("[{}] Failed to read Watermark column", partitionToken);
+ return null;
+ }
+ Timestamp partitionWatermark = row.getTimestamp(COLUMN_WATERMARK);
+ if (partitionWatermark.compareTo(watermark) < 0) {
+
transaction.buffer(createUpdateMetadataWatermarkMutationFrom(partitionToken,
watermark));
+ }
return null;
}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDaoTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDaoTest.java
index dc35c2ea493..dba8c4792c6 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDaoTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDaoTest.java
@@ -36,6 +36,8 @@ import com.google.cloud.spanner.Struct;
import com.google.cloud.spanner.TransactionContext;
import com.google.cloud.spanner.TransactionRunner;
import com.google.cloud.spanner.Value;
+import java.time.Duration;
+import java.time.Instant;
import java.util.Collections;
import java.util.Map;
import
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
@@ -238,14 +240,39 @@ public class PartitionMetadataDaoTest {
@Test
public void testInTransactionContextUpdateWatermark() {
ArgumentCaptor<Mutation> mutation =
ArgumentCaptor.forClass(Mutation.class);
- doNothing().when(transaction).buffer(mutation.capture());
- assertNull(inTransactionContext.updateWatermark(PARTITION_TOKEN,
WATERMARK));
+ when(transaction.readRow(any(), any(), any()))
+ .thenReturn(
+ Struct.newBuilder()
+ .set(PartitionMetadataAdminDao.COLUMN_WATERMARK)
+ .to(WATERMARK)
+ .build());
+ Instant largerWatermark =
WATERMARK.toSqlTimestamp().toInstant().plus(Duration.ofSeconds(1));
+ assertNull(
+ inTransactionContext.updateWatermark(
+ PARTITION_TOKEN,
+ Timestamp.ofTimeSecondsAndNanos(
+ largerWatermark.getEpochSecond(), largerWatermark.getNano())));
+ verify(transaction).buffer(mutation.capture());
Map<String, Value> mutationValueMap = mutation.getValue().asMap();
assertEquals(
PARTITION_TOKEN,
mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_PARTITION_TOKEN).getString());
assertEquals(
- WATERMARK,
mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_WATERMARK).getTimestamp());
+ Timestamp.ofTimeSecondsAndNanos(
+ largerWatermark.getEpochSecond(), largerWatermark.getNano()),
+
mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_WATERMARK).getTimestamp());
+ }
+
+ @Test
+ public void testInTransactionContextDoNotUpdateWatermark() {
+ when(transaction.readRow(any(), any(), any()))
+ .thenReturn(
+ Struct.newBuilder()
+ .set(PartitionMetadataAdminDao.COLUMN_WATERMARK)
+ .to(WATERMARK)
+ .build());
+ assertNull(inTransactionContext.updateWatermark(PARTITION_TOKEN,
WATERMARK));
+ verify(transaction, times(0)).buffer(any(Mutation.class));
}
@Test