scwhittle commented on code in PR #37580:
URL: https://github.com/apache/beam/pull/37580#discussion_r2803420492


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java:
##########
@@ -1790,6 +1800,17 @@ abstract static class Builder {
 
       abstract Builder setPlainText(ValueProvider<Boolean> plainText);
 
+      abstract Builder setCdcTimeIncrement(Duration cdcTimeIncrement);
+
+      /**
+       * Heartbeat interval for all change stream queries.
+       *
+       * <p>Be careful when changing this interval, as it needs to be less 
than the checkpointing
+       * interval in Dataflow. Otherwise, if there are no records within 
checkpoint intervals, the

Review Comment:
   I previously changed the sdk to ignore runner-initiated splits until 
tryClaim was made. This ensures some progress per scheduling.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java:
##########
@@ -1912,6 +1933,13 @@ public ReadChangeStream 
withUsingPlainTextChannel(boolean plainText) {
       return 
withUsingPlainTextChannel(ValueProvider.StaticValueProvider.of(plainText));
     }
 
+    public ReadChangeStream withLowLatency() {

Review Comment:
   add a comment
   // Configures the change stream to checkpoint and flush output targeting low 
latency at the cost of higher rpc rate and cpu usage.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java:
##########


Review Comment:
   I think that all previous continuations have been stops. I think that there 
is an edge case when resuming here we have to confirm is not a potential issue. 
The timestamp of where we are going to resume processing is updated when we 
process the heartbeat. However it is possible there is multiple records with 
the same timestamp. In general the restriction tracker tryClaim makes sure that 
all records that had the same timestamp are processed to handle this.  However 
with this early return I think if it is possible that a heartbeat has the same 
timestamp as a record we could exit this processing before we process the 
record and on resuming we would start past that record.
   
   It seems unlikely that would be the case but I'd rather get some explicit 
confirmation or handle that.
   
   Perhaps an easy way to handle it would be to notify the interruptor that we 
want to interrupt when we get a heartbeat.  And then it already internally 
handles ensuring all the timestamps of the same value are processed. We could 
change the actions to return just a boolean indicating whether or not to stop 
since the continuation itself isn't well supported.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java:
##########
@@ -387,12 +391,17 @@ private boolean isTimestampOutOfRange(SpannerException e) 
{
         && e.getMessage().contains(OUT_OF_RANGE_ERROR_MESSAGE);
   }
 
-  // Return (now + 2 mins) as the end timestamp for reading change streams. 
This is only used if
-  // users want to run the connector forever. If the end timestamp is reached, 
we will resume
+  // Return (now + config duration) as the end timestamp for reading change
+  // streams. This is only

Review Comment:
   nit: wrap better



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java:
##########
@@ -1790,6 +1800,17 @@ abstract static class Builder {
 
       abstract Builder setPlainText(ValueProvider<Boolean> plainText);
 
+      abstract Builder setCdcTimeIncrement(Duration cdcTimeIncrement);

Review Comment:
   I think we should have better name than CdcTimeIncrement. 
   
   realTimeCheckpointInterval?
   
   would also be good to have a comment, something like:
    // When caught up to real-time, checkpoint processing of change stream this 
often.  This sets a bound on latency of processing if a steady trickle of 
elements prevents the heartbeat interval from triggering.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to