shunping commented on code in PR #35089:
URL: https://github.com/apache/beam/pull/35089#discussion_r2116221911
##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -1648,6 +1648,10 @@ keysPerBundle:
continue
}
holdsInBundle[e.holdTimestamp]++
+ if e.timestamp > watermark {
Review Comment:
> Looking again, as written this is causing data loss: You're losing the
value that was popped from the heap on line 1637.
>
> The idea feels sound though (only processing what's beneath the current
input watermark.)
No, there won't be a data loss there. Because if the heap has some data
elements earlier than the timer element, the if statement at line 1632
guarantees that we will only put all the data elements before the timer into
one bundle. As a result, the timer element will always ends up at the beginning
of a bundle.
##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -1648,6 +1648,10 @@ keysPerBundle:
continue
}
holdsInBundle[e.holdTimestamp]++
+ if e.timestamp > watermark {
Review Comment:
> Looking again, as written this is causing data loss: You're losing the
value that was popped from the heap on line 1637.
>
> The idea feels sound though (only processing what's beneath the current
input watermark.)
No, there won't be data loss there. Because if the heap has some data
elements earlier than the timer element, the if statement at line 1632
guarantees that we will only put all the data elements before the timer into
one bundle. As a result, the timer element will always ends up at the beginning
of a bundle.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]