This is an automated email from the ASF dual-hosted git repository.
shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 586cb119224 [Prism] Support AfterSynchronizedProcessingTime and enable
java processing-time trigger tests (#36379)
586cb119224 is described below
commit 586cb11922445f286501f569514ec2a81d610a7a
Author: Shunping Huang <[email protected]>
AuthorDate: Fri Oct 3 13:47:50 2025 -0400
[Prism] Support AfterSynchronizedProcessingTime and enable java
processing-time trigger tests (#36379)
* Support AfterSynchronizedProcessingTime trigger in prism.
* Some minor bug fix in processing-time triggers.
* Enable UsesTestStreamWithProcessingTime test category (20+ tests) and
exclude 3 failed tests.
---
runners/prism/java/build.gradle | 16 ++++-----
.../beam/runners/prism/internal/engine/strategy.go | 38 ++++++++++++++++++++--
sdks/go/pkg/beam/runners/prism/internal/execute.go | 4 +--
.../prism/internal/jobservices/management.go | 15 +++++++--
.../runners/prism/internal/unimplemented_test.go | 5 +--
5 files changed, 56 insertions(+), 22 deletions(-)
diff --git a/runners/prism/java/build.gradle b/runners/prism/java/build.gradle
index fd3631fd4a7..dbd7cc6cb5c 100644
--- a/runners/prism/java/build.gradle
+++ b/runners/prism/java/build.gradle
@@ -86,13 +86,9 @@ def sickbayTests = [
'org.apache.beam.sdk.metrics.MetricsTest$CommittedMetricTests.testCommittedStringSetMetrics',
'org.apache.beam.sdk.metrics.MetricsTest$CommittedMetricTests.testCommittedGaugeMetrics',
- // ProcessingTime triggers not yet implemented in Prism.
- // https://github.com/apache/beam/issues/31438
+ // negative WaitGroup counter when failing bundle
'org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerUsingState',
-
'org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testCombiningAccumulatingProcessingTime',
-
'org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerEarly',
- 'org.apache.beam.sdk.testing.TestStreamTest.testProcessingTimeTrigger',
- 'org.apache.beam.sdk.testing.TestStreamTest.testLateDataAccumulating', //
Uses processing time trigger for early firings.
+ 'org.apache.beam.sdk.testing.TestStreamTest.testEarlyPanesOfWindow',
// A regression introduced when we use number of pending elements rather
than watermark to determine
// the bundle readiness of a stateless stage.
@@ -107,6 +103,7 @@ def sickbayTests = [
// Triggered Side Inputs not yet implemented in Prism.
// https://github.com/apache/beam/issues/31438
'org.apache.beam.sdk.transforms.ViewTest.testTriggeredLatestSingleton',
+ 'org.apache.beam.sdk.testing.TestStreamTest.testProcessingTimeTrigger',
// Prism doesn't support multiple TestStreams.
'org.apache.beam.sdk.testing.TestStreamTest.testMultipleStreams',
@@ -116,6 +113,9 @@ def sickbayTests = [
// GroupIntoBatchesTest tests that fail:
// Teststream has bad KV encodings due to using an outer context.
'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testInStreamingMode',
+
'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testBufferingTimerInFixedWindow',
+ // sdk worker disconnected
+
'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testBufferingTimerInGlobalWindow',
// ShardedKey not yet implemented.
'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testWithShardedKeyInGlobalWindow',
@@ -204,10 +204,6 @@ def createPrismValidatesRunnerTask = { name,
environmentType ->
//
https://github.com/apache/beam/issues?q=is%3Aissue+is%3Aopen+MultimapState
excludeCategories 'org.apache.beam.sdk.testing.UsesMultimapState'
- // Processing time with TestStream is unreliable without being able to
control
- // SDK side time portably. Ignore these tests.
- excludeCategories
'org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime'
-
// Not yet supported in Prism.
excludeCategories 'org.apache.beam.sdk.testing.UsesBoundedTrieMetrics'
}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go
b/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go
index 2aef5fcf332..691f249a5be 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go
@@ -718,12 +718,44 @@ func (t *TriggerAfterProcessingTime) reset(state
*StateData) {
// Not reaching the end of window yet.
// We keep the state (especially the next possible firing time) in case
the trigger is called again
ts.finished = false
- s := ts.extra.(afterProcessingTimeState)
- s.firingTime = t.applyTimestampTransforms(s.emNow) // compute next
possible firing time
- ts.extra = s
+ if ts.extra != nil {
+ s := ts.extra.(afterProcessingTimeState)
+ s.firingTime = t.applyTimestampTransforms(s.emNow) // compute
next possible firing time
+ ts.extra = s
+ }
state.setTriggerState(t, ts)
}
func (t *TriggerAfterProcessingTime) String() string {
return fmt.Sprintf("AfterProcessingTime[%v]", t.Transforms)
}
+
+// TriggerAfterSynchronizedProcessingTime is supposed to fires once when
processing
+// time across multiple workers synchronizes with the first element's
processing time.
+// It is a no-op in the current prism single-node architecture, because we
only have
+// one worker/machine. Therefore, the trigger just fires once it receives the
data.
+type TriggerAfterSynchronizedProcessingTime struct{}
+
+func (t *TriggerAfterSynchronizedProcessingTime) onElement(triggerInput,
*StateData) {}
+
+func (t *TriggerAfterSynchronizedProcessingTime) shouldFire(state *StateData)
bool {
+ ts := state.getTriggerState(t)
+ return !ts.finished
+}
+
+func (t *TriggerAfterSynchronizedProcessingTime) onFire(state *StateData) {
+ if !t.shouldFire(state) {
+ return
+ }
+ ts := state.getTriggerState(t)
+ ts.finished = true
+ state.setTriggerState(t, ts)
+}
+
+func (t *TriggerAfterSynchronizedProcessingTime) reset(state *StateData) {
+ delete(state.Trigger, t)
+}
+
+func (t *TriggerAfterSynchronizedProcessingTime) String() string {
+ return "AfterSynchronizedProcessingTime"
+}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go
b/sdks/go/pkg/beam/runners/prism/internal/execute.go
index 9d23a89d458..cad1fb7e547 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/execute.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go
@@ -37,7 +37,6 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/worker"
"golang.org/x/exp/maps"
"golang.org/x/sync/errgroup"
- "google.golang.org/protobuf/encoding/prototext"
"google.golang.org/protobuf/proto"
)
@@ -388,6 +387,7 @@ func executePipeline(ctx context.Context, wks
map[string]*worker.W, j *jobservic
wk := wks[s.envID]
if err := s.Execute(ctx, j, wk, comps, em, rb);
err != nil {
// Ensure we clean up on bundle failure
+ j.Logger.Error("Bundle Failed.",
slog.Any("error", err))
em.FailBundle(rb)
return err
}
@@ -498,7 +498,7 @@ func buildTrigger(tpb *pipepb.Trigger) engine.Trigger {
Transforms: transforms,
}
case *pipepb.Trigger_AfterSynchronizedProcessingTime_:
- panic(fmt.Sprintf("unsupported trigger: %v",
prototext.Format(tpb)))
+ return &engine.TriggerAfterSynchronizedProcessingTime{}
default:
return &engine.TriggerDefault{}
}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go
b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go
index 12c3c42c2e9..e3f65078657 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go
@@ -314,10 +314,19 @@ func (s *Server) Prepare(ctx context.Context, req
*jobpb.PrepareJobRequest) (_ *
}
func hasUnsupportedTriggers(tpb *pipepb.Trigger) bool {
+ if tpb == nil {
+ return false
+ }
+
unsupported := false
switch at := tpb.GetTrigger().(type) {
- case *pipepb.Trigger_AfterSynchronizedProcessingTime_:
- return true
+ // stateless leaf trigger
+ case *pipepb.Trigger_Never_, *pipepb.Trigger_Always_,
*pipepb.Trigger_Default_:
+ return false
+ // stateful leaf trigger
+ case *pipepb.Trigger_ElementCount_,
*pipepb.Trigger_AfterProcessingTime_,
*pipepb.Trigger_AfterSynchronizedProcessingTime_:
+ return false
+ // composite trigger below
case *pipepb.Trigger_AfterAll_:
for _, st := range at.AfterAll.GetSubtriggers() {
unsupported = unsupported || hasUnsupportedTriggers(st)
@@ -342,7 +351,7 @@ func hasUnsupportedTriggers(tpb *pipepb.Trigger) bool {
case *pipepb.Trigger_Repeat_:
return hasUnsupportedTriggers(at.Repeat.GetSubtrigger())
default:
- return false
+ return true
}
}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go
b/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go
index d54955f43d4..89cbd2b17f6 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go
@@ -49,10 +49,6 @@ func TestUnimplemented(t *testing.T) {
// See https://github.com/apache/beam/issues/31153.
{pipeline: primitives.TriggerElementCount},
{pipeline: primitives.TriggerOrFinally},
-
- // Currently unimplemented triggers.
- // https://github.com/apache/beam/issues/31438
- {pipeline: primitives.TriggerAfterSynchronizedProcessingTime},
}
for _, test := range tests {
@@ -94,6 +90,7 @@ func TestImplemented(t *testing.T) {
{pipeline: primitives.TriggerRepeat},
{pipeline: primitives.TriggerAfterProcessingTime},
{pipeline: primitives.TriggerAfterProcessingTimeNotTriggered},
+ {pipeline: primitives.TriggerAfterSynchronizedProcessingTime},
}
for _, test := range tests {