lostluck commented on a change in pull request #15815:
URL: https://github.com/apache/beam/pull/15815#discussion_r737801496



##########
File path: sdks/go/pkg/beam/core/runtime/exec/pardo.go
##########
@@ -324,6 +324,11 @@ func (n *ParDo) postInvoke() error {
 
 func (n *ParDo) fail(err error) error {

Review comment:
       Fail's only called if the *local* DoFn fails.We need to protect against 
panics with defer.
   
   Consider a Pipeline with DoFns `A -> B -> C.` Let's say `C` has an error and 
is able to return it.
   
   If `B` and `A` are the simplest 1:1 DoFns, then the error is passed through 
and eventually gets caught. Great.
   
   If `B` is using an emitter though then the emitter code is what's receiving 
the error, because the emitter is what calls the downstream 
ParDo.ProcessElement in for `C`.
   
    Emitters don't pass out errors (by design), so the only way to send the 
error back up, is to panic. This is true for reflectively generated emmitters 
and code generated ones, but you can see the reflective one here along with 
another explanation of this: 
https://github.com/apache/beam/blob/e373d92f3e89396971038072e0e0d2489764ea30/sdks/go/pkg/beam/core/runtime/exec/emit.go#L119
 
   
   Any reset behavior *has* to be happening in a `defer` to be safe during 
panics.  Just like errors, we recover the panic, and report that upstream as a 
failed bundle, for the runner to possibly retry.
   
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to