[
https://issues.apache.org/jira/browse/FLINK-18452?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Weike Dong updated FLINK-18452:
-------------------------------
Fix Version/s: 1.12.0
> Flaws in RetractableTopNFunction.ComparatorWrapper#equals method prevent
> state access after restoration
> -------------------------------------------------------------------------------------------------------
>
> Key: FLINK-18452
> URL: https://issues.apache.org/jira/browse/FLINK-18452
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Affects Versions: 1.10.0, 1.10.1, 1.11.0
> Reporter: Weike Dong
> Assignee: Weike Dong
> Priority: Major
> Fix For: 1.12.0
>
> Attachments:
> c2ebeac8aadebad0dffa5cc255d45190594c5b2a84bda020dd30bf24b9169702.png
>
>
> We found that in SQL jobs using "Top-N" functionality provided by the blink
> planner, the job state cannot be retrieved because of "incompatible" state
> serializers (in fact they are compatible).
> The error log is displayed like below
> {panel:title=taskmanager.log}
> 2020-06-30 09:19:32.089 [Rank(strategy=[RetractStrategy],
> rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=100],
> partitionBy=[appkey, serverid], orderBy=[quantity DESC], select=[appkey,
> serverid, quantity]) (1/1)] INFO org.apache.flink.runtime.taskmanager.Task
> - Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER],
> rankRange=[rankStart=1, rankEnd=100], partitionBy=[appkey, serverid],
> orderBy=[quantity DESC], select=[appkey, serverid, oid, quantity]) (1/1)
> (bd4d2e4327efac57dc70e220b8de460b) switched from RUNNING to FAILED.
> java.lang.RuntimeException: Error while getting state
> at
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
> at
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144)
> at
> org.apache.flink.table.runtime.operators.rank.RetractableTopNFunction.open(RetractableTopNFunction.java:115)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:57)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:990)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.StateMigrationException: The new state
> serializer cannot be incompatible.
> at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:543)
> at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:491)
> at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:652)
> at
> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
> at
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
> at
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
> at
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328)
> at
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
> at
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
> ... 13 more{panel}
>
> After careful debugging, it is found to be an issue with the compatibility
> check of type serializers.
>
> In short, during checkpointing, Flink serializes _SortedMapSerializer_ by
> creating a _SortedMapSerializerSnapshot_ object, and the original comparator
> is encapsulated within the object (here we call it
> _StreamExecSortComparator$579_).
>
> At restoration, the object is read and restored as normal. However, during
> the construction of RetractableTopNFunction instance, another Comparator is
> provided by Flink as an argument (we call it _StreamExecSortComparator$626_),
> and it is later used in the _ValueStateDescriptor_ which acts like a key to
> the state store.
>
> Here comes the problem: when the newly-restored Flink program tries to access
> state (_getState_) through the previously mentioned _ValueStateDescriptor_,
> the State Backend firstly detects whether the provided comparator in state
> descriptor is compatible with the one in snapshot, eventually the logic goes
> to the _equals_ method at _RetractableTopNFunction.ComparatorWrapper_ class.
>
> In the equals method, here is a code snippet:
> {code:java}
> return
> generatedRecordComparator.getClassName().equals(oGeneratedComparator.getClassName())
> &&
>
> generatedRecordComparator.getCode().equals(oGeneratedComparator.getCode()) &&
> Arrays.equals(generatedRecordComparator.getReferences(),
> oGeneratedComparator.getReferences());
> {code}
> After debugging, we found that the class name of comparator within snapshot
> is _StreamExecSortComparator$579_, and the class name of comparator provided
> in the new job is _StreamExecSortComparator$626_, hence this method always
> returns false, even though actually they are indeed compatible (acts the
> same). Also, because the code in each generator is generated independently,
> the corresponding varaibles within the two comparators are highly likely to
> be different (_isNullA$581_ vs _isNullA$682_).
>
> Hence we believe that the implementation of equals method has serious flaws,
> and should be addressed in later releases.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)