[ 
https://issues.apache.org/jira/browse/BEAM-10660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17501089#comment-17501089
 ] 

Adham Ehab commented on BEAM-10660:
-----------------------------------

This issue sounds very exciting. 
Can I start working on this? 

> [Go SDK] Implement State and Timer support
> ------------------------------------------
>
>                 Key: BEAM-10660
>                 URL: https://issues.apache.org/jira/browse/BEAM-10660
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-go
>            Reporter: Robert Burke
>            Priority: P3
>
> There's presently no mechanism to specify timers in the Go SDK, or use them 
> at all. The work would be designing the user facing code, and mechanisms, and 
> plumbing through timers properly. For ecample they can't be conflicting with 
> other user facing constructs like Emitter functions and iterator functions.
> However there's an abundance of work to handle before starting to deal with 
> state and timers though.
> While timers should work in batch, they're commonly more appropriate for 
> streaming which the SDK doesn't support very well at the moment. DoFns need 
> to be able to Self Checkpoint in order to behave as a streaming source (early 
> checkpointing allows a bundle to self terminate, so it can be rescheduled 
> later or as a minor way to split to multiple workers.). We should also 
> implement Triggers and Advanced/Custom window fns first as those are simpler 
> ways to get some of the advanced functions that timers allow for. We also 
> need to be able to set and propagate the watermark correctly through the SDK 
> (and validate that we do).
> See the programming guide for a fuller description of State and Timers
>  [https://beam.apache.org/documentation/programming-guide/#state-and-timers] 
>  * Design an idiomatic Go approach to Timers and State processing for DoFns
>  ** Go doesn’t support annotation like constructs, with the exception of 
> struct field tags.
>  ** Design likely requires new framework side marker types.
>  ** Design likely requires using field tags.
>  ** Needs to allow customization for state types. (easier post generics in 
> Go, but an design that doesn’t require that would be viable sooner)
>  * State concerns:
>  ** Should support deferred batch reads of multiple states
>  ** Needs to be expandable to handle ValueState, Combining State, and BagState
>  * Timer concerns: 
>  ** Needs to handle Event and Processing Time timers.
>  ** Dynamic Timer tags (likely the one and only way to handle Go SDK timers)
>  ** Needs to introduce an “OnTimer” method, and associated validation.
> Similar locations need changing relative to the Map Side Inputs 
> https://issues.apache.org/jira/browse/BEAM-3293 
> On the execution layer, the new forms would need to be added like for 
> exec/sideinput.go
>  
> [https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/sideinput.go]
>  
>  The inputs layer, for the actual abstraction using reflection:
>  
> [https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/input.go]
> But for specifically handling State (which leverages the state API in a more 
> sophisticated way than Side Inputs do) and Timers. The State API manager 
> implementation is handled in the harness 
> [https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/harness/statemgr.go]
>  
> The funcx package would need to be updated to detect the new parameter forms 
>  
> [https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/funcx/fn.go]
>  
> [https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/funcx/sideinput.go]
> as well has the DoFn graph validation code
>  
> [https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/graph/fn.go#L566]
> They would need to be correctly translated into the pipeline protos:
>  
> [https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/graphx/translate.go#L315]
>  and finally back to the newly created handlers in the exec package. 
>  
> [https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/translate.go#L402]
> The SideInputCache would need to be changed to be a full [UserState 
> cache]https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/harness/harness.go#L101]
>  as the state_caching protocol URN doesn't make a distinction between side 
> inputs and user state, and we should not break behavior. 
> It's likely other changes are necessary to handle specifics for state and 
> timers.
> If implemented pre-generics, the code generator frontend, and backend would 
> need to be updated to detect and generate code for efficient no-reflection 
> overhead map access functions if necessary 
> [https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/util/shimx/generate.go]
>  
>  
> [https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/util/starcgenx/starcgenx.go]
> Unit must be added throughout and Integration tests should be added to verify 
> the functionality against portable beam runners.
>  
> [https://github.com/apache/beam/tree/master/sdks/go/test/integration/primitives]
> And of course, the user GoDoc should be updated for the support.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to