[ 
https://issues.apache.org/jira/browse/FLINK-18464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17365421#comment-17365421
 ] 

guxiang commented on FLINK-18464:
---------------------------------

Hi,  I ran into the same problem, and I can reproduce it precisely.

I looked at all versions of Flink in 1.13.1 and all versions had this problem.

 

The reason it appears is that State is used in Trigger and is also used in 
TimeWindow, then  it will be a bug if you try to make a checkpoint.
{code:java}
org.apache.flink.util.FlinkRuntimeException: Error while adding data to RocksDB
    at 
org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:109)
    at 
com.sankuai.grocery.crm.data.mallorg.trigger.MessageProcessOnTimeStateTrigger.onEventTime(MessageProcessOnTimeStateTrigger.java:116)
    at 
com.sankuai.grocery.crm.data.mallorg.trigger.MessageProcessOnTimeStateTrigger.onEventTime(MessageProcessOnTimeStateTrigger.java:23)
    at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.onEventTime(WindowOperator.java:944)
    at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:481)
    at 
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:302)
    at 
org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:194)
    at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:626)
    at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:197)
    at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:196)
    at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:105)
    at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:206)
    at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
    at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: 
org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot be cast to 
org.apache.flink.runtime.state.VoidNamespace
    at 
org.apache.flink.runtime.state.VoidNamespaceSerializer.serialize(VoidNamespaceSerializer.java:30)
    at 
org.apache.flink.contrib.streaming.state.RocksDBKeySerializationUtils.writeNameSpace(RocksDBKeySerializationUtils.java:78)
    at 
org.apache.flink.contrib.streaming.state.RocksDBSerializedCompositeKeyBuilder.serializeNamespace(RocksDBSerializedCompositeKeyBuilder.java:175)
    at 
org.apache.flink.contrib.streaming.state.RocksDBSerializedCompositeKeyBuilder.buildCompositeKeyNamespace(RocksDBSerializedCompositeKeyBuilder.java:112)
    at 
org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeCurrentKeyWithGroupAndNamespace(AbstractRocksDBState.java:163)
    at 
org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:106)
    ... 20 more
{code}

> ClassCastException during namespace serialization for checkpoint (Heap)
> -----------------------------------------------------------------------
>
>                 Key: FLINK-18464
>                 URL: https://issues.apache.org/jira/browse/FLINK-18464
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing, Runtime / Task
>    Affects Versions: 1.9.3
>            Reporter: Roman Khachatryan
>            Priority: Major
>
> From 
> [thread|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoint-failed-because-of-TimeWindow-cannot-be-cast-to-VoidNamespace-td36310.html]
> {quote}I'm using flink 1.9 on Mesos and I try to use my own trigger and 
> evictor. The state is stored to memory.
> {quote}
>  
>   
> {code:java}
> input.setParallelism(processParallelism)
>         .assignTimestampsAndWatermarks(new UETimeAssigner)
>         .keyBy(_.key)
>         .window(TumblingEventTimeWindows.of(Time.minutes(20)))
>         .trigger(new MyTrigger)
>         .evictor(new MyEvictor)
>         .process(new MyFunction).setParallelism(aggregateParallelism)
>         .addSink(kafkaSink).setParallelism(sinkParallelism)
>         .name("kafka-record-sink"){code}
>  
>  
> {code:java}
> java.lang.Exception: Could not materialize checkpoint 1 for operator 
> Window(TumblingEventTimeWindows(1200000), JoinTrigger, JoinEvictor, 
> ScalaProcessWindowFunctionWrapper) -> Sink: kafka-record-sink (2/5).
>      at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100)
>      at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)
>      at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>      at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>      at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.concurrent.ExecutionException: 
> java.lang.ClassCastException: 
> org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot be cast to 
> org.apache.flink.runtime.state.VoidNamespace
>      at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>      at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>      at 
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
>      at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47)
>      at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
>      
>  ... 3 more 
> Caused by: java.lang.ClassCastException: 
> org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot be cast to 
> org.apache.flink.runtime.state.VoidNamespace
>      at 
> org.apache.flink.runtime.state.VoidNamespaceSerializer.serialize(VoidNamespaceSerializer.java:32)
>      at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot.writeState(CopyOnWriteStateMapSnapshot.java:114)
>      at 
> org.apache.flink.runtime.state.heap.AbstractStateTableSnapshot.writeStateInKeyGroup(AbstractStateTableSnapshot.java:121)
>      at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeStateInKeyGroup(CopyOnWriteStateTableSnapshot.java:37)
>      at 
> org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:191)
>      at 
> org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:158)
>      at 
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
>      at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>      at 
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)
>     
>      ... 5 more
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to