This is an automated email from the ASF dual-hosted git repository.
shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new ed992508c1d Fix a split bundle bug when using timers in stateful dofn
(#35770)
ed992508c1d is described below
commit ed992508c1dd51ea79408f44e7a571b624842227
Author: Shunping Huang <[email protected]>
AuthorDate: Tue Aug 5 12:05:07 2025 -0400
Fix a split bundle bug when using timers in stateful dofn (#35770)
* Use stage kind add pending instead.
* Fix holds.
* Update inprogress keys when split happens.
* Use number of data elements in EstimatedInputElements.
In this case, we won't have splits on bundles with all timers.
* Add a warning message for splitting timer bundle.
---
.../prism/internal/engine/elementmanager.go | 34 +++++++++++++++-------
sdks/go/pkg/beam/runners/prism/internal/stage.go | 2 +-
.../beam/runners/prism/internal/worker/bundle.go | 4 +--
3 files changed, 27 insertions(+), 13 deletions(-)
diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
index 767dc325fd3..2ddd7bbc5c1 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
@@ -588,7 +588,7 @@ func (em *ElementManager) InputForBundle(rb RunBundle, info
PColInfo) [][]byte {
return es.ToData(info)
}
-// DataAndTimerInputForBundle returns pre-allocated data for the given bundle
and the estimated number of elements.
+// DataAndTimerInputForBundle returns pre-allocated data for the given bundle
and the estimated number of data elements.
// Elements are encoded with the PCollection's coders.
func (em *ElementManager) DataAndTimerInputForBundle(rb RunBundle, info
PColInfo) ([]*Block, int) {
ss := em.stages[rb.StageID]
@@ -596,14 +596,16 @@ func (em *ElementManager) DataAndTimerInputForBundle(rb
RunBundle, info PColInfo
defer ss.mu.Unlock()
es := ss.inprogress[rb.BundleID]
- var total int
+ var total_data int
var ret []*Block
cur := &Block{}
for _, e := range es.es {
switch {
case e.IsTimer() && (cur.Kind != BlockTimer || e.family !=
cur.Family || cur.Transform != e.transform):
- total += len(cur.Bytes)
+ if cur.Kind == BlockData {
+ total_data += len(cur.Bytes)
+ }
cur = &Block{
Kind: BlockTimer,
Transform: e.transform,
@@ -631,7 +633,6 @@ func (em *ElementManager) DataAndTimerInputForBundle(rb
RunBundle, info PColInfo
cur.Bytes = append(cur.Bytes, buf.Bytes())
case cur.Kind != BlockData:
- total += len(cur.Bytes)
cur = &Block{
Kind: BlockData,
}
@@ -644,8 +645,10 @@ func (em *ElementManager) DataAndTimerInputForBundle(rb
RunBundle, info PColInfo
cur.Bytes = append(cur.Bytes, buf.Bytes())
}
}
- total += len(cur.Bytes)
- return ret, total
+ if cur.Kind == BlockData {
+ total_data += len(cur.Bytes)
+ }
+ return ret, total_data
}
// BlockKind indicates how the block is to be handled.
@@ -1021,7 +1024,7 @@ func (em *ElementManager) FailBundle(rb RunBundle) {
func (em *ElementManager) ReturnResiduals(rb RunBundle, firstRsIndex int,
inputInfo PColInfo, residuals Residuals) {
stage := em.stages[rb.StageID]
- stage.splitBundle(rb, firstRsIndex)
+ stage.splitBundle(rb, firstRsIndex, em)
unprocessedElements := reElementResiduals(residuals.Data, inputInfo, rb)
if len(unprocessedElements) > 0 {
slog.Debug("ReturnResiduals: unprocessed elements", "bundle",
rb, "count", len(unprocessedElements))
@@ -1909,7 +1912,7 @@ func (ss *stageState) makeInProgressBundle(genBundID
func() string, toProcess []
return bundID
}
-func (ss *stageState) splitBundle(rb RunBundle, firstResidual int) {
+func (ss *stageState) splitBundle(rb RunBundle, firstResidual int, em
*ElementManager) {
ss.mu.Lock()
defer ss.mu.Unlock()
@@ -1920,8 +1923,19 @@ func (ss *stageState) splitBundle(rb RunBundle,
firstResidual int) {
res := es.es[firstResidual:]
es.es = prim
- ss.pending = append(ss.pending, res...)
- heap.Init(&ss.pending)
+
+ for _, e := range res {
+ delete(ss.inprogressKeysByBundle[rb.BundleID],
string(e.keyBytes))
+ delete(ss.inprogressKeys, string(e.keyBytes))
+
+ if e.IsTimer() {
+ slog.Warn("Unexpected split on a bundle with timers.
See https://github.com/apache/beam/issues/35771 for information.")
+ ss.watermarkHolds.Drop(e.holdTimestamp, 1)
+
ss.inprogressHoldsByBundle[rb.BundleID][e.holdTimestamp]--
+ }
+ }
+ // we don't need to increment pending count in em, since it is already
pending
+ ss.kind.addPending(ss, em, res)
ss.inprogress[rb.BundleID] = es
}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/stage.go
b/sdks/go/pkg/beam/runners/prism/internal/stage.go
index 1b30cf31bff..97300cb1122 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/stage.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/stage.go
@@ -221,7 +221,7 @@ progress:
// Check if there has been any measurable progress by
the input, or all output pcollections since last report.
slow := previousIndex == index["index"] &&
previousTotalCount == index["totalCount"]
- if slow && unsplit {
+ if slow && unsplit && b.EstimatedInputElements > 0 {
slog.Debug("splitting report", "bundle", rb,
"index", index)
sr, err := b.Split(ctx, wk, 0.5 /* fraction of
remainder */, nil /* allowed splits */)
if err != nil {
diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go
b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go
index 14cd84aef82..15023a1b0bd 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go
@@ -40,8 +40,8 @@ type B struct {
// InputTransformID is where data is being sent to in the SDK.
InputTransformID string
- Input []*engine.Block // Data and Timers for this
bundle.
- EstimatedInputElements int
+ Input []*engine.Block // Data and Timers for
this bundle.
+ EstimatedInputElements int // Estimated number of
Data elements for this bundle
HasTimers []engine.StaticTimerID // Timer streams to
terminate.
// IterableSideInputData is a map from transformID + inputID, to
window, to data.