[ 
https://issues.apache.org/jira/browse/FLINK-18464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17366578#comment-17366578
 ] 

guxiang commented on FLINK-18464:
---------------------------------

I know the root cause:

the state `event-time-slot`   I use  in WindowFunction 

 
{code:java}

eventTimeSlotValueState = getRuntimeContext().getState(new 
ValueStateDescriptor<Long>("event-time-slot", Types.LONG));
{code}
 

!image-2021-06-21-20-33-39-295.png|width=926,height=138!

I also use the state `event-time-slot`  in Trigger 
{code:java}
ValueState<Long> eventTimeSlotValueState = ctx.getPartitionedState(new 
ValueStateDescriptor<Long>("event-time-slot", Long.class));
{code}
 

The two lines above will call the method below
{code:java}
AbstractKeyedStateBackend.java

@SuppressWarnings("unchecked")
@Override
public <N, S extends State> S getPartitionedState(
        final N namespace,
        final TypeSerializer<N> namespaceSerializer,
        final StateDescriptor<S, ?> stateDescriptor)
        throws Exception {

    checkNotNull(namespace, "Namespace");

    if (lastName != null && lastName.equals(stateDescriptor.getName())) {
        lastState.setCurrentNamespace(namespace);
        return (S) lastState;
    }

    InternalKvState<K, ?, ?> previous = 
keyValueStatesByName.get(stateDescriptor.getName());
    if (previous != null) {
        lastState = previous;
        lastState.setCurrentNamespace(namespace);
        lastName = stateDescriptor.getName();
        return (S) previous;
    }

    final S state = getOrCreateKeyedState(namespaceSerializer, stateDescriptor);
    final InternalKvState<K, N, ?> kvState = (InternalKvState<K, N, ?>) state;

    lastName = stateDescriptor.getName();
    lastState = kvState;
    kvState.setCurrentNamespace(namespace);

    return state;
}
{code}
the  above method will change the  namespace.   but not change the 
namespaceSerializer.  because the  state  namespaceSerializer is   final  

because it  only change  the namespace ,not change the  namespaceSerializer.   
It's causing this issue.

 

 

I thought of a solution:

1.   I want to   implement  a  method or state ,   only read the  state,  
cannot change the state value,  then  let the state .  

     Implement a getReadOnlyPartitionedState myself.  

     This provides a method for the state to view variables in the trigger or  
other ... 

 

> ClassCastException during namespace serialization for checkpoint (Heap and 
> RocksDB)
> -----------------------------------------------------------------------------------
>
>                 Key: FLINK-18464
>                 URL: https://issues.apache.org/jira/browse/FLINK-18464
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing, Runtime / State Backends
>    Affects Versions: 1.9.3, 1.13.1
>            Reporter: Roman Khachatryan
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: image-2021-06-21-20-06-51-323.png, 
> image-2021-06-21-20-07-30-281.png, image-2021-06-21-20-07-43-246.png, 
> image-2021-06-21-20-33-39-295.png
>
>
> (see FLINK-23036 for error details with RocksDB)
>  
> From 
> [thread|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoint-failed-because-of-TimeWindow-cannot-be-cast-to-VoidNamespace-td36310.html]
> {quote}I'm using flink 1.9 on Mesos and I try to use my own trigger and 
> evictor. The state is stored to memory.
> {quote}
>  
>   
> {code:java}
> input.setParallelism(processParallelism)
>         .assignTimestampsAndWatermarks(new UETimeAssigner)
>         .keyBy(_.key)
>         .window(TumblingEventTimeWindows.of(Time.minutes(20)))
>         .trigger(new MyTrigger)
>         .evictor(new MyEvictor)
>         .process(new MyFunction).setParallelism(aggregateParallelism)
>         .addSink(kafkaSink).setParallelism(sinkParallelism)
>         .name("kafka-record-sink"){code}
>  
>  
> {code:java}
> java.lang.Exception: Could not materialize checkpoint 1 for operator 
> Window(TumblingEventTimeWindows(1200000), JoinTrigger, JoinEvictor, 
> ScalaProcessWindowFunctionWrapper) -> Sink: kafka-record-sink (2/5).
>      at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100)
>      at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)
>      at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>      at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>      at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.concurrent.ExecutionException: 
> java.lang.ClassCastException: 
> org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot be cast to 
> org.apache.flink.runtime.state.VoidNamespace
>      at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>      at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>      at 
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
>      at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47)
>      at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
>      
>  ... 3 more 
> Caused by: java.lang.ClassCastException: 
> org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot be cast to 
> org.apache.flink.runtime.state.VoidNamespace
>      at 
> org.apache.flink.runtime.state.VoidNamespaceSerializer.serialize(VoidNamespaceSerializer.java:32)
>      at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot.writeState(CopyOnWriteStateMapSnapshot.java:114)
>      at 
> org.apache.flink.runtime.state.heap.AbstractStateTableSnapshot.writeStateInKeyGroup(AbstractStateTableSnapshot.java:121)
>      at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeStateInKeyGroup(CopyOnWriteStateTableSnapshot.java:37)
>      at 
> org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:191)
>      at 
> org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:158)
>      at 
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
>      at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>      at 
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)
>     
>      ... 5 more
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to