thiagotnunes commented on code in PR #32474:
URL: https://github.com/apache/beam/pull/32474#discussion_r1762261479


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ReadChangeStreamPartitionRangeTracker.java:
##########
@@ -49,6 +54,48 @@ public 
ReadChangeStreamPartitionRangeTracker(PartitionMetadata partition, Timest
     this.partition = partition;
   }
 
+  /**
+   * Sets a soft timeout from now for processing new positions. After the 
timeout the shouldContinue
+   * will start returning false indicating an early exit from processing.
+   */
+  @Override
+  public void setSoftTimeout(Duration duration) {
+    softDeadline = new 
Instant(timeSupplier.get().toSqlTimestamp()).plus(duration);
+    continueProcessing = true;
+  }
+
+  /**
+   * Returns true if the restriction tracker can claim new positions.
+   *
+   * <p>If soft timeout isn't set always returns true. Otherwise:
+   *
+   * <ol>
+   *   <li>If soft deadline hasn't been reached always returns true.
+   *   <li>If soft deadline has been reached but we haven't processed any 
positions returns true.
+   *   <li>If soft deadline has been reached but the new position is the same 
as the last attempted
+   *       position returns true.
+   *   <li>If soft deadline has been reached and the new position differs from 
the last attempted
+   *       position returns false.
+   * </ol>
+   *
+   * @return {@code true} if the position processing should continue, {@code 
false} if the soft
+   *     deadline has been reached and we have fully processed the previous 
position.
+   */
+  @Override
+  public boolean shouldContinue(Timestamp position) {
+    if (!continueProcessing) {
+      return false;
+    }
+    if (softDeadline == null || lastAttemptedPosition == null) {
+      return true;
+    }
+
+    continueProcessing &=
+        new Instant(timeSupplier.get().toSqlTimestamp()).isBefore(softDeadline)

Review Comment:
   nit: could we do the comparison without having to create a `new Instant` on 
every call?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordAction.java:
##########
@@ -88,6 +91,14 @@ public Optional<ProcessContinuation> run(
 
     final Timestamp commitTimestamp = record.getCommitTimestamp();
     final Instant commitInstant = new 
Instant(commitTimestamp.toSqlTimestamp().getTime());
+    if (tracker instanceof Interruptible

Review Comment:
   Could we receive an instance of Interruptible instead of having to do an 
instanceof comparison on every call?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to