[ 
https://issues.apache.org/jira/browse/FLINK-18464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17368734#comment-17368734
 ] 

guxiang commented on FLINK-18464:
---------------------------------

Hi  [Yun 
Tang|https://github.com/apache/flink/pull/16273/files#diff-75c634234efe759f72c64e98d07cb8fb05317d582858a5f7f15eaf33c0c3a857]

I think we can  also  cache the  namespaceSerializer,  when  we  find  the  
stateDescriptor name  equals to  the cache  lastname  and  namespaceSerializer  
 equals to  the cache  lastNamespaceSerializer. 

we use the cached  lastState.  

just like so .  Do you think it is  OK?

https://github.com/apache/flink/pull/16273/files#diff-75c634234efe759f72c64e98d07cb8fb05317d582858a5f7f15eaf33c0c3a857

!image-2021-06-24-17-51-53-734.png|width=720,height=317!

> ClassCastException during namespace serialization for checkpoint (Heap and 
> RocksDB)
> -----------------------------------------------------------------------------------
>
>                 Key: FLINK-18464
>                 URL: https://issues.apache.org/jira/browse/FLINK-18464
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing, Runtime / State Backends
>    Affects Versions: 1.9.3, 1.13.1
>            Reporter: Roman Khachatryan
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: image-2021-06-21-20-06-51-323.png, 
> image-2021-06-21-20-07-30-281.png, image-2021-06-21-20-07-43-246.png, 
> image-2021-06-21-20-33-39-295.png, image-2021-06-23-14-34-37-703.png, 
> image-2021-06-24-16-41-54-425.png, image-2021-06-24-17-51-53-734.png
>
>
> (see FLINK-23036 for error details with RocksDB)
>  
> From 
> [thread|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoint-failed-because-of-TimeWindow-cannot-be-cast-to-VoidNamespace-td36310.html]
> {quote}I'm using flink 1.9 on Mesos and I try to use my own trigger and 
> evictor. The state is stored to memory.
> {quote}
>  
>   
> {code:java}
> input.setParallelism(processParallelism)
>         .assignTimestampsAndWatermarks(new UETimeAssigner)
>         .keyBy(_.key)
>         .window(TumblingEventTimeWindows.of(Time.minutes(20)))
>         .trigger(new MyTrigger)
>         .evictor(new MyEvictor)
>         .process(new MyFunction).setParallelism(aggregateParallelism)
>         .addSink(kafkaSink).setParallelism(sinkParallelism)
>         .name("kafka-record-sink"){code}
>  
>  
> {code:java}
> java.lang.Exception: Could not materialize checkpoint 1 for operator 
> Window(TumblingEventTimeWindows(1200000), JoinTrigger, JoinEvictor, 
> ScalaProcessWindowFunctionWrapper) -> Sink: kafka-record-sink (2/5).
>      at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100)
>      at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)
>      at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>      at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>      at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.concurrent.ExecutionException: 
> java.lang.ClassCastException: 
> org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot be cast to 
> org.apache.flink.runtime.state.VoidNamespace
>      at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>      at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>      at 
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
>      at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47)
>      at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
>      
>  ... 3 more 
> Caused by: java.lang.ClassCastException: 
> org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot be cast to 
> org.apache.flink.runtime.state.VoidNamespace
>      at 
> org.apache.flink.runtime.state.VoidNamespaceSerializer.serialize(VoidNamespaceSerializer.java:32)
>      at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot.writeState(CopyOnWriteStateMapSnapshot.java:114)
>      at 
> org.apache.flink.runtime.state.heap.AbstractStateTableSnapshot.writeStateInKeyGroup(AbstractStateTableSnapshot.java:121)
>      at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeStateInKeyGroup(CopyOnWriteStateTableSnapshot.java:37)
>      at 
> org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:191)
>      at 
> org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:158)
>      at 
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
>      at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>      at 
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)
>     
>      ... 5 more
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to