[
https://issues.apache.org/jira/browse/BEAM-10660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17501089#comment-17501089
]
Adham Ehab commented on BEAM-10660:
-----------------------------------
This issue sounds very exciting.
Can I start working on this?
> [Go SDK] Implement State and Timer support
> ------------------------------------------
>
> Key: BEAM-10660
> URL: https://issues.apache.org/jira/browse/BEAM-10660
> Project: Beam
> Issue Type: New Feature
> Components: sdk-go
> Reporter: Robert Burke
> Priority: P3
>
> There's presently no mechanism to specify timers in the Go SDK, or use them
> at all. The work would be designing the user facing code, and mechanisms, and
> plumbing through timers properly. For ecample they can't be conflicting with
> other user facing constructs like Emitter functions and iterator functions.
> However there's an abundance of work to handle before starting to deal with
> state and timers though.
> While timers should work in batch, they're commonly more appropriate for
> streaming which the SDK doesn't support very well at the moment. DoFns need
> to be able to Self Checkpoint in order to behave as a streaming source (early
> checkpointing allows a bundle to self terminate, so it can be rescheduled
> later or as a minor way to split to multiple workers.). We should also
> implement Triggers and Advanced/Custom window fns first as those are simpler
> ways to get some of the advanced functions that timers allow for. We also
> need to be able to set and propagate the watermark correctly through the SDK
> (and validate that we do).
> See the programming guide for a fuller description of State and Timers
> [https://beam.apache.org/documentation/programming-guide/#state-and-timers]
> * Design an idiomatic Go approach to Timers and State processing for DoFns
> ** Go doesn’t support annotation like constructs, with the exception of
> struct field tags.
> ** Design likely requires new framework side marker types.
> ** Design likely requires using field tags.
> ** Needs to allow customization for state types. (easier post generics in
> Go, but an design that doesn’t require that would be viable sooner)
> * State concerns:
> ** Should support deferred batch reads of multiple states
> ** Needs to be expandable to handle ValueState, Combining State, and BagState
> * Timer concerns:
> ** Needs to handle Event and Processing Time timers.
> ** Dynamic Timer tags (likely the one and only way to handle Go SDK timers)
> ** Needs to introduce an “OnTimer” method, and associated validation.
> Similar locations need changing relative to the Map Side Inputs
> https://issues.apache.org/jira/browse/BEAM-3293
> On the execution layer, the new forms would need to be added like for
> exec/sideinput.go
>
> [https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/sideinput.go]
>
> The inputs layer, for the actual abstraction using reflection:
>
> [https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/input.go]
> But for specifically handling State (which leverages the state API in a more
> sophisticated way than Side Inputs do) and Timers. The State API manager
> implementation is handled in the harness
> [https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/harness/statemgr.go]
>
> The funcx package would need to be updated to detect the new parameter forms
>
> [https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/funcx/fn.go]
>
> [https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/funcx/sideinput.go]
> as well has the DoFn graph validation code
>
> [https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/graph/fn.go#L566]
> They would need to be correctly translated into the pipeline protos:
>
> [https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/graphx/translate.go#L315]
> and finally back to the newly created handlers in the exec package.
>
> [https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/translate.go#L402]
> The SideInputCache would need to be changed to be a full [UserState
> cache]https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/harness/harness.go#L101]
> as the state_caching protocol URN doesn't make a distinction between side
> inputs and user state, and we should not break behavior.
> It's likely other changes are necessary to handle specifics for state and
> timers.
> If implemented pre-generics, the code generator frontend, and backend would
> need to be updated to detect and generate code for efficient no-reflection
> overhead map access functions if necessary
> [https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/util/shimx/generate.go]
>
>
> [https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/util/starcgenx/starcgenx.go]
> Unit must be added throughout and Integration tests should be added to verify
> the functionality against portable beam runners.
>
> [https://github.com/apache/beam/tree/master/sdks/go/test/integration/primitives]
> And of course, the user GoDoc should be updated for the support.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)