shunping commented on code in PR #35089: URL: https://github.com/apache/beam/pull/35089#discussion_r2116221911
########## sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go: ########## @@ -1648,6 +1648,10 @@ keysPerBundle: continue } holdsInBundle[e.holdTimestamp]++ + if e.timestamp > watermark { Review Comment: > Looking again, as written this is causing data loss: You're losing the value that was popped from the heap on line 1637. > > The idea feels sound though (only processing what's beneath the current input watermark.) No, there won't be a data loss there. Because if the heap has some data elements earlier than the timer element, the if statement at line 1632 guarantees that we will only put all the data elements before the timer into one bundle. As a result, the timer element will always ends up at the beginning of a bundle. ########## sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go: ########## @@ -1648,6 +1648,10 @@ keysPerBundle: continue } holdsInBundle[e.holdTimestamp]++ + if e.timestamp > watermark { Review Comment: > Looking again, as written this is causing data loss: You're losing the value that was popped from the heap on line 1637. > > The idea feels sound though (only processing what's beneath the current input watermark.) No, there won't be data loss there. Because if the heap has some data elements earlier than the timer element, the if statement at line 1632 guarantees that we will only put all the data elements before the timer into one bundle. As a result, the timer element will always ends up at the beginning of a bundle. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org