Hi Max,

comments inline.

On 8/13/19 12:01 PM, Maximilian Michels wrote:
Hi Jan,

Just checking, do you see the same rescaling problem as described in
https://jira.apache.org/jira/browse/FLINK-12653 ?
Yes.

If so, you are most likely correct that this is due to the system state
that you added in your code. When I did the fix, I ran some tests to
check if any system state is not bound. I did not find instances but you
are right that we could see this issue for internal state, e.g. in
ReduceFnContextFactory.
I think that there were no instances of internal state used in Flink Runner prior to my patch introduced internal state for sorting inputs. But that seems a little fragile, because it might easily change.

Given that this is a Flink specific bug I'm not sure it warrants adding
a `getSystemStateTags()` method to the DoFnRunner. Also, this is error
prone since we have to remember to add all state there. The better
solution would be to eagerly register state during StateSpec creation,
but this would require significant code refactoring.
I'm also not happy with adding additional generic method just because of one runner, but registering that during creation of StateSpec would be hard, as you said.

Wouldn't it suffice to just perform an early binding in your code?
Additionally, we want to make sure to also revise any existing Beam code
paths.
I think I might do it (although it would mean that it would be registered early for all runners, not just Flink).

The issue hopefully will be fixed with Flink 1.9. Would be interesting
to try with the Flink 1.9 RC2:
https://dist.apache.org/repos/dist/dev/flink/flink-1.9.0-rc2/

I was not sure from comments in the Flink JIRA, that this will be fixes soon. If so, I'm fine with registering just the single state I introduced. If this would be an issue for long term I think it would require some other solution.

So - I will register the state(s) I have created and test that on Flink 1.9 when I have a little spare time. Will decide what to do next, ok?

Jan


Cheers,
Max

On 12.08.19 19:58, Jan Lukavský wrote:
I've managed to fix that by introducing (optional) method to DoFnRunner
called getSystemStateTags() (default implementation returns
Collection.emptyList()), and the use that list to early bind states in
Flink's DoFnOperator ([1])

@Max, WDYT?

Jan

[1]
https://github.com/je-ik/beam/commit/1360fb6bd35192d443f97068c7ba2155f79e8802

On 8/12/19 4:00 PM, Jan Lukavský wrote:
Hi,

I have come across issue that is very much likely caused by [1]. The
issue is that Beam's state is (generally) created lazily, after
element is received (as Max described in the Flink's JIRA). Max also
created workaround [2], but that seems to work for user state only
(i.e. state that has been created in user code by declaring @StateId -
please correct me if I'm wrong). In my work, however, I created a
system state (that holds elements before being output, due to
@RequiresTimeSortedInput annotation, but that's probably not
important), and this state is not covered by the workaround. This
might be the case for all system states, generally, because they are
not visible, until element arrives and the state is actually created.

Is my analysis correct? If so, would anyone have any suggestions how
to fix that?

Jan

[1] https://jira.apache.org/jira/browse/FLINK-12653

[2] https://issues.apache.org/jira/browse/BEAM-7144

Reply via email to