gemini-code-assist[bot] commented on code in PR #39084:
URL: https://github.com/apache/beam/pull/39084#discussion_r3465990135


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java:
##########
@@ -395,6 +414,7 @@ public int hashCode() {
   private static class UnboundedCountingReader extends UnboundedReader<Long> {
     private UnboundedCountingSource source;
     private long current;
+    private boolean done = false;
 
     // Initialized on first advance()

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   Instead of maintaining a mutable `done` state which is not checkpointed (and 
would cause watermark regressions upon reader restore), we can determine if the 
reader is done deterministically based on `current` and `source.end`. This 
allows us to completely remove the `done` field.
   
   ```suggestion
       private long current;
   
       // Initialized on first advance()
   ```



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java:
##########
@@ -453,7 +477,7 @@ private long expectedValue() {
 
     @Override
     public Instant getWatermark() {
-      return source.timestampFn.apply(current);
+      return done ? BoundedWindow.TIMESTAMP_MAX_VALUE : 
source.timestampFn.apply(current);

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   We can deterministically check if the reader is done by comparing `current` 
with `source.end - source.stride`. If `current >= source.end - source.stride`, 
then any subsequent value would be `>= source.end`, meaning no more elements 
can be produced. This avoids watermark regression when restoring from a 
checkpoint.
   
   ```java
         return (current >= source.end - source.stride)
             ? BoundedWindow.TIMESTAMP_MAX_VALUE
             : source.timestampFn.apply(current);
   ```



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java:
##########
@@ -431,6 +451,10 @@ public boolean advance() throws IOException {
         return false;
       }
       long nextValue = current + source.stride;
+      if (nextValue >= source.end) {
+        done = true;
+        return false;
+      }

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   Since we are removing the `done` field, we don't need to set `done = true` 
here. We can just return `false` directly when the next value reaches or 
exceeds the end limit.
   
   ```java
         if (nextValue >= source.end) {
           return false;
         }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to