[
https://issues.apache.org/jira/browse/FLINK-18464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17371624#comment-17371624
]
Roman Khachatryan commented on FLINK-18464:
-------------------------------------------
Thanks for the PR [~guxiangfly]
I have the same concerns about the performance: namespace serializer can be
provided by the user and the impact of calling its equals method on each state
access is unpredictable.
It also doesn't look like the right level of solving the problem (the problem
as I see it is that Window state is accessible from the Trigger). For example,
another state backend will not have this check.
> ClassCastException during namespace serialization for checkpoint (Heap and
> RocksDB)
> -----------------------------------------------------------------------------------
>
> Key: FLINK-18464
> URL: https://issues.apache.org/jira/browse/FLINK-18464
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Checkpointing, Runtime / State Backends
> Affects Versions: 1.9.3, 1.13.1
> Reporter: Roman Khachatryan
> Priority: Major
> Labels: pull-request-available
> Attachments: image-2021-06-21-20-06-51-323.png,
> image-2021-06-21-20-07-30-281.png, image-2021-06-21-20-07-43-246.png,
> image-2021-06-21-20-33-39-295.png, image-2021-06-23-14-34-37-703.png,
> image-2021-06-24-16-41-54-425.png, image-2021-06-24-17-51-53-734.png
>
>
> (see FLINK-23036 for error details with RocksDB)
>
> From
> [thread|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoint-failed-because-of-TimeWindow-cannot-be-cast-to-VoidNamespace-td36310.html]
> {quote}I'm using flink 1.9 on Mesos and I try to use my own trigger and
> evictor. The state is stored to memory.
> {quote}
>
>
> {code:java}
> input.setParallelism(processParallelism)
> .assignTimestampsAndWatermarks(new UETimeAssigner)
> .keyBy(_.key)
> .window(TumblingEventTimeWindows.of(Time.minutes(20)))
> .trigger(new MyTrigger)
> .evictor(new MyEvictor)
> .process(new MyFunction).setParallelism(aggregateParallelism)
> .addSink(kafkaSink).setParallelism(sinkParallelism)
> .name("kafka-record-sink"){code}
>
>
> {code:java}
> java.lang.Exception: Could not materialize checkpoint 1 for operator
> Window(TumblingEventTimeWindows(1200000), JoinTrigger, JoinEvictor,
> ScalaProcessWindowFunctionWrapper) -> Sink: kafka-record-sink (2/5).
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.concurrent.ExecutionException:
> java.lang.ClassCastException:
> org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot be cast to
> org.apache.flink.runtime.state.VoidNamespace
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> at
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
> at
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
>
> ... 3 more
> Caused by: java.lang.ClassCastException:
> org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot be cast to
> org.apache.flink.runtime.state.VoidNamespace
> at
> org.apache.flink.runtime.state.VoidNamespaceSerializer.serialize(VoidNamespaceSerializer.java:32)
> at
> org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot.writeState(CopyOnWriteStateMapSnapshot.java:114)
> at
> org.apache.flink.runtime.state.heap.AbstractStateTableSnapshot.writeStateInKeyGroup(AbstractStateTableSnapshot.java:121)
> at
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeStateInKeyGroup(CopyOnWriteStateTableSnapshot.java:37)
> at
> org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:191)
> at
> org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:158)
> at
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)
>
> ... 5 more
> {code}
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)