lostluck commented on issue #27630:
URL: https://github.com/apache/beam/issues/27630#issuecomment-1655884299
At the framework level, retries are dictated by the runner, but generally in
response to a failed bundle. There's no part of the framework or model where
retries are handled separately.
You're welcome to use goroutines and channels to move some work off to the
side, but in order for the normal processing to proceed correctly, emits
downstream should happen on the main goroutine, and not on side goroutines
spawed by the user. Critically, FinishBundle will be called at the end of a
bundle, so that provides an opportunity to block until preceeding work is
complete.
Critically you are responsible for managing your own concurrency down this
path. The framework can't help you, since goroutines escape the framework.
Perhaps someday, but not at this time. (Contributions are welcome of course!)
Example code, not intended for direct use, only for illustration:
```
type mySideProcessingDoFn[I, O any] struct{
input chan I
output chan O
}
func (fn *mySideProcessingDoFn[I,O]) StartBundle() {
fn.input = make(chan I)
fn.output = make(chan O)
go func(inputs chan I, outputs chan 0) {
for in := range inputs {
// Side processing, retries, other goroutines for processing etc.
outputs <- out
}
close(outputs)
}(fn.input, fn.output)
}
func (fn *mySideProcessingDoFn[I,O]) ProcessBundle(in I, emit func(O)) {
fn.input <- in
moveOn := time.After(time.Second)
for {
select {
case out, ok := <-fn.output:
if !ok {
// channel closed for some reason
return
}
emit(out)
case <-moveOn:
return
}
}
}
func (fn *mySideProcessingDoFn[I,O]) FinishBundle(emit func(O)) {
close(fn.input) // signals the bundle is done.
fn.input = nil
// Drain the output queue.
for {
select {
case out, ok := <-fn.output:
if !ok {
// channel closed for some reason
return
}
emit(out)
case <-moveOn:
return
}
}
}
// register the actual concrete types you want
type concreteGenDoFn mySideProcessingDoFn[string, string]
func init() {
register.DoFn2x0[string, func(string)]((*concreteGenDoFn)(nil))
register.Emitter1[string]
}
```
Note that if an entire bundle's worth of elements is cued up waiting, the
framework will report that progress has moved very quickly, and it will appear
to be stalled at 100% while everything is processing. I recommend limiting
additional parallel side processing to something like 8 elements, so progress
isn't faked, and to bound memory and CPU consumption for the bundle.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]