The getState function should be static, sorry. "synchronized static
@NotNull MyState getState()"

On Thu, Feb 18, 2021 at 3:41 PM Daniel Collins <dpcoll...@google.com> wrote:

> > On every dataflow start, I want to read from CloudSQL and build the cache
>
> If you do this outside of dataflow, you can use a static to do this on
> every worker start. Is that what you're looking for? For example:
>
> final class StateLoader {
>   private StateLoader() {}
>
>   @GuardedBy("this")
>   private static @Nullable MyState state;
>
>   synchronized @NotNull MyState getState() {
>     if (state == null) {
>       state = LoadStateFromSQL();
>     }
>     return state;
>   }
> }
>
> On Thu, Feb 18, 2021 at 2:50 PM Hemali Sutaria <
> hsuta...@paloaltonetworks.com> wrote:
>
>> Hi,
>>
>> I have one question. This is *kind of a blocker for our upcoming release*.
>> It would be great if you could reply at your earliest convenience.
>>
>> My dataflow pipeline is stateful. I am using Beam SDK for stateful
>> processing (StateId, ValueState). I have also implemented OnTimer for my
>> stateful transformation. On every dataflow start, I want to read from
>> CloudSQL and build the cache. For that, I need to provide the pre-built
>> cache as side-input to my current transform. But, it looks like there is
>> some issue when I add side input to my stateful transform. I think I am
>> hitting BEAM-6855 issue (https://issues.apache.org/jira/browse/BEAM-6855).
>> Is there any workaround? Any help would be appreciated.
>>
>> Following is my definition of Transforms. I am using 2.23.0 beam SDK. I
>> am using GlobalWindow.
>>
>> private class GetLatestState extends DoFn<KV<DataTunnelStatusKey, 
>> DataTunnelStatus>, DataTunnelStateRelational> {
>>     @TimerId("tunnelStatusExpiryTimer")
>>     private final TimerSpec tunnelStatusExpiryTimer = 
>> TimerSpecs.timer(TimeDomain.EVENT_TIME);
>>
>>     @StateId("tunnelStatus")
>>     private final StateSpec<ValueState<DataTunnelStatus>> tunnelStatusCache =
>>             StateSpecs.value(AvroCoder.of(DataTunnelStatus.class));
>>
>> @ProcessElement
>> public void process(@Element KV<DataTunnelStatusKey, DataTunnelStatus> input,
>>                     MultiOutputReceiver out,
>>                     @StateId("tunnelStatus") ValueState<DataTunnelStatus> 
>> tunnelStatusCache,
>>                     @TimerId("tunnelStatusExpiryTimer") Timer 
>> tunnelStatusExpiryTimer,
>>                     ProcessContext c)
>>
>>
>>
>> Thanks,
>> Hemali Sutaria
>>
>>

Reply via email to