I've managed to fix that by introducing (optional) method to DoFnRunner
called getSystemStateTags() (default implementation returns
Collection.emptyList()), and the use that list to early bind states in
Flink's DoFnOperator ([1])
@Max, WDYT?
Jan
[1]
https://github.com/je-ik/beam/commit/1360fb6bd35192d443f97068c7ba2155f79e8802
On 8/12/19 4:00 PM, Jan Lukavský wrote:
Hi,
I have come across issue that is very much likely caused by [1]. The
issue is that Beam's state is (generally) created lazily, after
element is received (as Max described in the Flink's JIRA). Max also
created workaround [2], but that seems to work for user state only
(i.e. state that has been created in user code by declaring @StateId -
please correct me if I'm wrong). In my work, however, I created a
system state (that holds elements before being output, due to
@RequiresTimeSortedInput annotation, but that's probably not
important), and this state is not covered by the workaround. This
might be the case for all system states, generally, because they are
not visible, until element arrives and the state is actually created.
Is my analysis correct? If so, would anyone have any suggestions how
to fix that?
Jan
[1] https://jira.apache.org/jira/browse/FLINK-12653
[2] https://issues.apache.org/jira/browse/BEAM-7144