Hi Jan,

Just checking, do you see the same rescaling problem as described in
https://jira.apache.org/jira/browse/FLINK-12653 ?

If so, you are most likely correct that this is due to the system state
that you added in your code. When I did the fix, I ran some tests to
check if any system state is not bound. I did not find instances but you
are right that we could see this issue for internal state, e.g. in
ReduceFnContextFactory.

Given that this is a Flink specific bug I'm not sure it warrants adding
a `getSystemStateTags()` method to the DoFnRunner. Also, this is error
prone since we have to remember to add all state there. The better
solution would be to eagerly register state during StateSpec creation,
but this would require significant code refactoring.

Wouldn't it suffice to just perform an early binding in your code?
Additionally, we want to make sure to also revise any existing Beam code
paths.

The issue hopefully will be fixed with Flink 1.9. Would be interesting
to try with the Flink 1.9 RC2:
https://dist.apache.org/repos/dist/dev/flink/flink-1.9.0-rc2/

Cheers,
Max

On 12.08.19 19:58, Jan Lukavský wrote:
> I've managed to fix that by introducing (optional) method to DoFnRunner 
> called getSystemStateTags() (default implementation returns 
> Collection.emptyList()), and the use that list to early bind states in 
> Flink's DoFnOperator ([1])
> 
> @Max, WDYT?
> 
> Jan
> 
> [1] 
> https://github.com/je-ik/beam/commit/1360fb6bd35192d443f97068c7ba2155f79e8802
> 
> On 8/12/19 4:00 PM, Jan Lukavský wrote:
>> Hi,
>>
>> I have come across issue that is very much likely caused by [1]. The 
>> issue is that Beam's state is (generally) created lazily, after 
>> element is received (as Max described in the Flink's JIRA). Max also 
>> created workaround [2], but that seems to work for user state only 
>> (i.e. state that has been created in user code by declaring @StateId - 
>> please correct me if I'm wrong). In my work, however, I created a 
>> system state (that holds elements before being output, due to 
>> @RequiresTimeSortedInput annotation, but that's probably not 
>> important), and this state is not covered by the workaround. This 
>> might be the case for all system states, generally, because they are 
>> not visible, until element arrives and the state is actually created.
>>
>> Is my analysis correct? If so, would anyone have any suggestions how 
>> to fix that?
>>
>> Jan
>>
>> [1] https://jira.apache.org/jira/browse/FLINK-12653
>>
>> [2] https://issues.apache.org/jira/browse/BEAM-7144
>>

Reply via email to