lostluck commented on code in PR #33881:
URL: https://github.com/apache/beam/pull/33881#discussion_r1951326789
##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -1246,6 +1305,113 @@ func (ss *stageState) AddPending(newPending []element)
int {
return len(newPending)
}
+// computeNextTriggeredPane produces the correct pane relative to the previous
pane,
+// and the end of window state.
+func computeNextTriggeredPane(pane typex.PaneInfo, endOfWindowReached bool)
typex.PaneInfo {
+ // This is the first firing, since index and first are both
+ // set to their zero values.
+ if pane.Index == 0 && !pane.IsFirst {
+ pane.IsFirst = true
+ } else {
+ pane.Index++
+ }
+ if endOfWindowReached {
+ pane.Timing = typex.PaneLate
+ pane.NonSpeculativeIndex++
+ } else {
+ pane.Timing = typex.PaneEarly
+ pane.NonSpeculativeIndex = -1
+ }
+ return pane
+}
+
+// computeNextWatermarkPane computes the next pane given the previous pane,
+// when the watermark passes either the End of Window, or End of Window plus
+// the allowed lateness.
+func computeNextWatermarkPane(pane typex.PaneInfo) typex.PaneInfo {
+ // The pane state is still early: this is the OnTime firing.
+ switch pane.Timing {
+ case typex.PaneEarly:
+ // We haven't fired ontime yet.
+ pane.Timing = typex.PaneOnTime
+ pane.NonSpeculativeIndex = 0
+
+ case typex.PaneOnTime:
+ // This must be the closing pane after an ontime pane.
+ pane.Timing = typex.PaneLate
+ pane.NonSpeculativeIndex++
+ case typex.PaneLate:
+ // We have had some other late pane.
+ pane.NonSpeculativeIndex++
+ }
+ // This is the first firing, since index and first are both
+ // set to their zero values.
+ if pane.Index == 0 && !pane.IsFirst {
+ pane.IsFirst = true
+ } else {
+ pane.Index++
+ }
+ return pane
+}
+
+// buildTriggeredBundle must be called with the stage.mu lock held.
+// When in discaring mode, returns 0.
Review Comment:
Done.
##########
sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go:
##########
@@ -188,6 +172,56 @@ func (s *Server) Prepare(ctx context.Context, req
*jobpb.PrepareJobRequest) (_ *
check("Splittable+Stateful DoFn", "See
https://github.com/apache/beam/issues/32139 for information.", "")
}
+ // Validate whether the triggers on side inputs for are
required for
+ // expedient data processing..
+ //
+ // Currently triggered side inputs are not supported by
prism, and will
+ // not have early or late firings.
+ //
+ // This feature is required when the Side Input
PCollection is unbounded
+ // and is in the a Global Window. This can cause the
pipeline to fully
+ // stall while the input is being computed, and may
never terminate.
+ //
+ // Other situations may not have desired results, but
are valid behaviors
+ // within the model.
+ //
+ // See https://github.com/apache/beam/issues/31438 for
implementation tracking.
+ for sideID := range pardo.GetSideInputs() {
+ pcolID := t.GetInputs()[sideID]
+ pcol :=
job.Pipeline.GetComponents().GetPcollections()[pcolID]
+ wsID := pcol.GetWindowingStrategyId()
+ ws :=
job.Pipeline.GetComponents().GetWindowingStrategies()[wsID]
+
+ if pcol.GetIsBounded() ==
pipepb.IsBounded_BOUNDED ||
+ ws.GetWindowFn().GetUrn() !=
urns.WindowFnGlobal {
+ continue
+ }
+
+ // Within the Unbounded GlobalWindow space is a
nich of expressed
+ // user intent that they *do* want to wait for
the end of the global
+ // window for output. We should permit these
pipelines, as there
+ // is utility for this in testing situations
anyway.
+ switch trig :=
ws.GetTrigger().GetTrigger().(type) {
+ case *pipepb.Trigger_Never_,
*pipepb.Trigger_Default_:
+ // Only one firing, at the end of the
global window, and is
+ // compatible with Prism's current
execution.
+ continue
+ case *pipepb.Trigger_AfterEndOfWindow_:
+ if early :=
trig.AfterEndOfWindow.GetEarlyFirings(); early == nil || early.GetNever() !=
nil {
+ if ws.GetAllowedLateness() == 0
{
+ // Late configuration
doesn't matter, and there are no early firings.
+ continue
+ }
+ if late :=
trig.AfterEndOfWindow.GetLateFirings(); late == nil || late.GetNever() != nil {
+ // Lateness allowed,
but but no firings anyway.
+ continue
+ }
+ }
+ }
+
+ check("Unbounded GlobalWindow Triggered
SideInput", prototext.Format(ws), "See
https://github.com/apache/beam/issues/31438 for information.")
Review Comment:
You're right, I had it slightly reversed. Moved context to the primary
feature string, and emptied the "want" targets. Leaving the explicit windowing
strategy as the "got".
##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -1246,6 +1305,113 @@ func (ss *stageState) AddPending(newPending []element)
int {
return len(newPending)
}
+// computeNextTriggeredPane produces the correct pane relative to the previous
pane,
+// and the end of window state.
+func computeNextTriggeredPane(pane typex.PaneInfo, endOfWindowReached bool)
typex.PaneInfo {
+ // This is the first firing, since index and first are both
+ // set to their zero values.
+ if pane.Index == 0 && !pane.IsFirst {
+ pane.IsFirst = true
+ } else {
+ pane.Index++
Review Comment:
Done. No reason not to be explicit here and below.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]