shunping commented on code in PR #36188:
URL: https://github.com/apache/beam/pull/36188#discussion_r2361593447


##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -1620,6 +1641,13 @@ func (ss *stageState) startEventTimeBundle(watermark 
mtime.Time, genBundID func(
        }
 
        bundID := ss.makeInProgressBundle(genBundID, toProcess, minTs, newKeys, 
holdsInBundle)
+       if _, ok := ss.kind.(*aggregateStageKind); ok {
+               // For aggregate stage, buildEventTimeBundle may have saved 
bundle panes with empty string as key.
+               // Move it under the correct BundleID now.
+               ss.bundlePanes[bundID] = ss.bundlePanes[""]

Review Comment:
   Fixed.



##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -1170,12 +1171,13 @@ type stageState struct {
        sideInputs map[LinkID]map[typex.Window][][]byte // side input data for 
this stage, from {tid, inputID} -> window
 
        // Fields for stateful stages which need to be per key.
-       pendingByKeys          map[string]*dataAndTimers                        
// pending input elements by Key, if stateful.
-       inprogressKeys         set[string]                                      
// all keys that are assigned to bundles.
-       inprogressKeysByBundle map[string]set[string]                           
// bundle to key assignments.
-       state                  map[LinkID]map[typex.Window]map[string]StateData 
// state data for this stage, from {tid, stateID} -> window -> userKey
-       stateTypeLen           map[LinkID]func([]byte) int                      
// map from state to a function that will produce the total length of a single 
value in bytes.
-       bundlesToInject        []RunBundle                                      
// bundlesToInject are triggered bundles that will be injected by the watermark 
loop to avoid premature pipeline termination.
+       pendingByKeys          map[string]*dataAndTimers                        
     // pending input elements by Key, if stateful.
+       inprogressKeys         set[string]                                      
     // all keys that are assigned to bundles.
+       inprogressKeysByBundle map[string]set[string]                           
     // bundle to key assignments.
+       state                  map[LinkID]map[typex.Window]map[string]StateData 
     // state data for this stage, from {tid, stateID} -> window -> userKey
+       stateTypeLen           map[LinkID]func([]byte) int                      
     // map from state to a function that will produce the total length of a 
single value in bytes.
+       bundlesToInject        []RunBundle                                      
     // bundlesToInject are triggered bundles that will be injected by the 
watermark loop to avoid premature pipeline termination.
+       bundlePanes            
map[string]map[typex.Window]map[string]typex.PaneInfo // PanInfo snapshot for 
bundles, from BundleID -> window -> userKey

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to