[
https://issues.apache.org/jira/browse/FLINK-14428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Aljoscha Krettek closed FLINK-14428.
------------------------------------
Resolution: Not A Problem
Please re-open if you think there is something wrong with the
lock/synchronization.
> Non-consistency key access in KeyedProcessFunction when use keyed state api
> in both processElement and onTimer method
> ---------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-14428
> URL: https://issues.apache.org/jira/browse/FLINK-14428
> Project: Flink
> Issue Type: Bug
> Components: API / DataStream
> Reporter: vinoyang
> Priority: Major
>
> Scenario:
> In {{KeyedProcessFunction}}, uses keyed state API in both {{processElement}}
> and {{onTimer}} method may cause non-consistency key access.
> Analysis:
> For timer, in {{InternalTimerServiceImpl}}, the key context is set to the key
> which comes from timer when calling registerXXXTimeTimer:
> {code:java}
> public void onProcessingTime(long time) throws Exception {
> // null out the timer in case the Triggerable calls
> registerProcessingTimeTimer()
> // inside the callback.
> nextTimer = null;
> InternalTimer<K, N> timer;
> while ((timer = processingTimeTimersQueue.peek()) != null &&
> timer.getTimestamp() <= time) {
> processingTimeTimersQueue.poll();
> keyContext.setCurrentKey(timer.getKey()); //here
> triggerTarget.onProcessingTime(timer);
> }
> if (timer != null && nextTimer == null) {
> nextTimer =
> processingTimeService.registerTimer(timer.getTimestamp(), this);
> }
> }
> {code}
> For processElement method, in {{OneInputStreamTask}} it is called after
> seting key context:
> {code:java}
> @Override
> public void emitRecord(StreamRecord<IN> record) throws
> Exception {
> synchronized (lock) {
> numRecordsIn.inc();
> operator.setKeyContextElement1(record);
> //here
> operator.processElement(record);
> }
> }
> {code}
> The setCurrentKey method in the first code snippet and the
> setKeyContextElement1 method in the second code snippet are point to the same
> {{AbstractStreamOperator#setCurrentKey}} method. However, they are in the
> different thread and there is only one keyed State Backend instance. And
> {{AbstractStreamOperator#setCurrentKey}} will change the current key of keyed
> state backend.
> So if we access keyed state API in both {{processElement}} and {{onTimer}},
> we may get error state value, because one of these methods may change the key
> and cause non-consistency problem.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)