shunping commented on code in PR #36188:
URL: https://github.com/apache/beam/pull/36188#discussion_r2361593447
##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -1620,6 +1641,13 @@ func (ss *stageState) startEventTimeBundle(watermark
mtime.Time, genBundID func(
}
bundID := ss.makeInProgressBundle(genBundID, toProcess, minTs, newKeys,
holdsInBundle)
+ if _, ok := ss.kind.(*aggregateStageKind); ok {
+ // For aggregate stage, buildEventTimeBundle may have saved
bundle panes with empty string as key.
+ // Move it under the correct BundleID now.
+ ss.bundlePanes[bundID] = ss.bundlePanes[""]
Review Comment:
Fixed.
##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -1170,12 +1171,13 @@ type stageState struct {
sideInputs map[LinkID]map[typex.Window][][]byte // side input data for
this stage, from {tid, inputID} -> window
// Fields for stateful stages which need to be per key.
- pendingByKeys map[string]*dataAndTimers
// pending input elements by Key, if stateful.
- inprogressKeys set[string]
// all keys that are assigned to bundles.
- inprogressKeysByBundle map[string]set[string]
// bundle to key assignments.
- state map[LinkID]map[typex.Window]map[string]StateData
// state data for this stage, from {tid, stateID} -> window -> userKey
- stateTypeLen map[LinkID]func([]byte) int
// map from state to a function that will produce the total length of a single
value in bytes.
- bundlesToInject []RunBundle
// bundlesToInject are triggered bundles that will be injected by the watermark
loop to avoid premature pipeline termination.
+ pendingByKeys map[string]*dataAndTimers
// pending input elements by Key, if stateful.
+ inprogressKeys set[string]
// all keys that are assigned to bundles.
+ inprogressKeysByBundle map[string]set[string]
// bundle to key assignments.
+ state map[LinkID]map[typex.Window]map[string]StateData
// state data for this stage, from {tid, stateID} -> window -> userKey
+ stateTypeLen map[LinkID]func([]byte) int
// map from state to a function that will produce the total length of a
single value in bytes.
+ bundlesToInject []RunBundle
// bundlesToInject are triggered bundles that will be injected by the
watermark loop to avoid premature pipeline termination.
+ bundlePanes
map[string]map[typex.Window]map[string]typex.PaneInfo // PanInfo snapshot for
bundles, from BundleID -> window -> userKey
Review Comment:
Fixed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]