This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 6509e514033 [#33513][prism]Handle Time sorted requirement and drop
late data. (#33515)
6509e514033 is described below
commit 6509e5140339b75105d150d09a19ecfb09255ff3
Author: Robert Burke <[email protected]>
AuthorDate: Tue Jan 7 13:01:56 2025 -0800
[#33513][prism]Handle Time sorted requirement and drop late data. (#33515)
---
runners/prism/java/build.gradle | 16 +++++++++-------
.../beam/runners/prism/internal/engine/elementmanager.go | 12 ++++++++++++
sdks/go/pkg/beam/runners/prism/internal/handlepardo.go | 7 +++++--
.../pkg/beam/runners/prism/internal/jobservices/job.go | 1 +
.../java/org/apache/beam/sdk/transforms/ParDoTest.java | 8 ++++++--
5 files changed, 33 insertions(+), 11 deletions(-)
diff --git a/runners/prism/java/build.gradle b/runners/prism/java/build.gradle
index a48973f6567..791952c625f 100644
--- a/runners/prism/java/build.gradle
+++ b/runners/prism/java/build.gradle
@@ -98,6 +98,7 @@ def sickbayTests = [
'org.apache.beam.sdk.transforms.ViewTest.testTriggeredLatestSingleton',
// Requires Allowed Lateness, among others.
'org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testEventTimeTimerSetWithinAllowedLateness',
+
'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testRequiresTimeSortedInputWithLateDataAndAllowedLateness',
'org.apache.beam.sdk.testing.TestStreamTest.testFirstElementLate',
'org.apache.beam.sdk.testing.TestStreamTest.testDiscardingMode',
'org.apache.beam.sdk.testing.TestStreamTest.testEarlyPanesOfWindow',
@@ -160,6 +161,14 @@ def sickbayTests = [
// TODO(https://github.com/apache/beam/issues/31231)
'org.apache.beam.sdk.transforms.RedistributeTest.testRedistributePreservesMetadata',
+
+ // These tests fail once Late Data was being precisely dropped.
+ // They set a single element to be late data, and expect it (correctly) to
be preserved.
+ // Since presently, these are treated as No-ops, the fix is to disable the
+ // dropping behavior when a stage's input is a Reshuffle/Redistribute
transform.
+
'org.apache.beam.sdk.transforms.ReshuffleTest.testReshuffleWithTimestampsStreaming',
+
'org.apache.beam.sdk.transforms.RedistributeTest.testRedistributeWithTimestampsStreaming',
+
// Prism isn't handling Java's side input views properly.
// https://github.com/apache/beam/issues/32932
// java.lang.IllegalArgumentException: PCollection with more than one
element accessed as a singleton view.
@@ -177,13 +186,6 @@ def sickbayTests = [
// java.lang.IllegalStateException: java.io.EOFException
'org.apache.beam.sdk.transforms.ViewTest.testSideInputWithNestedIterables',
- // Requires Time Sorted Input
-
'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testRequiresTimeSortedInput',
-
'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testRequiresTimeSortedInputWithTestStream',
-
'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testRequiresTimeSortedInputWithLateDataAndAllowedLateness',
-
'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testTwoRequiresTimeSortedInputWithLateData',
-
'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testRequiresTimeSortedInputWithLateData',
-
// Missing output due to processing time timer skew.
'org.apache.beam.sdk.transforms.ParDoTest$TimestampTests.testProcessElementSkew',
diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
index 3cfcf9ef8c0..bb3c8ceceeb 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
@@ -1179,6 +1179,18 @@ func makeStageState(ID string, inputIDs, outputIDs
[]string, sides []LinkID) *st
func (ss *stageState) AddPending(newPending []element) int {
ss.mu.Lock()
defer ss.mu.Unlock()
+ // TODO(#https://github.com/apache/beam/issues/31438):
+ // Adjust with AllowedLateness
+ // Data that arrives after the *output* watermark is late.
+ threshold := ss.output
+ origPending := make([]element, 0, ss.pending.Len())
+ for _, e := range newPending {
+ if e.window.MaxTimestamp() < threshold {
+ continue
+ }
+ origPending = append(origPending, e)
+ }
+ newPending = origPending
if ss.stateful {
if ss.pendingByKeys == nil {
ss.pendingByKeys = map[string]*dataAndTimers{}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/handlepardo.go
b/sdks/go/pkg/beam/runners/prism/internal/handlepardo.go
index 13e9b6f1b79..7ac472251f6 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/handlepardo.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/handlepardo.go
@@ -84,9 +84,12 @@ func (h *pardo) PrepareTransform(tid string, t
*pipepb.PTransform, comps *pipepb
// At their simplest, we don't need to do anything special at
pre-processing time, and simply pass through as normal.
- // StatefulDoFns need to be marked as being roots.
+ // ForceRoots cause fusion breaks in the optimized graph.
+ // StatefulDoFns need to be marked as being roots, for correct
per-key state handling.
+ // Prism already sorts input elements for a stage by EventTime,
so a fusion break enables the sorted behavior.
var forcedRoots []string
- if len(pdo.StateSpecs)+len(pdo.TimerFamilySpecs) > 0 {
+ if len(pdo.GetStateSpecs())+len(pdo.GetTimerFamilySpecs()) > 0
||
+ pdo.GetRequiresTimeSortedInput() {
forcedRoots = append(forcedRoots, tid)
}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
index 4be64e5a9c8..f186b11fd1d 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
@@ -47,6 +47,7 @@ var supportedRequirements = map[string]struct{}{
urns.RequirementStatefulProcessing: {},
urns.RequirementBundleFinalization: {},
urns.RequirementOnWindowExpiration: {},
+ urns.RequirementTimeSortedInput: {},
}
// TODO, move back to main package, and key off of executor handlers?
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 742547b9b6c..8409133772e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -3764,7 +3764,9 @@ public class ParDoTest implements Serializable {
if (stamp == 100) {
// advance watermark when we have 100 remaining elements
// all the rest are going to be late elements
- input = input.advanceWatermarkTo(Instant.ofEpochMilli(stamp));
+ input =
+ input.advanceWatermarkTo(
+
GlobalWindow.INSTANCE.maxTimestamp().plus(Duration.standardSeconds(1)));
}
}
testTimeSortedInput(
@@ -3796,7 +3798,9 @@ public class ParDoTest implements Serializable {
if (stamp == 100) {
// advance watermark when we have 100 remaining elements
// all the rest are going to be late elements
- input = input.advanceWatermarkTo(Instant.ofEpochMilli(stamp));
+ input =
+ input.advanceWatermarkTo(
+
GlobalWindow.INSTANCE.maxTimestamp().plus(Duration.standardSeconds(1)));
}
}
// apply the sorted function for the first time