lostluck commented on code in PR #36188:
URL: https://github.com/apache/beam/pull/36188#discussion_r2361052076
##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -1620,6 +1641,13 @@ func (ss *stageState) startEventTimeBundle(watermark
mtime.Time, genBundID func(
}
bundID := ss.makeInProgressBundle(genBundID, toProcess, minTs, newKeys,
holdsInBundle)
+ if _, ok := ss.kind.(*aggregateStageKind); ok {
+ // For aggregate stage, buildEventTimeBundle may have saved
bundle panes with empty string as key.
+ // Move it under the correct BundleID now.
+ ss.bundlePanes[bundID] = ss.bundlePanes[""]
Review Comment:
We could probably avoid the sneaky empty-string handling by returning a
`panesInBundle` map from buildEventTimeBundle, like we are doing for
`holdsInBundle`. Similar "refused bequests" to non-stateful stages needing to
return an empty holds map or empty newKeys.
##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -1170,12 +1171,13 @@ type stageState struct {
sideInputs map[LinkID]map[typex.Window][][]byte // side input data for
this stage, from {tid, inputID} -> window
// Fields for stateful stages which need to be per key.
- pendingByKeys map[string]*dataAndTimers
// pending input elements by Key, if stateful.
- inprogressKeys set[string]
// all keys that are assigned to bundles.
- inprogressKeysByBundle map[string]set[string]
// bundle to key assignments.
- state map[LinkID]map[typex.Window]map[string]StateData
// state data for this stage, from {tid, stateID} -> window -> userKey
- stateTypeLen map[LinkID]func([]byte) int
// map from state to a function that will produce the total length of a single
value in bytes.
- bundlesToInject []RunBundle
// bundlesToInject are triggered bundles that will be injected by the watermark
loop to avoid premature pipeline termination.
+ pendingByKeys map[string]*dataAndTimers
// pending input elements by Key, if stateful.
+ inprogressKeys set[string]
// all keys that are assigned to bundles.
+ inprogressKeysByBundle map[string]set[string]
// bundle to key assignments.
+ state map[LinkID]map[typex.Window]map[string]StateData
// state data for this stage, from {tid, stateID} -> window -> userKey
+ stateTypeLen map[LinkID]func([]byte) int
// map from state to a function that will produce the total length of a
single value in bytes.
+ bundlesToInject []RunBundle
// bundlesToInject are triggered bundles that will be injected by the
watermark loop to avoid premature pipeline termination.
+ bundlePanes
map[string]map[typex.Window]map[string]typex.PaneInfo // PanInfo snapshot for
bundles, from BundleID -> window -> userKey
Review Comment:
```suggestion
bundlePanes
map[string]map[typex.Window]map[string]typex.PaneInfo // PaneInfo snapshot for
bundles, from BundleID -> window -> userKey
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]