Sounds good. Might be worth commenting on the JIRA to get this prioritized in case it has not been fixed.
-Max On 13.08.19 12:18, Jan Lukavský wrote: > Hi Max, > > comments inline. > > On 8/13/19 12:01 PM, Maximilian Michels wrote: > > Hi Jan, > > > > Just checking, do you see the same rescaling problem as described in > > https://jira.apache.org/jira/browse/FLINK-12653 ? > Yes. > > > > If so, you are most likely correct that this is due to the system state > > that you added in your code. When I did the fix, I ran some tests to > > check if any system state is not bound. I did not find instances but you > > are right that we could see this issue for internal state, e.g. in > > ReduceFnContextFactory. > I think that there were no instances of internal state used in Flink > Runner prior to my patch introduced internal state for sorting inputs. > But that seems a little fragile, because it might easily change. > > > > Given that this is a Flink specific bug I'm not sure it warrants adding > > a `getSystemStateTags()` method to the DoFnRunner. Also, this is error > > prone since we have to remember to add all state there. The better > > solution would be to eagerly register state during StateSpec creation, > > but this would require significant code refactoring. > I'm also not happy with adding additional generic method just because of > one runner, but registering that during creation of StateSpec would be > hard, as you said. > > > > Wouldn't it suffice to just perform an early binding in your code? > > Additionally, we want to make sure to also revise any existing Beam code > > paths. > I think I might do it (although it would mean that it would be > registered early for all runners, not just Flink). > > > > The issue hopefully will be fixed with Flink 1.9. Would be interesting > > to try with the Flink 1.9 RC2: > > https://dist.apache.org/repos/dist/dev/flink/flink-1.9.0-rc2/ > > I was not sure from comments in the Flink JIRA, that this will be fixes > soon. If so, I'm fine with registering just the single state I > introduced. If this would be an issue for long term I think it would > require some other solution. > > So - I will register the state(s) I have created and test that on Flink > 1.9 when I have a little spare time. Will decide what to do next, ok? > > Jan > > > > > Cheers, > > Max > > > > On 12.08.19 19:58, Jan Lukavský wrote: > >> I've managed to fix that by introducing (optional) method to DoFnRunner > >> called getSystemStateTags() (default implementation returns > >> Collection.emptyList()), and the use that list to early bind states in > >> Flink's DoFnOperator ([1]) > >> > >> @Max, WDYT? > >> > >> Jan > >> > >> [1] > >> https://github.com/je-ik/beam/commit/1360fb6bd35192d443f97068c7ba2155f79e8802 > >> > >> On 8/12/19 4:00 PM, Jan Lukavský wrote: > >>> Hi, > >>> > >>> I have come across issue that is very much likely caused by [1]. The > >>> issue is that Beam's state is (generally) created lazily, after > >>> element is received (as Max described in the Flink's JIRA). Max also > >>> created workaround [2], but that seems to work for user state only > >>> (i.e. state that has been created in user code by declaring @StateId - > >>> please correct me if I'm wrong). In my work, however, I created a > >>> system state (that holds elements before being output, due to > >>> @RequiresTimeSortedInput annotation, but that's probably not > >>> important), and this state is not covered by the workaround. This > >>> might be the case for all system states, generally, because they are > >>> not visible, until element arrives and the state is actually created. > >>> > >>> Is my analysis correct? If so, would anyone have any suggestions how > >>> to fix that? > >>> > >>> Jan > >>> > >>> [1] https://jira.apache.org/jira/browse/FLINK-12653 > >>> > >>> [2] https://issues.apache.org/jira/browse/BEAM-7144 > >>>
