gemini-code-assist[bot] commented on code in PR #37459:
URL: https://github.com/apache/beam/pull/37459#discussion_r2760312509
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java:
##########
@@ -379,4 +387,16 @@ private Timestamp getNextReadChangeStreamEndTimestamp() {
final Timestamp current = Timestamp.now();
return Timestamp.ofTimeSecondsAndNanos(current.getSeconds() + 2 * 60,
current.getNanos());
}
+
+ // For Mutable Change Stream, Spanner only allow the max query end timestamp
to be 30 minutes in
+ // the future.
+ private Timestamp getBoundedQueryEndTimestamp(Timestamp endTimestamp) {
+ if (this.isMutableChangeStream) {
+ final Timestamp current = Timestamp.now();
+ Timestamp maxTimestamp =
+ Timestamp.ofTimeSecondsAndNanos(current.getSeconds() + 30 * 60,
current.getNanos());
+ return maxTimestamp.compareTo(endTimestamp) < 0 ? maxTimestamp :
endTimestamp;
+ }
+ return endTimestamp;
+ }
Review Comment:

For better readability and maintainability, it's good practice to extract
magic numbers into constants. Also, using `Comparators.min` can make the intent
of the code clearer.
You would need to add the following import:
`import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Comparators;`
```java
private Timestamp getBoundedQueryEndTimestamp(Timestamp endTimestamp) {
if (this.isMutableChangeStream) {
final long maxSecondsInFuture = 30 * 60;
final Timestamp current = Timestamp.now();
final Timestamp maxTimestamp =
Timestamp.ofTimeSecondsAndNanos(current.getSeconds() +
maxSecondsInFuture, current.getNanos());
return Comparators.min(maxTimestamp, endTimestamp);
}
return endTimestamp;
}
```
##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java:
##########
@@ -916,6 +919,118 @@ public void
testQueryChangeStreamWithChildPartitionsRecordUnboundedRestriction()
verify(partitionMetadataDao, never()).updateWatermark(any(), any());
}
+ @Test
+ public void testQueryChangeStreamWithMutableChangeStreamCappedEndTimestamp()
{
+ // Initialize action with isMutableChangeStream = true
+ action =
+ new QueryChangeStreamAction(
+ changeStreamDao,
+ partitionMetadataDao,
+ changeStreamRecordMapper,
+ partitionMetadataMapper,
+ dataChangeRecordAction,
+ heartbeatRecordAction,
+ childPartitionsRecordAction,
+ partitionStartRecordAction,
+ partitionEndRecordAction,
+ partitionEventRecordAction,
+ metrics,
+ true);
+
+ // Set endTimestamp to 60 minutes in the future
+ Timestamp now = Timestamp.now();
+ Timestamp endTimestamp =
+ Timestamp.ofTimeSecondsAndNanos(now.getSeconds() + 60 * 60,
now.getNanos());
+
+ partition = partition.toBuilder().setEndTimestamp(endTimestamp).build();
+ when(restriction.getTo()).thenReturn(endTimestamp);
+ when(partitionMetadataMapper.from(any())).thenReturn(partition);
+
+ final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
+ final ArgumentCaptor<Timestamp> timestampCaptor =
ArgumentCaptor.forClass(Timestamp.class);
+ when(changeStreamDao.changeStreamQuery(
+ eq(PARTITION_TOKEN), eq(PARTITION_START_TIMESTAMP),
+ timestampCaptor.capture(), eq(PARTITION_HEARTBEAT_MILLIS)))
+ .thenReturn(resultSet);
+ when(resultSet.next()).thenReturn(false); // Query finishes (reaches cap)
+ when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK);
+ when(restrictionTracker.tryClaim(any(Timestamp.class))).thenReturn(true);
+
+ final ProcessContinuation result =
+ action.run(
+ partition, restrictionTracker, outputReceiver, watermarkEstimator,
bundleFinalizer);
+
+ // Verify query was capped at ~30 minutes
+ long diff = timestampCaptor.getValue().getSeconds() - now.getSeconds();
+ assertTrue("Query should be capped at approx 30 minutes (1800s)",
Math.abs(diff - 1800) < 10);
+
+ // Crucial: Should RESUME to process the rest later
+ assertEquals(ProcessContinuation.resume(), result);
+ }
+
+ @Test
+ public void
testQueryChangeStreamWithMutableChangeStreamUncappedEndTimestamp() {
+ action =
+ new QueryChangeStreamAction(
+ changeStreamDao,
+ partitionMetadataDao,
+ changeStreamRecordMapper,
+ partitionMetadataMapper,
+ dataChangeRecordAction,
+ heartbeatRecordAction,
+ childPartitionsRecordAction,
+ partitionStartRecordAction,
+ partitionEndRecordAction,
+ partitionEventRecordAction,
+ metrics,
+ true);
+
+ // Set endTimestamp to only 10 minutes in the future
+ Timestamp now = Timestamp.now();
+ Timestamp endTimestamp =
+ Timestamp.ofTimeSecondsAndNanos(now.getSeconds() + 10 * 60,
now.getNanos());
+
+ partition = partition.toBuilder().setEndTimestamp(endTimestamp).build();
+ when(restriction.getTo()).thenReturn(endTimestamp);
+ when(partitionMetadataMapper.from(any())).thenReturn(partition);
+
+ final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
+ when(changeStreamDao.changeStreamQuery(
+ eq(PARTITION_TOKEN), eq(PARTITION_START_TIMESTAMP),
+ eq(endTimestamp), eq(PARTITION_HEARTBEAT_MILLIS)))
+ .thenReturn(resultSet);
+ when(resultSet.next()).thenReturn(false);
+ when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK);
+ when(restrictionTracker.tryClaim(endTimestamp)).thenReturn(true);
+
+ final ProcessContinuation result =
+ action.run(
+ partition, restrictionTracker, outputReceiver, watermarkEstimator,
bundleFinalizer);
+
+ // Should STOP because we reached the actual requested endTimestamp
+ assertEquals(ProcessContinuation.stop(), result);
+ }
Review Comment:

