gemini-code-assist[bot] commented on code in PR #37459:
URL: https://github.com/apache/beam/pull/37459#discussion_r2760312509


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java:
##########
@@ -379,4 +387,16 @@ private Timestamp getNextReadChangeStreamEndTimestamp() {
     final Timestamp current = Timestamp.now();
     return Timestamp.ofTimeSecondsAndNanos(current.getSeconds() + 2 * 60, 
current.getNanos());
   }
+
+  // For Mutable Change Stream, Spanner only allow the max query end timestamp 
to be 30 minutes in
+  // the future.
+  private Timestamp getBoundedQueryEndTimestamp(Timestamp endTimestamp) {
+    if (this.isMutableChangeStream) {
+      final Timestamp current = Timestamp.now();
+      Timestamp maxTimestamp =
+          Timestamp.ofTimeSecondsAndNanos(current.getSeconds() + 30 * 60, 
current.getNanos());
+      return maxTimestamp.compareTo(endTimestamp) < 0 ? maxTimestamp : 
endTimestamp;
+    }
+    return endTimestamp;
+  }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   For better readability and maintainability, it's good practice to extract 
magic numbers into constants. Also, using `Comparators.min` can make the intent 
of the code clearer.
   
   You would need to add the following import:
   `import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Comparators;`
   
   ```java
     private Timestamp getBoundedQueryEndTimestamp(Timestamp endTimestamp) {
       if (this.isMutableChangeStream) {
         final long maxSecondsInFuture = 30 * 60;
         final Timestamp current = Timestamp.now();
         final Timestamp maxTimestamp =
             Timestamp.ofTimeSecondsAndNanos(current.getSeconds() + 
maxSecondsInFuture, current.getNanos());
         return Comparators.min(maxTimestamp, endTimestamp);
       }
       return endTimestamp;
     }
   ```



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java:
##########
@@ -916,6 +919,118 @@ public void 
testQueryChangeStreamWithChildPartitionsRecordUnboundedRestriction()
     verify(partitionMetadataDao, never()).updateWatermark(any(), any());
   }
 
