The getState function should be static, sorry. "synchronized static @NotNull MyState getState()"
On Thu, Feb 18, 2021 at 3:41 PM Daniel Collins <dpcoll...@google.com> wrote: > > On every dataflow start, I want to read from CloudSQL and build the cache > > If you do this outside of dataflow, you can use a static to do this on > every worker start. Is that what you're looking for? For example: > > final class StateLoader { > private StateLoader() {} > > @GuardedBy("this") > private static @Nullable MyState state; > > synchronized @NotNull MyState getState() { > if (state == null) { > state = LoadStateFromSQL(); > } > return state; > } > } > > On Thu, Feb 18, 2021 at 2:50 PM Hemali Sutaria < > hsuta...@paloaltonetworks.com> wrote: > >> Hi, >> >> I have one question. This is *kind of a blocker for our upcoming release*. >> It would be great if you could reply at your earliest convenience. >> >> My dataflow pipeline is stateful. I am using Beam SDK for stateful >> processing (StateId, ValueState). I have also implemented OnTimer for my >> stateful transformation. On every dataflow start, I want to read from >> CloudSQL and build the cache. For that, I need to provide the pre-built >> cache as side-input to my current transform. But, it looks like there is >> some issue when I add side input to my stateful transform. I think I am >> hitting BEAM-6855 issue (https://issues.apache.org/jira/browse/BEAM-6855). >> Is there any workaround? Any help would be appreciated. >> >> Following is my definition of Transforms. I am using 2.23.0 beam SDK. I >> am using GlobalWindow. >> >> private class GetLatestState extends DoFn<KV<DataTunnelStatusKey, >> DataTunnelStatus>, DataTunnelStateRelational> { >> @TimerId("tunnelStatusExpiryTimer") >> private final TimerSpec tunnelStatusExpiryTimer = >> TimerSpecs.timer(TimeDomain.EVENT_TIME); >> >> @StateId("tunnelStatus") >> private final StateSpec<ValueState<DataTunnelStatus>> tunnelStatusCache = >> StateSpecs.value(AvroCoder.of(DataTunnelStatus.class)); >> >> @ProcessElement >> public void process(@Element KV<DataTunnelStatusKey, DataTunnelStatus> input, >> MultiOutputReceiver out, >> @StateId("tunnelStatus") ValueState<DataTunnelStatus> >> tunnelStatusCache, >> @TimerId("tunnelStatusExpiryTimer") Timer >> tunnelStatusExpiryTimer, >> ProcessContext c) >> >> >> >> Thanks, >> Hemali Sutaria >> >>