Using a named constant for the magic number `10 * 60` would improve
readability.
```java
public void
testQueryChangeStreamWithMutableChangeStreamUncappedEndTimestamp() {
final long tenMinutesInSeconds = 10 * 60;
action =
new QueryChangeStreamAction(
changeStreamDao,
partitionMetadataDao,
changeStreamRecordMapper,
partitionMetadataMapper,
dataChangeRecordAction,
heartbeatRecordAction,
childPartitionsRecordAction,
partitionStartRecordAction,
partitionEndRecordAction,
partitionEventRecordAction,
metrics,
true);
// Set endTimestamp to only 10 minutes in the future
Timestamp now = Timestamp.now();
Timestamp endTimestamp =
Timestamp.ofTimeSecondsAndNanos(now.getSeconds() +
tenMinutesInSeconds, now.getNanos());
partition = partition.toBuilder().setEndTimestamp(endTimestamp).build();
when(restriction.getTo()).thenReturn(endTimestamp);
when(partitionMetadataMapper.from(any())).thenReturn(partition);
final ChangeStreamResultSet resultSet =
mock(ChangeStreamResultSet.class);
when(changeStreamDao.changeStreamQuery(
eq(PARTITION_TOKEN),
eq(PARTITION_START_TIMESTAMP),
eq(endTimestamp),
eq(PARTITION_HEARTBEAT_MILLIS)))
.thenReturn(resultSet);
when(resultSet.next()).thenReturn(false);
when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK);
when(restrictionTracker.tryClaim(endTimestamp)).thenReturn(true);
final ProcessContinuation result =
action.run(
partition, restrictionTracker, outputReceiver,
watermarkEstimator, bundleFinalizer);
// Should STOP because we reached the actual requested endTimestamp
assertEquals(ProcessContinuation.stop(), result);
}
```
##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java:
##########
@@ -916,6 +919,118 @@ public void
testQueryChangeStreamWithChildPartitionsRecordUnboundedRestriction()
verify(partitionMetadataDao, never()).updateWatermark(any(), any());
}
+ @Test
+ public void testQueryChangeStreamWithMutableChangeStreamCappedEndTimestamp()
{
+ // Initialize action with isMutableChangeStream = true
+ action =
+ new QueryChangeStreamAction(
+ changeStreamDao,
+ partitionMetadataDao,
+ changeStreamRecordMapper,
+ partitionMetadataMapper,
+ dataChangeRecordAction,
+ heartbeatRecordAction,
+ childPartitionsRecordAction,
+ partitionStartRecordAction,
+ partitionEndRecordAction,
+ partitionEventRecordAction,
+ metrics,
+ true);
+
+ // Set endTimestamp to 60 minutes in the future
+ Timestamp now = Timestamp.now();
+ Timestamp endTimestamp =
+ Timestamp.ofTimeSecondsAndNanos(now.getSeconds() + 60 * 60,
now.getNanos());
+
+ partition = partition.toBuilder().setEndTimestamp(endTimestamp).build();
+ when(restriction.getTo()).thenReturn(endTimestamp);
+ when(partitionMetadataMapper.from(any())).thenReturn(partition);
+
+ final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
+ final ArgumentCaptor<Timestamp> timestampCaptor =
ArgumentCaptor.forClass(Timestamp.class);
+ when(changeStreamDao.changeStreamQuery(
+ eq(PARTITION_TOKEN), eq(PARTITION_START_TIMESTAMP),
+ timestampCaptor.capture(), eq(PARTITION_HEARTBEAT_MILLIS)))
+ .thenReturn(resultSet);
+ when(resultSet.next()).thenReturn(false); // Query finishes (reaches cap)
+ when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK);
+ when(restrictionTracker.tryClaim(any(Timestamp.class))).thenReturn(true);
+
+ final ProcessContinuation result =
+ action.run(
+ partition, restrictionTracker, outputReceiver, watermarkEstimator,
bundleFinalizer);
+
+ // Verify query was capped at ~30 minutes
+ long diff = timestampCaptor.getValue().getSeconds() - now.getSeconds();
+ assertTrue("Query should be capped at approx 30 minutes (1800s)",
Math.abs(diff - 1800) < 10);
+
+ // Crucial: Should RESUME to process the rest later
+ assertEquals(ProcessContinuation.resume(), result);
+ }
Review Comment:

