[ 
https://issues.apache.org/jira/browse/FLINK-18464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17383115#comment-17383115
 ] 

guxiang commented on FLINK-18464:
---------------------------------

[Yun Tang|https://issues.apache.org/jira/secure/ViewProfile.jspa?name=yunta]

Thanks for your  reply.   Sorry for late  replying these days, I'm a little 
busy at work

I just test the performance by a for  loop.

[https://github.com/apache/flink/pull/16273/files#diff-75c634234efe759f72c64e98d07cb8fb05317d582858a5f7f15eaf33c0c3a857]

 
{code:java}
//代码占位符
long start = System.currentTimeMillis();
for (int i = 0; i < 2000000000L; i++) {
    ValueState<Boolean> firstSeen2 = ctx.getPartitionedState(
            new ValueStateDescriptor<Boolean>("first-seen", Types.BOOLEAN)
    );
}
long end = System.currentTimeMillis();
System.out.println("the onElement test use time:"+(end-start));

{code}
They took about the same time before and after the change,and I couldn't see 
the difference.

 I  test the performance  by idea cpu profiler.

 

I find the 

 
h3. compare the  TypeSerializer

the getPartitionState take about  6166 simples.  and equals typeserializer take 
about 794 simples

 
{code:java}
//代码占位符
if (lastName != null && lastName.equals(stateDescriptor.getName()) && 
lastNamespaceSerializer != null && 
lastNamespaceSerializer.equals(namespaceSerializer)) {
    lastState.setCurrentNamespace(namespace);
    return (S) lastState;
}
{code}
 

 

!image-2021-07-19-15-20-36-431.png|width=1018,height=233!

!image-2021-07-19-15-21-04-214.png|width=769,height=125!
h3. compare the TypeSerializerClass

the getPartitionState take about  5550 simples.  no  equals method is not 
sampled

 
{code:java}
//代码占位符
if (lastName != null && lastName.equals(stateDescriptor.getName()) && 
lastNamespaceSerializerClass != null && 
lastNamespaceSerializerClass.equals(namespaceSerializer.getClass())) {
    lastState.setCurrentNamespace(namespace);
    return (S) lastState;
}{code}
 

!image-2021-07-19-14-44-59-511.png|width=1101,height=193!

 

 
h3. not compare the namespaceSerializer

the getPartitionState take about  4132 simples.   no  equals method is not 
sampled

 
{code:java}
//代码占位符
if (lastName != null && lastName.equals(stateDescriptor.getName())) {
    lastState.setCurrentNamespace(namespace);
    return (S) lastState;
}{code}
 

!image-2021-07-19-14-47-34-111.png|width=1034,height=229!

 

 

If you think this performance loss is unacceptable.

My final solution is   if the lastName result last time matches the lastName 
result this time.we use the cache result.

If the lastName doesn't match the current one.  Let's see if it has the same 
typeSerializerClass

just like so.

[https://github.com/apache/flink/pull/16273/files]

!image-2021-07-19-15-45-53-014.png|width=613,height=321!

 

It take about 4144 samples

!image-2021-07-19-15-48-23-853.png|width=1063,height=220!

 

Do you think it is  OK?  [~yunta] [~roman_khachatryan]

 

> ClassCastException during namespace serialization for checkpoint (Heap and 
> RocksDB)
> -----------------------------------------------------------------------------------
>
>                 Key: FLINK-18464
>                 URL: https://issues.apache.org/jira/browse/FLINK-18464
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing, Runtime / State Backends
>    Affects Versions: 1.9.3, 1.13.1
>            Reporter: Roman Khachatryan
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: image-2021-06-21-20-06-51-323.png, 
> image-2021-06-21-20-07-30-281.png, image-2021-06-21-20-07-43-246.png, 
> image-2021-06-21-20-33-39-295.png, image-2021-06-23-14-34-37-703.png, 
> image-2021-06-24-16-41-54-425.png, image-2021-06-24-17-51-53-734.png, 
> image-2021-07-08-14-50-12-559.png, image-2021-07-08-18-33-17-417.png, 
> image-2021-07-08-18-34-51-910.png, image-2021-07-19-14-40-17-398.png, 
> image-2021-07-19-14-44-59-511.png, image-2021-07-19-14-46-21-682.png, 
> image-2021-07-19-14-47-34-111.png, image-2021-07-19-15-20-36-431.png, 
> image-2021-07-19-15-21-04-214.png, image-2021-07-19-15-45-53-014.png, 
> image-2021-07-19-15-48-23-853.png
>
>
> (see FLINK-23036 for error details with RocksDB)
>  
> From 
> [thread|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoint-failed-because-of-TimeWindow-cannot-be-cast-to-VoidNamespace-td36310.html]
> {quote}I'm using flink 1.9 on Mesos and I try to use my own trigger and 
> evictor. The state is stored to memory.
> {quote}
>  
>   
> {code:java}
> input.setParallelism(processParallelism)
>         .assignTimestampsAndWatermarks(new UETimeAssigner)
>         .keyBy(_.key)
>         .window(TumblingEventTimeWindows.of(Time.minutes(20)))
>         .trigger(new MyTrigger)
>         .evictor(new MyEvictor)
>         .process(new MyFunction).setParallelism(aggregateParallelism)
>         .addSink(kafkaSink).setParallelism(sinkParallelism)
>         .name("kafka-record-sink"){code}
>  
>  
> {code:java}
> java.lang.Exception: Could not materialize checkpoint 1 for operator 
> Window(TumblingEventTimeWindows(1200000), JoinTrigger, JoinEvictor, 
> ScalaProcessWindowFunctionWrapper) -> Sink: kafka-record-sink (2/5).
>      at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100)
>      at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)
>      at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>      at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>      at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.concurrent.ExecutionException: 
> java.lang.ClassCastException: 
> org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot be cast to 
> org.apache.flink.runtime.state.VoidNamespace
>      at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>      at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>      at 
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
>      at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47)
>      at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
>      
>  ... 3 more 
> Caused by: java.lang.ClassCastException: 
> org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot be cast to 
> org.apache.flink.runtime.state.VoidNamespace
>      at 
> org.apache.flink.runtime.state.VoidNamespaceSerializer.serialize(VoidNamespaceSerializer.java:32)
>      at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot.writeState(CopyOnWriteStateMapSnapshot.java:114)
>      at 
> org.apache.flink.runtime.state.heap.AbstractStateTableSnapshot.writeStateInKeyGroup(AbstractStateTableSnapshot.java:121)
>      at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeStateInKeyGroup(CopyOnWriteStateTableSnapshot.java:37)
>      at 
> org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:191)
>      at 
> org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:158)
>      at 
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
>      at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>      at 
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)
>     
>      ... 5 more
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to