This is an automated email from the ASF dual-hosted git repository.

shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new da57e58a888 [Prism] Fix an issue on pane info being overwritten by 
different bundles. (#36188)
da57e58a888 is described below

commit da57e58a8887f136ff39bf6080974e96113b1819
Author: Shunping Huang <[email protected]>
AuthorDate: Fri Sep 19 15:15:01 2025 -0400

    [Prism] Fix an issue on pane info being overwritten by different bundles. 
(#36188)
---
 .../prism/internal/engine/elementmanager.go        | 103 ++++++++++++++++-----
 .../portability/fn_api_runner/fn_runner_test.py    |  18 ++++
 sdks/python/apache_beam/testing/util.py            |  29 ++++++
 3 files changed, 125 insertions(+), 25 deletions(-)

diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go 
b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
index d489bcc18c2..1e76d748809 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
@@ -851,7 +851,7 @@ func (em *ElementManager) PersistBundle(rb RunBundle, 
col2Coders map[string]PCol
                                                element{
                                                        window:    w,
                                                        timestamp: et,
-                                                       pane:      
stage.kind.updatePane(stage, pn, w, keyBytes),
+                                                       pane:      
stage.kind.getPaneOrDefault(stage, pn, w, keyBytes, rb.BundleID),
                                                        elmBytes:  elmBytes,
                                                        keyBytes:  keyBytes,
                                                        sequence:  seq,
@@ -905,6 +905,7 @@ func (em *ElementManager) PersistBundle(rb RunBundle, 
col2Coders map[string]PCol
                        delete(stage.inprogressKeys, k)
                }
                delete(stage.inprogressKeysByBundle, rb.BundleID)
+               delete(stage.bundlePanes, rb.BundleID)
 
                // Adjust holds as needed.
                for h, c := range newHolds {
@@ -1170,12 +1171,13 @@ type stageState struct {
        sideInputs map[LinkID]map[typex.Window][][]byte // side input data for 
this stage, from {tid, inputID} -> window
 
        // Fields for stateful stages which need to be per key.
-       pendingByKeys          map[string]*dataAndTimers                        
// pending input elements by Key, if stateful.
-       inprogressKeys         set[string]                                      
// all keys that are assigned to bundles.
-       inprogressKeysByBundle map[string]set[string]                           
// bundle to key assignments.
-       state                  map[LinkID]map[typex.Window]map[string]StateData 
// state data for this stage, from {tid, stateID} -> window -> userKey
-       stateTypeLen           map[LinkID]func([]byte) int                      
// map from state to a function that will produce the total length of a single 
value in bytes.
-       bundlesToInject        []RunBundle                                      
// bundlesToInject are triggered bundles that will be injected by the watermark 
loop to avoid premature pipeline termination.
+       pendingByKeys          map[string]*dataAndTimers                        
     // pending input elements by Key, if stateful.
+       inprogressKeys         set[string]                                      
     // all keys that are assigned to bundles.
+       inprogressKeysByBundle map[string]set[string]                           
     // bundle to key assignments.
+       state                  map[LinkID]map[typex.Window]map[string]StateData 
     // state data for this stage, from {tid, stateID} -> window -> userKey
+       stateTypeLen           map[LinkID]func([]byte) int                      
     // map from state to a function that will produce the total length of a 
single value in bytes.
+       bundlesToInject        []RunBundle                                      
     // bundlesToInject are triggered bundles that will be injected by the 
watermark loop to avoid premature pipeline termination.
+       bundlePanes            
map[string]map[typex.Window]map[string]typex.PaneInfo // PaneInfo snapshot for 
bundles, from BundleID -> window -> userKey
 
        // Accounting for handling watermark holds for timers.
        // We track the count of timers with the same hold, and clear it from
@@ -1187,6 +1189,13 @@ type stageState struct {
        processingTimeTimers *timerHandler
 }
 
+// bundlePane holds pane info for a bundle.
+type bundlePane struct {
+       win  typex.Window
+       key  string
+       pane typex.PaneInfo
+}
+
 // stageKind handles behavioral differences between ordinary, stateful, and 
aggregation stage kinds.
 //
 // kinds should be stateless, and stageState retains all state for the stage,
@@ -1195,10 +1204,11 @@ type stageKind interface {
        // addPending handles adding new pending elements to the stage 
appropriate for the kind.
        addPending(ss *stageState, em *ElementManager, newPending []element) int
        // buildEventTimeBundle handles building bundles for the stage per it's 
kind.
-       buildEventTimeBundle(ss *stageState, watermark mtime.Time) (toProcess 
elementHeap, minTs mtime.Time, newKeys set[string], holdsInBundle 
map[mtime.Time]int, schedulable bool, pendingAdjustment int)
+       buildEventTimeBundle(ss *stageState, watermark mtime.Time) (toProcess 
elementHeap, minTs mtime.Time, newKeys set[string],
+               holdsInBundle map[mtime.Time]int, panesInBundle []bundlePane, 
schedulable bool, pendingAdjustment int)
 
-       // updatePane based on the stage state.
-       updatePane(ss *stageState, pane typex.PaneInfo, w typex.Window, 
keyBytes []byte) typex.PaneInfo
+       // getPaneOrDefault based on the stage state, element metadata, and 
bundle id.
+       getPaneOrDefault(ss *stageState, defaultPane typex.PaneInfo, w 
typex.Window, keyBytes []byte, bundID string) typex.PaneInfo
 }
 
 // ordinaryStageKind represents stages that have no special behavior 
associated with them.
@@ -1207,8 +1217,8 @@ type ordinaryStageKind struct{}
 
 func (*ordinaryStageKind) String() string { return "OrdinaryStage" }
 
-func (*ordinaryStageKind) updatePane(ss *stageState, pane typex.PaneInfo, w 
typex.Window, keyBytes []byte) typex.PaneInfo {
-       return pane
+func (*ordinaryStageKind) getPaneOrDefault(ss *stageState, defaultPane 
typex.PaneInfo, w typex.Window, keyBytes []byte, bundID string) typex.PaneInfo {
+       return defaultPane
 }
 
 // statefulStageKind require keyed elements, and handles stages with stateful 
transforms, with state and timers.
@@ -1216,8 +1226,8 @@ type statefulStageKind struct{}
 
 func (*statefulStageKind) String() string { return "StatefulStage" }
 
-func (*statefulStageKind) updatePane(ss *stageState, pane typex.PaneInfo, w 
typex.Window, keyBytes []byte) typex.PaneInfo {
-       return pane
+func (*statefulStageKind) getPaneOrDefault(ss *stageState, defaultPane 
typex.PaneInfo, w typex.Window, keyBytes []byte, bundID string) typex.PaneInfo {
+       return defaultPane
 }
 
 // aggregateStageKind handles stages that perform aggregations over their 
primary inputs.
@@ -1226,9 +1236,12 @@ type aggregateStageKind struct{}
 
 func (*aggregateStageKind) String() string { return "AggregateStage" }
 
-func (*aggregateStageKind) updatePane(ss *stageState, pane typex.PaneInfo, w 
typex.Window, keyBytes []byte) typex.PaneInfo {
+func (*aggregateStageKind) getPaneOrDefault(ss *stageState, defaultPane 
typex.PaneInfo, w typex.Window, keyBytes []byte, bundID string) typex.PaneInfo {
        ss.mu.Lock()
        defer ss.mu.Unlock()
+       if pane, ok := ss.bundlePanes[bundID][w][string(keyBytes)]; ok {
+               return pane
+       }
        return ss.state[LinkID{}][w][string(keyBytes)].Pane
 }
 
@@ -1459,6 +1472,24 @@ func computeNextWatermarkPane(pane typex.PaneInfo) 
typex.PaneInfo {
        return pane
 }
 
+func (ss *stageState) savePanes(bundID string, panesInBundle []bundlePane) {
+       if len(panesInBundle) == 0 {
+               return
+       }
+       if ss.bundlePanes == nil {
+               ss.bundlePanes = 
make(map[string]map[typex.Window]map[string]typex.PaneInfo)
+       }
+       if ss.bundlePanes[bundID] == nil {
+               ss.bundlePanes[bundID] = 
make(map[typex.Window]map[string]typex.PaneInfo)
+       }
+       for _, p := range panesInBundle {
+               if ss.bundlePanes[bundID][p.win] == nil {
+                       ss.bundlePanes[bundID][p.win] = 
make(map[string]typex.PaneInfo)
+               }
+               ss.bundlePanes[bundID][p.win][p.key] = p.pane
+       }
+}
+
 // buildTriggeredBundle must be called with the stage.mu lock held.
 // When in discarding mode, returns 0.
 // When in accumulating mode, returns the number of fired elements to maintain 
a correct pending count.
@@ -1502,13 +1533,23 @@ func (ss *stageState) buildTriggeredBundle(em 
*ElementManager, key []byte, win t
        if ss.inprogressKeys == nil {
                ss.inprogressKeys = set[string]{}
        }
+       panesInBundle := []bundlePane{
+               {
+                       win:  win,
+                       key:  string(key),
+                       pane: ss.state[LinkID{}][win][string(key)].Pane,
+               },
+       }
+
        ss.makeInProgressBundle(
                func() string { return rb.BundleID },
                toProcess,
                ss.input,
                singleSet(string(key)),
                nil,
+               panesInBundle,
        )
+
        ss.bundlesToInject = append(ss.bundlesToInject, rb)
        // Bundle is marked in progress here to prevent a race condition.
        em.refreshCond.L.Lock()
@@ -1612,26 +1653,27 @@ func (ss *stageState) startEventTimeBundle(watermark 
mtime.Time, genBundID func(
        }()
        ss.mu.Lock()
        defer ss.mu.Unlock()
-       toProcess, minTs, newKeys, holdsInBundle, stillSchedulable, 
accumulatingPendingAdjustment := ss.kind.buildEventTimeBundle(ss, watermark)
+       toProcess, minTs, newKeys, holdsInBundle, panesInBundle, 
stillSchedulable, accumulatingPendingAdjustment := 
ss.kind.buildEventTimeBundle(ss, watermark)
 
        if len(toProcess) == 0 {
                // If we have nothing, there's nothing to progress.
                return "", false, stillSchedulable, 
accumulatingPendingAdjustment
        }
 
-       bundID := ss.makeInProgressBundle(genBundID, toProcess, minTs, newKeys, 
holdsInBundle)
+       bundID := ss.makeInProgressBundle(genBundID, toProcess, minTs, newKeys, 
holdsInBundle, panesInBundle)
+
        return bundID, true, stillSchedulable, accumulatingPendingAdjustment
 }
 
 // buildEventTimeBundle for ordinary stages processes all pending elements.
-func (*ordinaryStageKind) buildEventTimeBundle(ss *stageState, watermark 
mtime.Time) (toProcess elementHeap, minTs mtime.Time, newKeys set[string], 
holdsInBundle map[mtime.Time]int, schedulable bool, pendingAdjustment int) {
+func (*ordinaryStageKind) buildEventTimeBundle(ss *stageState, watermark 
mtime.Time) (toProcess elementHeap, minTs mtime.Time, newKeys set[string], 
holdsInBundle map[mtime.Time]int, _ []bundlePane, schedulable bool, 
pendingAdjustment int) {
        toProcess = ss.pending
        ss.pending = nil
-       return toProcess, mtime.MaxTimestamp, nil, nil, true, 0
+       return toProcess, mtime.MaxTimestamp, nil, nil, nil, true, 0
 }
 
 // buildEventTimeBundle for stateful stages, processes all elements that are 
before the input watermark time.
-func (*statefulStageKind) buildEventTimeBundle(ss *stageState, watermark 
mtime.Time) (toProcess elementHeap, _ mtime.Time, _ set[string], _ 
map[mtime.Time]int, schedulable bool, pendingAdjustment int) {
+func (*statefulStageKind) buildEventTimeBundle(ss *stageState, watermark 
mtime.Time) (toProcess elementHeap, _ mtime.Time, _ set[string], _ 
map[mtime.Time]int, _ []bundlePane, schedulable bool, pendingAdjustment int) {
        minTs := mtime.MaxTimestamp
        // TODO: Allow configurable limit of keys per bundle, and elements per 
key to improve parallelism.
        // TODO: when we do, we need to ensure that the stage remains 
schedualable for bundle execution, for remaining pending elements and keys.
@@ -1715,11 +1757,11 @@ keysPerBundle:
        // If we're out of data, and timers were not cleared then the watermark 
is accurate.
        stillSchedulable := !(len(ss.pendingByKeys) == 0 && !timerCleared)
 
-       return toProcess, minTs, newKeys, holdsInBundle, stillSchedulable, 0
+       return toProcess, minTs, newKeys, holdsInBundle, nil, stillSchedulable, 0
 }
 
 // buildEventTimeBundle for aggregation stages, processes all elements that 
are within the watermark for completed windows.
-func (*aggregateStageKind) buildEventTimeBundle(ss *stageState, watermark 
mtime.Time) (toProcess elementHeap, _ mtime.Time, _ set[string], _ 
map[mtime.Time]int, schedulable bool, pendingAdjustment int) {
+func (*aggregateStageKind) buildEventTimeBundle(ss *stageState, watermark 
mtime.Time) (toProcess elementHeap, _ mtime.Time, _ set[string], _ 
map[mtime.Time]int, panesInBundle []bundlePane, schedulable bool, 
pendingAdjustment int) {
        minTs := mtime.MaxTimestamp
        // TODO: Allow configurable limit of keys per bundle, and elements per 
key to improve parallelism.
        // TODO: when we do, we need to ensure that the stage remains 
schedualable for bundle execution, for remaining pending elements and keys.
@@ -1814,6 +1856,13 @@ keysPerBundle:
                        }
                        ss.state[LinkID{}][elm.window][string(elm.keyBytes)] = 
state
 
+                       // Save latest PaneInfo for this window + key pair. It 
will be used in PersistBundle.
+                       panesInBundle = append(panesInBundle, bundlePane{
+                               win:  elm.window,
+                               key:  string(elm.keyBytes),
+                               pane: 
ss.state[LinkID{}][elm.window][string(elm.keyBytes)].Pane,
+                       })
+
                        // The pane is already correct for this key + window + 
firing.
                        if ss.strat.Accumulating && !state.Pane.IsLast {
                                // If this isn't the last pane, then we must 
add the element back to the pending store for subsequent firings.
@@ -1835,7 +1884,7 @@ keysPerBundle:
        // If this is an aggregate, we need a watermark change in order to 
reschedule
        stillSchedulable := false
 
-       return toProcess, minTs, newKeys, holdsInBundle, stillSchedulable, 
accumulatingPendingAdjustment
+       return toProcess, minTs, newKeys, holdsInBundle, panesInBundle, 
stillSchedulable, accumulatingPendingAdjustment
 }
 
 func (ss *stageState) startProcessingTimeBundle(em *ElementManager, emNow 
mtime.Time, genBundID func() string) (string, bool, bool) {
@@ -1910,14 +1959,14 @@ func (ss *stageState) startProcessingTimeBundle(em 
*ElementManager, emNow mtime.
                // If we have nothing
                return "", false, stillSchedulable
        }
-       bundID := ss.makeInProgressBundle(genBundID, toProcess, minTs, newKeys, 
holdsInBundle)
+       bundID := ss.makeInProgressBundle(genBundID, toProcess, minTs, newKeys, 
holdsInBundle, nil)
        return bundID, true, stillSchedulable
 }
 
 // makeInProgressBundle is common code to store a set of elements as a bundle 
in progress.
 //
 // Callers must hold the stage lock.
-func (ss *stageState) makeInProgressBundle(genBundID func() string, toProcess 
[]element, minTs mtime.Time, newKeys set[string], holdsInBundle 
map[mtime.Time]int) string {
+func (ss *stageState) makeInProgressBundle(genBundID func() string, toProcess 
[]element, minTs mtime.Time, newKeys set[string], holdsInBundle 
map[mtime.Time]int, panesInBundle []bundlePane) string {
        // Catch the ordinary case for the minimum timestamp.
        if toProcess[0].timestamp < minTs {
                minTs = toProcess[0].timestamp
@@ -1941,6 +1990,9 @@ func (ss *stageState) makeInProgressBundle(genBundID 
func() string, toProcess []
        ss.inprogressKeysByBundle[bundID] = newKeys
        ss.inprogressKeys.merge(newKeys)
        ss.inprogressHoldsByBundle[bundID] = holdsInBundle
+
+       // Save latest PaneInfo for PersistBundle
+       ss.savePanes(bundID, panesInBundle)
        return bundID
 }
 
@@ -2156,6 +2208,7 @@ func (ss *stageState) 
createOnWindowExpirationBundles(newOut mtime.Time, em *Ele
                        wm,
                        usedKeys,
                        map[mtime.Time]int{wm: 1},
+                       nil,
                )
                ss.expiryWindowsByBundles[rb.BundleID] = win
 
diff --git 
a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py 
b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
index 3442b574681..0197733e911 100644
--- 
a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
+++ 
b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
@@ -69,8 +69,10 @@ from apache_beam.testing.synthetic_pipeline import 
SyntheticSDFAsSource
 from apache_beam.testing.test_stream import TestStream
 from apache_beam.testing.util import assert_that
 from apache_beam.testing.util import equal_to
+from apache_beam.testing.util import has_at_least_one
 from apache_beam.tools import utils
 from apache_beam.transforms import environments
+from apache_beam.transforms import trigger
 from apache_beam.transforms import userstate
 from apache_beam.transforms import window
 from apache_beam.transforms.periodicsequence import PeriodicImpulse
@@ -1594,6 +1596,22 @@ class FnApiRunnerTest(unittest.TestCase):
           | beam.GroupByKey())
       assert_that(res, equal_to([]))
 
+  def test_first_pane(self):
+    with self.create_pipeline() as p:
+      res = (
+          p | beam.Create([1, 2])
+          | beam.WithKeys(0)
+          | beam.WindowInto(
+              window.GlobalWindows(),
+              trigger=trigger.Repeatedly(trigger.AfterCount(1)),
+              accumulation_mode=trigger.AccumulationMode.ACCUMULATING,
+              allowed_lateness=0,
+          )
+          | beam.GroupByKey()
+          | beam.Values())
+      has_at_least_one(res, lambda e, t, w, p: p.is_first)
+      has_at_least_one(res, lambda e, t, w, p: p.index == 0)
+
 
 # These tests are kept in a separate group so that they are
 # not ran in the FnApiRunnerTestWithBundleRepeat which repeats
diff --git a/sdks/python/apache_beam/testing/util.py 
b/sdks/python/apache_beam/testing/util.py
index c9745abf949..5a7c36fa445 100644
--- a/sdks/python/apache_beam/testing/util.py
+++ b/sdks/python/apache_beam/testing/util.py
@@ -32,6 +32,7 @@ from apache_beam import pvalue
 from apache_beam.transforms import window
 from apache_beam.transforms.core import Create
 from apache_beam.transforms.core import DoFn
+from apache_beam.transforms.core import Filter
 from apache_beam.transforms.core import Map
 from apache_beam.transforms.core import ParDo
 from apache_beam.transforms.core import WindowInto
@@ -45,6 +46,7 @@ __all__ = [
     'assert_that',
     'equal_to',
     'equal_to_per_window',
+    'has_at_least_one',
     'is_empty',
     'is_not_empty',
     'matches_all',
@@ -377,6 +379,33 @@ def AssertThat(pcoll, *args, **kwargs):
   return assert_that(pcoll, *args, **kwargs)
 
 
+def has_at_least_one(input, criterion, label="has_at_least_one"):
+  pipeline = input.pipeline
+  # similar to assert_that, we choose a label if it already exists.
+  if label in pipeline.applied_labels:
+    label_idx = 2
+    while f"{label}_{label_idx}" in pipeline.applied_labels:
+      label_idx += 1
+    label = f"{label}_{label_idx}"
+
+  def _apply_criterion(
+      e=DoFn.ElementParam,
+      t=DoFn.TimestampParam,
+      w=DoFn.WindowParam,
+      p=DoFn.PaneInfoParam):
+    if criterion(e, t, w, p):
+      return e, t, w, p
+
+  def _not_empty(actual):
+    actual = list(actual)
+    if not actual:
+      raise BeamAssertException('Failed assert: nothing matches the criterion')
+
+  result = input | label >> Map(_apply_criterion) | label + "_filter" >> 
Filter(
+      lambda e: e is not None)
+  assert_that(result, _not_empty)
+
+
 def open_shards(glob_pattern, mode='rt', encoding='utf-8'):
   """Returns a composite file of all shards matching the given glob pattern.
 

Reply via email to