[
https://issues.apache.org/jira/browse/FLINK-2800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14944959#comment-14944959
]
Stefano Bortoli edited comment on FLINK-2800 at 10/6/15 12:38 PM:
------------------------------------------------------------------
I don't think the functions are relevant. The "GetEntitonForClass" implements a
filter as a flatmap. The combineGroup "POI2CDACombineGroupFunction" takes the
results of the cross and applies a join logic to the tuples.
"AddProxy2POIReduceGroupFunction" takes the POJO, and appends the results of
the join to the target element. The error happens in the serialization of the
POJO along the cross operation. The process ends without any problem if I use
byte[] in place of BSONObject and I deserialize the object with a new Kryo()
instance inside the combineGroup function.
was (Author: stefano.bortoli):
I don't think the functions are relevant. The "GetEntitonForClass" implements a
filter. The combineGroup "POI2CDACombineGroupFunction" takes the results of the
cross and applies a join logic to the tuples. "AddProxy2POIReduceGroupFunction"
takes the POJO, and appends the results of the join to the target element. The
error happens in the serialization of the POJO along the cross operation. The
process ends without any problem if I use byte[] in place of BSONObject and I
deserialize the object with a new Kryo() instance inside the combineGroup
function.
> kryo serialization problem
> --------------------------
>
> Key: FLINK-2800
> URL: https://issues.apache.org/jira/browse/FLINK-2800
> Project: Flink
> Issue Type: Bug
> Components: Type Serialization System
> Affects Versions: 1.0
> Environment: linux ubuntu 12.04 LTS, Java 7
> Reporter: Stefano Bortoli
>
> Performing a cross of two dataset of POJOs I have got the exception below.
> The first time I run the process, there was no problem. When I run it the
> second time, I have got the exception. My guess is that it could be a race
> condition related to the reuse of the Kryo serializer object. However, it
> could also be "a bug where type registrations are not properly forwarded to
> all Serializers", as suggested by Stephan.
> ------------------------------------------------------------------------
> 2015-10-01 18:18:21 INFO JobClient:161 - 10/01/2015 18:18:21 Cross(Cross at
> main(FlinkMongoHadoop2LinkPOI2CDA.java:160))(3/4) switched to FAILED
> com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID:
> 114
> at
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
> at
> org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180)
> at
> org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111)
> at
> org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309)
> at
> org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162)
> at
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489)
> at
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
> at java.lang.Thread.run(Thread.java:745)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)