This is an automated email from the ASF dual-hosted git repository.
shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 3534960b9c8 [Prism] Support AfterProcessingTime triggers - part 2
(#36333)
3534960b9c8 is described below
commit 3534960b9c8b99276853151825f233fad9d28513
Author: Shunping Huang <[email protected]>
AuthorDate: Thu Oct 2 19:14:11 2025 -0400
[Prism] Support AfterProcessingTime triggers - part 2 (#36333)
* Handle after-processing-time trigger with processing-time timer.
* Consolidate the buildProcessingTimeBundle for stateful and aggregate
kinds into one code path.
* Set correct pane info.
* Save panes when handling after-processing-time triggers
* Add comments
* Change existing after processing time trigger test and add one more.
* Change the bundleReady criterion for ordinary stage so it does not depend
on watermark.
* Remove advance watermark steps in tests.
* Fix failed vr tests in java related to statelss dofn with side input
* Sickbay a test.
* Add comments to explain the sickbay test.
* Address reviewer feedback.
* Add test filter for flink.
---
runners/prism/java/build.gradle | 10 ++
.../prism/internal/engine/elementmanager.go | 166 ++++++++++++++++++---
.../beam/runners/prism/internal/engine/strategy.go | 43 ++++++
.../prism/internal/jobservices/management.go | 2 +-
.../runners/prism/internal/unimplemented_test.go | 3 +-
sdks/go/test/integration/integration.go | 1 +
sdks/go/test/integration/primitives/windowinto.go | 24 ++-
.../test/integration/primitives/windowinto_test.go | 6 +
8 files changed, 230 insertions(+), 25 deletions(-)
diff --git a/runners/prism/java/build.gradle b/runners/prism/java/build.gradle
index e75fda999e1..fd3631fd4a7 100644
--- a/runners/prism/java/build.gradle
+++ b/runners/prism/java/build.gradle
@@ -94,6 +94,16 @@ def sickbayTests = [
'org.apache.beam.sdk.testing.TestStreamTest.testProcessingTimeTrigger',
'org.apache.beam.sdk.testing.TestStreamTest.testLateDataAccumulating', //
Uses processing time trigger for early firings.
+ // A regression introduced when we use number of pending elements rather
than watermark to determine
+ // the bundle readiness of a stateless stage.
+ // Currently, Prism processes a bundle of [100, ..., 1000] when watermark
is set to 100,
+ // and then a second bundle of [1, ... 99] when the watermark is set to
+inf.
+ // As a result, it yields an output of [-999, 1, 1...], where -999 comes
from the difference between 1000 and 1.
+ // According to
https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/DoFn.RequiresTimeSortedInput.html,
+ // the stateful dofn with `RequiresTimeSortedInput` annotation should
buffer an element until the element's timestamp + allowed_lateness.
+ // This stateful dofn feature is not yet supported in Prism.
+
'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testRequiresTimeSortedInputWithLateDataAndAllowedLateness',
+
// Triggered Side Inputs not yet implemented in Prism.
// https://github.com/apache/beam/issues/31438
'org.apache.beam.sdk.transforms.ViewTest.testTriggeredLatestSingleton',
diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
index 6af030f3622..d03d906e47d 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
@@ -1215,7 +1215,9 @@ type stageKind interface {
// buildEventTimeBundle handles building bundles for the stage per it's
kind.
buildEventTimeBundle(ss *stageState, watermark mtime.Time) (toProcess
elementHeap, minTs mtime.Time, newKeys set[string],
holdsInBundle map[mtime.Time]int, panesInBundle []bundlePane,
schedulable bool, pendingAdjustment int)
-
+ // buildProcessingTimeBundle handles building processing-time bundles
for the stage per it's kind.
+ buildProcessingTimeBundle(ss *stageState, em *ElementManager, emNow
mtime.Time) (toProcess elementHeap, minTs mtime.Time, newKeys set[string],
+ holdsInBundle map[mtime.Time]int, panesInBundle []bundlePane,
schedulable bool)
// getPaneOrDefault based on the stage state, element metadata, and
bundle id.
getPaneOrDefault(ss *stageState, defaultPane typex.PaneInfo, w
typex.Window, keyBytes []byte, bundID string) typex.PaneInfo
}
@@ -1327,17 +1329,54 @@ func (ss *stageState) injectTriggeredBundlesIfReady(em
*ElementManager, window t
ready := ss.strat.IsTriggerReady(triggerInput{
newElementCount: 1,
endOfWindowReached: endOfWindowReached,
+ emNow: em.ProcessingTimeNow(),
}, &state)
if ready {
state.Pane = computeNextTriggeredPane(state.Pane,
endOfWindowReached)
+ } else {
+ if pts := ss.strat.GetAfterProcessingTimeTriggers(); pts != nil
{
+ for _, t := range pts {
+ ts := (&state).getTriggerState(t)
+ if ts.extra == nil || t.shouldFire((&state)) {
+ // Skipping inserting a processing time
timer if the firing time
+ // is not set or it already should fire.
+ // When the after processing time
triggers should fire, there are
+ // two scenarios:
+ // (1) the entire trigger of this
window is ready to fire. In this
+ // case, `ready` should be true and
we won't reach here.
+ // (2) we are still waiting for other
triggers (subtriggers) to
+ // fire (e.g. AfterAll).
+ continue
+ }
+ firingTime :=
ts.extra.(afterProcessingTimeState).firingTime
+ notYetHolds := map[mtime.Time]int{}
+ timer := element{
+ window: window,
+ timestamp: firingTime,
+ holdTimestamp: window.MaxTimestamp(),
+ pane: typex.NoFiringPane(),
+ transform: ss.ID, // Use stage id
to fake transform id
+ family: "AfterProcessingTime",
+ tag: "",
+ sequence: 1,
+ elmBytes: nil,
+ keyBytes: []byte(key),
+ }
+ // TODO: how to deal with watermark holds for
this implicit processing time timer
+ // ss.watermarkHolds.Add(timer.holdTimestamp, 1)
+ ss.processingTimeTimers.Persist(firingTime,
timer, notYetHolds)
+ em.processTimeEvents.Schedule(firingTime, ss.ID)
+ em.wakeUpAt(firingTime)
+ }
+ }
}
// Store the state as triggers may have changed it.
ss.state[LinkID{}][window][key] = state
// If we're ready, it's time to fire!
if ready {
- count += ss.buildTriggeredBundle(em, key, window)
+ count += ss.startTriggeredBundle(em, key, window)
}
return count
}
@@ -1524,16 +1563,11 @@ func (ss *stageState) savePanes(bundID string,
panesInBundle []bundlePane) {
}
}
-// buildTriggeredBundle must be called with the stage.mu lock held.
-// When in discarding mode, returns 0.
-// When in accumulating mode, returns the number of fired elements to maintain
a correct pending count.
-func (ss *stageState) buildTriggeredBundle(em *ElementManager, key string, win
typex.Window) int {
+func (ss *stageState) buildTriggeredBundle(em *ElementManager, key string, win
typex.Window) ([]element, int) {
var toProcess []element
dnt := ss.pendingByKeys[key]
var notYet []element
- rb := RunBundle{StageID: ss.ID, BundleID: "agg-" + em.nextBundID(),
Watermark: ss.input}
-
// Look at all elements for this key, and only for this window.
for dnt.elements.Len() > 0 {
e := heap.Pop(&dnt.elements).(element)
@@ -1564,6 +1598,19 @@ func (ss *stageState) buildTriggeredBundle(em
*ElementManager, key string, win t
heap.Init(&dnt.elements)
}
+ return toProcess, accumulationDiff
+}
+
+// startTriggeredBundle must be called with the stage.mu lock held.
+// Returns the accumulation diff that the pending work needs to be adjusted
by, as completed work is subtracted from the pending count.
+// When in discarding mode, returns 0, as the pending work already includes
these elements.
+// When in accumulating mode, returns the number of fired elements, since
those elements remain pending even after this bundle is fired.
+func (ss *stageState) startTriggeredBundle(em *ElementManager, key string, win
typex.Window) int {
+ toProcess, accumulationDiff := ss.buildTriggeredBundle(em, key, win)
+ if len(toProcess) == 0 {
+ return accumulationDiff
+ }
+
if ss.inprogressKeys == nil {
ss.inprogressKeys = set[string]{}
}
@@ -1575,6 +1622,7 @@ func (ss *stageState) buildTriggeredBundle(em
*ElementManager, key string, win t
},
}
+ rb := RunBundle{StageID: ss.ID, BundleID: "agg-" + em.nextBundID(),
Watermark: ss.input}
ss.makeInProgressBundle(
func() string { return rb.BundleID },
toProcess,
@@ -1585,9 +1633,11 @@ func (ss *stageState) buildTriggeredBundle(em
*ElementManager, key string, win t
)
slog.Debug("started a triggered bundle", "stageID", ss.ID, "bundleID",
rb.BundleID, "size", len(toProcess))
- ss.bundlesToInject = append(ss.bundlesToInject, rb)
+ // TODO: Use ss.bundlesToInject rather than em.injectedBundles
+ // ss.bundlesToInject = append(ss.bundlesToInject, rb)
// Bundle is marked in progress here to prevent a race condition.
em.refreshCond.L.Lock()
+ em.injectedBundles = append(em.injectedBundles, rb)
em.inprogressBundles.insert(rb.BundleID)
em.refreshCond.L.Unlock()
return accumulationDiff
@@ -1927,6 +1977,20 @@ func (ss *stageState) startProcessingTimeBundle(em
*ElementManager, emNow mtime.
ss.mu.Lock()
defer ss.mu.Unlock()
+ toProcess, minTs, newKeys, holdsInBundle, panesInBundle,
stillSchedulable := ss.kind.buildProcessingTimeBundle(ss, em, emNow)
+
+ if len(toProcess) == 0 {
+ // If we have nothing
+ return "", false, stillSchedulable
+ }
+ bundID := ss.makeInProgressBundle(genBundID, toProcess, minTs, newKeys,
holdsInBundle, panesInBundle)
+ slog.Debug("started a processing time bundle", "stageID", ss.ID,
"bundleID", bundID, "size", len(toProcess), "emNow", emNow)
+ return bundID, true, stillSchedulable
+}
+
+// handleProcessingTimeTimer contains the common code for handling
processing-time timers for aggregation stages and stateful stages.
+func handleProcessingTimeTimer(ss *stageState, em *ElementManager, emNow
mtime.Time,
+ processTimerFn func(e element, toProcess []element, holdsInBundle
map[mtime.Time]int, panesInBundle []bundlePane) ([]element, []bundlePane))
(elementHeap, mtime.Time, set[string], map[mtime.Time]int, []bundlePane, bool) {
// TODO: Determine if it's possible and a good idea to treat all
EventTime processing as a MinTime
// Special Case for ProcessingTime handling.
// Eg. Always queue EventTime elements at minTime.
@@ -1935,6 +1999,7 @@ func (ss *stageState) startProcessingTimeBundle(em
*ElementManager, emNow mtime.
// Potentially puts too much work on the scheduling thread though.
var toProcess []element
+ var panesInBundle []bundlePane
minTs := mtime.MaxTimestamp
holdsInBundle := map[mtime.Time]int{}
@@ -1968,10 +2033,8 @@ func (ss *stageState) startProcessingTimeBundle(em
*ElementManager, emNow mtime.
if e.timestamp < minTs {
minTs = e.timestamp
}
- holdsInBundle[e.holdTimestamp]++
- // We're going to process this timer!
- toProcess = append(toProcess, e)
+ toProcess, panesInBundle = processTimerFn(e, toProcess,
holdsInBundle, panesInBundle)
}
nextTime = ss.processingTimeTimers.Peek()
@@ -1986,19 +2049,58 @@ func (ss *stageState) startProcessingTimeBundle(em
*ElementManager, emNow mtime.
for _, v := range notYet {
ss.processingTimeTimers.Persist(v.firing, v.timer, notYetHolds)
em.processTimeEvents.Schedule(v.firing, ss.ID)
+ em.wakeUpAt(v.firing)
}
// Add a refresh if there are still processing time events to process.
stillSchedulable := (nextTime < emNow && nextTime != mtime.MaxTimestamp
|| len(notYet) > 0)
- if len(toProcess) == 0 {
- // If we have nothing
- return "", false, stillSchedulable
- }
- bundID := ss.makeInProgressBundle(genBundID, toProcess, minTs, newKeys,
holdsInBundle, nil)
+ return toProcess, minTs, newKeys, holdsInBundle, panesInBundle,
stillSchedulable
+}
- slog.Debug("started a processing time bundle", "stageID", ss.ID,
"bundleID", bundID, "size", len(toProcess), "emNow", emNow)
- return bundID, true, stillSchedulable
+// buildProcessingTimeBundle for stateful stages prepares bundles for
processing-time timers
+func (*statefulStageKind) buildProcessingTimeBundle(ss *stageState, em
*ElementManager, emNow mtime.Time) (elementHeap, mtime.Time, set[string],
map[mtime.Time]int, []bundlePane, bool) {
+ return handleProcessingTimeTimer(ss, em, emNow, func(e element,
toProcess []element, holdsInBundle map[mtime.Time]int, panesInBundle
[]bundlePane) ([]element, []bundlePane) {
+ holdsInBundle[e.holdTimestamp]++
+ // We're going to process this timer!
+ toProcess = append(toProcess, e)
+ return toProcess, nil
+ })
+}
+
+// buildProcessingTimeBundle for aggregation stages prepares bundles for
after-processing-time triggers
+func (*aggregateStageKind) buildProcessingTimeBundle(ss *stageState, em
*ElementManager, emNow mtime.Time) (elementHeap, mtime.Time, set[string],
map[mtime.Time]int, []bundlePane, bool) {
+ return handleProcessingTimeTimer(ss, em, emNow, func(e element,
toProcess []element, holdsInBundle map[mtime.Time]int, panesInBundle
[]bundlePane) ([]element, []bundlePane) {
+ // Different from `buildProcessingTimeBundle` for stateful
stage,
+ // triggers don't hold back the watermark, so no holds are in
the triggered bundle.
+ state := ss.state[LinkID{}][e.window][string(e.keyBytes)]
+ endOfWindowReached := e.window.MaxTimestamp() < ss.input
+ ready := ss.strat.IsTriggerReady(triggerInput{
+ newElementCount: 0,
+ endOfWindowReached: endOfWindowReached,
+ emNow: emNow,
+ }, &state)
+
+ if ready {
+ state.Pane = computeNextTriggeredPane(state.Pane,
endOfWindowReached)
+
+ // We're going to process this trigger!
+ elems, _ := ss.buildTriggeredBundle(em,
string(e.keyBytes), e.window)
+ toProcess = append(toProcess, elems...)
+
+ ss.state[LinkID{}][e.window][string(e.keyBytes)] = state
+
+ panesInBundle = append(panesInBundle, bundlePane{})
+ }
+
+ return toProcess, panesInBundle
+ })
+}
+
+// buildProcessingTimeBundle for stateless stages is not supposed to be called
currently
+func (*ordinaryStageKind) buildProcessingTimeBundle(ss *stageState, em
*ElementManager, emNow mtime.Time) (elementHeap, mtime.Time, set[string],
map[mtime.Time]int, []bundlePane, bool) {
+ slog.Error("ordinary stages can't have processing time elements")
+ return nil, mtime.MinTimestamp, nil, nil, nil, false
}
// makeInProgressBundle is common code to store a set of elements as a bundle
in progress.
@@ -2281,13 +2383,23 @@ func (ss *stageState) bundleReady(em *ElementManager,
emNow mtime.Time) (mtime.T
inputW := ss.input
_, upstreamW := ss.UpstreamWatermark()
previousInputW := ss.previousInput
- if inputW == upstreamW && previousInputW == inputW {
+
+ _, isOrdinaryStage := ss.kind.(*ordinaryStageKind)
+ if isOrdinaryStage && len(ss.sides) == 0 {
+ // For ordinary stage with no side inputs, we use whether there
are pending elements to determine
+ // whether a bundle is ready or not.
+ if len(ss.pending) == 0 {
+ return mtime.MinTimestamp, false, ptimeEventsReady,
injectedReady
+ }
+ } else if inputW == upstreamW && previousInputW == inputW {
+ // Otherwise, use the progression of watermark to determine the
bundle readiness.
slog.Debug("bundleReady: unchanged upstream watermark",
slog.String("stage", ss.ID),
slog.Group("watermark",
slog.Any("upstream == input == previousInput",
inputW)))
return mtime.MinTimestamp, false, ptimeEventsReady,
injectedReady
}
+
ready := true
for _, side := range ss.sides {
pID, ok := em.pcolParents[side.Global]
@@ -2329,3 +2441,17 @@ func (em *ElementManager) ProcessingTimeNow() (ret
mtime.Time) {
func rebaseProcessingTime(localNow, scheduled mtime.Time) mtime.Time {
return localNow + (scheduled - mtime.Now())
}
+
+// wakeUpAt schedules a wakeup signal for the bundle processing loop.
+// This is used for processing time timers to ensure the loop re-evaluates
+// stages when a processing time timer is expected to fire.
+func (em *ElementManager) wakeUpAt(t mtime.Time) {
+ if em.testStreamHandler == nil && em.config.EnableRTC {
+ // only create this goroutine if we have real-time clock
enabled and the pipeline does not have TestStream.
+ go func(fireAt time.Time) {
+ time.AfterFunc(time.Until(fireAt), func() {
+ em.refreshCond.Broadcast()
+ })
+ }(t.ToTime())
+ }
+}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go
b/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go
index 044b9806c1b..2aef5fcf332 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go
@@ -73,6 +73,49 @@ func (ws WinStrat) IsNeverTrigger() bool {
return ok
}
+func getAfterProcessingTimeTriggers(t Trigger) []*TriggerAfterProcessingTime {
+ if t == nil {
+ return nil
+ }
+ var triggers []*TriggerAfterProcessingTime
+ switch at := t.(type) {
+ case *TriggerAfterProcessingTime:
+ return []*TriggerAfterProcessingTime{at}
+ case *TriggerAfterAll:
+ for _, st := range at.SubTriggers {
+ triggers = append(triggers,
getAfterProcessingTimeTriggers(st)...)
+ }
+ return triggers
+ case *TriggerAfterAny:
+ for _, st := range at.SubTriggers {
+ triggers = append(triggers,
getAfterProcessingTimeTriggers(st)...)
+ }
+ return triggers
+ case *TriggerAfterEach:
+ for _, st := range at.SubTriggers {
+ triggers = append(triggers,
getAfterProcessingTimeTriggers(st)...)
+ }
+ return triggers
+ case *TriggerAfterEndOfWindow:
+ triggers = append(triggers,
getAfterProcessingTimeTriggers(at.Early)...)
+ triggers = append(triggers,
getAfterProcessingTimeTriggers(at.Late)...)
+ return triggers
+ case *TriggerOrFinally:
+ triggers = append(triggers,
getAfterProcessingTimeTriggers(at.Main)...)
+ triggers = append(triggers,
getAfterProcessingTimeTriggers(at.Finally)...)
+ return triggers
+ case *TriggerRepeatedly:
+ return getAfterProcessingTimeTriggers(at.Repeated)
+ default:
+ return nil
+ }
+}
+
+// GetAfterProcessingTimeTriggers returns all AfterProcessingTime triggers
within the trigger.
+func (ws WinStrat) GetAfterProcessingTimeTriggers()
[]*TriggerAfterProcessingTime {
+ return getAfterProcessingTimeTriggers(ws.Trigger)
+}
+
func (ws WinStrat) String() string {
return fmt.Sprintf("WinStrat[AllowedLateness:%v Trigger:%v]",
ws.AllowedLateness, ws.Trigger)
}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go
b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go
index f0083815211..12c3c42c2e9 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go
@@ -316,7 +316,7 @@ func (s *Server) Prepare(ctx context.Context, req
*jobpb.PrepareJobRequest) (_ *
func hasUnsupportedTriggers(tpb *pipepb.Trigger) bool {
unsupported := false
switch at := tpb.GetTrigger().(type) {
- case *pipepb.Trigger_AfterProcessingTime_,
*pipepb.Trigger_AfterSynchronizedProcessingTime_:
+ case *pipepb.Trigger_AfterSynchronizedProcessingTime_:
return true
case *pipepb.Trigger_AfterAll_:
for _, st := range at.AfterAll.GetSubtriggers() {
diff --git a/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go
b/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go
index 7a742c22d0f..d54955f43d4 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go
@@ -53,7 +53,6 @@ func TestUnimplemented(t *testing.T) {
// Currently unimplemented triggers.
// https://github.com/apache/beam/issues/31438
{pipeline: primitives.TriggerAfterSynchronizedProcessingTime},
- {pipeline: primitives.TriggerAfterProcessingTime},
}
for _, test := range tests {
@@ -93,6 +92,8 @@ func TestImplemented(t *testing.T) {
{pipeline: primitives.TriggerAfterEach},
{pipeline: primitives.TriggerAfterEndOfWindow},
{pipeline: primitives.TriggerRepeat},
+ {pipeline: primitives.TriggerAfterProcessingTime},
+ {pipeline: primitives.TriggerAfterProcessingTimeNotTriggered},
}
for _, test := range tests {
diff --git a/sdks/go/test/integration/integration.go
b/sdks/go/test/integration/integration.go
index 8d951fe8ce9..eae64dcb205 100644
--- a/sdks/go/test/integration/integration.go
+++ b/sdks/go/test/integration/integration.go
@@ -171,6 +171,7 @@ var flinkFilters = []string{
"TestBigQueryIO.*",
"TestBigtableIO.*",
"TestSpannerIO.*",
+ "TestTriggerAfterProcessingTime",
// The number of produced outputs in AfterSynchronizedProcessingTime
varies in different runs.
"TestTriggerAfterSynchronizedProcessingTime",
// The flink runner does not support pipeline drain for SDF.
diff --git a/sdks/go/test/integration/primitives/windowinto.go
b/sdks/go/test/integration/primitives/windowinto.go
index d33e464b76f..f5d01bdfbba 100644
--- a/sdks/go/test/integration/primitives/windowinto.go
+++ b/sdks/go/test/integration/primitives/windowinto.go
@@ -217,14 +217,32 @@ func TriggerElementCount(s beam.Scope) {
}, 2)
}
-// TriggerAfterProcessingTime tests the AfterProcessingTime Trigger, it fires
output panes once 't' processing time has passed
+// TriggerAfterProcessingTimeNotTriggered tests the AfterProcessingTime
Trigger. It won't fire because 't' processing time is not reached
+// Not yet supported by the flink runner:
+// java.lang.UnsupportedOperationException: Advancing Processing time is not
supported by the Flink Runner.
+func TriggerAfterProcessingTimeNotTriggered(s beam.Scope) {
+ con := teststream.NewConfig()
+ con.AdvanceProcessingTime(100)
+ con.AddElements(1000, 1.0, 2.0, 3.0)
+ con.AdvanceProcessingTime(4999) // advance processing time but not
enough to fire the trigger
+ con.AddElements(22000, 4.0)
+
+ col := teststream.Create(s, con)
+
+ validateEquals(s.Scope("Global"), window.NewGlobalWindows(), col,
+ []beam.WindowIntoOption{
+ beam.Trigger(trigger.AfterProcessingTime().PlusDelay(5
* time.Second)),
+ }, 10.0)
+}
+
+// TriggerAfterProcessingTime tests the AfterProcessingTime Trigger. It fires
output panes once 't' processing time has passed
// Not yet supported by the flink runner:
// java.lang.UnsupportedOperationException: Advancing Processing time is not
supported by the Flink Runner.
func TriggerAfterProcessingTime(s beam.Scope) {
con := teststream.NewConfig()
con.AdvanceProcessingTime(100)
con.AddElements(1000, 1.0, 2.0, 3.0)
- con.AdvanceProcessingTime(2000)
+ con.AdvanceProcessingTime(5000) // advance processing time to fire the
trigger
con.AddElements(22000, 4.0)
col := teststream.Create(s, con)
@@ -232,7 +250,7 @@ func TriggerAfterProcessingTime(s beam.Scope) {
validateEquals(s.Scope("Global"), window.NewGlobalWindows(), col,
[]beam.WindowIntoOption{
beam.Trigger(trigger.AfterProcessingTime().PlusDelay(5
* time.Second)),
- }, 6.0)
+ }, 6.0, 4.0)
}
// TriggerRepeat tests the repeat trigger. As of now is it is configure to
take only one trigger as a subtrigger.
diff --git a/sdks/go/test/integration/primitives/windowinto_test.go
b/sdks/go/test/integration/primitives/windowinto_test.go
index 0f2cff5d8f2..39a1df6e9e7 100644
--- a/sdks/go/test/integration/primitives/windowinto_test.go
+++ b/sdks/go/test/integration/primitives/windowinto_test.go
@@ -77,6 +77,12 @@ func TestTriggerAfterAny(t *testing.T) {
ptest.BuildAndRun(t, TriggerAfterAny)
}
+func TestTriggerAfterProcessingTime(t *testing.T) {
+ integration.CheckFilters(t)
+ ptest.BuildAndRun(t, TriggerAfterProcessingTime)
+ ptest.BuildAndRun(t, TriggerAfterProcessingTimeNotTriggered)
+}
+
func TestTriggerAfterSynchronizedProcessingTime(t *testing.T) {
integration.CheckFilters(t)
ptest.BuildAndRun(t, TriggerAfterSynchronizedProcessingTime)