[ 
https://issues.apache.org/jira/browse/FLINK-2800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14944646#comment-14944646
 ] 

Stefano Bortoli commented on FLINK-2800:
----------------------------------------

I had the problem using both my POJO and BSONObject in the Tuple2 crossing data 
read from mongodb. However, the code is quite simple.

{code}
InputFormat<Object, BSONObject> mapreduceInputFormat = new 
MyMongoInputFormat<Object, BSONObject>();
                HadoopInputFormat<Object, BSONObject> hdIf = new 
HadoopInputFormat<Object, BSONObject>(
                                mapreduceInputFormat, Object.class, 
BSONObject.class, job);

// specify connection parameters
hdIf.getConfiguration().set(MongoUtil.MONGO_INPUT_URI_PROPERTY,collectionsUri);

DataSet<Tuple2<Object, BSONObject>> input = env.createInput(hdIf);
DataSet<Tuple2<String, BSONObject>> pois = input.flatMap(       new 
GetEntitonForClass(CulturalSitePointOfInterest.class.getSimpleName()));
DataSet<Tuple2<String, BSONObject>> cdas = 
input.flatMap(GetEntitonForClass(CulturalDigitalArtefact.class.getSimpleName()));

DataSet<Tuple2<String, String>> out = pois.cross(cdas)
                                .combineGroup(new 
POI2CDACombineGroupFunction()).distinct().setParallelism(parallelism);

DataSet<Tuple3<String, String, BSONObject>> union = 
out.join(pois).where(0).equalTo(0).projectFirst(0, 1).projectSecond(1);

DataSet<Tuple2<BSONWritable, MongoUpdateWritable>> writable = union.groupBy(0)  
.reduceGroup(
                                                new 
AddProxy2POIReduceGroupFunction()).setParallelism(parallelism);

MongoConfigUtil.setOutputURI(job.getConfiguration(), collectionsUri);
// emit result (this works only locally)
writable.output(new MongoHadoop2OutputFormat<BSONWritable, 
MongoUpdateWritable>(new MongoOutputFormat<BSONWritable, MongoUpdateWritable>(),
                                job));
// execute program
env.execute("Mongodb POI to CDA linker");
{code}

> kryo serialization problem
> --------------------------
>
>                 Key: FLINK-2800
>                 URL: https://issues.apache.org/jira/browse/FLINK-2800
>             Project: Flink
>          Issue Type: Bug
>          Components: Type Serialization System
>    Affects Versions: master
>         Environment: linux ubuntu 12.04 LTS, Java 7
>            Reporter: Stefano Bortoli
>
> Performing a cross of two dataset of POJOs I have got the exception below. 
> The first time I run the process, there was no problem. When I run it the 
> second time, I have got the exception. My guess is that it could be a race 
> condition related to the reuse of the Kryo serializer object. However, it 
> could also be "a bug where type registrations are not properly forwarded to 
> all Serializers", as suggested by Stephan.
> ------------------------------------------------------------------------
> 2015-10-01 18:18:21 INFO  JobClient:161 - 10/01/2015 18:18:21 Cross(Cross at 
> main(FlinkMongoHadoop2LinkPOI2CDA.java:160))(3/4) switched to FAILED 
> com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 
> 114
>       at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
>       at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>       at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>       at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210)
>       at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127)
>       at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>       at 
> org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180)
>       at 
> org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111)
>       at 
> org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309)
>       at 
> org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162)
>       at 
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489)
>       at 
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
>       at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to