lostluck commented on code in PR #31599:
URL: https://github.com/apache/beam/pull/31599#discussion_r1639078797


##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -290,9 +291,9 @@ func (rb RunBundle) LogValue() slog.Value {
 // Bundles is the core execution loop. It produces a sequences of bundles able 
to be executed.
 // The returned channel is closed when the context is canceled, or there are 
no pending elements
 // remaining.
-func (em *ElementManager) Bundles(ctx context.Context, nextBundID func() 
string) <-chan RunBundle {
-       runStageCh := make(chan RunBundle)
+func (em *ElementManager) Bundles(ctx context.Context, upstreamCancelFn 
context.CancelCauseFunc, nextBundID func() string) <-chan RunBundle {
        ctx, cancelFn := context.WithCancelCause(ctx)
+       runStageCh := make(chan RunBundle)

Review Comment:
   Move this back before the cancel line above to minimize the diff.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to