lostluck commented on code in PR #30492:
URL: https://github.com/apache/beam/pull/30492#discussion_r1618123753


##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -305,8 +312,13 @@ func (em *ElementManager) Bundles(ctx context.Context, 
nextBundID func() string)
 
                for {
                        em.refreshCond.L.Lock()
+                       // Check if processing time has advanced before the 
wait loop.
+                       emNow := em.ProcessingTimeNow()
+                       ptRefreshed := em.processTimeEvents.AdvanceTo(emNow)
+                       em.watermarkRefreshes.merge(ptRefreshed)
+
                        // If there are no watermark refreshes available, we 
wait until there are.
-                       for len(em.watermarkRefreshes) == 0 {
+                       for len(em.watermarkRefreshes)+len(ptRefreshed) == 0 { 
// TODO Add processing time event condition instead of piggybacking on 
watermarks?

Review Comment:
   In this case, it's a leftover comment. The "ProcessingTime event condition" 
is basically rthat list of stages that have been triggered by the Processing 
time queue. I originally was keeping everything in the watermark refreshes 
instead, but that didn't work as cleanly as I hoped. 
   
   Old comment removed, other comment clarified.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to