This is an automated email from the ASF dual-hosted git repository.

shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 586cb119224 [Prism] Support AfterSynchronizedProcessingTime and enable 
java processing-time trigger tests (#36379)
586cb119224 is described below

commit 586cb11922445f286501f569514ec2a81d610a7a
Author: Shunping Huang <[email protected]>
AuthorDate: Fri Oct 3 13:47:50 2025 -0400

    [Prism] Support AfterSynchronizedProcessingTime and enable java 
processing-time trigger tests (#36379)
    
    * Support AfterSynchronizedProcessingTime trigger in prism.
    
    * Some minor bug fix in processing-time triggers.
    
    * Enable UsesTestStreamWithProcessingTime test category (20+ tests) and 
exclude 3 failed tests.
---
 runners/prism/java/build.gradle                    | 16 ++++-----
 .../beam/runners/prism/internal/engine/strategy.go | 38 ++++++++++++++++++++--
 sdks/go/pkg/beam/runners/prism/internal/execute.go |  4 +--
 .../prism/internal/jobservices/management.go       | 15 +++++++--
 .../runners/prism/internal/unimplemented_test.go   |  5 +--
 5 files changed, 56 insertions(+), 22 deletions(-)

diff --git a/runners/prism/java/build.gradle b/runners/prism/java/build.gradle
index fd3631fd4a7..dbd7cc6cb5c 100644
--- a/runners/prism/java/build.gradle
+++ b/runners/prism/java/build.gradle
@@ -86,13 +86,9 @@ def sickbayTests = [
     
'org.apache.beam.sdk.metrics.MetricsTest$CommittedMetricTests.testCommittedStringSetMetrics',
     
'org.apache.beam.sdk.metrics.MetricsTest$CommittedMetricTests.testCommittedGaugeMetrics',
 
-    // ProcessingTime triggers not yet implemented in Prism.
-    // https://github.com/apache/beam/issues/31438
+    // negative WaitGroup counter when failing bundle
     
'org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerUsingState',
-    
'org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testCombiningAccumulatingProcessingTime',
-    
'org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerEarly',
-    'org.apache.beam.sdk.testing.TestStreamTest.testProcessingTimeTrigger',
-    'org.apache.beam.sdk.testing.TestStreamTest.testLateDataAccumulating', // 
Uses processing time trigger for early firings.
+    'org.apache.beam.sdk.testing.TestStreamTest.testEarlyPanesOfWindow',
 
     // A regression introduced when we use number of pending elements rather 
than watermark to determine
     // the bundle readiness of a stateless stage.
@@ -107,6 +103,7 @@ def sickbayTests = [
     // Triggered Side Inputs not yet implemented in Prism.
     // https://github.com/apache/beam/issues/31438
     'org.apache.beam.sdk.transforms.ViewTest.testTriggeredLatestSingleton',
+    'org.apache.beam.sdk.testing.TestStreamTest.testProcessingTimeTrigger',
 
     // Prism doesn't support multiple TestStreams.
     'org.apache.beam.sdk.testing.TestStreamTest.testMultipleStreams',
@@ -116,6 +113,9 @@ def sickbayTests = [
     // GroupIntoBatchesTest tests that fail:
     // Teststream has bad KV encodings due to using an outer context.
     'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testInStreamingMode',
+    
'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testBufferingTimerInFixedWindow',
+    // sdk worker disconnected
+    
'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testBufferingTimerInGlobalWindow',
     // ShardedKey not yet implemented.
     
'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testWithShardedKeyInGlobalWindow',
 
@@ -204,10 +204,6 @@ def createPrismValidatesRunnerTask = { name, 
environmentType ->
       // 
https://github.com/apache/beam/issues?q=is%3Aissue+is%3Aopen+MultimapState
       excludeCategories 'org.apache.beam.sdk.testing.UsesMultimapState'
 
-      // Processing time with TestStream is unreliable without being able to 
control
-      // SDK side time portably. Ignore these tests.
-      excludeCategories 
'org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime'
-
       // Not yet supported in Prism.
       excludeCategories 'org.apache.beam.sdk.testing.UsesBoundedTrieMetrics'
     }
diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go 
b/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go
index 2aef5fcf332..691f249a5be 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go
@@ -718,12 +718,44 @@ func (t *TriggerAfterProcessingTime) reset(state 
*StateData) {
        // Not reaching the end of window yet.
        // We keep the state (especially the next possible firing time) in case 
the trigger is called again
        ts.finished = false
-       s := ts.extra.(afterProcessingTimeState)
-       s.firingTime = t.applyTimestampTransforms(s.emNow) // compute next 
possible firing time
-       ts.extra = s
+       if ts.extra != nil {
+               s := ts.extra.(afterProcessingTimeState)
+               s.firingTime = t.applyTimestampTransforms(s.emNow) // compute 
next possible firing time
+               ts.extra = s
+       }
        state.setTriggerState(t, ts)
 }
 
 func (t *TriggerAfterProcessingTime) String() string {
        return fmt.Sprintf("AfterProcessingTime[%v]", t.Transforms)
 }
+
+// TriggerAfterSynchronizedProcessingTime is supposed to fires once when 
processing
+// time across multiple workers synchronizes with the first element's 
processing time.
+// It is a no-op in the current prism single-node architecture, because we 
only have
+// one worker/machine. Therefore, the trigger just fires once it receives the 
data.
+type TriggerAfterSynchronizedProcessingTime struct{}
+
+func (t *TriggerAfterSynchronizedProcessingTime) onElement(triggerInput, 
*StateData) {}
+
+func (t *TriggerAfterSynchronizedProcessingTime) shouldFire(state *StateData) 
bool {
+       ts := state.getTriggerState(t)
+       return !ts.finished
+}
+
+func (t *TriggerAfterSynchronizedProcessingTime) onFire(state *StateData) {
+       if !t.shouldFire(state) {
+               return
+       }
+       ts := state.getTriggerState(t)
+       ts.finished = true
+       state.setTriggerState(t, ts)
+}
+
+func (t *TriggerAfterSynchronizedProcessingTime) reset(state *StateData) {
+       delete(state.Trigger, t)
+}
+
+func (t *TriggerAfterSynchronizedProcessingTime) String() string {
+       return "AfterSynchronizedProcessingTime"
+}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go 
b/sdks/go/pkg/beam/runners/prism/internal/execute.go
index 9d23a89d458..cad1fb7e547 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/execute.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go
@@ -37,7 +37,6 @@ import (
        
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/worker"
        "golang.org/x/exp/maps"
        "golang.org/x/sync/errgroup"
-       "google.golang.org/protobuf/encoding/prototext"
        "google.golang.org/protobuf/proto"
 )
 
@@ -388,6 +387,7 @@ func executePipeline(ctx context.Context, wks 
map[string]*worker.W, j *jobservic
                                wk := wks[s.envID]
                                if err := s.Execute(ctx, j, wk, comps, em, rb); 
err != nil {
                                        // Ensure we clean up on bundle failure
+                                       j.Logger.Error("Bundle Failed.", 
slog.Any("error", err))
                                        em.FailBundle(rb)
                                        return err
                                }
@@ -498,7 +498,7 @@ func buildTrigger(tpb *pipepb.Trigger) engine.Trigger {
                        Transforms: transforms,
                }
        case *pipepb.Trigger_AfterSynchronizedProcessingTime_:
-               panic(fmt.Sprintf("unsupported trigger: %v", 
prototext.Format(tpb)))
+               return &engine.TriggerAfterSynchronizedProcessingTime{}
        default:
                return &engine.TriggerDefault{}
        }
diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go 
b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go
index 12c3c42c2e9..e3f65078657 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go
@@ -314,10 +314,19 @@ func (s *Server) Prepare(ctx context.Context, req 
*jobpb.PrepareJobRequest) (_ *
 }
 
 func hasUnsupportedTriggers(tpb *pipepb.Trigger) bool {
+       if tpb == nil {
+               return false
+       }
+
        unsupported := false
        switch at := tpb.GetTrigger().(type) {
-       case *pipepb.Trigger_AfterSynchronizedProcessingTime_:
-               return true
+       // stateless leaf trigger
+       case *pipepb.Trigger_Never_, *pipepb.Trigger_Always_, 
*pipepb.Trigger_Default_:
+               return false
+       // stateful leaf trigger
+       case *pipepb.Trigger_ElementCount_, 
*pipepb.Trigger_AfterProcessingTime_, 
*pipepb.Trigger_AfterSynchronizedProcessingTime_:
+               return false
+       // composite trigger below
        case *pipepb.Trigger_AfterAll_:
                for _, st := range at.AfterAll.GetSubtriggers() {
                        unsupported = unsupported || hasUnsupportedTriggers(st)
@@ -342,7 +351,7 @@ func hasUnsupportedTriggers(tpb *pipepb.Trigger) bool {
        case *pipepb.Trigger_Repeat_:
                return hasUnsupportedTriggers(at.Repeat.GetSubtrigger())
        default:
-               return false
+               return true
        }
 }
 
diff --git a/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go 
b/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go
index d54955f43d4..89cbd2b17f6 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go
@@ -49,10 +49,6 @@ func TestUnimplemented(t *testing.T) {
                // See https://github.com/apache/beam/issues/31153.
                {pipeline: primitives.TriggerElementCount},
                {pipeline: primitives.TriggerOrFinally},
-
-               // Currently unimplemented triggers.
-               // https://github.com/apache/beam/issues/31438
-               {pipeline: primitives.TriggerAfterSynchronizedProcessingTime},
        }
 
        for _, test := range tests {
@@ -94,6 +90,7 @@ func TestImplemented(t *testing.T) {
                {pipeline: primitives.TriggerRepeat},
                {pipeline: primitives.TriggerAfterProcessingTime},
                {pipeline: primitives.TriggerAfterProcessingTimeNotTriggered},
+               {pipeline: primitives.TriggerAfterSynchronizedProcessingTime},
        }
 
        for _, test := range tests {

Reply via email to