This is an automated email from the ASF dual-hosted git repository.
shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new d5059c3edda [Prism] Support AfterProcessingTime triggers - part 1
(#36126)
d5059c3edda is described below
commit d5059c3eddae856a6e37a8b7911acb05d12f95ab
Author: Shunping Huang <[email protected]>
AuthorDate: Wed Sep 24 17:59:57 2025 -0400
[Prism] Support AfterProcessingTime triggers - part 1 (#36126)
* Construct after-processing-time trigger from proto and define trigger
callbacks.
* Add some comments to tests.
* Handle the case when after-processing-time trigger is called repeated.
* Fix a bug when computing next trigger time and add a composite trigger
test.
---
.../beam/runners/prism/internal/engine/strategy.go | 108 ++++++++++++++++-
.../runners/prism/internal/engine/strategy_test.go | 129 +++++++++++++++++++++
sdks/go/pkg/beam/runners/prism/internal/execute.go | 22 +++-
3 files changed, 255 insertions(+), 4 deletions(-)
diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go
b/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go
index 5ccc4a51366..044b9806c1b 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go
@@ -79,8 +79,9 @@ func (ws WinStrat) String() string {
// triggerInput represents a Key + window + stage's trigger conditions.
type triggerInput struct {
- newElementCount int // The number of new elements since the last
check.
- endOfWindowReached bool // Whether or not the end of the window has
been reached.
+ newElementCount int // The number of new elements since the
last check.
+ endOfWindowReached bool // Whether or not the end of the window
has been reached.
+ emNow mtime.Time // The current processing time in the
runner.
}
// Trigger represents a trigger for a windowing strategy. A trigger
determines when
@@ -581,4 +582,105 @@ func (t *TriggerDefault) String() string {
return "Default"
}
-// TODO https://github.com/apache/beam/issues/31438 Handle
TriggerAfterProcessingTime
+// TimestampTransform is the engine's representation of a processing time
transform.
+type TimestampTransform struct {
+ Delay time.Duration
+ AlignToPeriod time.Duration
+ AlignToOffset time.Duration
+}
+
+// TriggerAfterProcessingTime fires once after a specified amount of
processing time
+// has passed since an element was first seen.
+// Uses the extra state field to track the processing time of the first
element.
+type TriggerAfterProcessingTime struct {
+ Transforms []TimestampTransform
+}
+
+type afterProcessingTimeState struct {
+ emNow mtime.Time
+ firingTime mtime.Time
+ endOfWindowReached bool
+}
+
+func (t *TriggerAfterProcessingTime) onElement(input triggerInput, state
*StateData) {
+ ts := state.getTriggerState(t)
+ if ts.finished {
+ return
+ }
+
+ if ts.extra == nil {
+ ts.extra = afterProcessingTimeState{
+ emNow: input.emNow,
+ firingTime:
t.applyTimestampTransforms(input.emNow),
+ endOfWindowReached: input.endOfWindowReached,
+ }
+ } else {
+ s, _ := ts.extra.(afterProcessingTimeState)
+ s.emNow = input.emNow
+ s.endOfWindowReached = input.endOfWindowReached
+ ts.extra = s
+ }
+
+ state.setTriggerState(t, ts)
+}
+
+func (t *TriggerAfterProcessingTime) applyTimestampTransforms(start
mtime.Time) mtime.Time {
+ ret := start
+ for _, transform := range t.Transforms {
+ ret = ret + mtime.Time(transform.Delay/time.Millisecond)
+ if transform.AlignToPeriod > 0 {
+ // timestamp - (timestamp % period) + period
+ // And with an offset, we adjust before and after.
+ tsMs := ret
+ periodMs := mtime.Time(transform.AlignToPeriod /
time.Millisecond)
+ offsetMs := mtime.Time(transform.AlignToOffset /
time.Millisecond)
+
+ adjustedMs := tsMs - offsetMs
+ alignedMs := adjustedMs - (adjustedMs % periodMs) +
periodMs + offsetMs
+ ret = alignedMs
+ }
+ }
+ return ret
+}
+
+func (t *TriggerAfterProcessingTime) shouldFire(state *StateData) bool {
+ ts := state.getTriggerState(t)
+ if ts.extra == nil || ts.finished {
+ return false
+ }
+ s := ts.extra.(afterProcessingTimeState)
+ return s.emNow >= s.firingTime
+}
+
+func (t *TriggerAfterProcessingTime) onFire(state *StateData) {
+ ts := state.getTriggerState(t)
+ if ts.finished {
+ return
+ }
+
+ // We don't reset the state here, only mark it as finished
+ ts.finished = true
+ state.setTriggerState(t, ts)
+}
+
+func (t *TriggerAfterProcessingTime) reset(state *StateData) {
+ ts := state.getTriggerState(t)
+ if ts.extra != nil {
+ if ts.extra.(afterProcessingTimeState).endOfWindowReached {
+ delete(state.Trigger, t)
+ return
+ }
+ }
+
+ // Not reaching the end of window yet.
+ // We keep the state (especially the next possible firing time) in case
the trigger is called again
+ ts.finished = false
+ s := ts.extra.(afterProcessingTimeState)
+ s.firingTime = t.applyTimestampTransforms(s.emNow) // compute next
possible firing time
+ ts.extra = s
+ state.setTriggerState(t, ts)
+}
+
+func (t *TriggerAfterProcessingTime) String() string {
+ return fmt.Sprintf("AfterProcessingTime[%v]", t.Transforms)
+}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/strategy_test.go
b/sdks/go/pkg/beam/runners/prism/internal/engine/strategy_test.go
index 86393d1c193..3b928be278f 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/engine/strategy_test.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/engine/strategy_test.go
@@ -420,6 +420,135 @@ func TestTriggers_isReady(t *testing.T) {
{triggerInput{newElementCount: 1,
endOfWindowReached: true}, false},
{triggerInput{newElementCount: 1,
endOfWindowReached: true}, true}, // Late
},
+ }, {
+ name: "afterProcessingTime_Delay_Exact",
+ trig: &TriggerAfterProcessingTime{
+ Transforms: []TimestampTransform{
+ {Delay: 3 * time.Second},
+ },
+ },
+ inputs: []io{
+ {triggerInput{emNow: 0}, false}, // the trigger
is set to fire at 3s after 0
+ {triggerInput{emNow: 1000}, false},
+ {triggerInput{emNow: 2000}, false},
+ {triggerInput{emNow: 3000}, true}, // fire
+ {triggerInput{emNow: 4000}, false},
+ {triggerInput{emNow: 5000}, false},
+ {triggerInput{emNow: 6000}, false},
+ {triggerInput{emNow: 7000}, false},
+ },
+ }, {
+ name: "afterProcessingTime_Delay_Late",
+ trig: &TriggerAfterProcessingTime{
+ Transforms: []TimestampTransform{
+ {Delay: 3 * time.Second},
+ },
+ },
+ inputs: []io{
+ {triggerInput{emNow: 0}, false}, // the trigger
is set to fire at 3s after 0
+ {triggerInput{emNow: 1000}, false},
+ {triggerInput{emNow: 2000}, false},
+ {triggerInput{emNow: 3001}, true}, // fire a
little after the preset time
+ {triggerInput{emNow: 4000}, false},
+ },
+ }, {
+ name: "afterProcessingTime_AlignToPeriodOnly",
+ trig: &TriggerAfterProcessingTime{
+ Transforms: []TimestampTransform{
+ {AlignToPeriod: 5 * time.Second},
+ },
+ },
+ inputs: []io{
+ {triggerInput{emNow: 1500}, false}, // align
1.5s to 5s
+ {triggerInput{emNow: 2000}, false},
+ {triggerInput{emNow: 4999}, false},
+ {triggerInput{emNow: 5000}, true}, // fire at 5
+ {triggerInput{emNow: 5001}, false},
+ },
+ }, {
+ name: "afterProcessingTime_AlignToPeriodAndOffset",
+ trig: &TriggerAfterProcessingTime{
+ Transforms: []TimestampTransform{
+ {AlignToPeriod: 5 * time.Second,
AlignToOffset: 200 * time.Millisecond},
+ },
+ },
+ inputs: []io{
+ {triggerInput{emNow: 1500}, false}, // align
1.5s to 5s plus an 0.2 offset
+ {triggerInput{emNow: 2000}, false},
+ {triggerInput{emNow: 5119}, false},
+ {triggerInput{emNow: 5200}, true}, // fire at
5.2s
+ {triggerInput{emNow: 5201}, false},
+ },
+ }, {
+ name: "afterProcessingTime_TwoTransforms",
+ trig: &TriggerAfterProcessingTime{
+ Transforms: []TimestampTransform{
+ {AlignToPeriod: 5 * time.Second,
AlignToOffset: 200 * time.Millisecond},
+ {Delay: 1 * time.Second},
+ },
+ },
+ inputs: []io{
+ {triggerInput{emNow: 1500}, false}, // align
1.5s to 5s plus an 0.2 offset and a 1s delay
+ {triggerInput{emNow: 2000}, false},
+ {triggerInput{emNow: 5119}, false},
+ {triggerInput{emNow: 5200}, false},
+ {triggerInput{emNow: 5201}, false},
+ {triggerInput{emNow: 6119}, false},
+ {triggerInput{emNow: 6200}, true}, // fire
+ {triggerInput{emNow: 6201}, false},
+ },
+ }, {
+ name: "afterProcessingTime_Repeated", trig:
&TriggerRepeatedly{
+ &TriggerAfterProcessingTime{
+ Transforms: []TimestampTransform{
+ {Delay: 3 * time.Second},
+ }}},
+ inputs: []io{
+ {triggerInput{emNow: 0}, false},
+ {triggerInput{emNow: 1000}, false},
+ {triggerInput{emNow: 2000}, false},
+ {triggerInput{emNow: 3000}, true}, // firing
the first time, trigger set again
+ {triggerInput{emNow: 4000}, false},
+ {triggerInput{emNow: 5000}, false},
+ {triggerInput{emNow: 6000}, true}, // firing
the second time
+ },
+ }, {
+ name: "afterProcessingTime_Repeated_AcrossWindows",
trig: &TriggerRepeatedly{
+ &TriggerAfterProcessingTime{
+ Transforms: []TimestampTransform{
+ {Delay: 3 * time.Second},
+ }}},
+ inputs: []io{
+ {triggerInput{emNow: 0}, false},
+ {triggerInput{emNow: 1000}, false},
+ {triggerInput{emNow: 2000}, false},
+ {triggerInput{emNow: 3000}, true}, // fire the
first time, trigger is set again
+ {triggerInput{emNow: 4000}, false},
+ {triggerInput{emNow: 5000}, false},
+ {triggerInput{emNow: 6000,
+ endOfWindowReached: true}, true}, //
fire the second time, reach end of window and start over
+ {triggerInput{emNow: 7000}, false}, // trigger
firing time is set to 7s + 3s = 10s
+ {triggerInput{emNow: 8000}, false},
+ {triggerInput{emNow: 9000}, false},
+ {triggerInput{emNow: 10000}, true}, // fire in
the new window
+ },
+ }, {
+ name: "afterProcessingTime_Repeated_Composite", trig:
&TriggerRepeatedly{
+ &TriggerAfterAny{SubTriggers: []Trigger{
+ &TriggerAfterProcessingTime{
+ Transforms:
[]TimestampTransform{
+ {Delay: 3 *
time.Second},
+ },
+ },
+ &TriggerElementCount{ElementCount: 2},
+ }}},
+ inputs: []io{
+ {triggerInput{emNow: 0, newElementCount: 1},
false}, // ElmCount = 1, set AfterProcessingTime trigger firing time to 3s
+ {triggerInput{emNow: 1000, newElementCount: 1},
true}, // ElmCount = 2, fire ElmCount trigger and reset ElmCount and
AfterProcessingTime firing time (4s)
+ {triggerInput{emNow: 4000, newElementCount: 1},
true}, // ElmCount = 1, fire AfterProcessingTime trigger and reset ElmCount
and AfterProcessingTime firing time (7s)
+ {triggerInput{emNow: 5000, newElementCount: 1},
false}, // ElmCount = 1
+ {triggerInput{emNow: 5500, newElementCount: 1},
true}, // ElmCount = 2, fire ElmCount trigger
+ },
}, {
name: "default",
trig: &TriggerDefault{},
diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go
b/sdks/go/pkg/beam/runners/prism/internal/execute.go
index 307ebee5664..9d23a89d458 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/execute.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go
@@ -477,7 +477,27 @@ func buildTrigger(tpb *pipepb.Trigger) engine.Trigger {
}
case *pipepb.Trigger_Repeat_:
return &engine.TriggerRepeatedly{Repeated:
buildTrigger(at.Repeat.GetSubtrigger())}
- case *pipepb.Trigger_AfterProcessingTime_,
*pipepb.Trigger_AfterSynchronizedProcessingTime_:
+ case *pipepb.Trigger_AfterProcessingTime_:
+ var transforms []engine.TimestampTransform
+ for _, ts := range
at.AfterProcessingTime.GetTimestampTransforms() {
+ var delay, period, offset time.Duration
+ if d := ts.GetDelay(); d != nil {
+ delay = time.Duration(d.GetDelayMillis()) *
time.Millisecond
+ }
+ if align := ts.GetAlignTo(); align != nil {
+ period = time.Duration(align.GetPeriod()) *
time.Millisecond
+ offset = time.Duration(align.GetOffset()) *
time.Millisecond
+ }
+ transforms = append(transforms,
engine.TimestampTransform{
+ Delay: delay,
+ AlignToPeriod: period,
+ AlignToOffset: offset,
+ })
+ }
+ return &engine.TriggerAfterProcessingTime{
+ Transforms: transforms,
+ }
+ case *pipepb.Trigger_AfterSynchronizedProcessingTime_:
panic(fmt.Sprintf("unsupported trigger: %v",
prototext.Format(tpb)))
default:
return &engine.TriggerDefault{}