[
https://issues.apache.org/jira/browse/FLINK-17453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17099784#comment-17099784
]
Aljoscha Krettek commented on FLINK-17453:
------------------------------------------
I don't actually see how this can work. I added a test case to
{{KryoGenericTypeSerializerTest}}. This fails, because Kryo will not initialize
the {{PriorityQueue}} with the custom comparator, meaning that the queue won't
be able to store the deserialized elements anymore. If it didn't fail with the
exception you reported it would fail later because of that, I assume.
{code:java}
@Test
public void testPriorityQueue(){
Comparator<Map.Entry<String, Long>> comparator = new
Comparator<Map.Entry<String, Long>>() {
@Override
public int compare(Map.Entry<String, Long> o1,
Map.Entry<String, Long> o2) {
return o1.getValue().compareTo(o2.getValue())
== 0 ? o1.getKey().compareTo(o2.getKey()) :
o1.getValue().compareTo(o2.getValue());
}
};
PriorityQueue<Map.Entry<String, Long>> c = new
PriorityQueue<>(comparator);
Map<String, Long> map = new HashMap<>();
map.put("hello", 1L);
map.put("foo", 42L);
for (Map.Entry<String, Long> entry : map.entrySet()) {
c.add(entry);
}
runTests(c);
}
{code}
The reason for the exception you see must be somewhere in the Table
API/Planner/Runtime implementation, from the stacktrace:
{code}
org.apache.flink.table.dataformat.BinaryGeneric.getJavaObjectFromBinaryGeneric(BinaryGeneric.java:86)
at
org.apache.flink.table.dataformat.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:628)
at
org.apache.flink.table.dataformat.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:633)
at
org.apache.flink.table.dataformat.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:320)
at
org.apache.flink.table.dataformat.DataFormatConverters$PojoConverter.toExternalImpl(DataFormatConverters.java:1293)
at
org.apache.flink.table.dataformat.DataFormatConverters$PojoConverter.toExternalImpl(DataFormatConverters.java:1257)
at
org.apache.flink.table.dataformat.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:302)
at GroupingWindowAggsHandler$57.setAccumulators(Unknown Source)
{code}
> KyroSerializer throws IndexOutOfBoundsException type java.util.PriorityQueue
> ----------------------------------------------------------------------------
>
> Key: FLINK-17453
> URL: https://issues.apache.org/jira/browse/FLINK-17453
> Project: Flink
> Issue Type: Bug
> Components: API / Type Serialization System, Table SQL / Runtime
> Affects Versions: 1.9.0
> Reporter: Jiayi Liao
> Priority: Major
> Attachments: udaf
>
>
> We're using SQL UDAF with a {{PriorityQueue}} as {{Accumulator}}, and when
> recovering from checkpoint, the error occurs.
> {code:java}
> 2020-04-28 22:28:18,659 INFO
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer -
> IndexOutOfBoundsException type java.util.PriorityQueue source data is: [2, 0,
> 0, 0, 0, 0, 0, 0].
> 2020-04-28 22:28:18,660 INFO org.apache.flink.runtime.taskmanager.Task
> - Attempting to fail task externally GroupWindowAggregate ->
> Calc_select_live_id__2 -> SinkConversionToTupl -> Map -> Filter (37/40)-
> execution # 0 (4636858426452f0a437d2f6d9564f34d).
> 2020-04-28 22:28:18,660 INFO org.apache.flink.runtime.taskmanager.Task
> - GroupWindowAggregate -> Calc_select_live_id__2 ->
> SinkConversionToTupl -> Map -> Filter (37/40)- execution # 0
> (4636858426452f0a437d2f6d9564f34d) switched from RUNNING to FAILED.
> org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught
> exception while processing timer.
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:967)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:941)
> at
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: TimerException{com.esotericsoftware.kryo.KryoException:
> java.io.EOFException: No more bytes left.}
> at
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284)
> ... 7 more
> Caused by: com.esotericsoftware.kryo.KryoException: java.io.EOFException: No
> more bytes left.
> at
> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:79)
> at com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355)
> at
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:109)
> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:361)
> at
> org.apache.flink.util.InstantiationUtil.deserializeFromByteArray(InstantiationUtil.java:536)
> at
> org.apache.flink.table.dataformat.BinaryGeneric.getJavaObjectFromBinaryGeneric(BinaryGeneric.java:86)
> at
> org.apache.flink.table.dataformat.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:628)
> at
> org.apache.flink.table.dataformat.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:633)
> at
> org.apache.flink.table.dataformat.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:320)
> at
> org.apache.flink.table.dataformat.DataFormatConverters$PojoConverter.toExternalImpl(DataFormatConverters.java:1293)
> at
> org.apache.flink.table.dataformat.DataFormatConverters$PojoConverter.toExternalImpl(DataFormatConverters.java:1257)
> at
> org.apache.flink.table.dataformat.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:302)
> at GroupingWindowAggsHandler$57.setAccumulators(Unknown Source)
> at
> org.apache.flink.table.runtime.operators.window.internal.GeneralWindowProcessFunction.getWindowAggregationResult(GeneralWindowProcessFunction.java:73)
> at
> org.apache.flink.table.runtime.operators.window.WindowOperator.emitWindowResult(WindowOperator.java:434)
> at
> org.apache.flink.table.runtime.operators.window.WindowOperator.onProcessingTime(WindowOperator.java:422)
> at
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
> at
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281)
> ... 7 more
> Caused by: java.io.EOFException: No more bytes left.
> ... 26 more
> {code}
> The problem happens when restoring from checkpoint. (I don't know what's
> inside the {{PriorityQueue}} because it's already on production environment).
> According to the logs, it seems that it's because the {{KryoSerializer}}
> cannot successfully deserialize the {{PriorityQueue}} instance and throws an
> {{IndexOutOfBoundsException}}.
> The UDAF accumulator is:
> {code:java}
> public static class Acc {
> public PriorityQueue<Map.Entry<String, Long>> queue;
> }
> @Override
> public Acc createAccumulator() {
> Acc accumulator = new Acc();
> Comparator<Map.Entry<String, Long>> comparator = new
> Comparator<Map.Entry<String, Long>>() {
> @Override
> public int compare(Map.Entry<String, Long> o1, Map.Entry<String, Long>
> o2) {
> return o1.getValue().compareTo(o2.getValue()) == 0 ?
> o1.getKey().compareTo(o2.getKey()) :
> o1.getValue().compareTo(o2.getValue());
> }
> };
> PriorityQueue<Map.Entry<String, Long>> pq = new PriorityQueue<>(comparator);
> accumulator.queue = pq;
> return accumulator;
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)