This is an automated email from the ASF dual-hosted git repository.
shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new da57e58a888 [Prism] Fix an issue on pane info being overwritten by
different bundles. (#36188)
da57e58a888 is described below
commit da57e58a8887f136ff39bf6080974e96113b1819
Author: Shunping Huang <[email protected]>
AuthorDate: Fri Sep 19 15:15:01 2025 -0400
[Prism] Fix an issue on pane info being overwritten by different bundles.
(#36188)
---
.../prism/internal/engine/elementmanager.go | 103 ++++++++++++++++-----
.../portability/fn_api_runner/fn_runner_test.py | 18 ++++
sdks/python/apache_beam/testing/util.py | 29 ++++++
3 files changed, 125 insertions(+), 25 deletions(-)
diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
index d489bcc18c2..1e76d748809 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
@@ -851,7 +851,7 @@ func (em *ElementManager) PersistBundle(rb RunBundle,
col2Coders map[string]PCol
element{
window: w,
timestamp: et,
- pane:
stage.kind.updatePane(stage, pn, w, keyBytes),
+ pane:
stage.kind.getPaneOrDefault(stage, pn, w, keyBytes, rb.BundleID),
elmBytes: elmBytes,
keyBytes: keyBytes,
sequence: seq,
@@ -905,6 +905,7 @@ func (em *ElementManager) PersistBundle(rb RunBundle,
col2Coders map[string]PCol
delete(stage.inprogressKeys, k)
}
delete(stage.inprogressKeysByBundle, rb.BundleID)
+ delete(stage.bundlePanes, rb.BundleID)
// Adjust holds as needed.
for h, c := range newHolds {
@@ -1170,12 +1171,13 @@ type stageState struct {
sideInputs map[LinkID]map[typex.Window][][]byte // side input data for
this stage, from {tid, inputID} -> window
// Fields for stateful stages which need to be per key.
- pendingByKeys map[string]*dataAndTimers
// pending input elements by Key, if stateful.
- inprogressKeys set[string]
// all keys that are assigned to bundles.
- inprogressKeysByBundle map[string]set[string]
// bundle to key assignments.
- state map[LinkID]map[typex.Window]map[string]StateData
// state data for this stage, from {tid, stateID} -> window -> userKey
- stateTypeLen map[LinkID]func([]byte) int
// map from state to a function that will produce the total length of a single
value in bytes.
- bundlesToInject []RunBundle
// bundlesToInject are triggered bundles that will be injected by the watermark
loop to avoid premature pipeline termination.
+ pendingByKeys map[string]*dataAndTimers
// pending input elements by Key, if stateful.
+ inprogressKeys set[string]
// all keys that are assigned to bundles.
+ inprogressKeysByBundle map[string]set[string]
// bundle to key assignments.
+ state map[LinkID]map[typex.Window]map[string]StateData
// state data for this stage, from {tid, stateID} -> window -> userKey
+ stateTypeLen map[LinkID]func([]byte) int
// map from state to a function that will produce the total length of a
single value in bytes.
+ bundlesToInject []RunBundle
// bundlesToInject are triggered bundles that will be injected by the
watermark loop to avoid premature pipeline termination.
+ bundlePanes
map[string]map[typex.Window]map[string]typex.PaneInfo // PaneInfo snapshot for
bundles, from BundleID -> window -> userKey
// Accounting for handling watermark holds for timers.
// We track the count of timers with the same hold, and clear it from
@@ -1187,6 +1189,13 @@ type stageState struct {
processingTimeTimers *timerHandler
}
+// bundlePane holds pane info for a bundle.
+type bundlePane struct {
+ win typex.Window
+ key string
+ pane typex.PaneInfo
+}
+
// stageKind handles behavioral differences between ordinary, stateful, and
aggregation stage kinds.
//
// kinds should be stateless, and stageState retains all state for the stage,
@@ -1195,10 +1204,11 @@ type stageKind interface {
// addPending handles adding new pending elements to the stage
appropriate for the kind.
addPending(ss *stageState, em *ElementManager, newPending []element) int
// buildEventTimeBundle handles building bundles for the stage per it's
kind.
- buildEventTimeBundle(ss *stageState, watermark mtime.Time) (toProcess
elementHeap, minTs mtime.Time, newKeys set[string], holdsInBundle
map[mtime.Time]int, schedulable bool, pendingAdjustment int)
+ buildEventTimeBundle(ss *stageState, watermark mtime.Time) (toProcess
elementHeap, minTs mtime.Time, newKeys set[string],
+ holdsInBundle map[mtime.Time]int, panesInBundle []bundlePane,
schedulable bool, pendingAdjustment int)
- // updatePane based on the stage state.
- updatePane(ss *stageState, pane typex.PaneInfo, w typex.Window,
keyBytes []byte) typex.PaneInfo
+ // getPaneOrDefault based on the stage state, element metadata, and
bundle id.
+ getPaneOrDefault(ss *stageState, defaultPane typex.PaneInfo, w
typex.Window, keyBytes []byte, bundID string) typex.PaneInfo
}
// ordinaryStageKind represents stages that have no special behavior
associated with them.
@@ -1207,8 +1217,8 @@ type ordinaryStageKind struct{}
func (*ordinaryStageKind) String() string { return "OrdinaryStage" }
-func (*ordinaryStageKind) updatePane(ss *stageState, pane typex.PaneInfo, w
typex.Window, keyBytes []byte) typex.PaneInfo {
- return pane
+func (*ordinaryStageKind) getPaneOrDefault(ss *stageState, defaultPane
typex.PaneInfo, w typex.Window, keyBytes []byte, bundID string) typex.PaneInfo {
+ return defaultPane
}
// statefulStageKind require keyed elements, and handles stages with stateful
transforms, with state and timers.
@@ -1216,8 +1226,8 @@ type statefulStageKind struct{}
func (*statefulStageKind) String() string { return "StatefulStage" }
-func (*statefulStageKind) updatePane(ss *stageState, pane typex.PaneInfo, w
typex.Window, keyBytes []byte) typex.PaneInfo {
- return pane
+func (*statefulStageKind) getPaneOrDefault(ss *stageState, defaultPane
typex.PaneInfo, w typex.Window, keyBytes []byte, bundID string) typex.PaneInfo {
+ return defaultPane
}
// aggregateStageKind handles stages that perform aggregations over their
primary inputs.
@@ -1226,9 +1236,12 @@ type aggregateStageKind struct{}
func (*aggregateStageKind) String() string { return "AggregateStage" }
-func (*aggregateStageKind) updatePane(ss *stageState, pane typex.PaneInfo, w
typex.Window, keyBytes []byte) typex.PaneInfo {
+func (*aggregateStageKind) getPaneOrDefault(ss *stageState, defaultPane
typex.PaneInfo, w typex.Window, keyBytes []byte, bundID string) typex.PaneInfo {
ss.mu.Lock()
defer ss.mu.Unlock()
+ if pane, ok := ss.bundlePanes[bundID][w][string(keyBytes)]; ok {
+ return pane
+ }
return ss.state[LinkID{}][w][string(keyBytes)].Pane
}
@@ -1459,6 +1472,24 @@ func computeNextWatermarkPane(pane typex.PaneInfo)
typex.PaneInfo {
return pane
}
+func (ss *stageState) savePanes(bundID string, panesInBundle []bundlePane) {
+ if len(panesInBundle) == 0 {
+ return
+ }
+ if ss.bundlePanes == nil {
+ ss.bundlePanes =
make(map[string]map[typex.Window]map[string]typex.PaneInfo)
+ }
+ if ss.bundlePanes[bundID] == nil {
+ ss.bundlePanes[bundID] =
make(map[typex.Window]map[string]typex.PaneInfo)
+ }
+ for _, p := range panesInBundle {
+ if ss.bundlePanes[bundID][p.win] == nil {
+ ss.bundlePanes[bundID][p.win] =
make(map[string]typex.PaneInfo)
+ }
+ ss.bundlePanes[bundID][p.win][p.key] = p.pane
+ }
+}
+
// buildTriggeredBundle must be called with the stage.mu lock held.
// When in discarding mode, returns 0.
// When in accumulating mode, returns the number of fired elements to maintain
a correct pending count.
@@ -1502,13 +1533,23 @@ func (ss *stageState) buildTriggeredBundle(em
*ElementManager, key []byte, win t
if ss.inprogressKeys == nil {
ss.inprogressKeys = set[string]{}
}
+ panesInBundle := []bundlePane{
+ {
+ win: win,
+ key: string(key),
+ pane: ss.state[LinkID{}][win][string(key)].Pane,
+ },
+ }
+
ss.makeInProgressBundle(
func() string { return rb.BundleID },
toProcess,
ss.input,
singleSet(string(key)),
nil,
+ panesInBundle,
)
+
ss.bundlesToInject = append(ss.bundlesToInject, rb)
// Bundle is marked in progress here to prevent a race condition.
em.refreshCond.L.Lock()
@@ -1612,26 +1653,27 @@ func (ss *stageState) startEventTimeBundle(watermark
mtime.Time, genBundID func(
}()
ss.mu.Lock()
defer ss.mu.Unlock()
- toProcess, minTs, newKeys, holdsInBundle, stillSchedulable,
accumulatingPendingAdjustment := ss.kind.buildEventTimeBundle(ss, watermark)
+ toProcess, minTs, newKeys, holdsInBundle, panesInBundle,
stillSchedulable, accumulatingPendingAdjustment :=
ss.kind.buildEventTimeBundle(ss, watermark)
if len(toProcess) == 0 {
// If we have nothing, there's nothing to progress.
return "", false, stillSchedulable,
accumulatingPendingAdjustment
}
- bundID := ss.makeInProgressBundle(genBundID, toProcess, minTs, newKeys,
holdsInBundle)
+ bundID := ss.makeInProgressBundle(genBundID, toProcess, minTs, newKeys,
holdsInBundle, panesInBundle)
+
return bundID, true, stillSchedulable, accumulatingPendingAdjustment
}
// buildEventTimeBundle for ordinary stages processes all pending elements.
-func (*ordinaryStageKind) buildEventTimeBundle(ss *stageState, watermark
mtime.Time) (toProcess elementHeap, minTs mtime.Time, newKeys set[string],
holdsInBundle map[mtime.Time]int, schedulable bool, pendingAdjustment int) {
+func (*ordinaryStageKind) buildEventTimeBundle(ss *stageState, watermark
mtime.Time) (toProcess elementHeap, minTs mtime.Time, newKeys set[string],
holdsInBundle map[mtime.Time]int, _ []bundlePane, schedulable bool,
pendingAdjustment int) {
toProcess = ss.pending
ss.pending = nil
- return toProcess, mtime.MaxTimestamp, nil, nil, true, 0
+ return toProcess, mtime.MaxTimestamp, nil, nil, nil, true, 0
}
// buildEventTimeBundle for stateful stages, processes all elements that are
before the input watermark time.
-func (*statefulStageKind) buildEventTimeBundle(ss *stageState, watermark
mtime.Time) (toProcess elementHeap, _ mtime.Time, _ set[string], _
map[mtime.Time]int, schedulable bool, pendingAdjustment int) {
+func (*statefulStageKind) buildEventTimeBundle(ss *stageState, watermark
mtime.Time) (toProcess elementHeap, _ mtime.Time, _ set[string], _
map[mtime.Time]int, _ []bundlePane, schedulable bool, pendingAdjustment int) {
minTs := mtime.MaxTimestamp
// TODO: Allow configurable limit of keys per bundle, and elements per
key to improve parallelism.
// TODO: when we do, we need to ensure that the stage remains
schedualable for bundle execution, for remaining pending elements and keys.
@@ -1715,11 +1757,11 @@ keysPerBundle:
// If we're out of data, and timers were not cleared then the watermark
is accurate.
stillSchedulable := !(len(ss.pendingByKeys) == 0 && !timerCleared)
- return toProcess, minTs, newKeys, holdsInBundle, stillSchedulable, 0
+ return toProcess, minTs, newKeys, holdsInBundle, nil, stillSchedulable, 0
}
// buildEventTimeBundle for aggregation stages, processes all elements that
are within the watermark for completed windows.
-func (*aggregateStageKind) buildEventTimeBundle(ss *stageState, watermark
mtime.Time) (toProcess elementHeap, _ mtime.Time, _ set[string], _
map[mtime.Time]int, schedulable bool, pendingAdjustment int) {
+func (*aggregateStageKind) buildEventTimeBundle(ss *stageState, watermark
mtime.Time) (toProcess elementHeap, _ mtime.Time, _ set[string], _
map[mtime.Time]int, panesInBundle []bundlePane, schedulable bool,
pendingAdjustment int) {
minTs := mtime.MaxTimestamp
// TODO: Allow configurable limit of keys per bundle, and elements per
key to improve parallelism.
// TODO: when we do, we need to ensure that the stage remains
schedualable for bundle execution, for remaining pending elements and keys.
@@ -1814,6 +1856,13 @@ keysPerBundle:
}
ss.state[LinkID{}][elm.window][string(elm.keyBytes)] =
state
+ // Save latest PaneInfo for this window + key pair. It
will be used in PersistBundle.
+ panesInBundle = append(panesInBundle, bundlePane{
+ win: elm.window,
+ key: string(elm.keyBytes),
+ pane:
ss.state[LinkID{}][elm.window][string(elm.keyBytes)].Pane,
+ })
+
// The pane is already correct for this key + window +
firing.
if ss.strat.Accumulating && !state.Pane.IsLast {
// If this isn't the last pane, then we must
add the element back to the pending store for subsequent firings.
@@ -1835,7 +1884,7 @@ keysPerBundle:
// If this is an aggregate, we need a watermark change in order to
reschedule
stillSchedulable := false
- return toProcess, minTs, newKeys, holdsInBundle, stillSchedulable,
accumulatingPendingAdjustment
+ return toProcess, minTs, newKeys, holdsInBundle, panesInBundle,
stillSchedulable, accumulatingPendingAdjustment
}
func (ss *stageState) startProcessingTimeBundle(em *ElementManager, emNow
mtime.Time, genBundID func() string) (string, bool, bool) {
@@ -1910,14 +1959,14 @@ func (ss *stageState) startProcessingTimeBundle(em
*ElementManager, emNow mtime.
// If we have nothing
return "", false, stillSchedulable
}
- bundID := ss.makeInProgressBundle(genBundID, toProcess, minTs, newKeys,
holdsInBundle)
+ bundID := ss.makeInProgressBundle(genBundID, toProcess, minTs, newKeys,
holdsInBundle, nil)
return bundID, true, stillSchedulable
}
// makeInProgressBundle is common code to store a set of elements as a bundle
in progress.
//
// Callers must hold the stage lock.
-func (ss *stageState) makeInProgressBundle(genBundID func() string, toProcess
[]element, minTs mtime.Time, newKeys set[string], holdsInBundle
map[mtime.Time]int) string {
+func (ss *stageState) makeInProgressBundle(genBundID func() string, toProcess
[]element, minTs mtime.Time, newKeys set[string], holdsInBundle
map[mtime.Time]int, panesInBundle []bundlePane) string {
// Catch the ordinary case for the minimum timestamp.
if toProcess[0].timestamp < minTs {
minTs = toProcess[0].timestamp
@@ -1941,6 +1990,9 @@ func (ss *stageState) makeInProgressBundle(genBundID
func() string, toProcess []
ss.inprogressKeysByBundle[bundID] = newKeys
ss.inprogressKeys.merge(newKeys)
ss.inprogressHoldsByBundle[bundID] = holdsInBundle
+
+ // Save latest PaneInfo for PersistBundle
+ ss.savePanes(bundID, panesInBundle)
return bundID
}
@@ -2156,6 +2208,7 @@ func (ss *stageState)
createOnWindowExpirationBundles(newOut mtime.Time, em *Ele
wm,
usedKeys,
map[mtime.Time]int{wm: 1},
+ nil,
)
ss.expiryWindowsByBundles[rb.BundleID] = win
diff --git
a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
index 3442b574681..0197733e911 100644
---
a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
+++
b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
@@ -69,8 +69,10 @@ from apache_beam.testing.synthetic_pipeline import
SyntheticSDFAsSource
from apache_beam.testing.test_stream import TestStream
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
+from apache_beam.testing.util import has_at_least_one
from apache_beam.tools import utils
from apache_beam.transforms import environments
+from apache_beam.transforms import trigger
from apache_beam.transforms import userstate
from apache_beam.transforms import window
from apache_beam.transforms.periodicsequence import PeriodicImpulse
@@ -1594,6 +1596,22 @@ class FnApiRunnerTest(unittest.TestCase):
| beam.GroupByKey())
assert_that(res, equal_to([]))
+ def test_first_pane(self):
+ with self.create_pipeline() as p:
+ res = (
+ p | beam.Create([1, 2])
+ | beam.WithKeys(0)
+ | beam.WindowInto(
+ window.GlobalWindows(),
+ trigger=trigger.Repeatedly(trigger.AfterCount(1)),
+ accumulation_mode=trigger.AccumulationMode.ACCUMULATING,
+ allowed_lateness=0,
+ )
+ | beam.GroupByKey()
+ | beam.Values())
+ has_at_least_one(res, lambda e, t, w, p: p.is_first)
+ has_at_least_one(res, lambda e, t, w, p: p.index == 0)
+
# These tests are kept in a separate group so that they are
# not ran in the FnApiRunnerTestWithBundleRepeat which repeats
diff --git a/sdks/python/apache_beam/testing/util.py
b/sdks/python/apache_beam/testing/util.py
index c9745abf949..5a7c36fa445 100644
--- a/sdks/python/apache_beam/testing/util.py
+++ b/sdks/python/apache_beam/testing/util.py
@@ -32,6 +32,7 @@ from apache_beam import pvalue
from apache_beam.transforms import window
from apache_beam.transforms.core import Create
from apache_beam.transforms.core import DoFn
+from apache_beam.transforms.core import Filter
from apache_beam.transforms.core import Map
from apache_beam.transforms.core import ParDo
from apache_beam.transforms.core import WindowInto
@@ -45,6 +46,7 @@ __all__ = [
'assert_that',
'equal_to',
'equal_to_per_window',
+ 'has_at_least_one',
'is_empty',
'is_not_empty',
'matches_all',
@@ -377,6 +379,33 @@ def AssertThat(pcoll, *args, **kwargs):
return assert_that(pcoll, *args, **kwargs)
+def has_at_least_one(input, criterion, label="has_at_least_one"):
+ pipeline = input.pipeline
+ # similar to assert_that, we choose a label if it already exists.
+ if label in pipeline.applied_labels:
+ label_idx = 2
+ while f"{label}_{label_idx}" in pipeline.applied_labels:
+ label_idx += 1
+ label = f"{label}_{label_idx}"
+
+ def _apply_criterion(
+ e=DoFn.ElementParam,
+ t=DoFn.TimestampParam,
+ w=DoFn.WindowParam,
+ p=DoFn.PaneInfoParam):
+ if criterion(e, t, w, p):
+ return e, t, w, p
+
+ def _not_empty(actual):
+ actual = list(actual)
+ if not actual:
+ raise BeamAssertException('Failed assert: nothing matches the criterion')
+
+ result = input | label >> Map(_apply_criterion) | label + "_filter" >>
Filter(
+ lambda e: e is not None)
+ assert_that(result, _not_empty)
+
+
def open_shards(glob_pattern, mode='rt', encoding='utf-8'):
"""Returns a composite file of all shards matching the given glob pattern.