thiagotnunes commented on code in PR #32474:
URL: https://github.com/apache/beam/pull/32474#discussion_r1762261479
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ReadChangeStreamPartitionRangeTracker.java:
##########
@@ -49,6 +54,48 @@ public
ReadChangeStreamPartitionRangeTracker(PartitionMetadata partition, Timest
this.partition = partition;
}
+ /**
+ * Sets a soft timeout from now for processing new positions. After the
timeout the shouldContinue
+ * will start returning false indicating an early exit from processing.
+ */
+ @Override
+ public void setSoftTimeout(Duration duration) {
+ softDeadline = new
Instant(timeSupplier.get().toSqlTimestamp()).plus(duration);
+ continueProcessing = true;
+ }
+
+ /**
+ * Returns true if the restriction tracker can claim new positions.
+ *
+ * <p>If soft timeout isn't set always returns true. Otherwise:
+ *
+ * <ol>
+ * <li>If soft deadline hasn't been reached always returns true.
+ * <li>If soft deadline has been reached but we haven't processed any
positions returns true.
+ * <li>If soft deadline has been reached but the new position is the same
as the last attempted
+ * position returns true.
+ * <li>If soft deadline has been reached and the new position differs from
the last attempted
+ * position returns false.
+ * </ol>
+ *
+ * @return {@code true} if the position processing should continue, {@code
false} if the soft
+ * deadline has been reached and we have fully processed the previous
position.
+ */
+ @Override
+ public boolean shouldContinue(Timestamp position) {
+ if (!continueProcessing) {
+ return false;
+ }
+ if (softDeadline == null || lastAttemptedPosition == null) {
+ return true;
+ }
+
+ continueProcessing &=
+ new Instant(timeSupplier.get().toSqlTimestamp()).isBefore(softDeadline)
Review Comment:
nit: could we do the comparison without having to create a `new Instant` on
every call?
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordAction.java:
##########
@@ -88,6 +91,14 @@ public Optional<ProcessContinuation> run(
final Timestamp commitTimestamp = record.getCommitTimestamp();
final Instant commitInstant = new
Instant(commitTimestamp.toSqlTimestamp().getTime());
+ if (tracker instanceof Interruptible
Review Comment:
Could we receive an instance of Interruptible instead of having to do an
instanceof comparison on every call?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]