This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e8920c79a0d Add logic to handle end timestamp of mutable change stream 
bounded query (#37459)
e8920c79a0d is described below

commit e8920c79a0d3ce65c84eb1b353d9073b7dd55936
Author: chenxuesdu <[email protected]>
AuthorDate: Tue Feb 10 11:57:57 2026 -0800

    Add logic to handle end timestamp of mutable change stream bounded query 
(#37459)
    
    * Add logic to handle end timestamp of mutable change stream bounded query.
    
    * Resolve comments
    
    * Fix tests
---
 .../changestreams/action/ActionFactory.java        |   6 +-
 .../action/QueryChangeStreamAction.java            |  33 +++++-
 .../gcp/spanner/changestreams/dao/DaoFactory.java  |   4 +
 .../dofn/ReadChangeStreamPartitionDoFn.java        |   5 +-
 .../action/QueryChangeStreamActionTest.java        | 120 ++++++++++++++++++++-
 .../dofn/ReadChangeStreamPartitionDoFnTest.java    |  25 +++--
 6 files changed, 174 insertions(+), 19 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java
index e8749a83666..cd84168b23f 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java
@@ -187,7 +187,8 @@ public class ActionFactory implements Serializable {
       PartitionStartRecordAction partitionStartRecordAction,
       PartitionEndRecordAction partitionEndRecordAction,
       PartitionEventRecordAction partitionEventRecordAction,
-      ChangeStreamMetrics metrics) {
+      ChangeStreamMetrics metrics,
+      boolean isMutableChangeStream) {
     if (queryChangeStreamActionInstance == null) {
       queryChangeStreamActionInstance =
           new QueryChangeStreamAction(
@@ -201,7 +202,8 @@ public class ActionFactory implements Serializable {
               partitionStartRecordAction,
               partitionEndRecordAction,
               partitionEventRecordAction,
-              metrics);
+              metrics,
+              isMutableChangeStream);
     }
     return queryChangeStreamActionInstance;
   }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
index 8da9f3d0951..69e89e74a38 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
@@ -34,6 +34,7 @@ import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChangeStreamRecord
 import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.InitialPartition;
 import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionEndRecord;
 import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionEventRecord;
 import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
@@ -89,6 +90,7 @@ public class QueryChangeStreamAction {
   private final PartitionEndRecordAction partitionEndRecordAction;
   private final PartitionEventRecordAction partitionEventRecordAction;
   private final ChangeStreamMetrics metrics;
+  private final boolean isMutableChangeStream;
 
   /**
    * Constructs an action class for performing a change stream query for a 
given partition.
@@ -106,6 +108,7 @@ public class QueryChangeStreamAction {
    * @param PartitionEndRecordAction action class to process {@link 
PartitionEndRecord}s
    * @param PartitionEventRecordAction action class to process {@link 
PartitionEventRecord}s
    * @param metrics metrics gathering class
+   * @param isMutableChangeStream whether the change stream is mutable or not
    */
   QueryChangeStreamAction(
       ChangeStreamDao changeStreamDao,
@@ -118,7 +121,8 @@ public class QueryChangeStreamAction {
       PartitionStartRecordAction partitionStartRecordAction,
       PartitionEndRecordAction partitionEndRecordAction,
       PartitionEventRecordAction partitionEventRecordAction,
-      ChangeStreamMetrics metrics) {
+      ChangeStreamMetrics metrics,
+      boolean isMutableChangeStream) {
     this.changeStreamDao = changeStreamDao;
     this.partitionMetadataDao = partitionMetadataDao;
     this.changeStreamRecordMapper = changeStreamRecordMapper;
@@ -130,6 +134,7 @@ public class QueryChangeStreamAction {
     this.partitionEndRecordAction = partitionEndRecordAction;
     this.partitionEventRecordAction = partitionEventRecordAction;
     this.metrics = metrics;
+    this.isMutableChangeStream = isMutableChangeStream;
   }
 
   /**
@@ -195,13 +200,23 @@ public class QueryChangeStreamAction {
     final Timestamp endTimestamp = partition.getEndTimestamp();
     final boolean isBoundedRestriction = 
!endTimestamp.equals(MAX_INCLUSIVE_END_AT);
     final Timestamp changeStreamQueryEndTimestamp =
-        isBoundedRestriction ? endTimestamp : 
getNextReadChangeStreamEndTimestamp();
+        isBoundedRestriction
+            ? getBoundedQueryEndTimestamp(endTimestamp)
+            : getNextReadChangeStreamEndTimestamp();
 
     // Once the changeStreamQuery completes we may need to resume reading from 
the partition if we
     // had an unbounded restriction for which we set an arbitrary query end 
timestamp and for which
     // we didn't  encounter any indications that the partition is done 
(explicit end records or
-    // exceptions about being out of timestamp range).
-    boolean stopAfterQuerySucceeds = isBoundedRestriction;
+    // exceptions about being out of timestamp range). We also special case 
the InitialPartition,
+    // which always stops after the query succeeds.
+    boolean stopAfterQuerySucceeds = false;
+    if (InitialPartition.isInitialPartition(partition.getPartitionToken())) {
+      stopAfterQuerySucceeds = true;
+    } else {
+      stopAfterQuerySucceeds =
+          isBoundedRestriction && 
changeStreamQueryEndTimestamp.equals(endTimestamp);
+    }
+
     try (ChangeStreamResultSet resultSet =
         changeStreamDao.changeStreamQuery(
             token, startTimestamp, changeStreamQueryEndTimestamp, 
partition.getHeartbeatMillis())) {
@@ -379,4 +394,14 @@ public class QueryChangeStreamAction {
     final Timestamp current = Timestamp.now();
     return Timestamp.ofTimeSecondsAndNanos(current.getSeconds() + 2 * 60, 
current.getNanos());
   }
+
+  // For Mutable Change Stream bounded queries, update the query end timestamp 
to be within 2
+  // minutes in the future.
+  private Timestamp getBoundedQueryEndTimestamp(Timestamp endTimestamp) {
+    if (this.isMutableChangeStream) {
+      Timestamp nextTimestamp = getNextReadChangeStreamEndTimestamp();
+      return nextTimestamp.compareTo(endTimestamp) < 0 ? nextTimestamp : 
endTimestamp;
+    }
+    return endTimestamp;
+  }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/DaoFactory.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/DaoFactory.java
index 67b58bace70..95bdbfed7ca 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/DaoFactory.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/DaoFactory.java
@@ -151,4 +151,8 @@ public class DaoFactory implements Serializable {
     }
     return changeStreamDaoInstance;
   }
+
+  public boolean isMutableChangeStream() {
+    return this.isMutableChangeStream;
+  }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java
index 4f5631c468b..c3650b42761 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java
@@ -73,6 +73,7 @@ public class ReadChangeStreamPartitionDoFn extends 
DoFn<PartitionMetadata, DataC
   private final MapperFactory mapperFactory;
   private final ActionFactory actionFactory;
   private final ChangeStreamMetrics metrics;
+  private final boolean isMutableChangeStream;
   /**
    * Needs to be set through the {@link
    * 
ReadChangeStreamPartitionDoFn#setThroughputEstimator(BytesThroughputEstimator)} 
call.
@@ -104,6 +105,7 @@ public class ReadChangeStreamPartitionDoFn extends 
DoFn<PartitionMetadata, DataC
     this.mapperFactory = mapperFactory;
     this.actionFactory = actionFactory;
     this.metrics = metrics;
+    this.isMutableChangeStream = daoFactory.isMutableChangeStream();
     this.throughputEstimator = new NullThroughputEstimator<>();
   }
 
@@ -215,7 +217,8 @@ public class ReadChangeStreamPartitionDoFn extends 
DoFn<PartitionMetadata, DataC
             partitionStartRecordAction,
             partitionEndRecordAction,
             partitionEventRecordAction,
-            metrics);
+            metrics,
+            isMutableChangeStream);
   }
 
   /**
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java
index cf4c047025c..26ab41dff87 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java
@@ -21,7 +21,9 @@ import static 
org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsCons
 import static 
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata.State.SCHEDULED;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
@@ -116,7 +118,8 @@ public class QueryChangeStreamActionTest {
             partitionStartRecordAction,
             partitionEndRecordAction,
             partitionEventRecordAction,
-            metrics);
+            metrics,
+            false);
     final Struct row = mock(Struct.class);
     partition =
         PartitionMetadata.newBuilder()
@@ -916,6 +919,121 @@ public class QueryChangeStreamActionTest {
     verify(partitionMetadataDao, never()).updateWatermark(any(), any());
   }
 
+  @Test
+  public void testQueryChangeStreamWithMutableChangeStreamCappedEndTimestamp() 
{
+    // Initialize action with isMutableChangeStream = true
+    action =
+        new QueryChangeStreamAction(
+            changeStreamDao,
+            partitionMetadataDao,
+            changeStreamRecordMapper,
+            partitionMetadataMapper,
+            dataChangeRecordAction,
+            heartbeatRecordAction,
+            childPartitionsRecordAction,
+            partitionStartRecordAction,
+            partitionEndRecordAction,
+            partitionEventRecordAction,
+            metrics,
+            true);
+
+    // Set endTimestamp to 60 minutes in the future
+    Timestamp now = Timestamp.now();
+    Timestamp endTimestamp =
+        Timestamp.ofTimeSecondsAndNanos(now.getSeconds() + 60 * 60, 
now.getNanos());
+
+    partition = partition.toBuilder().setEndTimestamp(endTimestamp).build();
+    when(restriction.getTo()).thenReturn(endTimestamp);
+    when(partitionMetadataMapper.from(any())).thenReturn(partition);
+
+    final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
+    final ArgumentCaptor<Timestamp> timestampCaptor = 
ArgumentCaptor.forClass(Timestamp.class);
+    when(changeStreamDao.changeStreamQuery(
+            eq(PARTITION_TOKEN), eq(PARTITION_START_TIMESTAMP),
+            timestampCaptor.capture(), eq(PARTITION_HEARTBEAT_MILLIS)))
+        .thenReturn(resultSet);
+    when(resultSet.next()).thenReturn(false); // Query finishes (reaches cap)
+    when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK);
+    when(restrictionTracker.tryClaim(any(Timestamp.class))).thenReturn(true);
+
+    final ProcessContinuation result =
+        action.run(
+            partition, restrictionTracker, outputReceiver, watermarkEstimator, 
bundleFinalizer);
+
+    // Verify query was capped at ~2 minutes
+    long diff = timestampCaptor.getValue().getSeconds() - now.getSeconds();
+    assertTrue("Query should be capped at approx 2 minutes (120s)", 
Math.abs(diff - 120) < 10);
+
+    // Crucial: Should RESUME to process the rest later
+    assertEquals(ProcessContinuation.resume(), result);
+  }
+
+  @Test
+  public void 
testQueryChangeStreamWithMutableChangeStreamUncappedEndTimestamp() {
+    action =
+        new QueryChangeStreamAction(
+            changeStreamDao,
+            partitionMetadataDao,
+            changeStreamRecordMapper,
+            partitionMetadataMapper,
+            dataChangeRecordAction,
+            heartbeatRecordAction,
+            childPartitionsRecordAction,
+            partitionStartRecordAction,
+            partitionEndRecordAction,
+            partitionEventRecordAction,
+            metrics,
+            true);
+
+    // Set endTimestamp to only 10 seconds in the future
+    Timestamp now = Timestamp.now();
+    Timestamp endTimestamp = Timestamp.ofTimeSecondsAndNanos(now.getSeconds() 
+ 10, now.getNanos());
+
+    partition = partition.toBuilder().setEndTimestamp(endTimestamp).build();
+    when(restriction.getTo()).thenReturn(endTimestamp);
+    when(partitionMetadataMapper.from(any())).thenReturn(partition);
+
+    final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
+    final ArgumentCaptor<Timestamp> timestampCaptor = 
ArgumentCaptor.forClass(Timestamp.class);
+    when(changeStreamDao.changeStreamQuery(
+            eq(PARTITION_TOKEN), eq(PARTITION_START_TIMESTAMP),
+            timestampCaptor.capture(), eq(PARTITION_HEARTBEAT_MILLIS)))
+        .thenReturn(resultSet);
+    when(resultSet.next()).thenReturn(false);
+    when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK);
+    when(restrictionTracker.tryClaim(endTimestamp)).thenReturn(true);
+
+    final ProcessContinuation result =
+        action.run(
+            partition, restrictionTracker, outputReceiver, watermarkEstimator, 
bundleFinalizer);
+
+    // Should use the exact endTimestamp since it is within the limit (10s < 
2m)
+    assertEquals(endTimestamp, timestampCaptor.getValue());
+
+    // Should STOP because we reached the actual requested endTimestamp
+    assertEquals(ProcessContinuation.stop(), result);
+  }
+
+  @Test
+  public void testQueryChangeStreamUnboundedResumesCorrectly() {
+    // Unbounded restriction (streaming forever)
+    setupUnboundedPartition();
+
+    final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
+    when(changeStreamDao.changeStreamQuery(any(), any(), any(), 
anyLong())).thenReturn(resultSet);
+    when(resultSet.next()).thenReturn(false);
+    when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK);
+    when(restrictionTracker.tryClaim(any(Timestamp.class))).thenReturn(true);
+
+    final ProcessContinuation result =
+        action.run(
+            partition, restrictionTracker, outputReceiver, watermarkEstimator, 
bundleFinalizer);
+
+    // Should return RESUME to continue reading the stream every 2 minutes
+    assertEquals(ProcessContinuation.resume(), result);
+    verify(metrics).incQueryCounter();
+  }
+
   private static class BundleFinalizerStub implements BundleFinalizer {
     @Override
     public void afterBundleCommit(Instant callbackExpiry, Callback callback) {
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java
index 62fa39eef55..9e588de77a0 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java
@@ -20,6 +20,8 @@ package org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn;
 import static 
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata.State.SCHEDULED;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
@@ -139,17 +141,18 @@ public class ReadChangeStreamPartitionDoFnTest {
     when(actionFactory.partitionEventRecordAction(partitionMetadataDao, 
metrics))
         .thenReturn(partitionEventRecordAction);
     when(actionFactory.queryChangeStreamAction(
-            changeStreamDao,
-            partitionMetadataDao,
-            changeStreamRecordMapper,
-            partitionMetadataMapper,
-            dataChangeRecordAction,
-            heartbeatRecordAction,
-            childPartitionsRecordAction,
-            partitionStartRecordAction,
-            partitionEndRecordAction,
-            partitionEventRecordAction,
-            metrics))
+            eq(changeStreamDao),
+            eq(partitionMetadataDao),
+            eq(changeStreamRecordMapper),
+            eq(partitionMetadataMapper),
+            eq(dataChangeRecordAction),
+            eq(heartbeatRecordAction),
+            eq(childPartitionsRecordAction),
+            eq(partitionStartRecordAction),
+            eq(partitionEndRecordAction),
+            eq(partitionEventRecordAction),
+            eq(metrics),
+            anyBoolean()))
         .thenReturn(queryChangeStreamAction);
 
     doFn.setup();

Reply via email to