damccorm opened a new issue, #20510:
URL: https://github.com/apache/beam/issues/20510
There's presently no mechanism to specify timers in the Go SDK, or use them
at all. The work would be designing the user facing code, and mechanisms, and
plumbing through timers properly. For ecample they can't be conflicting with
other user facing constructs like Emitter functions and iterator functions.
However there's an abundance of work to handle before starting to deal with
state and timers though.
While timers should work in batch, they're commonly more appropriate for
streaming which the SDK doesn't support very well at the moment. DoFns need to
be able to Self Checkpoint in order to behave as a streaming source (early
checkpointing allows a bundle to self terminate, so it can be rescheduled later
or as a minor way to split to multiple workers.). We should also implement
Triggers and Advanced/Custom window fns first as those are simpler ways to get
some of the advanced functions that timers allow for. We also need to be able
to set and propagate the watermark correctly through the SDK (and validate that
we do).
See the programming guide for a fuller description of State and Timers
[https://beam.apache.org/documentation/programming-guide/#state-and-timers](https://beam.apache.org/documentation/programming-guide/#state-and-timers)
* Design an idiomatic Go approach to Timers and State processing for DoFns
** Go doesn’t support annotation like constructs, with the exception of
struct field tags.
** Design likely requires new framework side marker types.
** Design likely requires using field tags.
** Needs to allow customization for state types. (easier post generics in
Go, but an design that doesn’t require that would be viable sooner)
* State concerns:
** Should support deferred batch reads of multiple states
** Needs to be expandable to handle ValueState, Combining State, and
BagState
* Timer concerns:
** Needs to handle Event and Processing Time timers.
** Dynamic Timer tags (likely the one and only way to handle Go SDK timers)
** Needs to introduce an “OnTimer” method, and associated validation.
Similar locations need changing relative to the Map Side Inputs
https://issues.apache.org/jira/browse/BEAM-3293
On the execution layer, the new forms would need to be added like for
exec/sideinput.go
[https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/sideinput.go](https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/sideinput.go)
The inputs layer, for the actual abstraction using reflection:
[https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/input.go](https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/input.go)
But for specifically handling State (which leverages the state API in a more
sophisticated way than Side Inputs do) and Timers. The State API manager
implementation is handled in the harness
[https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/harness/statemgr.go](https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/harness/statemgr.go)
The funcx package would need to be updated to detect the new parameter forms
[https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/funcx/fn.go](https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/funcx/fn.go)
[https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/funcx/sideinput.go](https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/funcx/sideinput.go)
as well has the DoFn graph validation code
[https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/graph/fn.go#L566](https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/graph/fn.go#L566)
They would need to be correctly translated into the pipeline protos:
[https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/graphx/translate.go#L315](https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/graphx/translate.go#L315)
and finally back to the newly created handlers in the exec package.
[https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/translate.go#L402](https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/translate.go#L402)
The SideInputCache would need to be changed to be a full [UserState
cache]https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/harness/harness.go#L101]
as the state_caching protocol URN doesn't make a distinction between side
inputs and user state, and we should not break behavior.
It's likely other changes are necessary to handle specifics for state and
timers.
If implemented pre-generics, the code generator frontend, and backend would
need to be updated to detect and generate code for efficient no-reflection
overhead map access functions if necessary
[https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/util/shimx/generate.go](https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/util/shimx/generate.go)
[https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/util/starcgenx/starcgenx.go](https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/util/starcgenx/starcgenx.go)
Unit must be added throughout and Integration tests should be added to
verify the functionality against portable beam runners.
[https://github.com/apache/beam/tree/master/sdks/go/test/integration/primitives](https://github.com/apache/beam/tree/master/sdks/go/test/integration/primitives)
And of course, the user GoDoc should be updated for the support.
Imported from Jira
[BEAM-10660](https://issues.apache.org/jira/browse/BEAM-10660). Original Jira
may contain additional context.
Reported by: lostluck.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]