[ https://issues.apache.org/jira/browse/FLINK-14428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16954205#comment-16954205 ]
vinoyang commented on FLINK-14428: ---------------------------------- I read the source code again, you are right. It's not a problem. > Non-consistency key access in KeyedProcessFunction when use keyed state api > in both processElement and onTimer method > --------------------------------------------------------------------------------------------------------------------- > > Key: FLINK-14428 > URL: https://issues.apache.org/jira/browse/FLINK-14428 > Project: Flink > Issue Type: Bug > Components: API / DataStream > Reporter: vinoyang > Priority: Major > > Scenario: > In {{KeyedProcessFunction}}, uses keyed state API in both {{processElement}} > and {{onTimer}} method may cause non-consistency key access. > Analysis: > For timer, in {{InternalTimerServiceImpl}}, the key context is set to the key > which comes from timer when calling registerXXXTimeTimer: > {code:java} > public void onProcessingTime(long time) throws Exception { > // null out the timer in case the Triggerable calls > registerProcessingTimeTimer() > // inside the callback. > nextTimer = null; > InternalTimer<K, N> timer; > while ((timer = processingTimeTimersQueue.peek()) != null && > timer.getTimestamp() <= time) { > processingTimeTimersQueue.poll(); > keyContext.setCurrentKey(timer.getKey()); //here > triggerTarget.onProcessingTime(timer); > } > if (timer != null && nextTimer == null) { > nextTimer = > processingTimeService.registerTimer(timer.getTimestamp(), this); > } > } > {code} > For processElement method, in {{OneInputStreamTask}} it is called after > seting key context: > {code:java} > @Override > public void emitRecord(StreamRecord<IN> record) throws > Exception { > synchronized (lock) { > numRecordsIn.inc(); > operator.setKeyContextElement1(record); > //here > operator.processElement(record); > } > } > {code} > The setCurrentKey method in the first code snippet and the > setKeyContextElement1 method in the second code snippet are point to the same > {{AbstractStreamOperator#setCurrentKey}} method. However, they are in the > different thread and there is only one keyed State Backend instance. And > {{AbstractStreamOperator#setCurrentKey}} will change the current key of keyed > state backend. > So if we access keyed state API in both {{processElement}} and {{onTimer}}, > we may get error state value, because one of these methods may change the key > and cause non-consistency problem. -- This message was sent by Atlassian Jira (v8.3.4#803005)