[ 
https://issues.apache.org/jira/browse/FLINK-18464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17371934#comment-17371934
 ] 

guxiang commented on FLINK-18464:
---------------------------------

Hi [~roman_khachatryan] :

Thanks for your reply.

I understand your performance concerns, but I don't think it will have a big 
impact on performance for now.

On the one hand I added caching and it didn't take too much performance away. 
On the other hand I think AbstractKeyedStateBackend# getPartitionedState method 
in most cases will only be invoked in the open method ( Can you share with me 
some examples of getPartitionedState being called frequently.  Thank your very 
much:D ). 

In addition, do you have any good test cases for AbstractKeyedStateBackend# 
getPartitionedState performance testing.

 

 

the following  reply this sentence:
{code:java}
It also doesn't look like the right level of solving the problem (the problem 
as I see it is that Window state is accessible from the Trigger). For example, 
another state backend will not have this check.{code}
Just from the code logic of AbstractKeyedStateBackend# getPartitionedState, 
this error does not occur only in case  Window state is accessible from the 
Trigger. But in all the cases where flink  call getPartitionedState, may be you 
are right.

In addition, not only Window state  accessible from the Trigger will  lead to 
this error , but also  Trigger's  state  accessible from window Can lead to 
this error.  I don't think it's very reasonable to just check the state of the 
window. 

> ClassCastException during namespace serialization for checkpoint (Heap and 
> RocksDB)
> -----------------------------------------------------------------------------------
>
>                 Key: FLINK-18464
>                 URL: https://issues.apache.org/jira/browse/FLINK-18464
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing, Runtime / State Backends
>    Affects Versions: 1.9.3, 1.13.1
>            Reporter: Roman Khachatryan
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: image-2021-06-21-20-06-51-323.png, 
> image-2021-06-21-20-07-30-281.png, image-2021-06-21-20-07-43-246.png, 
> image-2021-06-21-20-33-39-295.png, image-2021-06-23-14-34-37-703.png, 
> image-2021-06-24-16-41-54-425.png, image-2021-06-24-17-51-53-734.png
>
>
> (see FLINK-23036 for error details with RocksDB)
>  
> From 
> [thread|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoint-failed-because-of-TimeWindow-cannot-be-cast-to-VoidNamespace-td36310.html]
> {quote}I'm using flink 1.9 on Mesos and I try to use my own trigger and 
> evictor. The state is stored to memory.
> {quote}
>  
>   
> {code:java}
> input.setParallelism(processParallelism)
>         .assignTimestampsAndWatermarks(new UETimeAssigner)
>         .keyBy(_.key)
>         .window(TumblingEventTimeWindows.of(Time.minutes(20)))
>         .trigger(new MyTrigger)
>         .evictor(new MyEvictor)
>         .process(new MyFunction).setParallelism(aggregateParallelism)
>         .addSink(kafkaSink).setParallelism(sinkParallelism)
>         .name("kafka-record-sink"){code}
>  
>  
> {code:java}
> java.lang.Exception: Could not materialize checkpoint 1 for operator 
> Window(TumblingEventTimeWindows(1200000), JoinTrigger, JoinEvictor, 
> ScalaProcessWindowFunctionWrapper) -> Sink: kafka-record-sink (2/5).
>      at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100)
>      at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)
>      at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>      at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>      at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.concurrent.ExecutionException: 
> java.lang.ClassCastException: 
> org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot be cast to 
> org.apache.flink.runtime.state.VoidNamespace
>      at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>      at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>      at 
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
>      at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47)
>      at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
>      
>  ... 3 more 
> Caused by: java.lang.ClassCastException: 
> org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot be cast to 
> org.apache.flink.runtime.state.VoidNamespace
>      at 
> org.apache.flink.runtime.state.VoidNamespaceSerializer.serialize(VoidNamespaceSerializer.java:32)
>      at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot.writeState(CopyOnWriteStateMapSnapshot.java:114)
>      at 
> org.apache.flink.runtime.state.heap.AbstractStateTableSnapshot.writeStateInKeyGroup(AbstractStateTableSnapshot.java:121)
>      at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeStateInKeyGroup(CopyOnWriteStateTableSnapshot.java:37)
>      at 
> org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:191)
>      at 
> org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:158)
>      at 
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
>      at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>      at 
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)
>     
>      ... 5 more
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to