I think the issue is that the returns("eu.euranova.flink.Centroid25")
variant only passes a string and the system does not know the
typeparameters.
So we have to put GenericTypeInfo there, because we basically see Object's.

On Wed, Feb 11, 2015 at 9:37 PM, Stephan Ewen <se...@apache.org> wrote:

> @Timo If I understand it correctly, both omitting the "returns(...)"
> statement, or changing it to "returns(Centroid25.class)" would help?
>
> I think that the behavior between "returns(Centroid25.class)" and "
> returns("eu.euranova.flink.Centroid25")" should be consistent in that they
> both handle the type as a POJO.
>
> Stephan
>
>
> On Wed, Feb 11, 2015 at 9:28 PM, Timo Walther <twal...@apache.org> wrote:
>
> > Hey Nam-Luc,
> >
> > I think your problem lies in the following line:
> >
> > .returns("eu.euranova.flink.Centroid25")
> >
> > If you do not specify the fields of the class in the String by using
> > "<myfield=String,otherField=int>", the underlying parser will create an
> > "GenericTypeInfo" type information which then uses Kryo for
> serialization.
> >
> > In general, lambda expressions are a very new feature which currently
> > makes a lot of problems due to missing type information by compilers.
> Maybe
> > it is better to use (anonymous) classes instead.
> >
> > In case of "map()" functions you don't need to provide type hints through
> > the "returns()" method.
> >
> > For other operators you need to either specify all fields of the class in
> > the String (makes no sense in you case) or you change the method to
> >
> > .returns(Centroid25.class)
> >
> > I hope that helps.
> >
> > Regards,
> > Timo
> >
> >
> > On 11.02.2015 17:38, Nam-Luc Tran wrote:
> >
> >> Hello Stephan,
> >>
> >> Thank you for your help.
> >>
> >> I ensured all the POJO classes used comply to what you previously said
> >> and the same exception occurs. Here is the listing of classes
> >> Centroid25 and Point25:
> >>
> >> public class Centroid25 extends Point25 {
> >>
> >> public int id;
> >>
> >> public Centroid25() {}
> >>
> >> public Centroid25(int id, Double value0, Double value1, Double value2,
> >> Double value3, Double value4, Double value5,
> >> Double value6, Double value7, Double value8, Double value9, Double
> >> value10, Double value11, Double value12,
> >> Double value13, Double value14, Double value15, Double value16, Double
> >> value17, Double value18,
> >> Double value19, Double value20, Double value21, Double value22, Double
> >> value23, Double value24) {
> >> super(value0, value1, value2, value3, value4, value5, value6, value7,
> >> value8, value9, value10, value11,
> >> value12, value13, value14, value15, value16, value17, value18,
> >> value19, value20, value21, value22,
> >> value23, value24);
> >> this.id = id;
> >> }
> >>
> >> public Centroid25(int id, Point25 p) {
> >> super(p.f0,
> >> p.f1,p.f2,p.f3,p.f4,p.f5,p.f6,p.f7,p.f8,p.f9,p.f10,p.f11,p.
> >> f12,p.f13,p.f14,p.f15,p.f16,p.f17,p.f18,p.f19,p.f20,p.f21,p.
> >> f22,p.f23,p.f24);
> >> this.id = id;
> >> }
> >>
> >> public Centroid25(int id, Tuple25 p) {
> >> super(p.f0,
> >> p.f1,p.f2,p.f3,p.f4,p.f5,p.f6,p.f7,p.f8,p.f9,p.f10,p.f11,p.
> >> f12,p.f13,p.f14,p.f15,p.f16,p.f17,p.f18,p.f19,p.f20,p.f21,p.
> >> f22,p.f23,p.f24);
> >> this.id = id;
> >> }
> >>
> >> @Override
> >> public String toString() {
> >> return id + " " + super.toString();
> >> }
> >> }
> >>
> >> public class Point25{
> >>
> >> public Double
> >> f0,f1,f2,f3,f4,f5,f6,f7,f8,f9,f10,f11,f12,f13,f14,f15,f16,
> >> f17,f18,f19,f20,f21,f22,f23,f24
> >> = 0.0;
> >>
> >> public Point25() {
> >> }
> >>
> >> public Point25(Double value0, Double value1, Double value2, Double
> >> value3, Double value4, Double value5,
> >> Double value6, Double value7, Double value8, Double value9, Double
> >> value10, Double value11, Double value12,
> >> Double value13, Double value14, Double value15, Double value16, Double
> >> value17, Double value18,
> >> Double value19, Double value20, Double value21, Double value22, Double
> >> value23, Double value24) {
> >> f0=value0;
> >> f1=value1;
> >> f2=value2;
> >> f3=value3;
> >> f4=value4;
> >> f5=value5;
> >> f6=value6;
> >> f7=value7;
> >> f8=value8;
> >> f9=value9;
> >> f10=value10;
> >> f11=value11;
> >> f12=value12;
> >> f13=value13;
> >> f14=value14;
> >> f15=value15;
> >> f16=value16;
> >> f17=value17;
> >> f18=value18;
> >> f19=value19;
> >> f20=value20;
> >> f21=value21;
> >> f22=value22;
> >> f23=value23;
> >> f24=value24;
> >>
> >> }
> >>
> >> public List getFieldsAsList() {
> >> return Arrays.asList(f0, f1, f2, f3, f4, f5, f6, f7, f8, f9, f10, f11,
> >> f12, f13, f14, f15, f16, f17, f18, f19,
> >> f20, f21, f22, f23, f24);
> >> }
> >>
> >> public Point25 add(Point25 other) {
> >> f0 += other.f0;
> >> f1 += other.f1;
> >> f2 += other.f2;
> >> f3 += other.f3;
> >> f4 += other.f4;
> >> f5 += other.f5;
> >> f6 += other.f6;
> >> f7 += other.f7;
> >> f8 += other.f8;
> >> f9 += other.f9;
> >> f10 += other.f10;
> >> f11 += other.f11;
> >> f12 += other.f12;
> >> f13 += other.f13;
> >> f14 += other.f14;
> >> f15 += other.f15;
> >> f16 += other.f16;
> >> f17 += other.f17;
> >> f18 += other.f18;
> >> f19 += other.f19;
> >> f20 += other.f20;
> >> f21 += other.f21;
> >> f22 += other.f22;
> >> f23 += other.f23;
> >> f24 += other.f24;
> >> return this;
> >> }
> >>
> >> public Point25 div(long val) {
> >> f0 /= val;
> >> f1 /= val;
> >> f2 /= val;
> >> f3 /= val;
> >> f4 /= val;
> >> f5 += val;
> >> f6 += val;
> >> f7 += val;
> >> f8 += val;
> >> f9 += val;
> >> f10 += val;
> >> f11 += val;
> >> f12 += val;
> >> f13 += val;
> >> f14 += val;
> >> f15 += val;
> >> f16 += val;
> >> f17 += val;
> >> f18 += val;
> >> f19 += val;
> >> f20 += val;
> >> f21 += val;
> >> f22 += val;
> >> f23 += val;
> >> f24 += val;
> >> return this;
> >> }
> >>
> >> public double euclideanDistance(Point25 other) {
> >> List l = this.getFieldsAsList();
> >> List ol = other.getFieldsAsList();
> >> double res = 0;
> >> for(int i=0;i
> >>
> >>> I came accross an error for which I am unable to retrace the exact
> >>>
> >> cause.
> >>
> >>> Starting from flink-java-examples module, I have extended the KMeans
> >>> example
> >>> to a case where points have 25 coordinates. It follows the exact
> >>>
> >> same
> >>
> >>> structure and transformations as the original example, only with
> >>>
> >> points
> >>
> >>> having 25 coordinates instead of 2.
> >>>
> >>> When creating the centroids dataset within the code as follows the
> >>>
> >> job
> >>
> >>> iterates and executes well:
> >>>
> >>> Centroid25 cent1 = new
> >>>
> >> Centroid25(ThreadLocalRandom.current().nextInt(0,
> >>
> >>> 1000),
> >>>
> >>>
> >>>  -10.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-0.0,
> >> 0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,10.0);
> >>
> >>> Centroid25 cent2 = new
> >>>
> >> Centroid25(ThreadLocalRandom.current().nextInt(0,
> >>
> >>> 1000),
> >>>
> >>>
> >>>  -1.0,-1.0,1.0,1.0,-1.0,1.0,1.0,1.0,1.0,1.0,1.0,-1.0,1.0,1.
> >> 0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0);
> >>
> >>> DataSet centroids = env.fromCollection(Arrays.asList(cent1,
> >>> cent2));
> >>>
> >>> When reading from a csv file containing the following:
> >>>
> >>>
> >>>  -10.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-0.0,
> >> 0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,10.0
> >>
> >>>
> >>>  -1.0,-1.0,1.0,1.0,-1.0,1.0,1.0,1.0,1.0,1.0,1.0,-1.0,1.0,1.
> >> 0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0
> >>
> >>> with the following code:
> >>> DataSet> centroids = env
> >>>
> >>> .readCsvFile("file:///home/nltran/res3.csv")
> >>>
> >>>
> >> .fieldDelimiter(",")
> >>
> >>>
> >>>
> >> .includeFields("1111111111111111111111111")
> >>
> >>>
> >>>
> >> .types(Double.class, Double.class,
> >>
> >>> Double.class, Double.class,
> >>> Double.class, Double.class,
> >>>
> >>>
> >> Double.class,
> >>
> >>> Double.class, Double.class, Double.class, Double.class,
> >>> Double.class,
> >>>
> >>>
> >> Double.class,
> >>
> >>> Double.class, Double.class, Double.class, Double.class,
> >>> Double.class,
> >>>
> >>>
> >> Double.class,
> >>
> >>> Double.class, Double.class, Double.class, Double.class,
> >>> Double.class,
> >>>
> >>>
> >> Double.class).map(p -> {
> >>
> >>>
> >>>
> >> return new
> >>
> >>> Centroid25(ThreadLocalRandom.current().nextInt(0, 1000),
> >>>
> >>>
> >>>  p.f0,p.f1,p.f2,p.f3,p.f4,p.f5,p.f6,p.f7,p.f8,p.f9,p.f10,p.
> >> f11,p.f12,p.f13,p.f14,p.f15,p.f16,p.f17,p.f18,p.f19,p.f20,p.
> >> f21,p.f22,p.f23,p.f24);
> >>
> >>>
> >>>
> >> }).returns("eu.euranova.flink.Centroid25");
> >>
> >>>
> >>> I hit the following exception:
> >>>
> >>> 02/11/2015 14:58:27     PartialSolution (BulkIteration (Bulk
> >>> Iteration))(1/1)
> >>> switched to FAILED
> >>> com.esotericsoftware.kryo.KryoException: Buffer underflow
> >>>           at
> >>>
> >>>
> >>>  org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(
> >> NoFetchingInput.java:76)
> >>
> >>>           at
> >>>
> >> com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355)
> >>
> >>>           at
> >>>
> >>>
> >>>  com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(
> >> DefaultClassResolver.java:109)
> >>
> >>>           at
> >>>
> >> com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
> >>
> >>>           at
> >>>
> >> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
> >>
> >>>           at
> >>>
> >>>
> >>>
> org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(
> >> KryoSerializer.java:205)
> >>
> >>>           at
> >>>
> >>>
> >>>
> org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(
> >> KryoSerializer.java:210)
> >>
> >>>           at
> >>>
> >>>
> >>>  org.apache.flink.runtime.io.disk.InputViewIterator.next(
> >> InputViewIterator.java:43)
> >>
> >>>           at
> >>>
> >>>  org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:91)
> >>
> >>>           at
> >>>
> >>>
> >>>  org.apache.flink.runtime.operators.RegularPactTask.run(
> >> RegularPactTask.java:496)
> >>
> >>>           at
> >>>
> >>>
> >>>  org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(
> >> AbstractIterativePactTask.java:138)
> >>
> >>>           at
> >>>
> >>>
> >>>  org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(
> >> IterationHeadPactTask.java:324)
> >>
> >>>           at
> >>>
> >>>
> >>>  org.apache.flink.runtime.operators.RegularPactTask.
> >> invoke(RegularPactTask.java:360)
> >>
> >>>           at
> >>>
> >>>
> >>>  org.apache.flink.runtime.execution.RuntimeEnvironment.
> >> run(RuntimeEnvironment.java:204)
> >>
> >>>           at java.lang.Thread.run(Thread.java:745)
> >>>
> >>> 02/11/2015 14:58:27     Job execution switched to status
> >>>
> >> FAILING.
> >>
> >>> 02/11/2015 14:58:27     CHAIN Map (Map at
> >>>
> >> main(DoTheKMeans.java:64)) ->
> >>
> >>> Map (Map
> >>> at main(DoTheKMeans.java:65))(1/1) switched to CANCELING
> >>> 02/11/2015 14:58:27     Combine (Reduce at
> >>>
> >> main(DoTheKMeans.java:68))(1/1)
> >>
> >>> switched to CANCELING
> >>> 02/11/2015 14:58:27     CHAIN Reduce(Reduce at
> >>>
> >> main(DoTheKMeans.java:68))
> >>
> >>> -> Map
> >>> (Map at main(DoTheKMeans.java:71))(1/1) switched to CANCELING
> >>> 02/11/2015 14:58:27     DataSink(Print to System.out)(1/1)
> >>>
> >> switched to
> >>
> >>> CANCELED
> >>> 02/11/2015 14:58:27     Sync(BulkIteration (Bulk
> >>>
> >> Iteration))(1/1) switched
> >>
> >>> to
> >>> CANCELING
> >>> 02/11/2015 14:58:27     Sync(BulkIteration (Bulk
> >>>
> >> Iteration))(1/1) switched
> >>
> >>> to
> >>> CANCELED
> >>> 02/11/2015 14:58:27     CHAIN Map (Map at
> >>>
> >> main(DoTheKMeans.java:64)) ->
> >>
> >>> Map (Map
> >>> at main(DoTheKMeans.java:65))(1/1) switched to CANCELED
> >>> 02/11/2015 14:58:27     Combine (Reduce at
> >>>
> >> main(DoTheKMeans.java:68))(1/1)
> >>
> >>> switched to CANCELED
> >>> 02/11/2015 14:58:27     CHAIN Reduce(Reduce at
> >>>
> >> main(DoTheKMeans.java:68))
> >>
> >>> -> Map
> >>> (Map at main(DoTheKMeans.java:71))(1/1) switched to CANCELED
> >>> 02/11/2015 14:58:27     Job execution switched to status FAILED.
> >>> Exception in thread "main"
> >>> org.apache.flink.runtime.client.JobExecutionException:
> >>> com.esotericsoftware.kryo.KryoException: Buffer underflow
> >>>           at
> >>>
> >>>
> >>>  org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(
> >> NoFetchingInput.java:76)
> >>
> >>>           at
> >>>
> >> com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355)
> >>
> >>>           at
> >>>
> >>>
> >>>  com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(
> >> DefaultClassResolver.java:109)
> >>
> >>>           at
> >>>
> >> com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
> >>
> >>>           at
> >>>
> >> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
> >>
> >>>           at
> >>>
> >>>
> >>>
> org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(
> >> KryoSerializer.java:205)
> >>
> >>>           at
> >>>
> >>>
> >>>
> org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(
> >> KryoSerializer.java:210)
> >>
> >>>           at
> >>>
> >>>
> >>>  org.apache.flink.runtime.io.disk.InputViewIterator.next(
> >> InputViewIterator.java:43)
> >>
> >>>           at
> >>>
> >>>  org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:91)
> >>
> >>>           at
> >>>
> >>>
> >>>  org.apache.flink.runtime.operators.RegularPactTask.run(
> >> RegularPactTask.java:496)
> >>
> >>>           at
> >>>
> >>>
> >>>  org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(
> >> AbstractIterativePactTask.java:138)
> >>
> >>>           at
> >>>
> >>>
> >>>  org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(
> >> IterationHeadPactTask.java:324)
> >>
> >>>           at
> >>>
> >>>
> >>>  org.apache.flink.runtime.operators.RegularPactTask.
> >> invoke(RegularPactTask.java:360)
> >>
> >>>           at
> >>>
> >>>
> >>>  org.apache.flink.runtime.execution.RuntimeEnvironment.
> >> run(RuntimeEnvironment.java:204)
> >>
> >>>           at java.lang.Thread.run(Thread.java:745)
> >>>
> >>>           at
> >>>
> >>>
> >>>  org.apache.flink.runtime.client.JobClientListener$$anonfun$
> >> receiveWithLogMessages$2.applyOrElse(JobClient.scala:88)
> >>
> >>>           at
> >>>
> >>>
> >>>  scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(
> >> AbstractPartialFunction.scala:33)
> >>
> >>>           at
> >>>
> >>>
> >>>  scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
> >> AbstractPartialFunction.scala:33)
> >>
> >>>           at
> >>>
> >>>
> >>>  scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
> >> AbstractPartialFunction.scala:25)
> >>
> >>>           at
> >>>
> >>>
> >>>  org.apache.flink.runtime.ActorLogMessages$$anon$1.
> >> apply(ActorLogMessages.scala:37)
> >>
> >>>           at
> >>>
> >>>
> >>>  org.apache.flink.runtime.ActorLogMessages$$anon$1.
> >> apply(ActorLogMessages.scala:30)
> >>
> >>>           at
> >>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
> >>>           at
> >>>
> >>>
> >>>  org.apache.flink.runtime.ActorLogMessages$$anon$1.
> >> applyOrElse(ActorLogMessages.scala:30)
> >>
> >>>           at
> >>>
> >> akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> >>
> >>>           at
> >>>
> >>>
> >>>  org.apache.flink.runtime.client.JobClientListener.
> >> aroundReceive(JobClient.scala:74)
> >>
> >>>           at
> >>>
> >> akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> >>
> >>>           at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> >>>           at
> >>>
> >> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
> >>
> >>>           at akka.dispatch.Mailbox.run(Mailbox.scala:221)
> >>>           at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> >>>           at
> >>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> >>>           at
> >>>
> >>>
> >>>  scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> >> runTask(ForkJoinPool.java:1339)
> >>
> >>>           at
> >>>
> >>>  scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> >> ForkJoinPool.java:1979)
> >>
> >>>           at
> >>>
> >>>
> >>>  scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> >> ForkJoinWorkerThread.java:107)
> >>
> >>> The centroid25 data is exactly the same in both cases. Could you
> >>>
> >> help me
> >>
> >>> retrace what is wrong?
> >>>
> >>> Thanks and best regards,
> >>>
> >>> Tran Nam-Luc
> >>>
> >>>
> >>>
> >>> --
> >>> View this message in context:
> >>>
> >>>  http://apache-flink-incubator-mailing-list-archive.1008284.
> >> n3.nabble.com/kryoException-Buffer-underflow-tp3760.html
> >>
> >>> Sent from the Apache Flink (Incubator) Mailing List archive. mailing
> >>>
> >> list
> >>
> >>> archive at Nabble.com.
> >>>
> >>>
> >>
> >>
> >
>

Reply via email to