Using named constants for magic numbers like `60 * 60` and `1800` improves
code readability and maintainability.
```java
public void
testQueryChangeStreamWithMutableChangeStreamCappedEndTimestamp() {
final long sixtyMinutesInSeconds = 60 * 60;
final long thirtyMinutesInSeconds = 30 * 60;
// Initialize action with isMutableChangeStream = true
action =
new QueryChangeStreamAction(
changeStreamDao,
partitionMetadataDao,
changeStreamRecordMapper,
partitionMetadataMapper,
dataChangeRecordAction,
heartbeatRecordAction,
childPartitionsRecordAction,
partitionStartRecordAction,
partitionEndRecordAction,
partitionEventRecordAction,
metrics,
true);
// Set endTimestamp to 60 minutes in the future
Timestamp now = Timestamp.now();
Timestamp endTimestamp =
Timestamp.ofTimeSecondsAndNanos(now.getSeconds() +
sixtyMinutesInSeconds, now.getNanos());
partition = partition.toBuilder().setEndTimestamp(endTimestamp).build();
when(restriction.getTo()).thenReturn(endTimestamp);
when(partitionMetadataMapper.from(any())).thenReturn(partition);
final ChangeStreamResultSet resultSet =
mock(ChangeStreamResultSet.class);
final ArgumentCaptor<Timestamp> timestampCaptor =
ArgumentCaptor.forClass(Timestamp.class);
when(changeStreamDao.changeStreamQuery(
eq(PARTITION_TOKEN),
eq(PARTITION_START_TIMESTAMP),
timestampCaptor.capture(),
eq(PARTITION_HEARTBEAT_MILLIS)))
.thenReturn(resultSet);
when(resultSet.next()).thenReturn(false); // Query finishes (reaches cap)
when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK);
when(restrictionTracker.tryClaim(any(Timestamp.class))).thenReturn(true);
final ProcessContinuation result =
action.run(
partition, restrictionTracker, outputReceiver,
watermarkEstimator, bundleFinalizer);
// Verify query was capped at ~30 minutes
long diff = timestampCaptor.getValue().getSeconds() - now.getSeconds();
assertTrue(
"Query should be capped at approx 30 minutes (1800s)",
Math.abs(diff - thirtyMinutesInSeconds) < 10);
// Crucial: Should RESUME to process the rest later
assertEquals(ProcessContinuation.resume(), result);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]