lostluck commented on a change in pull request #15815:
URL: https://github.com/apache/beam/pull/15815#discussion_r737801496
##########
File path: sdks/go/pkg/beam/core/runtime/exec/pardo.go
##########
@@ -324,6 +324,11 @@ func (n *ParDo) postInvoke() error {
func (n *ParDo) fail(err error) error {
Review comment:
Fail's only called if the *local* DoFn fails.We need to protect against
panics with defer.
Consider a Pipeline with DoFns `A -> B -> C.` Let's say `C` has an error and
is able to return it.
If `B` and `A` are the simplest 1:1 DoFns, then the error is passed through
and eventually gets caught. Great.
If `B` is using an emitter though then the emitter code is what's receiving
the error, because the emitter is what calls the downstream
ParDo.ProcessElement in for `C`.
Emitters don't pass out errors (by design), so the only way to send the
error back up, is to panic. This is true for reflectively generated emmitters
and code generated ones, but you can see the reflective one here along with
another explanation of this:
https://github.com/apache/beam/blob/e373d92f3e89396971038072e0e0d2489764ea30/sdks/go/pkg/beam/core/runtime/exec/emit.go#L119
Any reset behavior *has* to be happening in a `defer` to be safe during
panics. Just like errors, we recover the panic, and report that upstream as a
failed bundle, for the runner to possibly retry.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]