[ 
https://issues.apache.org/jira/browse/FLINK-18452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17233224#comment-17233224
 ] 

Jark Wu commented on FLINK-18452:
---------------------------------

I will take this issue. 

> Flaws in RetractableTopNFunction.ComparatorWrapper#equals method prevent 
> state access after restoration
> -------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-18452
>                 URL: https://issues.apache.org/jira/browse/FLINK-18452
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 1.10.0, 1.10.1, 1.11.0
>            Reporter: Weike Dong
>            Assignee: Jark Wu
>            Priority: Major
>             Fix For: 1.12.0
>
>         Attachments: 
> c2ebeac8aadebad0dffa5cc255d45190594c5b2a84bda020dd30bf24b9169702.png
>
>
> We found that in SQL jobs using "Top-N" functionality provided by the blink 
> planner, the job state cannot be retrieved because of "incompatible" state 
> serializers (in fact they are compatible).
> The error log is displayed like below
> {panel:title=taskmanager.log}
> 2020-06-30 09:19:32.089 [Rank(strategy=[RetractStrategy], 
> rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=100], 
> partitionBy=[appkey, serverid], orderBy=[quantity DESC], select=[appkey, 
> serverid,  quantity]) (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  
> - Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], 
> rankRange=[rankStart=1, rankEnd=100], partitionBy=[appkey, serverid], 
> orderBy=[quantity DESC], select=[appkey, serverid, oid, quantity]) (1/1) 
> (bd4d2e4327efac57dc70e220b8de460b) switched from RUNNING to FAILED.
> java.lang.RuntimeException: Error while getting state
>         at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
>         at 
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144)
>         at 
> org.apache.flink.table.runtime.operators.rank.RetractableTopNFunction.open(RetractableTopNFunction.java:115)
>         at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>         at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>         at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:57)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:990)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.StateMigrationException: The new state 
> serializer cannot be incompatible.
>         at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:543)
>         at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:491)
>         at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:652)
>         at 
> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
>         at 
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
>         at 
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
>         at 
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328)
>         at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
>         at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
>         ... 13 more{panel}
>  
> After careful debugging, it is found to be an issue with the compatibility 
> check of type serializers.
>  
> In short, during checkpointing, Flink serializes _SortedMapSerializer_ by 
> creating a _SortedMapSerializerSnapshot_ object, and the original comparator 
> is encapsulated within the object (here we call it 
> _StreamExecSortComparator$579_).
>  
> At restoration, the object is read and restored as normal. However, during 
> the construction of RetractableTopNFunction instance, another Comparator is 
> provided by Flink as an argument (we call it _StreamExecSortComparator$626_), 
> and it is later used in the _ValueStateDescriptor_ which acts like a key to 
> the state store.
>  
> Here comes the problem: when the newly-restored Flink program tries to access 
> state (_getState_) through the previously mentioned _ValueStateDescriptor_, 
> the State Backend firstly detects whether the provided comparator in state 
> descriptor is compatible with the one in snapshot, eventually the logic goes 
> to the _equals_ method at _RetractableTopNFunction.ComparatorWrapper_ class.
>  
> In the equals method, here is a code snippet:
> {code:java}
> return 
> generatedRecordComparator.getClassName().equals(oGeneratedComparator.getClassName())
>  &&
>       
> generatedRecordComparator.getCode().equals(oGeneratedComparator.getCode()) &&
>       Arrays.equals(generatedRecordComparator.getReferences(), 
> oGeneratedComparator.getReferences());
> {code}
> After debugging, we found that the class name of comparator within snapshot 
> is _StreamExecSortComparator$579_, and the class name of comparator provided 
> in the new job is _StreamExecSortComparator$626_, hence this method always 
> returns false, even though actually they are indeed compatible (acts the 
> same). Also, because the code in each generator is generated independently, 
> the corresponding varaibles within the two comparators are highly likely to 
> be different (_isNullA$581_ vs _isNullA$682_).
>  
> Hence we believe that the implementation of equals method has serious flaws, 
> and should be addressed in later releases.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to