boyuanzz commented on a change in pull request #14020:
URL: https://github.com/apache/beam/pull/14020#discussion_r579435256



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
##########
@@ -537,6 +537,7 @@ public ProcessContinuation processElement(
       while (tracker.tryClaim(out) && out[0] != null) {
         receiver.outputWithTimestamp(
             new ValueWithRecordId<>(out[0].getValue(), out[0].getId()), 
out[0].getTimestamp());
+        
watermarkEstimator.setWatermark(tracker.currentRestriction().getWatermark());

Review comment:
       >  I think we should leave the call to setWatemark only outside of this 
while loop.
   
   The tricky part is that when we exit `tryClaim()` loop, it's either 
trySplit() has been called or when source reader return false from advance(). 
When trySplit() is called, the currentRestriction will be changed to 
`EmptyUnboundedSource `. So output of the `tryClaim()` loo, the watermark we 
get from `currentRestriction` is actually from `EmptyUnboundedSource`, which is 
MAX_TIMESTAMP.
   
   Besides, the SDK gets watermark for `DelayedBundleApplication` from 
`WatermarkEstimator` before we call trySplit[`]. So we actually need to update 
the WatermarkEstimator with correct watermark every time we do tryClaim.
   [1] 
https://github.com/apache/beam/blob/3bb232fb098700de408f574585dfe74bbaff7230/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java#L330-L331

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
##########
@@ -537,6 +537,7 @@ public ProcessContinuation processElement(
       while (tracker.tryClaim(out) && out[0] != null) {
         receiver.outputWithTimestamp(
             new ValueWithRecordId<>(out[0].getValue(), out[0].getId()), 
out[0].getTimestamp());
+        
watermarkEstimator.setWatermark(tracker.currentRestriction().getWatermark());

Review comment:
       >  I think we should leave the call to setWatemark only outside of this 
while loop.
   
   The tricky part is that when we exit `tryClaim()` loop, it's either 
trySplit() has been called or when source reader return false from advance(). 
When trySplit() is called, the currentRestriction will be changed to 
`EmptyUnboundedSource `. So output of the `tryClaim()` loo, the watermark we 
get from `currentRestriction` is actually from `EmptyUnboundedSource`, which is 
MAX_TIMESTAMP.
   
   Besides, the SDK gets watermark for `DelayedBundleApplication` from 
`WatermarkEstimator` before we call trySplit[1]. So we actually need to update 
the WatermarkEstimator with correct watermark every time we do tryClaim.
   [1] 
https://github.com/apache/beam/blob/3bb232fb098700de408f574585dfe74bbaff7230/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java#L330-L331




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to