lostluck commented on issue #27630:
URL: https://github.com/apache/beam/issues/27630#issuecomment-1655884299

   At the framework level, retries are dictated by the runner, but generally in 
response to a failed bundle. There's no part of the framework or model where 
retries are handled separately.
   
   You're welcome to use goroutines and channels to move some work off to the 
side, but in order for the normal processing to proceed correctly, emits 
downstream should happen on the main goroutine, and not on side goroutines 
spawed by the user. Critically, FinishBundle will be called at the end of a 
bundle, so that provides an opportunity to block until preceeding work is 
complete. 
   
   Critically you are responsible for managing your own concurrency down this 
path. The framework can't help you, since goroutines escape the framework. 
Perhaps someday, but not at this time. (Contributions are welcome of course!)
   
   Example code, not intended for direct use, only for illustration:
   
   ```
   type mySideProcessingDoFn[I, O any] struct{
     input  chan I
     output  chan O
   }
   
   func (fn *mySideProcessingDoFn[I,O]) StartBundle() {
     fn.input = make(chan I)
     fn.output = make(chan O)
   
     go func(inputs chan I, outputs chan 0) {
        for in := range inputs {
          // Side processing, retries, other goroutines for processing etc.
          outputs <- out
        }
        close(outputs)
     }(fn.input, fn.output)
   }
   
   func (fn *mySideProcessingDoFn[I,O]) ProcessBundle(in I, emit func(O)) {
      fn.input <- in
   
     moveOn := time.After(time.Second)
   for {
     select {
      case out, ok := <-fn.output:
       if !ok {
         // channel closed for some reason
         return 
       }
       emit(out)
      case <-moveOn:
        return
     } 
   }
      
   }
   
   func (fn *mySideProcessingDoFn[I,O]) FinishBundle(emit func(O)) {
     close(fn.input) // signals the bundle is done.
     fn.input = nil
   
    // Drain the output queue.
   for {
     select {
      case out, ok := <-fn.output:
       if !ok {
         // channel closed for some reason
         return 
       }
       emit(out)
      case <-moveOn:
        return
     } 
   }
     
   }
   
   // register the actual concrete types you want
   type concreteGenDoFn mySideProcessingDoFn[string, string]
   
   func init() {
     register.DoFn2x0[string, func(string)]((*concreteGenDoFn)(nil))
     register.Emitter1[string]
   }
   ```
   
   Note that if an entire bundle's worth of elements is cued up waiting, the 
framework will report that progress has moved very quickly, and it will appear 
to be stalled at 100% while everything is processing. I recommend limiting 
additional parallel side processing to something like 8 elements, so progress 
isn't faked, and to bound memory and CPU consumption for the bundle.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to