+  @Test
+  public void testQueryChangeStreamWithMutableChangeStreamCappedEndTimestamp() 
{
+    // Initialize action with isMutableChangeStream = true
+    action =
+        new QueryChangeStreamAction(
+            changeStreamDao,
+            partitionMetadataDao,
+            changeStreamRecordMapper,
+            partitionMetadataMapper,
+            dataChangeRecordAction,
+            heartbeatRecordAction,
+            childPartitionsRecordAction,
+            partitionStartRecordAction,
+            partitionEndRecordAction,
+            partitionEventRecordAction,
+            metrics,
+            true);
+
+    // Set endTimestamp to 60 minutes in the future
+    Timestamp now = Timestamp.now();
+    Timestamp endTimestamp =
+        Timestamp.ofTimeSecondsAndNanos(now.getSeconds() + 60 * 60, 
now.getNanos());
+
+    partition = partition.toBuilder().setEndTimestamp(endTimestamp).build();
+    when(restriction.getTo()).thenReturn(endTimestamp);
+    when(partitionMetadataMapper.from(any())).thenReturn(partition);
+
+    final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
+    final ArgumentCaptor<Timestamp> timestampCaptor = 
ArgumentCaptor.forClass(Timestamp.class);
+    when(changeStreamDao.changeStreamQuery(
+            eq(PARTITION_TOKEN), eq(PARTITION_START_TIMESTAMP),
+            timestampCaptor.capture(), eq(PARTITION_HEARTBEAT_MILLIS)))
+        .thenReturn(resultSet);
+    when(resultSet.next()).thenReturn(false); // Query finishes (reaches cap)
+    when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK);
+    when(restrictionTracker.tryClaim(any(Timestamp.class))).thenReturn(true);
+
+    final ProcessContinuation result =
+        action.run(
+            partition, restrictionTracker, outputReceiver, watermarkEstimator, 
bundleFinalizer);
+
+    // Verify query was capped at ~30 minutes
+    long diff = timestampCaptor.getValue().getSeconds() - now.getSeconds();
+    assertTrue("Query should be capped at approx 30 minutes (1800s)", 
Math.abs(diff - 1800) < 10);
+
+    // Crucial: Should RESUME to process the rest later
+    assertEquals(ProcessContinuation.resume(), result);
+  }
+
+  @Test
+  public void 
testQueryChangeStreamWithMutableChangeStreamUncappedEndTimestamp() {
+    action =
+        new QueryChangeStreamAction(
+            changeStreamDao,
+            partitionMetadataDao,
+            changeStreamRecordMapper,
+            partitionMetadataMapper,
+            dataChangeRecordAction,
+            heartbeatRecordAction,
+            childPartitionsRecordAction,
+            partitionStartRecordAction,
+            partitionEndRecordAction,
+            partitionEventRecordAction,
+            metrics,
+            true);
+
+    // Set endTimestamp to only 10 minutes in the future
+    Timestamp now = Timestamp.now();
+    Timestamp endTimestamp =
+        Timestamp.ofTimeSecondsAndNanos(now.getSeconds() + 10 * 60, 
now.getNanos());
+
+    partition = partition.toBuilder().setEndTimestamp(endTimestamp).build();
+    when(restriction.getTo()).thenReturn(endTimestamp);
+    when(partitionMetadataMapper.from(any())).thenReturn(partition);
+
+    final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
+    when(changeStreamDao.changeStreamQuery(
+            eq(PARTITION_TOKEN), eq(PARTITION_START_TIMESTAMP),
+            eq(endTimestamp), eq(PARTITION_HEARTBEAT_MILLIS)))
+        .thenReturn(resultSet);
+    when(resultSet.next()).thenReturn(false);
+    when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK);
+    when(restrictionTracker.tryClaim(endTimestamp)).thenReturn(true);
+
+    final ProcessContinuation result =
+        action.run(
+            partition, restrictionTracker, outputReceiver, watermarkEstimator, 
bundleFinalizer);
+
+    // Should STOP because we reached the actual requested endTimestamp
+    assertEquals(ProcessContinuation.stop(), result);
+  }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Using a named constant for the magic number `10 * 60` would improve 
readability.
   
   ```java
     public void 
testQueryChangeStreamWithMutableChangeStreamUncappedEndTimestamp() {
       final long tenMinutesInSeconds = 10 * 60;
       action =
           new QueryChangeStreamAction(
               changeStreamDao,
               partitionMetadataDao,
               changeStreamRecordMapper,
               partitionMetadataMapper,
               dataChangeRecordAction,
               heartbeatRecordAction,
               childPartitionsRecordAction,
               partitionStartRecordAction,
               partitionEndRecordAction,
               partitionEventRecordAction,
               metrics,
               true);
   
       // Set endTimestamp to only 10 minutes in the future
       Timestamp now = Timestamp.now();
       Timestamp endTimestamp =
           Timestamp.ofTimeSecondsAndNanos(now.getSeconds() + 
tenMinutesInSeconds, now.getNanos());
   
       partition = partition.toBuilder().setEndTimestamp(endTimestamp).build();
       when(restriction.getTo()).thenReturn(endTimestamp);
       when(partitionMetadataMapper.from(any())).thenReturn(partition);
   
       final ChangeStreamResultSet resultSet = 
mock(ChangeStreamResultSet.class);
       when(changeStreamDao.changeStreamQuery(
               eq(PARTITION_TOKEN),
               eq(PARTITION_START_TIMESTAMP),
               eq(endTimestamp),
               eq(PARTITION_HEARTBEAT_MILLIS)))
           .thenReturn(resultSet);
       when(resultSet.next()).thenReturn(false);
       when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK);
       when(restrictionTracker.tryClaim(endTimestamp)).thenReturn(true);
   
       final ProcessContinuation result =
           action.run(
               partition, restrictionTracker, outputReceiver, 
watermarkEstimator, bundleFinalizer);
   
       // Should STOP because we reached the actual requested endTimestamp
       assertEquals(ProcessContinuation.stop(), result);
     }
   ```



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java:
##########
@@ -916,6 +919,118 @@ public void 
testQueryChangeStreamWithChildPartitionsRecordUnboundedRestriction()
     verify(partitionMetadataDao, never()).updateWatermark(any(), any());
   }
 
