Hi,

I have one question. This is *kind of a blocker for our upcoming release*.
It would be great if you could reply at your earliest convenience.

My dataflow pipeline is stateful. I am using Beam SDK for stateful
processing (StateId, ValueState). I have also implemented OnTimer for my
stateful transformation. On every dataflow start, I want to read from
CloudSQL and build the cache. For that, I need to provide the pre-built
cache as side-input to my current transform. But, it looks like there is
some issue when I add side input to my stateful transform. I think I am
hitting BEAM-6855 issue (https://issues.apache.org/jira/browse/BEAM-6855).
Is there any workaround? Any help would be appreciated.

Following is my definition of Transforms. I am using 2.23.0 beam SDK. I am
using GlobalWindow.

private class GetLatestState extends DoFn<KV<DataTunnelStatusKey,
DataTunnelStatus>, DataTunnelStateRelational> {
    @TimerId("tunnelStatusExpiryTimer")
    private final TimerSpec tunnelStatusExpiryTimer =
TimerSpecs.timer(TimeDomain.EVENT_TIME);

    @StateId("tunnelStatus")
    private final StateSpec<ValueState<DataTunnelStatus>> tunnelStatusCache =
            StateSpecs.value(AvroCoder.of(DataTunnelStatus.class));

@ProcessElement
public void process(@Element KV<DataTunnelStatusKey, DataTunnelStatus> input,
                    MultiOutputReceiver out,
                    @StateId("tunnelStatus")
ValueState<DataTunnelStatus> tunnelStatusCache,
                    @TimerId("tunnelStatusExpiryTimer") Timer
tunnelStatusExpiryTimer,
                    ProcessContext c)



Thanks,
Hemali Sutaria

Reply via email to