shunping commented on code in PR #38523:
URL: https://github.com/apache/beam/pull/38523#discussion_r3283266226
##########
sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go:
##########
@@ -61,8 +62,18 @@ type B struct {
dataSema atomic.Int32
OutputData engine.TentativeData
- Resp chan *fnpb.ProcessBundleResponse
- Done chan struct{}
+ Resp chan *fnpb.ProcessBundleResponse
+ // DataAbort is closed when the worker responds to the bundle
instruction
+ // (with success or failure), signaling ProcessOn to stop streaming
data.
+ //
+ // This prevents a deadlock where a worker fails mid-bundle and stops
reading
+ // from the data channel while the runner blocks indefinitely
attempting to
+ // write remaining elements. Other signals are insufficient to abort
immediately:
+ // - ctx.Done() only triggers on global timeouts/cancellations, which
is too late.
+ // - wk.StoppedChan is only closed when tearing down the worker pool,
which does
+ // not happen while the runner is waiting on the current bundle to
finish.
+ DataAbort chan struct{}
+ mu sync.Mutex
BundleErr error
Review Comment:
Done.
##########
sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go:
##########
@@ -98,18 +109,38 @@ func (b *B) LogValue() slog.Value {
slog.String("stage", b.PBDID))
}
+// SetErr sets the bundle error if it is not already set.
+func (b *B) SetErr(err error) {
Review Comment:
Done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]