I think the issue is that the returns("eu.euranova.flink.Centroid25") variant only passes a string and the system does not know the typeparameters. So we have to put GenericTypeInfo there, because we basically see Object's.
On Wed, Feb 11, 2015 at 9:37 PM, Stephan Ewen <se...@apache.org> wrote: > @Timo If I understand it correctly, both omitting the "returns(...)" > statement, or changing it to "returns(Centroid25.class)" would help? > > I think that the behavior between "returns(Centroid25.class)" and " > returns("eu.euranova.flink.Centroid25")" should be consistent in that they > both handle the type as a POJO. > > Stephan > > > On Wed, Feb 11, 2015 at 9:28 PM, Timo Walther <twal...@apache.org> wrote: > > > Hey Nam-Luc, > > > > I think your problem lies in the following line: > > > > .returns("eu.euranova.flink.Centroid25") > > > > If you do not specify the fields of the class in the String by using > > "<myfield=String,otherField=int>", the underlying parser will create an > > "GenericTypeInfo" type information which then uses Kryo for > serialization. > > > > In general, lambda expressions are a very new feature which currently > > makes a lot of problems due to missing type information by compilers. > Maybe > > it is better to use (anonymous) classes instead. > > > > In case of "map()" functions you don't need to provide type hints through > > the "returns()" method. > > > > For other operators you need to either specify all fields of the class in > > the String (makes no sense in you case) or you change the method to > > > > .returns(Centroid25.class) > > > > I hope that helps. > > > > Regards, > > Timo > > > > > > On 11.02.2015 17:38, Nam-Luc Tran wrote: > > > >> Hello Stephan, > >> > >> Thank you for your help. > >> > >> I ensured all the POJO classes used comply to what you previously said > >> and the same exception occurs. Here is the listing of classes > >> Centroid25 and Point25: > >> > >> public class Centroid25 extends Point25 { > >> > >> public int id; > >> > >> public Centroid25() {} > >> > >> public Centroid25(int id, Double value0, Double value1, Double value2, > >> Double value3, Double value4, Double value5, > >> Double value6, Double value7, Double value8, Double value9, Double > >> value10, Double value11, Double value12, > >> Double value13, Double value14, Double value15, Double value16, Double > >> value17, Double value18, > >> Double value19, Double value20, Double value21, Double value22, Double > >> value23, Double value24) { > >> super(value0, value1, value2, value3, value4, value5, value6, value7, > >> value8, value9, value10, value11, > >> value12, value13, value14, value15, value16, value17, value18, > >> value19, value20, value21, value22, > >> value23, value24); > >> this.id = id; > >> } > >> > >> public Centroid25(int id, Point25 p) { > >> super(p.f0, > >> p.f1,p.f2,p.f3,p.f4,p.f5,p.f6,p.f7,p.f8,p.f9,p.f10,p.f11,p. > >> f12,p.f13,p.f14,p.f15,p.f16,p.f17,p.f18,p.f19,p.f20,p.f21,p. > >> f22,p.f23,p.f24); > >> this.id = id; > >> } > >> > >> public Centroid25(int id, Tuple25 p) { > >> super(p.f0, > >> p.f1,p.f2,p.f3,p.f4,p.f5,p.f6,p.f7,p.f8,p.f9,p.f10,p.f11,p. > >> f12,p.f13,p.f14,p.f15,p.f16,p.f17,p.f18,p.f19,p.f20,p.f21,p. > >> f22,p.f23,p.f24); > >> this.id = id; > >> } > >> > >> @Override > >> public String toString() { > >> return id + " " + super.toString(); > >> } > >> } > >> > >> public class Point25{ > >> > >> public Double > >> f0,f1,f2,f3,f4,f5,f6,f7,f8,f9,f10,f11,f12,f13,f14,f15,f16, > >> f17,f18,f19,f20,f21,f22,f23,f24 > >> = 0.0; > >> > >> public Point25() { > >> } > >> > >> public Point25(Double value0, Double value1, Double value2, Double > >> value3, Double value4, Double value5, > >> Double value6, Double value7, Double value8, Double value9, Double > >> value10, Double value11, Double value12, > >> Double value13, Double value14, Double value15, Double value16, Double > >> value17, Double value18, > >> Double value19, Double value20, Double value21, Double value22, Double > >> value23, Double value24) { > >> f0=value0; > >> f1=value1; > >> f2=value2; > >> f3=value3; > >> f4=value4; > >> f5=value5; > >> f6=value6; > >> f7=value7; > >> f8=value8; > >> f9=value9; > >> f10=value10; > >> f11=value11; > >> f12=value12; > >> f13=value13; > >> f14=value14; > >> f15=value15; > >> f16=value16; > >> f17=value17; > >> f18=value18; > >> f19=value19; > >> f20=value20; > >> f21=value21; > >> f22=value22; > >> f23=value23; > >> f24=value24; > >> > >> } > >> > >> public List getFieldsAsList() { > >> return Arrays.asList(f0, f1, f2, f3, f4, f5, f6, f7, f8, f9, f10, f11, > >> f12, f13, f14, f15, f16, f17, f18, f19, > >> f20, f21, f22, f23, f24); > >> } > >> > >> public Point25 add(Point25 other) { > >> f0 += other.f0; > >> f1 += other.f1; > >> f2 += other.f2; > >> f3 += other.f3; > >> f4 += other.f4; > >> f5 += other.f5; > >> f6 += other.f6; > >> f7 += other.f7; > >> f8 += other.f8; > >> f9 += other.f9; > >> f10 += other.f10; > >> f11 += other.f11; > >> f12 += other.f12; > >> f13 += other.f13; > >> f14 += other.f14; > >> f15 += other.f15; > >> f16 += other.f16; > >> f17 += other.f17; > >> f18 += other.f18; > >> f19 += other.f19; > >> f20 += other.f20; > >> f21 += other.f21; > >> f22 += other.f22; > >> f23 += other.f23; > >> f24 += other.f24; > >> return this; > >> } > >> > >> public Point25 div(long val) { > >> f0 /= val; > >> f1 /= val; > >> f2 /= val; > >> f3 /= val; > >> f4 /= val; > >> f5 += val; > >> f6 += val; > >> f7 += val; > >> f8 += val; > >> f9 += val; > >> f10 += val; > >> f11 += val; > >> f12 += val; > >> f13 += val; > >> f14 += val; > >> f15 += val; > >> f16 += val; > >> f17 += val; > >> f18 += val; > >> f19 += val; > >> f20 += val; > >> f21 += val; > >> f22 += val; > >> f23 += val; > >> f24 += val; > >> return this; > >> } > >> > >> public double euclideanDistance(Point25 other) { > >> List l = this.getFieldsAsList(); > >> List ol = other.getFieldsAsList(); > >> double res = 0; > >> for(int i=0;i > >> > >>> I came accross an error for which I am unable to retrace the exact > >>> > >> cause. > >> > >>> Starting from flink-java-examples module, I have extended the KMeans > >>> example > >>> to a case where points have 25 coordinates. It follows the exact > >>> > >> same > >> > >>> structure and transformations as the original example, only with > >>> > >> points > >> > >>> having 25 coordinates instead of 2. > >>> > >>> When creating the centroids dataset within the code as follows the > >>> > >> job > >> > >>> iterates and executes well: > >>> > >>> Centroid25 cent1 = new > >>> > >> Centroid25(ThreadLocalRandom.current().nextInt(0, > >> > >>> 1000), > >>> > >>> > >>> -10.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-0.0, > >> 0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,10.0); > >> > >>> Centroid25 cent2 = new > >>> > >> Centroid25(ThreadLocalRandom.current().nextInt(0, > >> > >>> 1000), > >>> > >>> > >>> -1.0,-1.0,1.0,1.0,-1.0,1.0,1.0,1.0,1.0,1.0,1.0,-1.0,1.0,1. > >> 0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0); > >> > >>> DataSet centroids = env.fromCollection(Arrays.asList(cent1, > >>> cent2)); > >>> > >>> When reading from a csv file containing the following: > >>> > >>> > >>> -10.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-0.0, > >> 0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,10.0 > >> > >>> > >>> -1.0,-1.0,1.0,1.0,-1.0,1.0,1.0,1.0,1.0,1.0,1.0,-1.0,1.0,1. > >> 0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0 > >> > >>> with the following code: > >>> DataSet> centroids = env > >>> > >>> .readCsvFile("file:///home/nltran/res3.csv") > >>> > >>> > >> .fieldDelimiter(",") > >> > >>> > >>> > >> .includeFields("1111111111111111111111111") > >> > >>> > >>> > >> .types(Double.class, Double.class, > >> > >>> Double.class, Double.class, > >>> Double.class, Double.class, > >>> > >>> > >> Double.class, > >> > >>> Double.class, Double.class, Double.class, Double.class, > >>> Double.class, > >>> > >>> > >> Double.class, > >> > >>> Double.class, Double.class, Double.class, Double.class, > >>> Double.class, > >>> > >>> > >> Double.class, > >> > >>> Double.class, Double.class, Double.class, Double.class, > >>> Double.class, > >>> > >>> > >> Double.class).map(p -> { > >> > >>> > >>> > >> return new > >> > >>> Centroid25(ThreadLocalRandom.current().nextInt(0, 1000), > >>> > >>> > >>> p.f0,p.f1,p.f2,p.f3,p.f4,p.f5,p.f6,p.f7,p.f8,p.f9,p.f10,p. > >> f11,p.f12,p.f13,p.f14,p.f15,p.f16,p.f17,p.f18,p.f19,p.f20,p. > >> f21,p.f22,p.f23,p.f24); > >> > >>> > >>> > >> }).returns("eu.euranova.flink.Centroid25"); > >> > >>> > >>> I hit the following exception: > >>> > >>> 02/11/2015 14:58:27 PartialSolution (BulkIteration (Bulk > >>> Iteration))(1/1) > >>> switched to FAILED > >>> com.esotericsoftware.kryo.KryoException: Buffer underflow > >>> at > >>> > >>> > >>> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require( > >> NoFetchingInput.java:76) > >> > >>> at > >>> > >> com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355) > >> > >>> at > >>> > >>> > >>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass( > >> DefaultClassResolver.java:109) > >> > >>> at > >>> > >> com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) > >> > >>> at > >>> > >> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) > >> > >>> at > >>> > >>> > >>> > org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize( > >> KryoSerializer.java:205) > >> > >>> at > >>> > >>> > >>> > org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize( > >> KryoSerializer.java:210) > >> > >>> at > >>> > >>> > >>> org.apache.flink.runtime.io.disk.InputViewIterator.next( > >> InputViewIterator.java:43) > >> > >>> at > >>> > >>> org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:91) > >> > >>> at > >>> > >>> > >>> org.apache.flink.runtime.operators.RegularPactTask.run( > >> RegularPactTask.java:496) > >> > >>> at > >>> > >>> > >>> org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run( > >> AbstractIterativePactTask.java:138) > >> > >>> at > >>> > >>> > >>> org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run( > >> IterationHeadPactTask.java:324) > >> > >>> at > >>> > >>> > >>> org.apache.flink.runtime.operators.RegularPactTask. > >> invoke(RegularPactTask.java:360) > >> > >>> at > >>> > >>> > >>> org.apache.flink.runtime.execution.RuntimeEnvironment. > >> run(RuntimeEnvironment.java:204) > >> > >>> at java.lang.Thread.run(Thread.java:745) > >>> > >>> 02/11/2015 14:58:27 Job execution switched to status > >>> > >> FAILING. > >> > >>> 02/11/2015 14:58:27 CHAIN Map (Map at > >>> > >> main(DoTheKMeans.java:64)) -> > >> > >>> Map (Map > >>> at main(DoTheKMeans.java:65))(1/1) switched to CANCELING > >>> 02/11/2015 14:58:27 Combine (Reduce at > >>> > >> main(DoTheKMeans.java:68))(1/1) > >> > >>> switched to CANCELING > >>> 02/11/2015 14:58:27 CHAIN Reduce(Reduce at > >>> > >> main(DoTheKMeans.java:68)) > >> > >>> -> Map > >>> (Map at main(DoTheKMeans.java:71))(1/1) switched to CANCELING > >>> 02/11/2015 14:58:27 DataSink(Print to System.out)(1/1) > >>> > >> switched to > >> > >>> CANCELED > >>> 02/11/2015 14:58:27 Sync(BulkIteration (Bulk > >>> > >> Iteration))(1/1) switched > >> > >>> to > >>> CANCELING > >>> 02/11/2015 14:58:27 Sync(BulkIteration (Bulk > >>> > >> Iteration))(1/1) switched > >> > >>> to > >>> CANCELED > >>> 02/11/2015 14:58:27 CHAIN Map (Map at > >>> > >> main(DoTheKMeans.java:64)) -> > >> > >>> Map (Map > >>> at main(DoTheKMeans.java:65))(1/1) switched to CANCELED > >>> 02/11/2015 14:58:27 Combine (Reduce at > >>> > >> main(DoTheKMeans.java:68))(1/1) > >> > >>> switched to CANCELED > >>> 02/11/2015 14:58:27 CHAIN Reduce(Reduce at > >>> > >> main(DoTheKMeans.java:68)) > >> > >>> -> Map > >>> (Map at main(DoTheKMeans.java:71))(1/1) switched to CANCELED > >>> 02/11/2015 14:58:27 Job execution switched to status FAILED. > >>> Exception in thread "main" > >>> org.apache.flink.runtime.client.JobExecutionException: > >>> com.esotericsoftware.kryo.KryoException: Buffer underflow > >>> at > >>> > >>> > >>> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require( > >> NoFetchingInput.java:76) > >> > >>> at > >>> > >> com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355) > >> > >>> at > >>> > >>> > >>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass( > >> DefaultClassResolver.java:109) > >> > >>> at > >>> > >> com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) > >> > >>> at > >>> > >> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) > >> > >>> at > >>> > >>> > >>> > org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize( > >> KryoSerializer.java:205) > >> > >>> at > >>> > >>> > >>> > org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize( > >> KryoSerializer.java:210) > >> > >>> at > >>> > >>> > >>> org.apache.flink.runtime.io.disk.InputViewIterator.next( > >> InputViewIterator.java:43) > >> > >>> at > >>> > >>> org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:91) > >> > >>> at > >>> > >>> > >>> org.apache.flink.runtime.operators.RegularPactTask.run( > >> RegularPactTask.java:496) > >> > >>> at > >>> > >>> > >>> org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run( > >> AbstractIterativePactTask.java:138) > >> > >>> at > >>> > >>> > >>> org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run( > >> IterationHeadPactTask.java:324) > >> > >>> at > >>> > >>> > >>> org.apache.flink.runtime.operators.RegularPactTask. > >> invoke(RegularPactTask.java:360) > >> > >>> at > >>> > >>> > >>> org.apache.flink.runtime.execution.RuntimeEnvironment. > >> run(RuntimeEnvironment.java:204) > >> > >>> at java.lang.Thread.run(Thread.java:745) > >>> > >>> at > >>> > >>> > >>> org.apache.flink.runtime.client.JobClientListener$$anonfun$ > >> receiveWithLogMessages$2.applyOrElse(JobClient.scala:88) > >> > >>> at > >>> > >>> > >>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp( > >> AbstractPartialFunction.scala:33) > >> > >>> at > >>> > >>> > >>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply( > >> AbstractPartialFunction.scala:33) > >> > >>> at > >>> > >>> > >>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply( > >> AbstractPartialFunction.scala:25) > >> > >>> at > >>> > >>> > >>> org.apache.flink.runtime.ActorLogMessages$$anon$1. > >> apply(ActorLogMessages.scala:37) > >> > >>> at > >>> > >>> > >>> org.apache.flink.runtime.ActorLogMessages$$anon$1. > >> apply(ActorLogMessages.scala:30) > >> > >>> at > >>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > >>> at > >>> > >>> > >>> org.apache.flink.runtime.ActorLogMessages$$anon$1. > >> applyOrElse(ActorLogMessages.scala:30) > >> > >>> at > >>> > >> akka.actor.Actor$class.aroundReceive(Actor.scala:465) > >> > >>> at > >>> > >>> > >>> org.apache.flink.runtime.client.JobClientListener. > >> aroundReceive(JobClient.scala:74) > >> > >>> at > >>> > >> akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > >> > >>> at akka.actor.ActorCell.invoke(ActorCell.scala:487) > >>> at > >>> > >> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > >> > >>> at akka.dispatch.Mailbox.run(Mailbox.scala:221) > >>> at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > >>> at > >>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > >>> at > >>> > >>> > >>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue. > >> runTask(ForkJoinPool.java:1339) > >> > >>> at > >>> > >>> scala.concurrent.forkjoin.ForkJoinPool.runWorker( > >> ForkJoinPool.java:1979) > >> > >>> at > >>> > >>> > >>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run( > >> ForkJoinWorkerThread.java:107) > >> > >>> The centroid25 data is exactly the same in both cases. Could you > >>> > >> help me > >> > >>> retrace what is wrong? > >>> > >>> Thanks and best regards, > >>> > >>> Tran Nam-Luc > >>> > >>> > >>> > >>> -- > >>> View this message in context: > >>> > >>> http://apache-flink-incubator-mailing-list-archive.1008284. > >> n3.nabble.com/kryoException-Buffer-underflow-tp3760.html > >> > >>> Sent from the Apache Flink (Incubator) Mailing List archive. mailing > >>> > >> list > >> > >>> archive at Nabble.com. > >>> > >>> > >> > >> > > >