@Timo If I understand it correctly, both omitting the "returns(...)" statement, or changing it to "returns(Centroid25.class)" would help?
I think that the behavior between "returns(Centroid25.class)" and " returns("eu.euranova.flink.Centroid25")" should be consistent in that they both handle the type as a POJO. Stephan On Wed, Feb 11, 2015 at 9:28 PM, Timo Walther <twal...@apache.org> wrote: > Hey Nam-Luc, > > I think your problem lies in the following line: > > .returns("eu.euranova.flink.Centroid25") > > If you do not specify the fields of the class in the String by using > "<myfield=String,otherField=int>", the underlying parser will create an > "GenericTypeInfo" type information which then uses Kryo for serialization. > > In general, lambda expressions are a very new feature which currently > makes a lot of problems due to missing type information by compilers. Maybe > it is better to use (anonymous) classes instead. > > In case of "map()" functions you don't need to provide type hints through > the "returns()" method. > > For other operators you need to either specify all fields of the class in > the String (makes no sense in you case) or you change the method to > > .returns(Centroid25.class) > > I hope that helps. > > Regards, > Timo > > > On 11.02.2015 17:38, Nam-Luc Tran wrote: > >> Hello Stephan, >> >> Thank you for your help. >> >> I ensured all the POJO classes used comply to what you previously said >> and the same exception occurs. Here is the listing of classes >> Centroid25 and Point25: >> >> public class Centroid25 extends Point25 { >> >> public int id; >> >> public Centroid25() {} >> >> public Centroid25(int id, Double value0, Double value1, Double value2, >> Double value3, Double value4, Double value5, >> Double value6, Double value7, Double value8, Double value9, Double >> value10, Double value11, Double value12, >> Double value13, Double value14, Double value15, Double value16, Double >> value17, Double value18, >> Double value19, Double value20, Double value21, Double value22, Double >> value23, Double value24) { >> super(value0, value1, value2, value3, value4, value5, value6, value7, >> value8, value9, value10, value11, >> value12, value13, value14, value15, value16, value17, value18, >> value19, value20, value21, value22, >> value23, value24); >> this.id = id; >> } >> >> public Centroid25(int id, Point25 p) { >> super(p.f0, >> p.f1,p.f2,p.f3,p.f4,p.f5,p.f6,p.f7,p.f8,p.f9,p.f10,p.f11,p. >> f12,p.f13,p.f14,p.f15,p.f16,p.f17,p.f18,p.f19,p.f20,p.f21,p. >> f22,p.f23,p.f24); >> this.id = id; >> } >> >> public Centroid25(int id, Tuple25 p) { >> super(p.f0, >> p.f1,p.f2,p.f3,p.f4,p.f5,p.f6,p.f7,p.f8,p.f9,p.f10,p.f11,p. >> f12,p.f13,p.f14,p.f15,p.f16,p.f17,p.f18,p.f19,p.f20,p.f21,p. >> f22,p.f23,p.f24); >> this.id = id; >> } >> >> @Override >> public String toString() { >> return id + " " + super.toString(); >> } >> } >> >> public class Point25{ >> >> public Double >> f0,f1,f2,f3,f4,f5,f6,f7,f8,f9,f10,f11,f12,f13,f14,f15,f16, >> f17,f18,f19,f20,f21,f22,f23,f24 >> = 0.0; >> >> public Point25() { >> } >> >> public Point25(Double value0, Double value1, Double value2, Double >> value3, Double value4, Double value5, >> Double value6, Double value7, Double value8, Double value9, Double >> value10, Double value11, Double value12, >> Double value13, Double value14, Double value15, Double value16, Double >> value17, Double value18, >> Double value19, Double value20, Double value21, Double value22, Double >> value23, Double value24) { >> f0=value0; >> f1=value1; >> f2=value2; >> f3=value3; >> f4=value4; >> f5=value5; >> f6=value6; >> f7=value7; >> f8=value8; >> f9=value9; >> f10=value10; >> f11=value11; >> f12=value12; >> f13=value13; >> f14=value14; >> f15=value15; >> f16=value16; >> f17=value17; >> f18=value18; >> f19=value19; >> f20=value20; >> f21=value21; >> f22=value22; >> f23=value23; >> f24=value24; >> >> } >> >> public List getFieldsAsList() { >> return Arrays.asList(f0, f1, f2, f3, f4, f5, f6, f7, f8, f9, f10, f11, >> f12, f13, f14, f15, f16, f17, f18, f19, >> f20, f21, f22, f23, f24); >> } >> >> public Point25 add(Point25 other) { >> f0 += other.f0; >> f1 += other.f1; >> f2 += other.f2; >> f3 += other.f3; >> f4 += other.f4; >> f5 += other.f5; >> f6 += other.f6; >> f7 += other.f7; >> f8 += other.f8; >> f9 += other.f9; >> f10 += other.f10; >> f11 += other.f11; >> f12 += other.f12; >> f13 += other.f13; >> f14 += other.f14; >> f15 += other.f15; >> f16 += other.f16; >> f17 += other.f17; >> f18 += other.f18; >> f19 += other.f19; >> f20 += other.f20; >> f21 += other.f21; >> f22 += other.f22; >> f23 += other.f23; >> f24 += other.f24; >> return this; >> } >> >> public Point25 div(long val) { >> f0 /= val; >> f1 /= val; >> f2 /= val; >> f3 /= val; >> f4 /= val; >> f5 += val; >> f6 += val; >> f7 += val; >> f8 += val; >> f9 += val; >> f10 += val; >> f11 += val; >> f12 += val; >> f13 += val; >> f14 += val; >> f15 += val; >> f16 += val; >> f17 += val; >> f18 += val; >> f19 += val; >> f20 += val; >> f21 += val; >> f22 += val; >> f23 += val; >> f24 += val; >> return this; >> } >> >> public double euclideanDistance(Point25 other) { >> List l = this.getFieldsAsList(); >> List ol = other.getFieldsAsList(); >> double res = 0; >> for(int i=0;i >> >>> I came accross an error for which I am unable to retrace the exact >>> >> cause. >> >>> Starting from flink-java-examples module, I have extended the KMeans >>> example >>> to a case where points have 25 coordinates. It follows the exact >>> >> same >> >>> structure and transformations as the original example, only with >>> >> points >> >>> having 25 coordinates instead of 2. >>> >>> When creating the centroids dataset within the code as follows the >>> >> job >> >>> iterates and executes well: >>> >>> Centroid25 cent1 = new >>> >> Centroid25(ThreadLocalRandom.current().nextInt(0, >> >>> 1000), >>> >>> >>> -10.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-0.0, >> 0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,10.0); >> >>> Centroid25 cent2 = new >>> >> Centroid25(ThreadLocalRandom.current().nextInt(0, >> >>> 1000), >>> >>> >>> -1.0,-1.0,1.0,1.0,-1.0,1.0,1.0,1.0,1.0,1.0,1.0,-1.0,1.0,1. >> 0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0); >> >>> DataSet centroids = env.fromCollection(Arrays.asList(cent1, >>> cent2)); >>> >>> When reading from a csv file containing the following: >>> >>> >>> -10.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-0.0, >> 0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,10.0 >> >>> >>> -1.0,-1.0,1.0,1.0,-1.0,1.0,1.0,1.0,1.0,1.0,1.0,-1.0,1.0,1. >> 0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0 >> >>> with the following code: >>> DataSet> centroids = env >>> >>> .readCsvFile("file:///home/nltran/res3.csv") >>> >>> >> .fieldDelimiter(",") >> >>> >>> >> .includeFields("1111111111111111111111111") >> >>> >>> >> .types(Double.class, Double.class, >> >>> Double.class, Double.class, >>> Double.class, Double.class, >>> >>> >> Double.class, >> >>> Double.class, Double.class, Double.class, Double.class, >>> Double.class, >>> >>> >> Double.class, >> >>> Double.class, Double.class, Double.class, Double.class, >>> Double.class, >>> >>> >> Double.class, >> >>> Double.class, Double.class, Double.class, Double.class, >>> Double.class, >>> >>> >> Double.class).map(p -> { >> >>> >>> >> return new >> >>> Centroid25(ThreadLocalRandom.current().nextInt(0, 1000), >>> >>> >>> p.f0,p.f1,p.f2,p.f3,p.f4,p.f5,p.f6,p.f7,p.f8,p.f9,p.f10,p. >> f11,p.f12,p.f13,p.f14,p.f15,p.f16,p.f17,p.f18,p.f19,p.f20,p. >> f21,p.f22,p.f23,p.f24); >> >>> >>> >> }).returns("eu.euranova.flink.Centroid25"); >> >>> >>> I hit the following exception: >>> >>> 02/11/2015 14:58:27 PartialSolution (BulkIteration (Bulk >>> Iteration))(1/1) >>> switched to FAILED >>> com.esotericsoftware.kryo.KryoException: Buffer underflow >>> at >>> >>> >>> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require( >> NoFetchingInput.java:76) >> >>> at >>> >> com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355) >> >>> at >>> >>> >>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass( >> DefaultClassResolver.java:109) >> >>> at >>> >> com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) >> >>> at >>> >> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) >> >>> at >>> >>> >>> org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize( >> KryoSerializer.java:205) >> >>> at >>> >>> >>> org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize( >> KryoSerializer.java:210) >> >>> at >>> >>> >>> org.apache.flink.runtime.io.disk.InputViewIterator.next( >> InputViewIterator.java:43) >> >>> at >>> >>> org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:91) >> >>> at >>> >>> >>> org.apache.flink.runtime.operators.RegularPactTask.run( >> RegularPactTask.java:496) >> >>> at >>> >>> >>> org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run( >> AbstractIterativePactTask.java:138) >> >>> at >>> >>> >>> org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run( >> IterationHeadPactTask.java:324) >> >>> at >>> >>> >>> org.apache.flink.runtime.operators.RegularPactTask. >> invoke(RegularPactTask.java:360) >> >>> at >>> >>> >>> org.apache.flink.runtime.execution.RuntimeEnvironment. >> run(RuntimeEnvironment.java:204) >> >>> at java.lang.Thread.run(Thread.java:745) >>> >>> 02/11/2015 14:58:27 Job execution switched to status >>> >> FAILING. >> >>> 02/11/2015 14:58:27 CHAIN Map (Map at >>> >> main(DoTheKMeans.java:64)) -> >> >>> Map (Map >>> at main(DoTheKMeans.java:65))(1/1) switched to CANCELING >>> 02/11/2015 14:58:27 Combine (Reduce at >>> >> main(DoTheKMeans.java:68))(1/1) >> >>> switched to CANCELING >>> 02/11/2015 14:58:27 CHAIN Reduce(Reduce at >>> >> main(DoTheKMeans.java:68)) >> >>> -> Map >>> (Map at main(DoTheKMeans.java:71))(1/1) switched to CANCELING >>> 02/11/2015 14:58:27 DataSink(Print to System.out)(1/1) >>> >> switched to >> >>> CANCELED >>> 02/11/2015 14:58:27 Sync(BulkIteration (Bulk >>> >> Iteration))(1/1) switched >> >>> to >>> CANCELING >>> 02/11/2015 14:58:27 Sync(BulkIteration (Bulk >>> >> Iteration))(1/1) switched >> >>> to >>> CANCELED >>> 02/11/2015 14:58:27 CHAIN Map (Map at >>> >> main(DoTheKMeans.java:64)) -> >> >>> Map (Map >>> at main(DoTheKMeans.java:65))(1/1) switched to CANCELED >>> 02/11/2015 14:58:27 Combine (Reduce at >>> >> main(DoTheKMeans.java:68))(1/1) >> >>> switched to CANCELED >>> 02/11/2015 14:58:27 CHAIN Reduce(Reduce at >>> >> main(DoTheKMeans.java:68)) >> >>> -> Map >>> (Map at main(DoTheKMeans.java:71))(1/1) switched to CANCELED >>> 02/11/2015 14:58:27 Job execution switched to status FAILED. >>> Exception in thread "main" >>> org.apache.flink.runtime.client.JobExecutionException: >>> com.esotericsoftware.kryo.KryoException: Buffer underflow >>> at >>> >>> >>> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require( >> NoFetchingInput.java:76) >> >>> at >>> >> com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355) >> >>> at >>> >>> >>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass( >> DefaultClassResolver.java:109) >> >>> at >>> >> com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) >> >>> at >>> >> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) >> >>> at >>> >>> >>> org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize( >> KryoSerializer.java:205) >> >>> at >>> >>> >>> org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize( >> KryoSerializer.java:210) >> >>> at >>> >>> >>> org.apache.flink.runtime.io.disk.InputViewIterator.next( >> InputViewIterator.java:43) >> >>> at >>> >>> org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:91) >> >>> at >>> >>> >>> org.apache.flink.runtime.operators.RegularPactTask.run( >> RegularPactTask.java:496) >> >>> at >>> >>> >>> org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run( >> AbstractIterativePactTask.java:138) >> >>> at >>> >>> >>> org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run( >> IterationHeadPactTask.java:324) >> >>> at >>> >>> >>> org.apache.flink.runtime.operators.RegularPactTask. >> invoke(RegularPactTask.java:360) >> >>> at >>> >>> >>> org.apache.flink.runtime.execution.RuntimeEnvironment. >> run(RuntimeEnvironment.java:204) >> >>> at java.lang.Thread.run(Thread.java:745) >>> >>> at >>> >>> >>> org.apache.flink.runtime.client.JobClientListener$$anonfun$ >> receiveWithLogMessages$2.applyOrElse(JobClient.scala:88) >> >>> at >>> >>> >>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp( >> AbstractPartialFunction.scala:33) >> >>> at >>> >>> >>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply( >> AbstractPartialFunction.scala:33) >> >>> at >>> >>> >>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply( >> AbstractPartialFunction.scala:25) >> >>> at >>> >>> >>> org.apache.flink.runtime.ActorLogMessages$$anon$1. >> apply(ActorLogMessages.scala:37) >> >>> at >>> >>> >>> org.apache.flink.runtime.ActorLogMessages$$anon$1. >> apply(ActorLogMessages.scala:30) >> >>> at >>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) >>> at >>> >>> >>> org.apache.flink.runtime.ActorLogMessages$$anon$1. >> applyOrElse(ActorLogMessages.scala:30) >> >>> at >>> >> akka.actor.Actor$class.aroundReceive(Actor.scala:465) >> >>> at >>> >>> >>> org.apache.flink.runtime.client.JobClientListener. >> aroundReceive(JobClient.scala:74) >> >>> at >>> >> akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >> >>> at akka.actor.ActorCell.invoke(ActorCell.scala:487) >>> at >>> >> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) >> >>> at akka.dispatch.Mailbox.run(Mailbox.scala:221) >>> at akka.dispatch.Mailbox.exec(Mailbox.scala:231) >>> at >>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>> at >>> >>> >>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue. >> runTask(ForkJoinPool.java:1339) >> >>> at >>> >>> scala.concurrent.forkjoin.ForkJoinPool.runWorker( >> ForkJoinPool.java:1979) >> >>> at >>> >>> >>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run( >> ForkJoinWorkerThread.java:107) >> >>> The centroid25 data is exactly the same in both cases. Could you >>> >> help me >> >>> retrace what is wrong? >>> >>> Thanks and best regards, >>> >>> Tran Nam-Luc >>> >>> >>> >>> -- >>> View this message in context: >>> >>> http://apache-flink-incubator-mailing-list-archive.1008284. >> n3.nabble.com/kryoException-Buffer-underflow-tp3760.html >> >>> Sent from the Apache Flink (Incubator) Mailing List archive. mailing >>> >> list >> >>> archive at Nabble.com. >>> >>> >> >> >