This is an automated email from the ASF dual-hosted git repository.

shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new ed992508c1d Fix a split bundle bug when using timers in stateful dofn 
(#35770)
ed992508c1d is described below

commit ed992508c1dd51ea79408f44e7a571b624842227
Author: Shunping Huang <[email protected]>
AuthorDate: Tue Aug 5 12:05:07 2025 -0400

    Fix a split bundle bug when using timers in stateful dofn (#35770)
    
    * Use stage kind add pending instead.
    
    * Fix holds.
    
    * Update inprogress keys when split happens.
    
    * Use number of data elements in EstimatedInputElements.
    
    In this case, we won't have splits on bundles with all timers.
    
    * Add a warning message for splitting timer bundle.
---
 .../prism/internal/engine/elementmanager.go        | 34 +++++++++++++++-------
 sdks/go/pkg/beam/runners/prism/internal/stage.go   |  2 +-
 .../beam/runners/prism/internal/worker/bundle.go   |  4 +--
 3 files changed, 27 insertions(+), 13 deletions(-)

diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go 
b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
index 767dc325fd3..2ddd7bbc5c1 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
@@ -588,7 +588,7 @@ func (em *ElementManager) InputForBundle(rb RunBundle, info 
PColInfo) [][]byte {
        return es.ToData(info)
 }
 
-// DataAndTimerInputForBundle returns pre-allocated data for the given bundle 
and the estimated number of elements.
+// DataAndTimerInputForBundle returns pre-allocated data for the given bundle 
and the estimated number of data elements.
 // Elements are encoded with the PCollection's coders.
 func (em *ElementManager) DataAndTimerInputForBundle(rb RunBundle, info 
PColInfo) ([]*Block, int) {
        ss := em.stages[rb.StageID]
@@ -596,14 +596,16 @@ func (em *ElementManager) DataAndTimerInputForBundle(rb 
RunBundle, info PColInfo
        defer ss.mu.Unlock()
        es := ss.inprogress[rb.BundleID]
 
-       var total int
+       var total_data int
 
        var ret []*Block
        cur := &Block{}
        for _, e := range es.es {
                switch {
                case e.IsTimer() && (cur.Kind != BlockTimer || e.family != 
cur.Family || cur.Transform != e.transform):
-                       total += len(cur.Bytes)
+                       if cur.Kind == BlockData {
+                               total_data += len(cur.Bytes)
+                       }
                        cur = &Block{
                                Kind:      BlockTimer,
                                Transform: e.transform,
@@ -631,7 +633,6 @@ func (em *ElementManager) DataAndTimerInputForBundle(rb 
RunBundle, info PColInfo
 
                        cur.Bytes = append(cur.Bytes, buf.Bytes())
                case cur.Kind != BlockData:
-                       total += len(cur.Bytes)
                        cur = &Block{
                                Kind: BlockData,
                        }
@@ -644,8 +645,10 @@ func (em *ElementManager) DataAndTimerInputForBundle(rb 
RunBundle, info PColInfo
                        cur.Bytes = append(cur.Bytes, buf.Bytes())
                }
        }
-       total += len(cur.Bytes)
-       return ret, total
+       if cur.Kind == BlockData {
+               total_data += len(cur.Bytes)
+       }
+       return ret, total_data
 }
 
 // BlockKind indicates how the block is to be handled.
@@ -1021,7 +1024,7 @@ func (em *ElementManager) FailBundle(rb RunBundle) {
 func (em *ElementManager) ReturnResiduals(rb RunBundle, firstRsIndex int, 
inputInfo PColInfo, residuals Residuals) {
        stage := em.stages[rb.StageID]
 
-       stage.splitBundle(rb, firstRsIndex)
+       stage.splitBundle(rb, firstRsIndex, em)
        unprocessedElements := reElementResiduals(residuals.Data, inputInfo, rb)
        if len(unprocessedElements) > 0 {
                slog.Debug("ReturnResiduals: unprocessed elements", "bundle", 
rb, "count", len(unprocessedElements))
@@ -1909,7 +1912,7 @@ func (ss *stageState) makeInProgressBundle(genBundID 
func() string, toProcess []
        return bundID
 }
 
-func (ss *stageState) splitBundle(rb RunBundle, firstResidual int) {
+func (ss *stageState) splitBundle(rb RunBundle, firstResidual int, em 
*ElementManager) {
        ss.mu.Lock()
        defer ss.mu.Unlock()
 
@@ -1920,8 +1923,19 @@ func (ss *stageState) splitBundle(rb RunBundle, 
firstResidual int) {
        res := es.es[firstResidual:]
 
        es.es = prim
-       ss.pending = append(ss.pending, res...)
-       heap.Init(&ss.pending)
+
+       for _, e := range res {
+               delete(ss.inprogressKeysByBundle[rb.BundleID], 
string(e.keyBytes))
+               delete(ss.inprogressKeys, string(e.keyBytes))
+
+               if e.IsTimer() {
+                       slog.Warn("Unexpected split on a bundle with timers. 
See https://github.com/apache/beam/issues/35771 for information.")
+                       ss.watermarkHolds.Drop(e.holdTimestamp, 1)
+                       
ss.inprogressHoldsByBundle[rb.BundleID][e.holdTimestamp]--
+               }
+       }
+       // we don't need to increment pending count in em, since it is already 
pending
+       ss.kind.addPending(ss, em, res)
        ss.inprogress[rb.BundleID] = es
 }
 
diff --git a/sdks/go/pkg/beam/runners/prism/internal/stage.go 
b/sdks/go/pkg/beam/runners/prism/internal/stage.go
index 1b30cf31bff..97300cb1122 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/stage.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/stage.go
@@ -221,7 +221,7 @@ progress:
 
                        // Check if there has been any measurable progress by 
the input, or all output pcollections since last report.
                        slow := previousIndex == index["index"] && 
previousTotalCount == index["totalCount"]
-                       if slow && unsplit {
+                       if slow && unsplit && b.EstimatedInputElements > 0 {
                                slog.Debug("splitting report", "bundle", rb, 
"index", index)
                                sr, err := b.Split(ctx, wk, 0.5 /* fraction of 
remainder */, nil /* allowed splits */)
                                if err != nil {
diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go 
b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go
index 14cd84aef82..15023a1b0bd 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go
@@ -40,8 +40,8 @@ type B struct {
 
        // InputTransformID is where data is being sent to in the SDK.
        InputTransformID       string
-       Input                  []*engine.Block // Data and Timers for this 
bundle.
-       EstimatedInputElements int
+       Input                  []*engine.Block        // Data and Timers for 
this bundle.
+       EstimatedInputElements int                    // Estimated number of 
Data elements for this bundle
        HasTimers              []engine.StaticTimerID // Timer streams to 
terminate.
 
        // IterableSideInputData is a map from transformID + inputID, to 
window, to data.

Reply via email to