[
https://issues.apache.org/jira/browse/FLINK-17320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17089415#comment-17089415
]
Jiayi Liao commented on FLINK-17320:
------------------------------------
After some survey, the {{PriorityQueue}} can be serialized with some changes :
{code:java}
PriorityQueue<String> pq = new PriorityQueue<>((Comparator<String> &
Serializable)(o1, o2) -> o1.length() - 8 - o2.length());
pq.add("1234135");
pq.add("12323424135");
KryoSerializer kryoSerializer = new KryoSerializer(PriorityQueue.class, new
ExecutionConfig());
kryoSerializer.getKryo().register(getClass());
kryoSerializer.getKryo().register(java.lang.invoke.SerializedLambda.class);
kryoSerializer.getKryo().register(ClosureSerializer.Closure.class, new
ClosureSerializer());
kryoSerializer.serialize(pq, new DataOutputSerializer(10240));
{code}
I'm not sure if this serialization problem can be handled from Flink's side
(because the lambda expression cannot be serialized in true), but can we add
some tips on the error message at least if it cannot be solved?
> Java8 lambda expression cannot be serialized.
> ---------------------------------------------
>
> Key: FLINK-17320
> URL: https://issues.apache.org/jira/browse/FLINK-17320
> Project: Flink
> Issue Type: Bug
> Components: API / Type Serialization System, Table SQL / Runtime
> Affects Versions: 1.9.0
> Reporter: Jiayi Liao
> Priority: Major
>
> It happens when we want to use {{java.util.PriorityQueue}} in customed UDAF.
> The serialization error occurs with codes below.
> {code:java}
> @Test
> public void test() throws IOException {
> PriorityQueue<String> pq = new PriorityQueue<>((o1, o2) -> o1.length
> - o2.length - 1);
> pq.add("1234135");
> pq.add("12323424135");
> KryoSerializer kryoSerializer = new
> KryoSerializer(PriorityQueue.class, new ExecutionConfig());
> kryoSerializer.serialize(pq, new DataOutputSerializer(10240));
> }
> {code}
> And the NPE will be thrown:
> {code:java}
> Caused by: java.lang.NullPointerException
> at
> com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:80)
> at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:488)
> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:593)
> at
> org.apache.flink.runtime.types.PriorityQueueSerializer.write(PriorityQueueSerializer.java:67)
> at
> org.apache.flink.runtime.types.PriorityQueueSerializer.write(PriorityQueueSerializer.java:40)
> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:307)
> at
> org.apache.flink.util.InstantiationUtil.serializeToByteArray(InstantiationUtil.java:526)
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)