vinoyang created FLINK-14428:
--------------------------------
Summary: Non-consistency key access in KeyedProcessFunction when
use keyed state in both processElement and onTimer method
Key: FLINK-14428
URL: https://issues.apache.org/jira/browse/FLINK-14428
Project: Flink
Issue Type: Bug
Components: API / DataStream
Reporter: vinoyang
Scenario:
In {{KeyedProcessFunction}}, uses keyed state API in both {{processElement}}
and {{onTimer}} method may cause non-consistency key access.
Analysis:
For timer, in {{InternalTimerServiceImpl}}, the key context is set to the key
which comes from timer when calling registerXXXTimeTimer:
{code:java}
public void onProcessingTime(long time) throws Exception {
// null out the timer in case the Triggerable calls
registerProcessingTimeTimer()
// inside the callback.
nextTimer = null;
InternalTimer<K, N> timer;
while ((timer = processingTimeTimersQueue.peek()) != null &&
timer.getTimestamp() <= time) {
processingTimeTimersQueue.poll();
keyContext.setCurrentKey(timer.getKey()); //here
triggerTarget.onProcessingTime(timer);
}
if (timer != null && nextTimer == null) {
nextTimer =
processingTimeService.registerTimer(timer.getTimestamp(), this);
}
}
{code}
For processElement method, in {{OneInputStreamTask}} it is called after seting
key context:
{code:java}
@Override
public void emitRecord(StreamRecord<IN> record) throws
Exception {
synchronized (lock) {
numRecordsIn.inc();
operator.setKeyContextElement1(record);
//here
operator.processElement(record);
}
}
{code}
The setCurrentKey method in the first code snippet and the
setKeyContextElement1 method in the second code snippet are point to the same
{{AbstractStreamOperator#setCurrentKey}} method. However, there is only one
keyed State Backend instance. And {{AbstractStreamOperator#setCurrentKey}} will
change the current key of keyed state backend.
So if we access keyed state API in both {{processElement}} and {{onTimer}}, we
may get error state value, because one of these methods may change the key and
caused non-consistency problem.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)