[
https://issues.apache.org/jira/browse/FLINK-18464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17368746#comment-17368746
]
Yun Tang commented on FLINK-18464:
----------------------------------
[~guxiangfly] I think this might be a possible way. And we must ensure that the
last state and the last namespace point to the same one, in case that last
state lost the connectionship with last namespace serializer.
What do others think of this idea? [~roman_khachatryan] [~sjwiesman]. If
agreed, I could assign this ticket to [~guxiangfly] and help review related PR.
> ClassCastException during namespace serialization for checkpoint (Heap and
> RocksDB)
> -----------------------------------------------------------------------------------
>
> Key: FLINK-18464
> URL: https://issues.apache.org/jira/browse/FLINK-18464
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Checkpointing, Runtime / State Backends
> Affects Versions: 1.9.3, 1.13.1
> Reporter: Roman Khachatryan
> Priority: Major
> Labels: pull-request-available
> Attachments: image-2021-06-21-20-06-51-323.png,
> image-2021-06-21-20-07-30-281.png, image-2021-06-21-20-07-43-246.png,
> image-2021-06-21-20-33-39-295.png, image-2021-06-23-14-34-37-703.png,
> image-2021-06-24-16-41-54-425.png, image-2021-06-24-17-51-53-734.png
>
>
> (see FLINK-23036 for error details with RocksDB)
>
> From
> [thread|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoint-failed-because-of-TimeWindow-cannot-be-cast-to-VoidNamespace-td36310.html]
> {quote}I'm using flink 1.9 on Mesos and I try to use my own trigger and
> evictor. The state is stored to memory.
> {quote}
>
>
> {code:java}
> input.setParallelism(processParallelism)
> .assignTimestampsAndWatermarks(new UETimeAssigner)
> .keyBy(_.key)
> .window(TumblingEventTimeWindows.of(Time.minutes(20)))
> .trigger(new MyTrigger)
> .evictor(new MyEvictor)
> .process(new MyFunction).setParallelism(aggregateParallelism)
> .addSink(kafkaSink).setParallelism(sinkParallelism)
> .name("kafka-record-sink"){code}
>
>
> {code:java}
> java.lang.Exception: Could not materialize checkpoint 1 for operator
> Window(TumblingEventTimeWindows(1200000), JoinTrigger, JoinEvictor,
> ScalaProcessWindowFunctionWrapper) -> Sink: kafka-record-sink (2/5).
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.concurrent.ExecutionException:
> java.lang.ClassCastException:
> org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot be cast to
> org.apache.flink.runtime.state.VoidNamespace
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> at
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
> at
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
>
> ... 3 more
> Caused by: java.lang.ClassCastException:
> org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot be cast to
> org.apache.flink.runtime.state.VoidNamespace
> at
> org.apache.flink.runtime.state.VoidNamespaceSerializer.serialize(VoidNamespaceSerializer.java:32)
> at
> org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot.writeState(CopyOnWriteStateMapSnapshot.java:114)
> at
> org.apache.flink.runtime.state.heap.AbstractStateTableSnapshot.writeStateInKeyGroup(AbstractStateTableSnapshot.java:121)
> at
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeStateInKeyGroup(CopyOnWriteStateTableSnapshot.java:37)
> at
> org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:191)
> at
> org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:158)
> at
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)
>
> ... 5 more
> {code}
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)