vinoyang created FLINK-14428:
--------------------------------

             Summary: Non-consistency key access in KeyedProcessFunction when 
use keyed state in both processElement and onTimer method
                 Key: FLINK-14428
                 URL: https://issues.apache.org/jira/browse/FLINK-14428
             Project: Flink
          Issue Type: Bug
          Components: API / DataStream
            Reporter: vinoyang


Scenario:

In {{KeyedProcessFunction}}, uses keyed state API in both {{processElement}} 
and {{onTimer}} method may cause non-consistency key access.

Analysis:

For timer, in {{InternalTimerServiceImpl}}, the key context is set to the key 
which comes from timer when calling registerXXXTimeTimer:


{code:java}
public void onProcessingTime(long time) throws Exception {
                // null out the timer in case the Triggerable calls 
registerProcessingTimeTimer()
                // inside the callback.
                nextTimer = null;

                InternalTimer<K, N> timer;

                while ((timer = processingTimeTimersQueue.peek()) != null && 
timer.getTimestamp() <= time) {
                        processingTimeTimersQueue.poll();
                        keyContext.setCurrentKey(timer.getKey());        //here
                        triggerTarget.onProcessingTime(timer);
                }

                if (timer != null && nextTimer == null) {
                        nextTimer = 
processingTimeService.registerTimer(timer.getTimestamp(), this);
                }
        }
{code}

For processElement method, in {{OneInputStreamTask}} it is called after seting 
key context:


{code:java}
                @Override
                public void emitRecord(StreamRecord<IN> record) throws 
Exception {
                        synchronized (lock) {
                                numRecordsIn.inc();
                                operator.setKeyContextElement1(record);        
//here
                                operator.processElement(record);
                        }
                }
{code}

The setCurrentKey method in the first code snippet and the 
setKeyContextElement1 method in the second code snippet are point to the same 
{{AbstractStreamOperator#setCurrentKey}} method. However, there is only one 
keyed State Backend instance. And {{AbstractStreamOperator#setCurrentKey}} will 
change the current key of keyed state backend.

So if we access keyed state API in both {{processElement}} and {{onTimer}}, we 
may get error state value, because one of these methods may change the key and 
caused non-consistency problem.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to