[ 
https://issues.apache.org/jira/browse/FLINK-14428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16954205#comment-16954205
 ] 

vinoyang commented on FLINK-14428:
----------------------------------

I read the source code again, you are right. It's not a problem.

> Non-consistency key access in KeyedProcessFunction when use keyed state api 
> in both processElement and onTimer method
> ---------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-14428
>                 URL: https://issues.apache.org/jira/browse/FLINK-14428
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream
>            Reporter: vinoyang
>            Priority: Major
>
> Scenario:
> In {{KeyedProcessFunction}}, uses keyed state API in both {{processElement}} 
> and {{onTimer}} method may cause non-consistency key access.
> Analysis:
> For timer, in {{InternalTimerServiceImpl}}, the key context is set to the key 
> which comes from timer when calling registerXXXTimeTimer:
> {code:java}
> public void onProcessingTime(long time) throws Exception {
>               // null out the timer in case the Triggerable calls 
> registerProcessingTimeTimer()
>               // inside the callback.
>               nextTimer = null;
>               InternalTimer<K, N> timer;
>               while ((timer = processingTimeTimersQueue.peek()) != null && 
> timer.getTimestamp() <= time) {
>                       processingTimeTimersQueue.poll();
>                       keyContext.setCurrentKey(timer.getKey());        //here
>                       triggerTarget.onProcessingTime(timer);
>               }
>               if (timer != null && nextTimer == null) {
>                       nextTimer = 
> processingTimeService.registerTimer(timer.getTimestamp(), this);
>               }
>       }
> {code}
> For processElement method, in {{OneInputStreamTask}} it is called after 
> seting key context:
> {code:java}
>               @Override
>               public void emitRecord(StreamRecord<IN> record) throws 
> Exception {
>                       synchronized (lock) {
>                               numRecordsIn.inc();
>                               operator.setKeyContextElement1(record);        
> //here
>                               operator.processElement(record);
>                       }
>               }
> {code}
> The setCurrentKey method in the first code snippet and the 
> setKeyContextElement1 method in the second code snippet are point to the same 
> {{AbstractStreamOperator#setCurrentKey}} method. However, they are in the 
> different thread and there is only one keyed State Backend instance. And 
> {{AbstractStreamOperator#setCurrentKey}} will change the current key of keyed 
> state backend.
> So if we access keyed state API in both {{processElement}} and {{onTimer}}, 
> we may get error state value, because one of these methods may change the key 
> and cause non-consistency problem.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to