This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new e8920c79a0d Add logic to handle end timestamp of mutable change stream
bounded query (#37459)
e8920c79a0d is described below
commit e8920c79a0d3ce65c84eb1b353d9073b7dd55936
Author: chenxuesdu <[email protected]>
AuthorDate: Tue Feb 10 11:57:57 2026 -0800
Add logic to handle end timestamp of mutable change stream bounded query
(#37459)
* Add logic to handle end timestamp of mutable change stream bounded query.
* Resolve comments
* Fix tests
---
.../changestreams/action/ActionFactory.java | 6 +-
.../action/QueryChangeStreamAction.java | 33 +++++-
.../gcp/spanner/changestreams/dao/DaoFactory.java | 4 +
.../dofn/ReadChangeStreamPartitionDoFn.java | 5 +-
.../action/QueryChangeStreamActionTest.java | 120 ++++++++++++++++++++-
.../dofn/ReadChangeStreamPartitionDoFnTest.java | 25 +++--
6 files changed, 174 insertions(+), 19 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java
index e8749a83666..cd84168b23f 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java
@@ -187,7 +187,8 @@ public class ActionFactory implements Serializable {
PartitionStartRecordAction partitionStartRecordAction,
PartitionEndRecordAction partitionEndRecordAction,
PartitionEventRecordAction partitionEventRecordAction,
- ChangeStreamMetrics metrics) {
+ ChangeStreamMetrics metrics,
+ boolean isMutableChangeStream) {
if (queryChangeStreamActionInstance == null) {
queryChangeStreamActionInstance =
new QueryChangeStreamAction(
@@ -201,7 +202,8 @@ public class ActionFactory implements Serializable {
partitionStartRecordAction,
partitionEndRecordAction,
partitionEventRecordAction,
- metrics);
+ metrics,
+ isMutableChangeStream);
}
return queryChangeStreamActionInstance;
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
index 8da9f3d0951..69e89e74a38 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
@@ -34,6 +34,7 @@ import
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChangeStreamRecord
import
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.InitialPartition;
import
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionEndRecord;
import
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionEventRecord;
import
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
@@ -89,6 +90,7 @@ public class QueryChangeStreamAction {
private final PartitionEndRecordAction partitionEndRecordAction;
private final PartitionEventRecordAction partitionEventRecordAction;
private final ChangeStreamMetrics metrics;
+ private final boolean isMutableChangeStream;
/**
* Constructs an action class for performing a change stream query for a
given partition.
@@ -106,6 +108,7 @@ public class QueryChangeStreamAction {
* @param PartitionEndRecordAction action class to process {@link
PartitionEndRecord}s
* @param PartitionEventRecordAction action class to process {@link
PartitionEventRecord}s
* @param metrics metrics gathering class
+ * @param isMutableChangeStream whether the change stream is mutable or not
*/
QueryChangeStreamAction(
ChangeStreamDao changeStreamDao,
@@ -118,7 +121,8 @@ public class QueryChangeStreamAction {
PartitionStartRecordAction partitionStartRecordAction,
PartitionEndRecordAction partitionEndRecordAction,
PartitionEventRecordAction partitionEventRecordAction,
- ChangeStreamMetrics metrics) {
+ ChangeStreamMetrics metrics,
+ boolean isMutableChangeStream) {
this.changeStreamDao = changeStreamDao;
this.partitionMetadataDao = partitionMetadataDao;
this.changeStreamRecordMapper = changeStreamRecordMapper;
@@ -130,6 +134,7 @@ public class QueryChangeStreamAction {
this.partitionEndRecordAction = partitionEndRecordAction;
this.partitionEventRecordAction = partitionEventRecordAction;
this.metrics = metrics;
+ this.isMutableChangeStream = isMutableChangeStream;
}
/**
@@ -195,13 +200,23 @@ public class QueryChangeStreamAction {
final Timestamp endTimestamp = partition.getEndTimestamp();
final boolean isBoundedRestriction =
!endTimestamp.equals(MAX_INCLUSIVE_END_AT);
final Timestamp changeStreamQueryEndTimestamp =
- isBoundedRestriction ? endTimestamp :
getNextReadChangeStreamEndTimestamp();
+ isBoundedRestriction
+ ? getBoundedQueryEndTimestamp(endTimestamp)
+ : getNextReadChangeStreamEndTimestamp();
// Once the changeStreamQuery completes we may need to resume reading from
the partition if we
// had an unbounded restriction for which we set an arbitrary query end
timestamp and for which
// we didn't encounter any indications that the partition is done
(explicit end records or
- // exceptions about being out of timestamp range).
- boolean stopAfterQuerySucceeds = isBoundedRestriction;
+ // exceptions about being out of timestamp range). We also special case
the InitialPartition,
+ // which always stops after the query succeeds.
+ boolean stopAfterQuerySucceeds = false;
+ if (InitialPartition.isInitialPartition(partition.getPartitionToken())) {
+ stopAfterQuerySucceeds = true;
+ } else {
+ stopAfterQuerySucceeds =
+ isBoundedRestriction &&
changeStreamQueryEndTimestamp.equals(endTimestamp);
+ }
+
try (ChangeStreamResultSet resultSet =
changeStreamDao.changeStreamQuery(
token, startTimestamp, changeStreamQueryEndTimestamp,
partition.getHeartbeatMillis())) {
@@ -379,4 +394,14 @@ public class QueryChangeStreamAction {
final Timestamp current = Timestamp.now();
return Timestamp.ofTimeSecondsAndNanos(current.getSeconds() + 2 * 60,
current.getNanos());
}
+
+ // For Mutable Change Stream bounded queries, update the query end timestamp
to be within 2
+ // minutes in the future.
+ private Timestamp getBoundedQueryEndTimestamp(Timestamp endTimestamp) {
+ if (this.isMutableChangeStream) {
+ Timestamp nextTimestamp = getNextReadChangeStreamEndTimestamp();
+ return nextTimestamp.compareTo(endTimestamp) < 0 ? nextTimestamp :
endTimestamp;
+ }
+ return endTimestamp;
+ }
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/DaoFactory.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/DaoFactory.java
index 67b58bace70..95bdbfed7ca 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/DaoFactory.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/DaoFactory.java
@@ -151,4 +151,8 @@ public class DaoFactory implements Serializable {
}
return changeStreamDaoInstance;
}
+
+ public boolean isMutableChangeStream() {
+ return this.isMutableChangeStream;
+ }
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java
index 4f5631c468b..c3650b42761 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java
@@ -73,6 +73,7 @@ public class ReadChangeStreamPartitionDoFn extends
DoFn<PartitionMetadata, DataC
private final MapperFactory mapperFactory;
private final ActionFactory actionFactory;
private final ChangeStreamMetrics metrics;
+ private final boolean isMutableChangeStream;
/**
* Needs to be set through the {@link
*
ReadChangeStreamPartitionDoFn#setThroughputEstimator(BytesThroughputEstimator)}
call.
@@ -104,6 +105,7 @@ public class ReadChangeStreamPartitionDoFn extends
DoFn<PartitionMetadata, DataC
this.mapperFactory = mapperFactory;
this.actionFactory = actionFactory;
this.metrics = metrics;
+ this.isMutableChangeStream = daoFactory.isMutableChangeStream();
this.throughputEstimator = new NullThroughputEstimator<>();
}
@@ -215,7 +217,8 @@ public class ReadChangeStreamPartitionDoFn extends
DoFn<PartitionMetadata, DataC
partitionStartRecordAction,
partitionEndRecordAction,
partitionEventRecordAction,
- metrics);
+ metrics,
+ isMutableChangeStream);
}
/**
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java
index cf4c047025c..26ab41dff87 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java
@@ -21,7 +21,9 @@ import static
org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsCons
import static
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata.State.SCHEDULED;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
@@ -116,7 +118,8 @@ public class QueryChangeStreamActionTest {
partitionStartRecordAction,
partitionEndRecordAction,
partitionEventRecordAction,
- metrics);
+ metrics,
+ false);
final Struct row = mock(Struct.class);
partition =
PartitionMetadata.newBuilder()
@@ -916,6 +919,121 @@ public class QueryChangeStreamActionTest {
verify(partitionMetadataDao, never()).updateWatermark(any(), any());
}
+ @Test
+ public void testQueryChangeStreamWithMutableChangeStreamCappedEndTimestamp()
{
+ // Initialize action with isMutableChangeStream = true
+ action =
+ new QueryChangeStreamAction(
+ changeStreamDao,
+ partitionMetadataDao,
+ changeStreamRecordMapper,
+ partitionMetadataMapper,
+ dataChangeRecordAction,
+ heartbeatRecordAction,
+ childPartitionsRecordAction,
+ partitionStartRecordAction,
+ partitionEndRecordAction,
+ partitionEventRecordAction,
+ metrics,
+ true);
+
+ // Set endTimestamp to 60 minutes in the future
+ Timestamp now = Timestamp.now();
+ Timestamp endTimestamp =
+ Timestamp.ofTimeSecondsAndNanos(now.getSeconds() + 60 * 60,
now.getNanos());
+
+ partition = partition.toBuilder().setEndTimestamp(endTimestamp).build();
+ when(restriction.getTo()).thenReturn(endTimestamp);
+ when(partitionMetadataMapper.from(any())).thenReturn(partition);
+
+ final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
+ final ArgumentCaptor<Timestamp> timestampCaptor =
ArgumentCaptor.forClass(Timestamp.class);
+ when(changeStreamDao.changeStreamQuery(
+ eq(PARTITION_TOKEN), eq(PARTITION_START_TIMESTAMP),
+ timestampCaptor.capture(), eq(PARTITION_HEARTBEAT_MILLIS)))
+ .thenReturn(resultSet);
+ when(resultSet.next()).thenReturn(false); // Query finishes (reaches cap)
+ when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK);
+ when(restrictionTracker.tryClaim(any(Timestamp.class))).thenReturn(true);
+
+ final ProcessContinuation result =
+ action.run(
+ partition, restrictionTracker, outputReceiver, watermarkEstimator,
bundleFinalizer);
+
+ // Verify query was capped at ~2 minutes
+ long diff = timestampCaptor.getValue().getSeconds() - now.getSeconds();
+ assertTrue("Query should be capped at approx 2 minutes (120s)",
Math.abs(diff - 120) < 10);
+
+ // Crucial: Should RESUME to process the rest later
+ assertEquals(ProcessContinuation.resume(), result);
+ }
+
+ @Test
+ public void
testQueryChangeStreamWithMutableChangeStreamUncappedEndTimestamp() {
+ action =
+ new QueryChangeStreamAction(
+ changeStreamDao,
+ partitionMetadataDao,
+ changeStreamRecordMapper,
+ partitionMetadataMapper,
+ dataChangeRecordAction,
+ heartbeatRecordAction,
+ childPartitionsRecordAction,
+ partitionStartRecordAction,
+ partitionEndRecordAction,
+ partitionEventRecordAction,
+ metrics,
+ true);
+
+ // Set endTimestamp to only 10 seconds in the future
+ Timestamp now = Timestamp.now();
+ Timestamp endTimestamp = Timestamp.ofTimeSecondsAndNanos(now.getSeconds()
+ 10, now.getNanos());
+
+ partition = partition.toBuilder().setEndTimestamp(endTimestamp).build();
+ when(restriction.getTo()).thenReturn(endTimestamp);
+ when(partitionMetadataMapper.from(any())).thenReturn(partition);
+
+ final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
+ final ArgumentCaptor<Timestamp> timestampCaptor =
ArgumentCaptor.forClass(Timestamp.class);
+ when(changeStreamDao.changeStreamQuery(
+ eq(PARTITION_TOKEN), eq(PARTITION_START_TIMESTAMP),
+ timestampCaptor.capture(), eq(PARTITION_HEARTBEAT_MILLIS)))
+ .thenReturn(resultSet);
+ when(resultSet.next()).thenReturn(false);
+ when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK);
+ when(restrictionTracker.tryClaim(endTimestamp)).thenReturn(true);
+
+ final ProcessContinuation result =
+ action.run(
+ partition, restrictionTracker, outputReceiver, watermarkEstimator,
bundleFinalizer);
+
+ // Should use the exact endTimestamp since it is within the limit (10s <
2m)
+ assertEquals(endTimestamp, timestampCaptor.getValue());
+
+ // Should STOP because we reached the actual requested endTimestamp
+ assertEquals(ProcessContinuation.stop(), result);
+ }
+
+ @Test
+ public void testQueryChangeStreamUnboundedResumesCorrectly() {
+ // Unbounded restriction (streaming forever)
+ setupUnboundedPartition();
+
+ final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
+ when(changeStreamDao.changeStreamQuery(any(), any(), any(),
anyLong())).thenReturn(resultSet);
+ when(resultSet.next()).thenReturn(false);
+ when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK);
+ when(restrictionTracker.tryClaim(any(Timestamp.class))).thenReturn(true);
+
+ final ProcessContinuation result =
+ action.run(
+ partition, restrictionTracker, outputReceiver, watermarkEstimator,
bundleFinalizer);
+
+ // Should return RESUME to continue reading the stream every 2 minutes
+ assertEquals(ProcessContinuation.resume(), result);
+ verify(metrics).incQueryCounter();
+ }
+
private static class BundleFinalizerStub implements BundleFinalizer {
@Override
public void afterBundleCommit(Instant callbackExpiry, Callback callback) {
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java
index 62fa39eef55..9e588de77a0 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java
@@ -20,6 +20,8 @@ package org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn;
import static
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata.State.SCHEDULED;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
@@ -139,17 +141,18 @@ public class ReadChangeStreamPartitionDoFnTest {
when(actionFactory.partitionEventRecordAction(partitionMetadataDao,
metrics))
.thenReturn(partitionEventRecordAction);
when(actionFactory.queryChangeStreamAction(
- changeStreamDao,
- partitionMetadataDao,
- changeStreamRecordMapper,
- partitionMetadataMapper,
- dataChangeRecordAction,
- heartbeatRecordAction,
- childPartitionsRecordAction,
- partitionStartRecordAction,
- partitionEndRecordAction,
- partitionEventRecordAction,
- metrics))
+ eq(changeStreamDao),
+ eq(partitionMetadataDao),
+ eq(changeStreamRecordMapper),
+ eq(partitionMetadataMapper),
+ eq(dataChangeRecordAction),
+ eq(heartbeatRecordAction),
+ eq(childPartitionsRecordAction),
+ eq(partitionStartRecordAction),
+ eq(partitionEndRecordAction),
+ eq(partitionEventRecordAction),
+ eq(metrics),
+ anyBoolean()))
.thenReturn(queryChangeStreamAction);
doFn.setup();