lostluck commented on code in PR #33881:
URL: https://github.com/apache/beam/pull/33881#discussion_r1951326789


##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -1246,6 +1305,113 @@ func (ss *stageState) AddPending(newPending []element) 
int {
        return len(newPending)
 }
 
+// computeNextTriggeredPane produces the correct pane relative to the previous 
pane,
+// and the end of window state.
+func computeNextTriggeredPane(pane typex.PaneInfo, endOfWindowReached bool) 
typex.PaneInfo {
+       // This is the first firing, since index and first are both
+       // set to their zero values.
+       if pane.Index == 0 && !pane.IsFirst {
+               pane.IsFirst = true
+       } else {
+               pane.Index++
+       }
+       if endOfWindowReached {
+               pane.Timing = typex.PaneLate
+               pane.NonSpeculativeIndex++
+       } else {
+               pane.Timing = typex.PaneEarly
+               pane.NonSpeculativeIndex = -1
+       }
+       return pane
+}
+
+// computeNextWatermarkPane computes the next pane given the previous pane,
+// when the watermark passes either the End of Window, or End of Window plus
+// the allowed lateness.
+func computeNextWatermarkPane(pane typex.PaneInfo) typex.PaneInfo {
+       // The pane state is still early: this is the OnTime firing.
+       switch pane.Timing {
+       case typex.PaneEarly:
+               // We haven't fired ontime yet.
+               pane.Timing = typex.PaneOnTime
+               pane.NonSpeculativeIndex = 0
+
+       case typex.PaneOnTime:
+               // This must be the closing pane after an ontime pane.
+               pane.Timing = typex.PaneLate
+               pane.NonSpeculativeIndex++
+       case typex.PaneLate:
+               // We have had some other late pane.
+               pane.NonSpeculativeIndex++
+       }
+       // This is the first firing, since index and first are both
+       // set to their zero values.
+       if pane.Index == 0 && !pane.IsFirst {
+               pane.IsFirst = true
+       } else {
+               pane.Index++
+       }
+       return pane
+}
+
+// buildTriggeredBundle must be called with the stage.mu lock held.
+// When in discaring mode, returns 0.

Review Comment:
   Done.



##########
sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go:
##########
@@ -188,6 +172,56 @@ func (s *Server) Prepare(ctx context.Context, req 
*jobpb.PrepareJobRequest) (_ *
                                check("Splittable+Stateful DoFn", "See 
https://github.com/apache/beam/issues/32139 for information.", "")
                        }
 
+                       // Validate whether the triggers on side inputs for are 
required for
+                       // expedient data processing..
+                       //
+                       // Currently triggered side inputs are not supported by 
prism, and will
+                       // not have early or late firings.
+                       //
+                       // This feature is required when the Side Input 
PCollection is unbounded
+                       // and is in the a Global Window. This can cause the 
pipeline to fully
+                       // stall while the input is being computed, and may 
never terminate.
+                       //
+                       // Other situations may not have desired results, but 
are valid behaviors
+                       // within the model.
+                       //
+                       // See https://github.com/apache/beam/issues/31438 for 
implementation tracking.
+                       for sideID := range pardo.GetSideInputs() {
+                               pcolID := t.GetInputs()[sideID]
+                               pcol := 
job.Pipeline.GetComponents().GetPcollections()[pcolID]
+                               wsID := pcol.GetWindowingStrategyId()
+                               ws := 
job.Pipeline.GetComponents().GetWindowingStrategies()[wsID]
+
+                               if pcol.GetIsBounded() == 
pipepb.IsBounded_BOUNDED ||
+                                       ws.GetWindowFn().GetUrn() != 
urns.WindowFnGlobal {
+                                       continue
+                               }
+
+                               // Within the Unbounded GlobalWindow space is a 
nich of expressed
+                               // user intent that they *do* want to wait for 
the end of the global
+                               // window for output. We should permit these 
pipelines, as there
+                               // is utility for this in testing situations 
anyway.
+                               switch trig := 
ws.GetTrigger().GetTrigger().(type) {
+                               case *pipepb.Trigger_Never_, 
*pipepb.Trigger_Default_:
+                                       // Only one firing, at the end of the 
global window, and is
+                                       // compatible with Prism's current 
execution.
+                                       continue
+                               case *pipepb.Trigger_AfterEndOfWindow_:
+                                       if early := 
trig.AfterEndOfWindow.GetEarlyFirings(); early == nil || early.GetNever() != 
nil {
+                                               if ws.GetAllowedLateness() == 0 
{
+                                                       // Late configuration 
doesn't matter, and there are no early firings.
+                                                       continue
+                                               }
+                                               if late := 
trig.AfterEndOfWindow.GetLateFirings(); late == nil || late.GetNever() != nil {
+                                                       // Lateness allowed, 
but but no firings anyway.
+                                                       continue
+                                               }
+                                       }
+                               }
+
+                               check("Unbounded GlobalWindow Triggered 
SideInput", prototext.Format(ws), "See 
https://github.com/apache/beam/issues/31438 for information.")

Review Comment:
   You're right, I had it slightly reversed. Moved context to the primary 
feature string, and emptied the "want" targets. Leaving the explicit windowing 
strategy as the "got".



##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -1246,6 +1305,113 @@ func (ss *stageState) AddPending(newPending []element) 
int {
        return len(newPending)
 }
 
+// computeNextTriggeredPane produces the correct pane relative to the previous 
pane,
+// and the end of window state.
+func computeNextTriggeredPane(pane typex.PaneInfo, endOfWindowReached bool) 
typex.PaneInfo {
+       // This is the first firing, since index and first are both
+       // set to their zero values.
+       if pane.Index == 0 && !pane.IsFirst {
+               pane.IsFirst = true
+       } else {
+               pane.Index++

Review Comment:
   Done. No reason not to be explicit here and below.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to