This is an automated email from the ASF dual-hosted git repository.

shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 3534960b9c8 [Prism] Support AfterProcessingTime triggers - part 2 
(#36333)
3534960b9c8 is described below

commit 3534960b9c8b99276853151825f233fad9d28513
Author: Shunping Huang <[email protected]>
AuthorDate: Thu Oct 2 19:14:11 2025 -0400

    [Prism] Support AfterProcessingTime triggers - part 2 (#36333)
    
    * Handle after-processing-time trigger with processing-time timer.
    
    * Consolidate the buildProcessingTimeBundle for stateful and aggregate 
kinds into one code path.
    
    * Set correct pane info.
    
    * Save panes when handling after-processing-time triggers
    
    * Add comments
    
    * Change existing after processing time trigger test and add one more.
    
    * Change the bundleReady criterion for ordinary stage so it does not depend 
on watermark.
    
    * Remove advance watermark steps in tests.
    
    * Fix failed vr tests in java related to statelss dofn with side input
    
    * Sickbay a test.
    
    * Add comments to explain the sickbay test.
    
    * Address reviewer feedback.
    
    * Add test filter for flink.
---
 runners/prism/java/build.gradle                    |  10 ++
 .../prism/internal/engine/elementmanager.go        | 166 ++++++++++++++++++---
 .../beam/runners/prism/internal/engine/strategy.go |  43 ++++++
 .../prism/internal/jobservices/management.go       |   2 +-
 .../runners/prism/internal/unimplemented_test.go   |   3 +-
 sdks/go/test/integration/integration.go            |   1 +
 sdks/go/test/integration/primitives/windowinto.go  |  24 ++-
 .../test/integration/primitives/windowinto_test.go |   6 +
 8 files changed, 230 insertions(+), 25 deletions(-)

diff --git a/runners/prism/java/build.gradle b/runners/prism/java/build.gradle
index e75fda999e1..fd3631fd4a7 100644
--- a/runners/prism/java/build.gradle
+++ b/runners/prism/java/build.gradle
@@ -94,6 +94,16 @@ def sickbayTests = [
     'org.apache.beam.sdk.testing.TestStreamTest.testProcessingTimeTrigger',
     'org.apache.beam.sdk.testing.TestStreamTest.testLateDataAccumulating', // 
Uses processing time trigger for early firings.
 
+    // A regression introduced when we use number of pending elements rather 
than watermark to determine
+    // the bundle readiness of a stateless stage.
+    // Currently, Prism processes a bundle of [100, ..., 1000] when watermark 
is set to 100,
+    // and then a second bundle of [1, ... 99] when the watermark is set to 
+inf.
+    // As a result, it yields an output of [-999, 1, 1...], where -999 comes 
from the difference between 1000 and 1.
+    // According to 
https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/DoFn.RequiresTimeSortedInput.html,
+    // the stateful dofn with `RequiresTimeSortedInput` annotation should 
buffer an element until the element's timestamp + allowed_lateness.
+    // This stateful dofn feature is not yet supported in Prism.
+    
'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testRequiresTimeSortedInputWithLateDataAndAllowedLateness',
+
     // Triggered Side Inputs not yet implemented in Prism.
     // https://github.com/apache/beam/issues/31438
     'org.apache.beam.sdk.transforms.ViewTest.testTriggeredLatestSingleton',
diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go 
b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
index 6af030f3622..d03d906e47d 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
@@ -1215,7 +1215,9 @@ type stageKind interface {
        // buildEventTimeBundle handles building bundles for the stage per it's 
kind.
        buildEventTimeBundle(ss *stageState, watermark mtime.Time) (toProcess 
elementHeap, minTs mtime.Time, newKeys set[string],
                holdsInBundle map[mtime.Time]int, panesInBundle []bundlePane, 
schedulable bool, pendingAdjustment int)
-
+       // buildProcessingTimeBundle handles building processing-time bundles 
for the stage per it's kind.
+       buildProcessingTimeBundle(ss *stageState, em *ElementManager, emNow 
mtime.Time) (toProcess elementHeap, minTs mtime.Time, newKeys set[string],
+               holdsInBundle map[mtime.Time]int, panesInBundle []bundlePane, 
schedulable bool)
        // getPaneOrDefault based on the stage state, element metadata, and 
bundle id.
        getPaneOrDefault(ss *stageState, defaultPane typex.PaneInfo, w 
typex.Window, keyBytes []byte, bundID string) typex.PaneInfo
 }
@@ -1327,17 +1329,54 @@ func (ss *stageState) injectTriggeredBundlesIfReady(em 
*ElementManager, window t
        ready := ss.strat.IsTriggerReady(triggerInput{
                newElementCount:    1,
                endOfWindowReached: endOfWindowReached,
+               emNow:              em.ProcessingTimeNow(),
        }, &state)
 
        if ready {
                state.Pane = computeNextTriggeredPane(state.Pane, 
endOfWindowReached)
+       } else {
+               if pts := ss.strat.GetAfterProcessingTimeTriggers(); pts != nil 
{
+                       for _, t := range pts {
+                               ts := (&state).getTriggerState(t)
+                               if ts.extra == nil || t.shouldFire((&state)) {
+                                       // Skipping inserting a processing time 
timer if the firing time
+                                       // is not set or it already should fire.
+                                       // When the after processing time 
triggers should fire, there are
+                                       // two scenarios:
+                                       // (1) the entire trigger of this 
window is ready to fire. In this
+                                       //     case, `ready` should be true and 
we won't reach here.
+                                       // (2) we are still waiting for other 
triggers (subtriggers) to
+                                       //     fire (e.g. AfterAll).
+                                       continue
+                               }
+                               firingTime := 
ts.extra.(afterProcessingTimeState).firingTime
+                               notYetHolds := map[mtime.Time]int{}
+                               timer := element{
+                                       window:        window,
+                                       timestamp:     firingTime,
+                                       holdTimestamp: window.MaxTimestamp(),
+                                       pane:          typex.NoFiringPane(),
+                                       transform:     ss.ID, // Use stage id 
to fake transform id
+                                       family:        "AfterProcessingTime",
+                                       tag:           "",
+                                       sequence:      1,
+                                       elmBytes:      nil,
+                                       keyBytes:      []byte(key),
+                               }
+                               // TODO: how to deal with watermark holds for 
this implicit processing time timer
+                               // ss.watermarkHolds.Add(timer.holdTimestamp, 1)
+                               ss.processingTimeTimers.Persist(firingTime, 
timer, notYetHolds)
+                               em.processTimeEvents.Schedule(firingTime, ss.ID)
+                               em.wakeUpAt(firingTime)
+                       }
+               }
        }
        // Store the state as triggers may have changed it.
        ss.state[LinkID{}][window][key] = state
 
        // If we're ready, it's time to fire!
        if ready {
-               count += ss.buildTriggeredBundle(em, key, window)
+               count += ss.startTriggeredBundle(em, key, window)
        }
        return count
 }
@@ -1524,16 +1563,11 @@ func (ss *stageState) savePanes(bundID string, 
panesInBundle []bundlePane) {
        }
 }
 
-// buildTriggeredBundle must be called with the stage.mu lock held.
-// When in discarding mode, returns 0.
-// When in accumulating mode, returns the number of fired elements to maintain 
a correct pending count.
-func (ss *stageState) buildTriggeredBundle(em *ElementManager, key string, win 
typex.Window) int {
+func (ss *stageState) buildTriggeredBundle(em *ElementManager, key string, win 
typex.Window) ([]element, int) {
        var toProcess []element
        dnt := ss.pendingByKeys[key]
        var notYet []element
 
-       rb := RunBundle{StageID: ss.ID, BundleID: "agg-" + em.nextBundID(), 
Watermark: ss.input}
-
        // Look at all elements for this key, and only for this window.
        for dnt.elements.Len() > 0 {
                e := heap.Pop(&dnt.elements).(element)
@@ -1564,6 +1598,19 @@ func (ss *stageState) buildTriggeredBundle(em 
*ElementManager, key string, win t
                heap.Init(&dnt.elements)
        }
 
+       return toProcess, accumulationDiff
+}
+
+// startTriggeredBundle must be called with the stage.mu lock held.
+// Returns the accumulation diff that the pending work needs to be adjusted 
by, as completed work is subtracted from the pending count.
+// When in discarding mode, returns 0, as the pending work already includes 
these elements.
+// When in accumulating mode, returns the number of fired elements, since 
those elements remain pending even after this bundle is fired.
+func (ss *stageState) startTriggeredBundle(em *ElementManager, key string, win 
typex.Window) int {
+       toProcess, accumulationDiff := ss.buildTriggeredBundle(em, key, win)
+       if len(toProcess) == 0 {
+               return accumulationDiff
+       }
+
        if ss.inprogressKeys == nil {
                ss.inprogressKeys = set[string]{}
        }
@@ -1575,6 +1622,7 @@ func (ss *stageState) buildTriggeredBundle(em 
*ElementManager, key string, win t
                },
        }
 
+       rb := RunBundle{StageID: ss.ID, BundleID: "agg-" + em.nextBundID(), 
Watermark: ss.input}
        ss.makeInProgressBundle(
                func() string { return rb.BundleID },
                toProcess,
@@ -1585,9 +1633,11 @@ func (ss *stageState) buildTriggeredBundle(em 
*ElementManager, key string, win t
        )
        slog.Debug("started a triggered bundle", "stageID", ss.ID, "bundleID", 
rb.BundleID, "size", len(toProcess))
 
-       ss.bundlesToInject = append(ss.bundlesToInject, rb)
+       // TODO: Use ss.bundlesToInject rather than em.injectedBundles
+       // ss.bundlesToInject = append(ss.bundlesToInject, rb)
        // Bundle is marked in progress here to prevent a race condition.
        em.refreshCond.L.Lock()
+       em.injectedBundles = append(em.injectedBundles, rb)
        em.inprogressBundles.insert(rb.BundleID)
        em.refreshCond.L.Unlock()
        return accumulationDiff
@@ -1927,6 +1977,20 @@ func (ss *stageState) startProcessingTimeBundle(em 
*ElementManager, emNow mtime.
        ss.mu.Lock()
        defer ss.mu.Unlock()
 
+       toProcess, minTs, newKeys, holdsInBundle, panesInBundle, 
stillSchedulable := ss.kind.buildProcessingTimeBundle(ss, em, emNow)
+
+       if len(toProcess) == 0 {
+               // If we have nothing
+               return "", false, stillSchedulable
+       }
+       bundID := ss.makeInProgressBundle(genBundID, toProcess, minTs, newKeys, 
holdsInBundle, panesInBundle)
+       slog.Debug("started a processing time bundle", "stageID", ss.ID, 
"bundleID", bundID, "size", len(toProcess), "emNow", emNow)
+       return bundID, true, stillSchedulable
+}
+
+// handleProcessingTimeTimer contains the common code for handling 
processing-time timers for aggregation stages and stateful stages.
+func handleProcessingTimeTimer(ss *stageState, em *ElementManager, emNow 
mtime.Time,
+       processTimerFn func(e element, toProcess []element, holdsInBundle 
map[mtime.Time]int, panesInBundle []bundlePane) ([]element, []bundlePane)) 
(elementHeap, mtime.Time, set[string], map[mtime.Time]int, []bundlePane, bool) {
        // TODO: Determine if it's possible and a good idea to treat all 
EventTime processing as a MinTime
        // Special Case for ProcessingTime handling.
        // Eg. Always queue EventTime elements at minTime.
@@ -1935,6 +1999,7 @@ func (ss *stageState) startProcessingTimeBundle(em 
*ElementManager, emNow mtime.
        // Potentially puts too much work on the scheduling thread though.
 
        var toProcess []element
+       var panesInBundle []bundlePane
        minTs := mtime.MaxTimestamp
        holdsInBundle := map[mtime.Time]int{}
 
@@ -1968,10 +2033,8 @@ func (ss *stageState) startProcessingTimeBundle(em 
*ElementManager, emNow mtime.
                        if e.timestamp < minTs {
                                minTs = e.timestamp
                        }
-                       holdsInBundle[e.holdTimestamp]++
 
-                       // We're going to process this timer!
-                       toProcess = append(toProcess, e)
+                       toProcess, panesInBundle = processTimerFn(e, toProcess, 
holdsInBundle, panesInBundle)
                }
 
                nextTime = ss.processingTimeTimers.Peek()
@@ -1986,19 +2049,58 @@ func (ss *stageState) startProcessingTimeBundle(em 
*ElementManager, emNow mtime.
        for _, v := range notYet {
                ss.processingTimeTimers.Persist(v.firing, v.timer, notYetHolds)
                em.processTimeEvents.Schedule(v.firing, ss.ID)
+               em.wakeUpAt(v.firing)
        }
 
        // Add a refresh if there are still processing time events to process.
        stillSchedulable := (nextTime < emNow && nextTime != mtime.MaxTimestamp 
|| len(notYet) > 0)
 
-       if len(toProcess) == 0 {
-               // If we have nothing
-               return "", false, stillSchedulable
-       }
-       bundID := ss.makeInProgressBundle(genBundID, toProcess, minTs, newKeys, 
holdsInBundle, nil)
+       return toProcess, minTs, newKeys, holdsInBundle, panesInBundle, 
stillSchedulable
+}
 
-       slog.Debug("started a processing time bundle", "stageID", ss.ID, 
"bundleID", bundID, "size", len(toProcess), "emNow", emNow)
-       return bundID, true, stillSchedulable
+// buildProcessingTimeBundle for stateful stages prepares bundles for 
processing-time timers
+func (*statefulStageKind) buildProcessingTimeBundle(ss *stageState, em 
*ElementManager, emNow mtime.Time) (elementHeap, mtime.Time, set[string], 
map[mtime.Time]int, []bundlePane, bool) {
+       return handleProcessingTimeTimer(ss, em, emNow, func(e element, 
toProcess []element, holdsInBundle map[mtime.Time]int, panesInBundle 
[]bundlePane) ([]element, []bundlePane) {
+               holdsInBundle[e.holdTimestamp]++
+               // We're going to process this timer!
+               toProcess = append(toProcess, e)
+               return toProcess, nil
+       })
+}
+
+// buildProcessingTimeBundle for aggregation stages prepares bundles for 
after-processing-time triggers
+func (*aggregateStageKind) buildProcessingTimeBundle(ss *stageState, em 
*ElementManager, emNow mtime.Time) (elementHeap, mtime.Time, set[string], 
map[mtime.Time]int, []bundlePane, bool) {
+       return handleProcessingTimeTimer(ss, em, emNow, func(e element, 
toProcess []element, holdsInBundle map[mtime.Time]int, panesInBundle 
[]bundlePane) ([]element, []bundlePane) {
+               // Different from `buildProcessingTimeBundle` for stateful 
stage,
+               // triggers don't hold back the watermark, so no holds are in 
the triggered bundle.
+               state := ss.state[LinkID{}][e.window][string(e.keyBytes)]
+               endOfWindowReached := e.window.MaxTimestamp() < ss.input
+               ready := ss.strat.IsTriggerReady(triggerInput{
+                       newElementCount:    0,
+                       endOfWindowReached: endOfWindowReached,
+                       emNow:              emNow,
+               }, &state)
+
+               if ready {
+                       state.Pane = computeNextTriggeredPane(state.Pane, 
endOfWindowReached)
+
+                       // We're going to process this trigger!
+                       elems, _ := ss.buildTriggeredBundle(em, 
string(e.keyBytes), e.window)
+                       toProcess = append(toProcess, elems...)
+
+                       ss.state[LinkID{}][e.window][string(e.keyBytes)] = state
+
+                       panesInBundle = append(panesInBundle, bundlePane{})
+               }
+
+               return toProcess, panesInBundle
+       })
+}
+
+// buildProcessingTimeBundle for stateless stages is not supposed to be called 
currently
+func (*ordinaryStageKind) buildProcessingTimeBundle(ss *stageState, em 
*ElementManager, emNow mtime.Time) (elementHeap, mtime.Time, set[string], 
map[mtime.Time]int, []bundlePane, bool) {
+       slog.Error("ordinary stages can't have processing time elements")
+       return nil, mtime.MinTimestamp, nil, nil, nil, false
 }
 
 // makeInProgressBundle is common code to store a set of elements as a bundle 
in progress.
@@ -2281,13 +2383,23 @@ func (ss *stageState) bundleReady(em *ElementManager, 
emNow mtime.Time) (mtime.T
        inputW := ss.input
        _, upstreamW := ss.UpstreamWatermark()
        previousInputW := ss.previousInput
-       if inputW == upstreamW && previousInputW == inputW {
+
+       _, isOrdinaryStage := ss.kind.(*ordinaryStageKind)
+       if isOrdinaryStage && len(ss.sides) == 0 {
+               // For ordinary stage with no side inputs, we use whether there 
are pending elements to determine
+               // whether a bundle is ready or not.
+               if len(ss.pending) == 0 {
+                       return mtime.MinTimestamp, false, ptimeEventsReady, 
injectedReady
+               }
+       } else if inputW == upstreamW && previousInputW == inputW {
+               // Otherwise, use the progression of watermark to determine the 
bundle readiness.
                slog.Debug("bundleReady: unchanged upstream watermark",
                        slog.String("stage", ss.ID),
                        slog.Group("watermark",
                                slog.Any("upstream == input == previousInput", 
inputW)))
                return mtime.MinTimestamp, false, ptimeEventsReady, 
injectedReady
        }
+
        ready := true
        for _, side := range ss.sides {
                pID, ok := em.pcolParents[side.Global]
@@ -2329,3 +2441,17 @@ func (em *ElementManager) ProcessingTimeNow() (ret 
mtime.Time) {
 func rebaseProcessingTime(localNow, scheduled mtime.Time) mtime.Time {
        return localNow + (scheduled - mtime.Now())
 }
+
+// wakeUpAt schedules a wakeup signal for the bundle processing loop.
+// This is used for processing time timers to ensure the loop re-evaluates
+// stages when a processing time timer is expected to fire.
+func (em *ElementManager) wakeUpAt(t mtime.Time) {
+       if em.testStreamHandler == nil && em.config.EnableRTC {
+               // only create this goroutine if we have real-time clock 
enabled and the pipeline does not have TestStream.
+               go func(fireAt time.Time) {
+                       time.AfterFunc(time.Until(fireAt), func() {
+                               em.refreshCond.Broadcast()
+                       })
+               }(t.ToTime())
+       }
+}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go 
b/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go
index 044b9806c1b..2aef5fcf332 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go
@@ -73,6 +73,49 @@ func (ws WinStrat) IsNeverTrigger() bool {
        return ok
 }
 
+func getAfterProcessingTimeTriggers(t Trigger) []*TriggerAfterProcessingTime {
+       if t == nil {
+               return nil
+       }
+       var triggers []*TriggerAfterProcessingTime
+       switch at := t.(type) {
+       case *TriggerAfterProcessingTime:
+               return []*TriggerAfterProcessingTime{at}
+       case *TriggerAfterAll:
+               for _, st := range at.SubTriggers {
+                       triggers = append(triggers, 
getAfterProcessingTimeTriggers(st)...)
+               }
+               return triggers
+       case *TriggerAfterAny:
+               for _, st := range at.SubTriggers {
+                       triggers = append(triggers, 
getAfterProcessingTimeTriggers(st)...)
+               }
+               return triggers
+       case *TriggerAfterEach:
+               for _, st := range at.SubTriggers {
+                       triggers = append(triggers, 
getAfterProcessingTimeTriggers(st)...)
+               }
+               return triggers
+       case *TriggerAfterEndOfWindow:
+               triggers = append(triggers, 
getAfterProcessingTimeTriggers(at.Early)...)
+               triggers = append(triggers, 
getAfterProcessingTimeTriggers(at.Late)...)
+               return triggers
+       case *TriggerOrFinally:
+               triggers = append(triggers, 
getAfterProcessingTimeTriggers(at.Main)...)
+               triggers = append(triggers, 
getAfterProcessingTimeTriggers(at.Finally)...)
+               return triggers
+       case *TriggerRepeatedly:
+               return getAfterProcessingTimeTriggers(at.Repeated)
+       default:
+               return nil
+       }
+}
+
+// GetAfterProcessingTimeTriggers returns all AfterProcessingTime triggers 
within the trigger.
+func (ws WinStrat) GetAfterProcessingTimeTriggers() 
[]*TriggerAfterProcessingTime {
+       return getAfterProcessingTimeTriggers(ws.Trigger)
+}
+
 func (ws WinStrat) String() string {
        return fmt.Sprintf("WinStrat[AllowedLateness:%v Trigger:%v]", 
ws.AllowedLateness, ws.Trigger)
 }
diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go 
b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go
index f0083815211..12c3c42c2e9 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go
@@ -316,7 +316,7 @@ func (s *Server) Prepare(ctx context.Context, req 
*jobpb.PrepareJobRequest) (_ *
 func hasUnsupportedTriggers(tpb *pipepb.Trigger) bool {
        unsupported := false
        switch at := tpb.GetTrigger().(type) {
-       case *pipepb.Trigger_AfterProcessingTime_, 
*pipepb.Trigger_AfterSynchronizedProcessingTime_:
+       case *pipepb.Trigger_AfterSynchronizedProcessingTime_:
                return true
        case *pipepb.Trigger_AfterAll_:
                for _, st := range at.AfterAll.GetSubtriggers() {
diff --git a/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go 
b/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go
index 7a742c22d0f..d54955f43d4 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go
@@ -53,7 +53,6 @@ func TestUnimplemented(t *testing.T) {
                // Currently unimplemented triggers.
                // https://github.com/apache/beam/issues/31438
                {pipeline: primitives.TriggerAfterSynchronizedProcessingTime},
-               {pipeline: primitives.TriggerAfterProcessingTime},
        }
 
        for _, test := range tests {
@@ -93,6 +92,8 @@ func TestImplemented(t *testing.T) {
                {pipeline: primitives.TriggerAfterEach},
                {pipeline: primitives.TriggerAfterEndOfWindow},
                {pipeline: primitives.TriggerRepeat},
+               {pipeline: primitives.TriggerAfterProcessingTime},
+               {pipeline: primitives.TriggerAfterProcessingTimeNotTriggered},
        }
 
        for _, test := range tests {
diff --git a/sdks/go/test/integration/integration.go 
b/sdks/go/test/integration/integration.go
index 8d951fe8ce9..eae64dcb205 100644
--- a/sdks/go/test/integration/integration.go
+++ b/sdks/go/test/integration/integration.go
@@ -171,6 +171,7 @@ var flinkFilters = []string{
        "TestBigQueryIO.*",
        "TestBigtableIO.*",
        "TestSpannerIO.*",
+       "TestTriggerAfterProcessingTime",
        // The number of produced outputs in AfterSynchronizedProcessingTime 
varies in different runs.
        "TestTriggerAfterSynchronizedProcessingTime",
        // The flink runner does not support pipeline drain for SDF.
diff --git a/sdks/go/test/integration/primitives/windowinto.go 
b/sdks/go/test/integration/primitives/windowinto.go
index d33e464b76f..f5d01bdfbba 100644
--- a/sdks/go/test/integration/primitives/windowinto.go
+++ b/sdks/go/test/integration/primitives/windowinto.go
@@ -217,14 +217,32 @@ func TriggerElementCount(s beam.Scope) {
                }, 2)
 }
 
-// TriggerAfterProcessingTime tests the AfterProcessingTime Trigger, it fires 
output panes once 't' processing time has passed
+// TriggerAfterProcessingTimeNotTriggered tests the AfterProcessingTime 
Trigger. It won't fire because 't' processing time is not reached
+// Not yet supported by the flink runner:
+// java.lang.UnsupportedOperationException: Advancing Processing time is not 
supported by the Flink Runner.
+func TriggerAfterProcessingTimeNotTriggered(s beam.Scope) {
+       con := teststream.NewConfig()
+       con.AdvanceProcessingTime(100)
+       con.AddElements(1000, 1.0, 2.0, 3.0)
+       con.AdvanceProcessingTime(4999) // advance processing time but not 
enough to fire the trigger
+       con.AddElements(22000, 4.0)
+
+       col := teststream.Create(s, con)
+
+       validateEquals(s.Scope("Global"), window.NewGlobalWindows(), col,
+               []beam.WindowIntoOption{
+                       beam.Trigger(trigger.AfterProcessingTime().PlusDelay(5 
* time.Second)),
+               }, 10.0)
+}
+
+// TriggerAfterProcessingTime tests the AfterProcessingTime Trigger. It fires 
output panes once 't' processing time has passed
 // Not yet supported by the flink runner:
 // java.lang.UnsupportedOperationException: Advancing Processing time is not 
supported by the Flink Runner.
 func TriggerAfterProcessingTime(s beam.Scope) {
        con := teststream.NewConfig()
        con.AdvanceProcessingTime(100)
        con.AddElements(1000, 1.0, 2.0, 3.0)
-       con.AdvanceProcessingTime(2000)
+       con.AdvanceProcessingTime(5000) // advance processing time to fire the 
trigger
        con.AddElements(22000, 4.0)
 
        col := teststream.Create(s, con)
@@ -232,7 +250,7 @@ func TriggerAfterProcessingTime(s beam.Scope) {
        validateEquals(s.Scope("Global"), window.NewGlobalWindows(), col,
                []beam.WindowIntoOption{
                        beam.Trigger(trigger.AfterProcessingTime().PlusDelay(5 
* time.Second)),
-               }, 6.0)
+               }, 6.0, 4.0)
 }
 
 // TriggerRepeat tests the repeat trigger. As of now is it is configure to 
take only one trigger as a subtrigger.
diff --git a/sdks/go/test/integration/primitives/windowinto_test.go 
b/sdks/go/test/integration/primitives/windowinto_test.go
index 0f2cff5d8f2..39a1df6e9e7 100644
--- a/sdks/go/test/integration/primitives/windowinto_test.go
+++ b/sdks/go/test/integration/primitives/windowinto_test.go
@@ -77,6 +77,12 @@ func TestTriggerAfterAny(t *testing.T) {
        ptest.BuildAndRun(t, TriggerAfterAny)
 }
 
+func TestTriggerAfterProcessingTime(t *testing.T) {
+       integration.CheckFilters(t)
+       ptest.BuildAndRun(t, TriggerAfterProcessingTime)
+       ptest.BuildAndRun(t, TriggerAfterProcessingTimeNotTriggered)
+}
+
 func TestTriggerAfterSynchronizedProcessingTime(t *testing.T) {
        integration.CheckFilters(t)
        ptest.BuildAndRun(t, TriggerAfterSynchronizedProcessingTime)

Reply via email to