+  @Test
+  public void testQueryChangeStreamWithMutableChangeStreamCappedEndTimestamp() 
{
+    // Initialize action with isMutableChangeStream = true
+    action =
+        new QueryChangeStreamAction(
+            changeStreamDao,
+            partitionMetadataDao,
+            changeStreamRecordMapper,
+            partitionMetadataMapper,
+            dataChangeRecordAction,
+            heartbeatRecordAction,
+            childPartitionsRecordAction,
+            partitionStartRecordAction,
+            partitionEndRecordAction,
+            partitionEventRecordAction,
+            metrics,
+            true);
+
+    // Set endTimestamp to 60 minutes in the future
+    Timestamp now = Timestamp.now();
+    Timestamp endTimestamp =
+        Timestamp.ofTimeSecondsAndNanos(now.getSeconds() + 60 * 60, 
now.getNanos());
+
+    partition = partition.toBuilder().setEndTimestamp(endTimestamp).build();
+    when(restriction.getTo()).thenReturn(endTimestamp);
+    when(partitionMetadataMapper.from(any())).thenReturn(partition);
+
+    final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
+    final ArgumentCaptor<Timestamp> timestampCaptor = 
ArgumentCaptor.forClass(Timestamp.class);
+    when(changeStreamDao.changeStreamQuery(
+            eq(PARTITION_TOKEN), eq(PARTITION_START_TIMESTAMP),
+            timestampCaptor.capture(), eq(PARTITION_HEARTBEAT_MILLIS)))
+        .thenReturn(resultSet);
+    when(resultSet.next()).thenReturn(false); // Query finishes (reaches cap)
+    when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK);
+    when(restrictionTracker.tryClaim(any(Timestamp.class))).thenReturn(true);
+
+    final ProcessContinuation result =
+        action.run(
+            partition, restrictionTracker, outputReceiver, watermarkEstimator, 
bundleFinalizer);
+
+    // Verify query was capped at ~30 minutes
+    long diff = timestampCaptor.getValue().getSeconds() - now.getSeconds();
+    assertTrue("Query should be capped at approx 30 minutes (1800s)", 
Math.abs(diff - 1800) < 10);
+
+    // Crucial: Should RESUME to process the rest later
+    assertEquals(ProcessContinuation.resume(), result);
+  }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Using named constants for magic numbers like `60 * 60` and `1800` improves 
code readability and maintainability.
   
   ```java
     public void 
testQueryChangeStreamWithMutableChangeStreamCappedEndTimestamp() {
       final long sixtyMinutesInSeconds = 60 * 60;
       final long thirtyMinutesInSeconds = 30 * 60;
   
       // Initialize action with isMutableChangeStream = true
       action =
           new QueryChangeStreamAction(
               changeStreamDao,
               partitionMetadataDao,
               changeStreamRecordMapper,
               partitionMetadataMapper,
               dataChangeRecordAction,
               heartbeatRecordAction,
               childPartitionsRecordAction,
               partitionStartRecordAction,
               partitionEndRecordAction,
               partitionEventRecordAction,
               metrics,
               true);
   
       // Set endTimestamp to 60 minutes in the future
       Timestamp now = Timestamp.now();
       Timestamp endTimestamp =
           Timestamp.ofTimeSecondsAndNanos(now.getSeconds() + 
sixtyMinutesInSeconds, now.getNanos());
   
       partition = partition.toBuilder().setEndTimestamp(endTimestamp).build();
       when(restriction.getTo()).thenReturn(endTimestamp);
       when(partitionMetadataMapper.from(any())).thenReturn(partition);
   
       final ChangeStreamResultSet resultSet = 
mock(ChangeStreamResultSet.class);
       final ArgumentCaptor<Timestamp> timestampCaptor = 
ArgumentCaptor.forClass(Timestamp.class);
       when(changeStreamDao.changeStreamQuery(
               eq(PARTITION_TOKEN),
               eq(PARTITION_START_TIMESTAMP),
               timestampCaptor.capture(),
               eq(PARTITION_HEARTBEAT_MILLIS)))
           .thenReturn(resultSet);
       when(resultSet.next()).thenReturn(false); // Query finishes (reaches cap)
       when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK);
       when(restrictionTracker.tryClaim(any(Timestamp.class))).thenReturn(true);
   
       final ProcessContinuation result =
           action.run(
               partition, restrictionTracker, outputReceiver, 
watermarkEstimator, bundleFinalizer);
   
       // Verify query was capped at ~30 minutes
       long diff = timestampCaptor.getValue().getSeconds() - now.getSeconds();
       assertTrue(
           "Query should be capped at approx 30 minutes (1800s)",
           Math.abs(diff - thirtyMinutesInSeconds) < 10);
   
       // Crucial: Should RESUME to process the rest later
       assertEquals(ProcessContinuation.resume(), result);
     }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to