shunping commented on code in PR #38523:
URL: https://github.com/apache/beam/pull/38523#discussion_r3283266226


##########
sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go:
##########
@@ -61,8 +62,18 @@ type B struct {
        dataSema   atomic.Int32
        OutputData engine.TentativeData
 
-       Resp      chan *fnpb.ProcessBundleResponse
-       Done      chan struct{}
+       Resp chan *fnpb.ProcessBundleResponse
+       // DataAbort is closed when the worker responds to the bundle 
instruction
+       // (with success or failure), signaling ProcessOn to stop streaming 
data.
+       //
+       // This prevents a deadlock where a worker fails mid-bundle and stops 
reading
+       // from the data channel while the runner blocks indefinitely 
attempting to
+       // write remaining elements. Other signals are insufficient to abort 
immediately:
+       // - ctx.Done() only triggers on global timeouts/cancellations, which 
is too late.
+       // - wk.StoppedChan is only closed when tearing down the worker pool, 
which does
+       //   not happen while the runner is waiting on the current bundle to 
finish.
+       DataAbort chan struct{}
+       mu        sync.Mutex
        BundleErr error

Review Comment:
   Done.



##########
sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go:
##########
@@ -98,18 +109,38 @@ func (b *B) LogValue() slog.Value {
                slog.String("stage", b.PBDID))
 }
 
+// SetErr sets the bundle error if it is not already set.
+func (b *B) SetErr(err error) {

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to