Sounds good. Might be worth commenting on the JIRA to get this prioritized in 
case it has not been fixed.

-Max

On 13.08.19 12:18, Jan Lukavský wrote:
> Hi Max,
>
> comments inline.
>
> On 8/13/19 12:01 PM, Maximilian Michels wrote:
> > Hi Jan,
> >
> > Just checking, do you see the same rescaling problem as described in
> > https://jira.apache.org/jira/browse/FLINK-12653 ?
> Yes.
> >
> > If so, you are most likely correct that this is due to the system state
> > that you added in your code. When I did the fix, I ran some tests to
> > check if any system state is not bound. I did not find instances but you
> > are right that we could see this issue for internal state, e.g. in
> > ReduceFnContextFactory.
> I think that there were no instances of internal state used in Flink
> Runner prior to my patch introduced internal state for sorting inputs.
> But that seems a little fragile, because it might easily change.
> >
> > Given that this is a Flink specific bug I'm not sure it warrants adding
> > a `getSystemStateTags()` method to the DoFnRunner. Also, this is error
> > prone since we have to remember to add all state there. The better
> > solution would be to eagerly register state during StateSpec creation,
> > but this would require significant code refactoring.
> I'm also not happy with adding additional generic method just because of
> one runner, but registering that during creation of StateSpec would be
> hard, as you said.
> >
> > Wouldn't it suffice to just perform an early binding in your code?
> > Additionally, we want to make sure to also revise any existing Beam code
> > paths.
> I think I might do it (although it would mean that it would be
> registered early for all runners, not just Flink).
> >
> > The issue hopefully will be fixed with Flink 1.9. Would be interesting
> > to try with the Flink 1.9 RC2:
> > https://dist.apache.org/repos/dist/dev/flink/flink-1.9.0-rc2/
>
> I was not sure from comments in the Flink JIRA, that this will be fixes
> soon. If so, I'm fine with registering just the single state I
> introduced. If this would be an issue for long term I think it would
> require some other solution.
>
> So - I will register the state(s) I have created and test that on Flink
> 1.9 when I have a little spare time. Will decide what to do next, ok?
>
> Jan
>
> >
> > Cheers,
> > Max
> >
> > On 12.08.19 19:58, Jan Lukavský wrote:
> >> I've managed to fix that by introducing (optional) method to DoFnRunner
> >> called getSystemStateTags() (default implementation returns
> >> Collection.emptyList()), and the use that list to early bind states in
> >> Flink's DoFnOperator ([1])
> >>
> >> @Max, WDYT?
> >>
> >> Jan
> >>
> >> [1]
> >> https://github.com/je-ik/beam/commit/1360fb6bd35192d443f97068c7ba2155f79e8802
> >>
> >> On 8/12/19 4:00 PM, Jan Lukavský wrote:
> >>> Hi,
> >>>
> >>> I have come across issue that is very much likely caused by [1]. The
> >>> issue is that Beam's state is (generally) created lazily, after
> >>> element is received (as Max described in the Flink's JIRA). Max also
> >>> created workaround [2], but that seems to work for user state only
> >>> (i.e. state that has been created in user code by declaring @StateId -
> >>> please correct me if I'm wrong). In my work, however, I created a
> >>> system state (that holds elements before being output, due to
> >>> @RequiresTimeSortedInput annotation, but that's probably not
> >>> important), and this state is not covered by the workaround. This
> >>> might be the case for all system states, generally, because they are
> >>> not visible, until element arrives and the state is actually created.
> >>>
> >>> Is my analysis correct? If so, would anyone have any suggestions how
> >>> to fix that?
> >>>
> >>> Jan
> >>>
> >>> [1] https://jira.apache.org/jira/browse/FLINK-12653
> >>>
> >>> [2] https://issues.apache.org/jira/browse/BEAM-7144
> >>>

Reply via email to