lostluck commented on code in PR #30492:
URL: https://github.com/apache/beam/pull/30492#discussion_r1618123753
##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -305,8 +312,13 @@ func (em *ElementManager) Bundles(ctx context.Context,
nextBundID func() string)
for {
em.refreshCond.L.Lock()
+ // Check if processing time has advanced before the
wait loop.
+ emNow := em.ProcessingTimeNow()
+ ptRefreshed := em.processTimeEvents.AdvanceTo(emNow)
+ em.watermarkRefreshes.merge(ptRefreshed)
+
// If there are no watermark refreshes available, we
wait until there are.
- for len(em.watermarkRefreshes) == 0 {
+ for len(em.watermarkRefreshes)+len(ptRefreshed) == 0 {
// TODO Add processing time event condition instead of piggybacking on
watermarks?
Review Comment:
In this case, it's a leftover comment. The "ProcessingTime event condition"
is basically rthat list of stages that have been triggered by the Processing
time queue. I originally was keeping everything in the watermark refreshes
instead, but that didn't work as cleanly as I hoped.
Old comment removed, other comment clarified.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]