This is an automated email from the ASF dual-hosted git repository.

shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new d5059c3edda [Prism] Support AfterProcessingTime triggers - part 1 
(#36126)
d5059c3edda is described below

commit d5059c3eddae856a6e37a8b7911acb05d12f95ab
Author: Shunping Huang <[email protected]>
AuthorDate: Wed Sep 24 17:59:57 2025 -0400

    [Prism] Support AfterProcessingTime triggers - part 1 (#36126)
    
    * Construct after-processing-time trigger from proto and define trigger 
callbacks.
    
    * Add some comments to tests.
    
    * Handle the case when after-processing-time trigger is called repeated.
    
    * Fix a bug when computing next trigger time and add a composite trigger 
test.
---
 .../beam/runners/prism/internal/engine/strategy.go | 108 ++++++++++++++++-
 .../runners/prism/internal/engine/strategy_test.go | 129 +++++++++++++++++++++
 sdks/go/pkg/beam/runners/prism/internal/execute.go |  22 +++-
 3 files changed, 255 insertions(+), 4 deletions(-)

diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go 
b/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go
index 5ccc4a51366..044b9806c1b 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go
@@ -79,8 +79,9 @@ func (ws WinStrat) String() string {
 
 // triggerInput represents a Key + window + stage's trigger conditions.
 type triggerInput struct {
-       newElementCount    int  // The number of new elements since the last 
check.
-       endOfWindowReached bool // Whether or not the end of the window has 
been reached.
+       newElementCount    int        // The number of new elements since the 
last check.
+       endOfWindowReached bool       // Whether or not the end of the window 
has been reached.
+       emNow              mtime.Time // The current processing time in the 
runner.
 }
 
 // Trigger represents a trigger for a windowing strategy.  A trigger 
determines when
@@ -581,4 +582,105 @@ func (t *TriggerDefault) String() string {
        return "Default"
 }
 
-// TODO https://github.com/apache/beam/issues/31438 Handle 
TriggerAfterProcessingTime
+// TimestampTransform is the engine's representation of a processing time 
transform.
+type TimestampTransform struct {
+       Delay         time.Duration
+       AlignToPeriod time.Duration
+       AlignToOffset time.Duration
+}
+
+// TriggerAfterProcessingTime fires once after a specified amount of 
processing time
+// has passed since an element was first seen.
+// Uses the extra state field to track the processing time of the first 
element.
+type TriggerAfterProcessingTime struct {
+       Transforms []TimestampTransform
+}
+
+type afterProcessingTimeState struct {
+       emNow              mtime.Time
+       firingTime         mtime.Time
+       endOfWindowReached bool
+}
+
+func (t *TriggerAfterProcessingTime) onElement(input triggerInput, state 
*StateData) {
+       ts := state.getTriggerState(t)
+       if ts.finished {
+               return
+       }
+
+       if ts.extra == nil {
+               ts.extra = afterProcessingTimeState{
+                       emNow:              input.emNow,
+                       firingTime:         
t.applyTimestampTransforms(input.emNow),
+                       endOfWindowReached: input.endOfWindowReached,
+               }
+       } else {
+               s, _ := ts.extra.(afterProcessingTimeState)
+               s.emNow = input.emNow
+               s.endOfWindowReached = input.endOfWindowReached
+               ts.extra = s
+       }
+
+       state.setTriggerState(t, ts)
+}
+
+func (t *TriggerAfterProcessingTime) applyTimestampTransforms(start 
mtime.Time) mtime.Time {
+       ret := start
+       for _, transform := range t.Transforms {
+               ret = ret + mtime.Time(transform.Delay/time.Millisecond)
+               if transform.AlignToPeriod > 0 {
+                       // timestamp - (timestamp % period) + period
+                       // And with an offset, we adjust before and after.
+                       tsMs := ret
+                       periodMs := mtime.Time(transform.AlignToPeriod / 
time.Millisecond)
+                       offsetMs := mtime.Time(transform.AlignToOffset / 
time.Millisecond)
+
+                       adjustedMs := tsMs - offsetMs
+                       alignedMs := adjustedMs - (adjustedMs % periodMs) + 
periodMs + offsetMs
+                       ret = alignedMs
+               }
+       }
+       return ret
+}
+
+func (t *TriggerAfterProcessingTime) shouldFire(state *StateData) bool {
+       ts := state.getTriggerState(t)
+       if ts.extra == nil || ts.finished {
+               return false
+       }
+       s := ts.extra.(afterProcessingTimeState)
+       return s.emNow >= s.firingTime
+}
+
+func (t *TriggerAfterProcessingTime) onFire(state *StateData) {
+       ts := state.getTriggerState(t)
+       if ts.finished {
+               return
+       }
+
+       // We don't reset the state here, only mark it as finished
+       ts.finished = true
+       state.setTriggerState(t, ts)
+}
+
+func (t *TriggerAfterProcessingTime) reset(state *StateData) {
+       ts := state.getTriggerState(t)
+       if ts.extra != nil {
+               if ts.extra.(afterProcessingTimeState).endOfWindowReached {
+                       delete(state.Trigger, t)
+                       return
+               }
+       }
+
+       // Not reaching the end of window yet.
+       // We keep the state (especially the next possible firing time) in case 
the trigger is called again
+       ts.finished = false
+       s := ts.extra.(afterProcessingTimeState)
+       s.firingTime = t.applyTimestampTransforms(s.emNow) // compute next 
possible firing time
+       ts.extra = s
+       state.setTriggerState(t, ts)
+}
+
+func (t *TriggerAfterProcessingTime) String() string {
+       return fmt.Sprintf("AfterProcessingTime[%v]", t.Transforms)
+}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/strategy_test.go 
b/sdks/go/pkg/beam/runners/prism/internal/engine/strategy_test.go
index 86393d1c193..3b928be278f 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/engine/strategy_test.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/engine/strategy_test.go
@@ -420,6 +420,135 @@ func TestTriggers_isReady(t *testing.T) {
                                {triggerInput{newElementCount: 1, 
endOfWindowReached: true}, false},
                                {triggerInput{newElementCount: 1, 
endOfWindowReached: true}, true}, // Late
                        },
+               }, {
+                       name: "afterProcessingTime_Delay_Exact",
+                       trig: &TriggerAfterProcessingTime{
+                               Transforms: []TimestampTransform{
+                                       {Delay: 3 * time.Second},
+                               },
+                       },
+                       inputs: []io{
+                               {triggerInput{emNow: 0}, false}, // the trigger 
is set to fire at 3s after 0
+                               {triggerInput{emNow: 1000}, false},
+                               {triggerInput{emNow: 2000}, false},
+                               {triggerInput{emNow: 3000}, true}, // fire
+                               {triggerInput{emNow: 4000}, false},
+                               {triggerInput{emNow: 5000}, false},
+                               {triggerInput{emNow: 6000}, false},
+                               {triggerInput{emNow: 7000}, false},
+                       },
+               }, {
+                       name: "afterProcessingTime_Delay_Late",
+                       trig: &TriggerAfterProcessingTime{
+                               Transforms: []TimestampTransform{
+                                       {Delay: 3 * time.Second},
+                               },
+                       },
+                       inputs: []io{
+                               {triggerInput{emNow: 0}, false}, // the trigger 
is set to fire at 3s after 0
+                               {triggerInput{emNow: 1000}, false},
+                               {triggerInput{emNow: 2000}, false},
+                               {triggerInput{emNow: 3001}, true}, // fire a 
little after the preset time
+                               {triggerInput{emNow: 4000}, false},
+                       },
+               }, {
+                       name: "afterProcessingTime_AlignToPeriodOnly",
+                       trig: &TriggerAfterProcessingTime{
+                               Transforms: []TimestampTransform{
+                                       {AlignToPeriod: 5 * time.Second},
+                               },
+                       },
+                       inputs: []io{
+                               {triggerInput{emNow: 1500}, false}, // align 
1.5s to 5s
+                               {triggerInput{emNow: 2000}, false},
+                               {triggerInput{emNow: 4999}, false},
+                               {triggerInput{emNow: 5000}, true}, // fire at 5
+                               {triggerInput{emNow: 5001}, false},
+                       },
+               }, {
+                       name: "afterProcessingTime_AlignToPeriodAndOffset",
+                       trig: &TriggerAfterProcessingTime{
+                               Transforms: []TimestampTransform{
+                                       {AlignToPeriod: 5 * time.Second, 
AlignToOffset: 200 * time.Millisecond},
+                               },
+                       },
+                       inputs: []io{
+                               {triggerInput{emNow: 1500}, false}, // align 
1.5s to 5s plus an 0.2 offset
+                               {triggerInput{emNow: 2000}, false},
+                               {triggerInput{emNow: 5119}, false},
+                               {triggerInput{emNow: 5200}, true}, // fire at 
5.2s
+                               {triggerInput{emNow: 5201}, false},
+                       },
+               }, {
+                       name: "afterProcessingTime_TwoTransforms",
+                       trig: &TriggerAfterProcessingTime{
+                               Transforms: []TimestampTransform{
+                                       {AlignToPeriod: 5 * time.Second, 
AlignToOffset: 200 * time.Millisecond},
+                                       {Delay: 1 * time.Second},
+                               },
+                       },
+                       inputs: []io{
+                               {triggerInput{emNow: 1500}, false}, // align 
1.5s to 5s plus an 0.2 offset and a 1s delay
+                               {triggerInput{emNow: 2000}, false},
+                               {triggerInput{emNow: 5119}, false},
+                               {triggerInput{emNow: 5200}, false},
+                               {triggerInput{emNow: 5201}, false},
+                               {triggerInput{emNow: 6119}, false},
+                               {triggerInput{emNow: 6200}, true}, // fire
+                               {triggerInput{emNow: 6201}, false},
+                       },
+               }, {
+                       name: "afterProcessingTime_Repeated", trig: 
&TriggerRepeatedly{
+                               &TriggerAfterProcessingTime{
+                                       Transforms: []TimestampTransform{
+                                               {Delay: 3 * time.Second},
+                                       }}},
+                       inputs: []io{
+                               {triggerInput{emNow: 0}, false},
+                               {triggerInput{emNow: 1000}, false},
+                               {triggerInput{emNow: 2000}, false},
+                               {triggerInput{emNow: 3000}, true}, // firing 
the first time, trigger set again
+                               {triggerInput{emNow: 4000}, false},
+                               {triggerInput{emNow: 5000}, false},
+                               {triggerInput{emNow: 6000}, true}, // firing 
the second time
+                       },
+               }, {
+                       name: "afterProcessingTime_Repeated_AcrossWindows", 
trig: &TriggerRepeatedly{
+                               &TriggerAfterProcessingTime{
+                                       Transforms: []TimestampTransform{
+                                               {Delay: 3 * time.Second},
+                                       }}},
+                       inputs: []io{
+                               {triggerInput{emNow: 0}, false},
+                               {triggerInput{emNow: 1000}, false},
+                               {triggerInput{emNow: 2000}, false},
+                               {triggerInput{emNow: 3000}, true}, // fire the 
first time, trigger is set again
+                               {triggerInput{emNow: 4000}, false},
+                               {triggerInput{emNow: 5000}, false},
+                               {triggerInput{emNow: 6000,
+                                       endOfWindowReached: true}, true}, // 
fire the second time, reach end of window and start over
+                               {triggerInput{emNow: 7000}, false}, // trigger 
firing time is set to 7s + 3s = 10s
+                               {triggerInput{emNow: 8000}, false},
+                               {triggerInput{emNow: 9000}, false},
+                               {triggerInput{emNow: 10000}, true}, // fire in 
the new window
+                       },
+               }, {
+                       name: "afterProcessingTime_Repeated_Composite", trig: 
&TriggerRepeatedly{
+                               &TriggerAfterAny{SubTriggers: []Trigger{
+                                       &TriggerAfterProcessingTime{
+                                               Transforms: 
[]TimestampTransform{
+                                                       {Delay: 3 * 
time.Second},
+                                               },
+                                       },
+                                       &TriggerElementCount{ElementCount: 2},
+                               }}},
+                       inputs: []io{
+                               {triggerInput{emNow: 0, newElementCount: 1}, 
false},    // ElmCount = 1, set AfterProcessingTime trigger firing time to 3s
+                               {triggerInput{emNow: 1000, newElementCount: 1}, 
true},  // ElmCount = 2, fire ElmCount trigger and reset ElmCount and 
AfterProcessingTime firing time (4s)
+                               {triggerInput{emNow: 4000, newElementCount: 1}, 
true},  // ElmCount = 1, fire AfterProcessingTime trigger and reset ElmCount 
and AfterProcessingTime firing time (7s)
+                               {triggerInput{emNow: 5000, newElementCount: 1}, 
false}, // ElmCount = 1
+                               {triggerInput{emNow: 5500, newElementCount: 1}, 
true},  // ElmCount = 2, fire ElmCount trigger
+                       },
                }, {
                        name: "default",
                        trig: &TriggerDefault{},
diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go 
b/sdks/go/pkg/beam/runners/prism/internal/execute.go
index 307ebee5664..9d23a89d458 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/execute.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go
@@ -477,7 +477,27 @@ func buildTrigger(tpb *pipepb.Trigger) engine.Trigger {
                }
        case *pipepb.Trigger_Repeat_:
                return &engine.TriggerRepeatedly{Repeated: 
buildTrigger(at.Repeat.GetSubtrigger())}
-       case *pipepb.Trigger_AfterProcessingTime_, 
*pipepb.Trigger_AfterSynchronizedProcessingTime_:
+       case *pipepb.Trigger_AfterProcessingTime_:
+               var transforms []engine.TimestampTransform
+               for _, ts := range 
at.AfterProcessingTime.GetTimestampTransforms() {
+                       var delay, period, offset time.Duration
+                       if d := ts.GetDelay(); d != nil {
+                               delay = time.Duration(d.GetDelayMillis()) * 
time.Millisecond
+                       }
+                       if align := ts.GetAlignTo(); align != nil {
+                               period = time.Duration(align.GetPeriod()) * 
time.Millisecond
+                               offset = time.Duration(align.GetOffset()) * 
time.Millisecond
+                       }
+                       transforms = append(transforms, 
engine.TimestampTransform{
+                               Delay:         delay,
+                               AlignToPeriod: period,
+                               AlignToOffset: offset,
+                       })
+               }
+               return &engine.TriggerAfterProcessingTime{
+                       Transforms: transforms,
+               }
+       case *pipepb.Trigger_AfterSynchronizedProcessingTime_:
                panic(fmt.Sprintf("unsupported trigger: %v", 
prototext.Format(tpb)))
        default:
                return &engine.TriggerDefault{}

Reply via email to