[ 
https://issues.apache.org/jira/browse/FLINK-17453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17099784#comment-17099784
 ] 

Aljoscha Krettek commented on FLINK-17453:
------------------------------------------

I don't actually see how this can work. I added a test case to 
{{KryoGenericTypeSerializerTest}}. This fails, because Kryo will not initialize 
the {{PriorityQueue}} with the custom comparator, meaning that the queue won't 
be able to store the deserialized elements anymore. If it didn't fail with the 
exception you reported it would fail later because of that, I assume. 

{code:java}
        @Test
        public void testPriorityQueue(){
                Comparator<Map.Entry<String, Long>> comparator = new 
Comparator<Map.Entry<String, Long>>() {
                        @Override
                        public int compare(Map.Entry<String, Long> o1, 
Map.Entry<String, Long> o2) {
                                return o1.getValue().compareTo(o2.getValue()) 
== 0 ? o1.getKey().compareTo(o2.getKey()) :
                                                
o1.getValue().compareTo(o2.getValue());
                        }
                };
                PriorityQueue<Map.Entry<String, Long>> c = new 
PriorityQueue<>(comparator);
                Map<String, Long> map = new HashMap<>();
                map.put("hello", 1L);
                map.put("foo", 42L);
                for (Map.Entry<String, Long> entry : map.entrySet()) {
                        c.add(entry);
                }
                runTests(c);
        }

{code}

The reason for the exception you see must be somewhere in the Table 
API/Planner/Runtime implementation, from the stacktrace:
{code}
org.apache.flink.table.dataformat.BinaryGeneric.getJavaObjectFromBinaryGeneric(BinaryGeneric.java:86)
        at 
org.apache.flink.table.dataformat.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:628)
        at 
org.apache.flink.table.dataformat.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:633)
        at 
org.apache.flink.table.dataformat.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:320)
        at 
org.apache.flink.table.dataformat.DataFormatConverters$PojoConverter.toExternalImpl(DataFormatConverters.java:1293)
        at 
org.apache.flink.table.dataformat.DataFormatConverters$PojoConverter.toExternalImpl(DataFormatConverters.java:1257)
        at 
org.apache.flink.table.dataformat.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:302)
        at GroupingWindowAggsHandler$57.setAccumulators(Unknown Source)
{code}

> KyroSerializer throws IndexOutOfBoundsException type java.util.PriorityQueue
> ----------------------------------------------------------------------------
>
>                 Key: FLINK-17453
>                 URL: https://issues.apache.org/jira/browse/FLINK-17453
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Type Serialization System, Table SQL / Runtime
>    Affects Versions: 1.9.0
>            Reporter: Jiayi Liao
>            Priority: Major
>         Attachments: udaf
>
>
> We're using SQL UDAF with a {{PriorityQueue}} as {{Accumulator}}, and when 
> recovering from checkpoint, the error occurs.
> {code:java}
> 2020-04-28 22:28:18,659 INFO  
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer  - 
> IndexOutOfBoundsException type java.util.PriorityQueue source data is: [2, 0, 
> 0, 0, 0, 0, 0, 0].
> 2020-04-28 22:28:18,660 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - Attempting to fail task externally GroupWindowAggregate -> 
> Calc_select_live_id__2 -> SinkConversionToTupl -> Map -> Filter (37/40)- 
> execution # 0 (4636858426452f0a437d2f6d9564f34d).
> 2020-04-28 22:28:18,660 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - GroupWindowAggregate -> Calc_select_live_id__2 -> 
> SinkConversionToTupl -> Map -> Filter (37/40)- execution # 0 
> (4636858426452f0a437d2f6d9564f34d) switched from RUNNING to FAILED.
> org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught 
> exception while processing timer.
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:967)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:941)
>         at 
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)
>         at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: TimerException{com.esotericsoftware.kryo.KryoException: 
> java.io.EOFException: No more bytes left.}
>         at 
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284)
>         ... 7 more
> Caused by: com.esotericsoftware.kryo.KryoException: java.io.EOFException: No 
> more bytes left.
>         at 
> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:79)
>         at com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355)
>         at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:109)
>         at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>         at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:361)
>         at 
> org.apache.flink.util.InstantiationUtil.deserializeFromByteArray(InstantiationUtil.java:536)
>         at 
> org.apache.flink.table.dataformat.BinaryGeneric.getJavaObjectFromBinaryGeneric(BinaryGeneric.java:86)
>         at 
> org.apache.flink.table.dataformat.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:628)
>         at 
> org.apache.flink.table.dataformat.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:633)
>         at 
> org.apache.flink.table.dataformat.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:320)
>         at 
> org.apache.flink.table.dataformat.DataFormatConverters$PojoConverter.toExternalImpl(DataFormatConverters.java:1293)
>         at 
> org.apache.flink.table.dataformat.DataFormatConverters$PojoConverter.toExternalImpl(DataFormatConverters.java:1257)
>         at 
> org.apache.flink.table.dataformat.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:302)
>         at GroupingWindowAggsHandler$57.setAccumulators(Unknown Source)
>         at 
> org.apache.flink.table.runtime.operators.window.internal.GeneralWindowProcessFunction.getWindowAggregationResult(GeneralWindowProcessFunction.java:73)
>         at 
> org.apache.flink.table.runtime.operators.window.WindowOperator.emitWindowResult(WindowOperator.java:434)
>         at 
> org.apache.flink.table.runtime.operators.window.WindowOperator.onProcessingTime(WindowOperator.java:422)
>         at 
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
>         at 
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281)
>         ... 7 more
> Caused by: java.io.EOFException: No more bytes left.
>         ... 26 more
> {code}
> The problem happens when restoring from checkpoint. (I don't know what's 
> inside the {{PriorityQueue}} because it's already on production environment). 
> According to the logs, it seems that it's because the {{KryoSerializer}} 
> cannot successfully deserialize the {{PriorityQueue}} instance and throws an 
> {{IndexOutOfBoundsException}}. 
> The UDAF accumulator is:
> {code:java}
> public static class Acc {
> public PriorityQueue<Map.Entry<String, Long>> queue;
> }
> @Override
> public Acc createAccumulator() {
> Acc accumulator = new Acc();
> Comparator<Map.Entry<String, Long>> comparator = new 
> Comparator<Map.Entry<String, Long>>() {
>     @Override
>     public int compare(Map.Entry<String, Long> o1, Map.Entry<String, Long> 
> o2) {
>         return o1.getValue().compareTo(o2.getValue()) == 0 ? 
> o1.getKey().compareTo(o2.getKey()) :
>                 o1.getValue().compareTo(o2.getValue());
>     }
> };
> PriorityQueue<Map.Entry<String, Long>> pq = new PriorityQueue<>(comparator);
> accumulator.queue = pq;
> return accumulator;
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to