Weike Dong commented on FLINK-18452:

One important thing to note here is backward compatibility. For example, for 
_GeneratedRecordComparator_ instances created in current or earlier Flink 
versions, AFAIK they do not have the proper meta info needed to compare with 
the new one with the meta info.

In order to maintain compatibility, a hacky approach is to extract relevant 
fields from the generated code text, however, this is error-prone and could 
possibly only be used as a fallback approach if no other methods are available.

Or maybe we could add a new class (like _GeneratedRecordComparatorV2_) with the 
required meta info, and "migrate" the current implementation to the new one if 
needed. In this way, we could enumerate all of the comparator code generation 
implementations in existing Flink versions, and provide a robust migration plan.

> Flaws in RetractableTopNFunction.ComparatorWrapper#equals method prevent 
> state access after restoration
> -------------------------------------------------------------------------------------------------------
>                 Key: FLINK-18452
>                 URL: https://issues.apache.org/jira/browse/FLINK-18452
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 1.10.0, 1.10.1, 1.11.0
>            Reporter: Weike Dong
>            Priority: Major
>         Attachments: 
> c2ebeac8aadebad0dffa5cc255d45190594c5b2a84bda020dd30bf24b9169702.png
> We found that in SQL jobs using "Top-N" functionality provided by the blink 
> planner, the job state cannot be retrieved because of "incompatible" state 
> serializers (in fact they are compatible).
> The error log is displayed like below
> {panel:title=taskmanager.log}
> 2020-06-30 09:19:32.089 [Rank(strategy=[RetractStrategy], 
> rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=100], 
> partitionBy=[appkey, serverid], orderBy=[quantity DESC], select=[appkey, 
> serverid,  quantity]) (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  
> - Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], 
> rankRange=[rankStart=1, rankEnd=100], partitionBy=[appkey, serverid], 
> orderBy=[quantity DESC], select=[appkey, serverid, oid, quantity]) (1/1) 
> (bd4d2e4327efac57dc70e220b8de460b) switched from RUNNING to FAILED.
> java.lang.RuntimeException: Error while getting state
>         at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
>         at 
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144)
>         at 
> org.apache.flink.table.runtime.operators.rank.RetractableTopNFunction.open(RetractableTopNFunction.java:115)
>         at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>         at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>         at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:57)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:990)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.StateMigrationException: The new state 
> serializer cannot be incompatible.
>         at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:543)
>         at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:491)
>         at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:652)
>         at 
> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
>         at 
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
>         at 
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
>         at 
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328)
>         at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
>         at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
>         ... 13 more{panel}
> After careful debugging, it is found to be an issue with the compatibility 
> check of type serializers.
> In short, during checkpointing, Flink serializes _SortedMapSerializer_ by 
> creating a _SortedMapSerializerSnapshot_ object, and the original comparator 
> is encapsulated within the object (here we call it 
> _StreamExecSortComparator$579_).
> At restoration, the object is read and restored as normal. However, during 
> the construction of RetractableTopNFunction instance, another Comparator is 
> provided by Flink as an argument (we call it _StreamExecSortComparator$626_), 
> and it is later used in the _ValueStateDescriptor_ which acts like a key to 
> the state store.
> Here comes the problem: when the newly-restored Flink program tries to access 
> state (_getState_) through the previously mentioned _ValueStateDescriptor_, 
> the State Backend firstly detects whether the provided comparator in state 
> descriptor is compatible with the one in snapshot, eventually the logic goes 
> to the _equals_ method at _RetractableTopNFunction.ComparatorWrapper_ class.
> In the equals method, here is a code snippet:
> {code:java}
> return 
> generatedRecordComparator.getClassName().equals(oGeneratedComparator.getClassName())
>  &&
> generatedRecordComparator.getCode().equals(oGeneratedComparator.getCode()) &&
>       Arrays.equals(generatedRecordComparator.getReferences(), 
> oGeneratedComparator.getReferences());
> {code}
> After debugging, we found that the class name of comparator within snapshot 
> is _StreamExecSortComparator$579_, and the class name of comparator provided 
> in the new job is _StreamExecSortComparator$626_, hence this method always 
> returns false, even though actually they are indeed compatible (acts the 
> same). Also, because the code in each generator is generated independently, 
> the corresponding varaibles within the two comparators are highly likely to 
> be different (_isNullA$581_ vs _isNullA$682_).
> Hence we believe that the implementation of equals method has serious flaws, 
> and should be addressed in later releases.

This message was sent by Atlassian Jira

Reply via email to