[
https://issues.apache.org/jira/browse/FLINK-17320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17089499#comment-17089499
]
Jingsong Lee commented on FLINK-17320:
--------------------------------------
{code:java}
@Test
public void test() throws IOException {
MyPriorityQueue<String> pq = new MyPriorityQueue<>((o1, o2) -> o1.length() -
o2.length() - 1);
pq.add("1234135");
pq.add("12323424135");
KryoSerializer kryoSerializer = new KryoSerializer(PriorityQueue.class, new
ExecutionConfig());
kryoSerializer.serialize(pq, new DataOutputSerializer(10240));
}
private class MyPriorityQueue<T> extends PriorityQueue<T> {
public MyPriorityQueue(Comparator<T> comparator) {
super(comparator);
}
}
{code}
It works. But if we just use PriorityQueue, will fail.
Looks like Flink PriorityQueueSerializer can not deal with lambda.
> Java8 lambda expression cannot be serialized.
> ---------------------------------------------
>
> Key: FLINK-17320
> URL: https://issues.apache.org/jira/browse/FLINK-17320
> Project: Flink
> Issue Type: Bug
> Components: API / Type Serialization System, Table SQL / Runtime
> Affects Versions: 1.9.0
> Reporter: Jiayi Liao
> Priority: Major
>
> It happens when we want to use {{java.util.PriorityQueue}} in customed UDAF.
> The serialization error occurs with codes below.
> {code:java}
> @Test
> public void test() throws IOException {
> PriorityQueue<String> pq = new PriorityQueue<>((o1, o2) -> o1.length
> - o2.length - 1);
> pq.add("1234135");
> pq.add("12323424135");
> KryoSerializer kryoSerializer = new
> KryoSerializer(PriorityQueue.class, new ExecutionConfig());
> kryoSerializer.serialize(pq, new DataOutputSerializer(10240));
> }
> {code}
> And the NPE will be thrown:
> {code:java}
> Caused by: java.lang.NullPointerException
> at
> com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:80)
> at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:488)
> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:593)
> at
> org.apache.flink.runtime.types.PriorityQueueSerializer.write(PriorityQueueSerializer.java:67)
> at
> org.apache.flink.runtime.types.PriorityQueueSerializer.write(PriorityQueueSerializer.java:40)
> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:307)
> at
> org.apache.flink.util.InstantiationUtil.serializeToByteArray(InstantiationUtil.java:526)
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)