[ https://issues.apache.org/jira/browse/FLINK-2800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14945078#comment-14945078 ]
Stefano Bortoli commented on FLINK-2800: ---------------------------------------- Nope, it comes when I use the BSONObject or directly my POJO and not the byte[] as part of the crossed tuples. If I pass through the byte[] then I deserialize the object in the method, and everything works. Trying to implement what Till suggested, but the cross exceeds the java heap memory, and I keep on having exceptions. {quote} java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:2271) at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118) at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153) at java.io.DataOutputStream.write(DataOutputStream.java:107) at org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper.write(OutputViewDataOutputStreamWrapper.java:70) at org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39) at com.esotericsoftware.kryo.io.Output.flush(Output.java:163) at com.esotericsoftware.kryo.io.Output.require(Output.java:142) at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:228) at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:214) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:36) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:25) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599) at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:95) at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:21) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:186) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:116) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:116) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30) at org.apache.flink.api.common.accumulators.SerializedListAccumulator.add(SerializedListAccumulator.java:59) at org.apache.flink.api.java.Utils$CollectHelper.flatMap(Utils.java:127) at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79) at org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterFirst(CrossDriver.java:247) at org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:160) at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489) at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579) at java.lang.Thread.run(Thread.java:745) {quote} > kryo serialization problem > -------------------------- > > Key: FLINK-2800 > URL: https://issues.apache.org/jira/browse/FLINK-2800 > Project: Flink > Issue Type: Bug > Components: Type Serialization System > Affects Versions: 1.0 > Environment: linux ubuntu 12.04 LTS, Java 7 > Reporter: Stefano Bortoli > > Performing a cross of two dataset of POJOs I have got the exception below. > The first time I run the process, there was no problem. When I run it the > second time, I have got the exception. My guess is that it could be a race > condition related to the reuse of the Kryo serializer object. However, it > could also be "a bug where type registrations are not properly forwarded to > all Serializers", as suggested by Stephan. > ------------------------------------------------------------------------ > 2015-10-01 18:18:21 INFO JobClient:161 - 10/01/2015 18:18:21 Cross(Cross at > main(FlinkMongoHadoop2LinkPOI2CDA.java:160))(3/4) switched to FAILED > com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: > 114 > at > com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119) > at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) > at > org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180) > at > org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111) > at > org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309) > at > org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162) > at > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489) > at > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581) > at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.4#6332)