This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 6509e514033 [#33513][prism]Handle Time sorted requirement and drop 
late data. (#33515)
6509e514033 is described below

commit 6509e5140339b75105d150d09a19ecfb09255ff3
Author: Robert Burke <[email protected]>
AuthorDate: Tue Jan 7 13:01:56 2025 -0800

    [#33513][prism]Handle Time sorted requirement and drop late data. (#33515)
---
 runners/prism/java/build.gradle                          | 16 +++++++++-------
 .../beam/runners/prism/internal/engine/elementmanager.go | 12 ++++++++++++
 sdks/go/pkg/beam/runners/prism/internal/handlepardo.go   |  7 +++++--
 .../pkg/beam/runners/prism/internal/jobservices/job.go   |  1 +
 .../java/org/apache/beam/sdk/transforms/ParDoTest.java   |  8 ++++++--
 5 files changed, 33 insertions(+), 11 deletions(-)

diff --git a/runners/prism/java/build.gradle b/runners/prism/java/build.gradle
index a48973f6567..791952c625f 100644
--- a/runners/prism/java/build.gradle
+++ b/runners/prism/java/build.gradle
@@ -98,6 +98,7 @@ def sickbayTests = [
     'org.apache.beam.sdk.transforms.ViewTest.testTriggeredLatestSingleton',
     // Requires Allowed Lateness, among others.
     
'org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testEventTimeTimerSetWithinAllowedLateness',
+    
'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testRequiresTimeSortedInputWithLateDataAndAllowedLateness',
     'org.apache.beam.sdk.testing.TestStreamTest.testFirstElementLate',
     'org.apache.beam.sdk.testing.TestStreamTest.testDiscardingMode',
     'org.apache.beam.sdk.testing.TestStreamTest.testEarlyPanesOfWindow',
@@ -160,6 +161,14 @@ def sickbayTests = [
     // TODO(https://github.com/apache/beam/issues/31231)
     
'org.apache.beam.sdk.transforms.RedistributeTest.testRedistributePreservesMetadata',
 
+
+    // These tests fail once Late Data was being precisely dropped.
+    // They set a single element to be late data, and expect it (correctly) to 
be preserved.
+    // Since presently, these are treated as No-ops, the fix is to disable the
+    // dropping behavior when a stage's input is a Reshuffle/Redistribute 
transform.
+    
'org.apache.beam.sdk.transforms.ReshuffleTest.testReshuffleWithTimestampsStreaming',
+    
'org.apache.beam.sdk.transforms.RedistributeTest.testRedistributeWithTimestampsStreaming',
+
     // Prism isn't handling Java's side input views properly.
     // https://github.com/apache/beam/issues/32932
     // java.lang.IllegalArgumentException: PCollection with more than one 
element accessed as a singleton view.
@@ -177,13 +186,6 @@ def sickbayTests = [
     // java.lang.IllegalStateException: java.io.EOFException
     'org.apache.beam.sdk.transforms.ViewTest.testSideInputWithNestedIterables',
 
-    // Requires Time Sorted Input
-    
'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testRequiresTimeSortedInput',
-    
'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testRequiresTimeSortedInputWithTestStream',
-    
'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testRequiresTimeSortedInputWithLateDataAndAllowedLateness',
-    
'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testTwoRequiresTimeSortedInputWithLateData',
-    
'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testRequiresTimeSortedInputWithLateData',
-
     // Missing output due to processing time timer skew.
     
'org.apache.beam.sdk.transforms.ParDoTest$TimestampTests.testProcessElementSkew',
 
diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go 
b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
index 3cfcf9ef8c0..bb3c8ceceeb 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
@@ -1179,6 +1179,18 @@ func makeStageState(ID string, inputIDs, outputIDs 
[]string, sides []LinkID) *st
 func (ss *stageState) AddPending(newPending []element) int {
        ss.mu.Lock()
        defer ss.mu.Unlock()
+       // TODO(#https://github.com/apache/beam/issues/31438):
+       // Adjust with AllowedLateness
+       // Data that arrives after the *output* watermark is late.
+       threshold := ss.output
+       origPending := make([]element, 0, ss.pending.Len())
+       for _, e := range newPending {
+               if e.window.MaxTimestamp() < threshold {
+                       continue
+               }
+               origPending = append(origPending, e)
+       }
+       newPending = origPending
        if ss.stateful {
                if ss.pendingByKeys == nil {
                        ss.pendingByKeys = map[string]*dataAndTimers{}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/handlepardo.go 
b/sdks/go/pkg/beam/runners/prism/internal/handlepardo.go
index 13e9b6f1b79..7ac472251f6 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/handlepardo.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/handlepardo.go
@@ -84,9 +84,12 @@ func (h *pardo) PrepareTransform(tid string, t 
*pipepb.PTransform, comps *pipepb
 
                // At their simplest, we don't need to do anything special at 
pre-processing time, and simply pass through as normal.
 
-               // StatefulDoFns need to be marked as being roots.
+               // ForceRoots cause fusion breaks in the optimized graph.
+               // StatefulDoFns need to be marked as being roots, for correct 
per-key state handling.
+               // Prism already sorts input elements for a stage by EventTime, 
so a fusion break enables the sorted behavior.
                var forcedRoots []string
-               if len(pdo.StateSpecs)+len(pdo.TimerFamilySpecs) > 0 {
+               if len(pdo.GetStateSpecs())+len(pdo.GetTimerFamilySpecs()) > 0 
||
+                       pdo.GetRequiresTimeSortedInput() {
                        forcedRoots = append(forcedRoots, tid)
                }
 
diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go 
b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
index 4be64e5a9c8..f186b11fd1d 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
@@ -47,6 +47,7 @@ var supportedRequirements = map[string]struct{}{
        urns.RequirementStatefulProcessing: {},
        urns.RequirementBundleFinalization: {},
        urns.RequirementOnWindowExpiration: {},
+       urns.RequirementTimeSortedInput:    {},
 }
 
 // TODO, move back to main package, and key off of executor handlers?
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 742547b9b6c..8409133772e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -3764,7 +3764,9 @@ public class ParDoTest implements Serializable {
         if (stamp == 100) {
           // advance watermark when we have 100 remaining elements
           // all the rest are going to be late elements
-          input = input.advanceWatermarkTo(Instant.ofEpochMilli(stamp));
+          input =
+              input.advanceWatermarkTo(
+                  
GlobalWindow.INSTANCE.maxTimestamp().plus(Duration.standardSeconds(1)));
         }
       }
       testTimeSortedInput(
@@ -3796,7 +3798,9 @@ public class ParDoTest implements Serializable {
         if (stamp == 100) {
           // advance watermark when we have 100 remaining elements
           // all the rest are going to be late elements
-          input = input.advanceWatermarkTo(Instant.ofEpochMilli(stamp));
+          input =
+              input.advanceWatermarkTo(
+                  
GlobalWindow.INSTANCE.maxTimestamp().plus(Duration.standardSeconds(1)));
         }
       }
       // apply the sorted function for the first time

Reply via email to