But in this case, there are no type parameters, correct? Centroid25 is not
a generic class...

On Wed, Feb 11, 2015 at 9:40 PM, Robert Metzger <rmetz...@apache.org> wrote:

> I think the issue is that the returns("eu.euranova.flink.Centroid25")
> variant only passes a string and the system does not know the
> typeparameters.
> So we have to put GenericTypeInfo there, because we basically see Object's.
>
> On Wed, Feb 11, 2015 at 9:37 PM, Stephan Ewen <se...@apache.org> wrote:
>
> > @Timo If I understand it correctly, both omitting the "returns(...)"
> > statement, or changing it to "returns(Centroid25.class)" would help?
> >
> > I think that the behavior between "returns(Centroid25.class)" and "
> > returns("eu.euranova.flink.Centroid25")" should be consistent in that
> they
> > both handle the type as a POJO.
> >
> > Stephan
> >
> >
> > On Wed, Feb 11, 2015 at 9:28 PM, Timo Walther <twal...@apache.org>
> wrote:
> >
> > > Hey Nam-Luc,
> > >
> > > I think your problem lies in the following line:
> > >
> > > .returns("eu.euranova.flink.Centroid25")
> > >
> > > If you do not specify the fields of the class in the String by using
> > > "<myfield=String,otherField=int>", the underlying parser will create an
> > > "GenericTypeInfo" type information which then uses Kryo for
> > serialization.
> > >
> > > In general, lambda expressions are a very new feature which currently
> > > makes a lot of problems due to missing type information by compilers.
> > Maybe
> > > it is better to use (anonymous) classes instead.
> > >
> > > In case of "map()" functions you don't need to provide type hints
> through
> > > the "returns()" method.
> > >
> > > For other operators you need to either specify all fields of the class
> in
> > > the String (makes no sense in you case) or you change the method to
> > >
> > > .returns(Centroid25.class)
> > >
> > > I hope that helps.
> > >
> > > Regards,
> > > Timo
> > >
> > >
> > > On 11.02.2015 17:38, Nam-Luc Tran wrote:
> > >
> > >> Hello Stephan,
> > >>
> > >> Thank you for your help.
> > >>
> > >> I ensured all the POJO classes used comply to what you previously said
> > >> and the same exception occurs. Here is the listing of classes
> > >> Centroid25 and Point25:
> > >>
> > >> public class Centroid25 extends Point25 {
> > >>
> > >> public int id;
> > >>
> > >> public Centroid25() {}
> > >>
> > >> public Centroid25(int id, Double value0, Double value1, Double value2,
> > >> Double value3, Double value4, Double value5,
> > >> Double value6, Double value7, Double value8, Double value9, Double
> > >> value10, Double value11, Double value12,
> > >> Double value13, Double value14, Double value15, Double value16, Double
> > >> value17, Double value18,
> > >> Double value19, Double value20, Double value21, Double value22, Double
> > >> value23, Double value24) {
> > >> super(value0, value1, value2, value3, value4, value5, value6, value7,
> > >> value8, value9, value10, value11,
> > >> value12, value13, value14, value15, value16, value17, value18,
> > >> value19, value20, value21, value22,
> > >> value23, value24);
> > >> this.id = id;
> > >> }
> > >>
> > >> public Centroid25(int id, Point25 p) {
> > >> super(p.f0,
> > >> p.f1,p.f2,p.f3,p.f4,p.f5,p.f6,p.f7,p.f8,p.f9,p.f10,p.f11,p.
> > >> f12,p.f13,p.f14,p.f15,p.f16,p.f17,p.f18,p.f19,p.f20,p.f21,p.
> > >> f22,p.f23,p.f24);
> > >> this.id = id;
> > >> }
> > >>
> > >> public Centroid25(int id, Tuple25 p) {
> > >> super(p.f0,
> > >> p.f1,p.f2,p.f3,p.f4,p.f5,p.f6,p.f7,p.f8,p.f9,p.f10,p.f11,p.
> > >> f12,p.f13,p.f14,p.f15,p.f16,p.f17,p.f18,p.f19,p.f20,p.f21,p.
> > >> f22,p.f23,p.f24);
> > >> this.id = id;
> > >> }
> > >>
> > >> @Override
> > >> public String toString() {
> > >> return id + " " + super.toString();
> > >> }
> > >> }
> > >>
> > >> public class Point25{
> > >>
> > >> public Double
> > >> f0,f1,f2,f3,f4,f5,f6,f7,f8,f9,f10,f11,f12,f13,f14,f15,f16,
> > >> f17,f18,f19,f20,f21,f22,f23,f24
> > >> = 0.0;
> > >>
> > >> public Point25() {
> > >> }
> > >>
> > >> public Point25(Double value0, Double value1, Double value2, Double
> > >> value3, Double value4, Double value5,
> > >> Double value6, Double value7, Double value8, Double value9, Double
> > >> value10, Double value11, Double value12,
> > >> Double value13, Double value14, Double value15, Double value16, Double
> > >> value17, Double value18,
> > >> Double value19, Double value20, Double value21, Double value22, Double
> > >> value23, Double value24) {
> > >> f0=value0;
> > >> f1=value1;
> > >> f2=value2;
> > >> f3=value3;
> > >> f4=value4;
> > >> f5=value5;
> > >> f6=value6;
> > >> f7=value7;
> > >> f8=value8;
> > >> f9=value9;
> > >> f10=value10;
> > >> f11=value11;
> > >> f12=value12;
> > >> f13=value13;
> > >> f14=value14;
> > >> f15=value15;
> > >> f16=value16;
> > >> f17=value17;
> > >> f18=value18;
> > >> f19=value19;
> > >> f20=value20;
> > >> f21=value21;
> > >> f22=value22;
> > >> f23=value23;
> > >> f24=value24;
> > >>
> > >> }
> > >>
> > >> public List getFieldsAsList() {
> > >> return Arrays.asList(f0, f1, f2, f3, f4, f5, f6, f7, f8, f9, f10, f11,
> > >> f12, f13, f14, f15, f16, f17, f18, f19,
> > >> f20, f21, f22, f23, f24);
> > >> }
> > >>
> > >> public Point25 add(Point25 other) {
> > >> f0 += other.f0;
> > >> f1 += other.f1;
> > >> f2 += other.f2;
> > >> f3 += other.f3;
> > >> f4 += other.f4;
> > >> f5 += other.f5;
> > >> f6 += other.f6;
> > >> f7 += other.f7;
> > >> f8 += other.f8;
> > >> f9 += other.f9;
> > >> f10 += other.f10;
> > >> f11 += other.f11;
> > >> f12 += other.f12;
> > >> f13 += other.f13;
> > >> f14 += other.f14;
> > >> f15 += other.f15;
> > >> f16 += other.f16;
> > >> f17 += other.f17;
> > >> f18 += other.f18;
> > >> f19 += other.f19;
> > >> f20 += other.f20;
> > >> f21 += other.f21;
> > >> f22 += other.f22;
> > >> f23 += other.f23;
> > >> f24 += other.f24;
> > >> return this;
> > >> }
> > >>
> > >> public Point25 div(long val) {
> > >> f0 /= val;
> > >> f1 /= val;
> > >> f2 /= val;
> > >> f3 /= val;
> > >> f4 /= val;
> > >> f5 += val;
> > >> f6 += val;
> > >> f7 += val;
> > >> f8 += val;
> > >> f9 += val;
> > >> f10 += val;
> > >> f11 += val;
> > >> f12 += val;
> > >> f13 += val;
> > >> f14 += val;
> > >> f15 += val;
> > >> f16 += val;
> > >> f17 += val;
> > >> f18 += val;
> > >> f19 += val;
> > >> f20 += val;
> > >> f21 += val;
> > >> f22 += val;
> > >> f23 += val;
> > >> f24 += val;
> > >> return this;
> > >> }
> > >>
> > >> public double euclideanDistance(Point25 other) {
> > >> List l = this.getFieldsAsList();
> > >> List ol = other.getFieldsAsList();
> > >> double res = 0;
> > >> for(int i=0;i
> > >>
> > >>> I came accross an error for which I am unable to retrace the exact
> > >>>
> > >> cause.
> > >>
> > >>> Starting from flink-java-examples module, I have extended the KMeans
> > >>> example
> > >>> to a case where points have 25 coordinates. It follows the exact
> > >>>
> > >> same
> > >>
> > >>> structure and transformations as the original example, only with
> > >>>
> > >> points
> > >>
> > >>> having 25 coordinates instead of 2.
> > >>>
> > >>> When creating the centroids dataset within the code as follows the
> > >>>
> > >> job
> > >>
> > >>> iterates and executes well:
> > >>>
> > >>> Centroid25 cent1 = new
> > >>>
> > >> Centroid25(ThreadLocalRandom.current().nextInt(0,
> > >>
> > >>> 1000),
> > >>>
> > >>>
> > >>>  -10.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-0.0,
> > >> 0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,10.0);
> > >>
> > >>> Centroid25 cent2 = new
> > >>>
> > >> Centroid25(ThreadLocalRandom.current().nextInt(0,
> > >>
> > >>> 1000),
> > >>>
> > >>>
> > >>>  -1.0,-1.0,1.0,1.0,-1.0,1.0,1.0,1.0,1.0,1.0,1.0,-1.0,1.0,1.
> > >> 0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0);
> > >>
> > >>> DataSet centroids = env.fromCollection(Arrays.asList(cent1,
> > >>> cent2));
> > >>>
> > >>> When reading from a csv file containing the following:
> > >>>
> > >>>
> > >>>  -10.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-0.0,
> > >> 0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,10.0
> > >>
> > >>>
> > >>>  -1.0,-1.0,1.0,1.0,-1.0,1.0,1.0,1.0,1.0,1.0,1.0,-1.0,1.0,1.
> > >> 0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0
> > >>
> > >>> with the following code:
> > >>> DataSet> centroids = env
> > >>>
> > >>> .readCsvFile("file:///home/nltran/res3.csv")
> > >>>
> > >>>
> > >> .fieldDelimiter(",")
> > >>
> > >>>
> > >>>
> > >> .includeFields("1111111111111111111111111")
> > >>
> > >>>
> > >>>
> > >> .types(Double.class, Double.class,
> > >>
> > >>> Double.class, Double.class,
> > >>> Double.class, Double.class,
> > >>>
> > >>>
> > >> Double.class,
> > >>
> > >>> Double.class, Double.class, Double.class, Double.class,
> > >>> Double.class,
> > >>>
> > >>>
> > >> Double.class,
> > >>
> > >>> Double.class, Double.class, Double.class, Double.class,
> > >>> Double.class,
> > >>>
> > >>>
> > >> Double.class,
> > >>
> > >>> Double.class, Double.class, Double.class, Double.class,
> > >>> Double.class,
> > >>>
> > >>>
> > >> Double.class).map(p -> {
> > >>
> > >>>
> > >>>
> > >> return new
> > >>
> > >>> Centroid25(ThreadLocalRandom.current().nextInt(0, 1000),
> > >>>
> > >>>
> > >>>  p.f0,p.f1,p.f2,p.f3,p.f4,p.f5,p.f6,p.f7,p.f8,p.f9,p.f10,p.
> > >> f11,p.f12,p.f13,p.f14,p.f15,p.f16,p.f17,p.f18,p.f19,p.f20,p.
> > >> f21,p.f22,p.f23,p.f24);
> > >>
> > >>>
> > >>>
> > >> }).returns("eu.euranova.flink.Centroid25");
> > >>
> > >>>
> > >>> I hit the following exception:
> > >>>
> > >>> 02/11/2015 14:58:27     PartialSolution (BulkIteration (Bulk
> > >>> Iteration))(1/1)
> > >>> switched to FAILED
> > >>> com.esotericsoftware.kryo.KryoException: Buffer underflow
> > >>>           at
> > >>>
> > >>>
> > >>>  org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(
> > >> NoFetchingInput.java:76)
> > >>
> > >>>           at
> > >>>
> > >> com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355)
> > >>
> > >>>           at
> > >>>
> > >>>
> > >>>  com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(
> > >> DefaultClassResolver.java:109)
> > >>
> > >>>           at
> > >>>
> > >> com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
> > >>
> > >>>           at
> > >>>
> > >> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
> > >>
> > >>>           at
> > >>>
> > >>>
> > >>>
> > org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(
> > >> KryoSerializer.java:205)
> > >>
> > >>>           at
> > >>>
> > >>>
> > >>>
> > org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(
> > >> KryoSerializer.java:210)
> > >>
> > >>>           at
> > >>>
> > >>>
> > >>>  org.apache.flink.runtime.io.disk.InputViewIterator.next(
> > >> InputViewIterator.java:43)
> > >>
> > >>>           at
> > >>>
> > >>>
> org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:91)
> > >>
> > >>>           at
> > >>>
> > >>>
> > >>>  org.apache.flink.runtime.operators.RegularPactTask.run(
> > >> RegularPactTask.java:496)
> > >>
> > >>>           at
> > >>>
> > >>>
> > >>>
> org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(
> > >> AbstractIterativePactTask.java:138)
> > >>
> > >>>           at
> > >>>
> > >>>
> > >>>  org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(
> > >> IterationHeadPactTask.java:324)
> > >>
> > >>>           at
> > >>>
> > >>>
> > >>>  org.apache.flink.runtime.operators.RegularPactTask.
> > >> invoke(RegularPactTask.java:360)
> > >>
> > >>>           at
> > >>>
> > >>>
> > >>>  org.apache.flink.runtime.execution.RuntimeEnvironment.
> > >> run(RuntimeEnvironment.java:204)
> > >>
> > >>>           at java.lang.Thread.run(Thread.java:745)
> > >>>
> > >>> 02/11/2015 14:58:27     Job execution switched to status
> > >>>
> > >> FAILING.
> > >>
> > >>> 02/11/2015 14:58:27     CHAIN Map (Map at
> > >>>
> > >> main(DoTheKMeans.java:64)) ->
> > >>
> > >>> Map (Map
> > >>> at main(DoTheKMeans.java:65))(1/1) switched to CANCELING
> > >>> 02/11/2015 14:58:27     Combine (Reduce at
> > >>>
> > >> main(DoTheKMeans.java:68))(1/1)
> > >>
> > >>> switched to CANCELING
> > >>> 02/11/2015 14:58:27     CHAIN Reduce(Reduce at
> > >>>
> > >> main(DoTheKMeans.java:68))
> > >>
> > >>> -> Map
> > >>> (Map at main(DoTheKMeans.java:71))(1/1) switched to CANCELING
> > >>> 02/11/2015 14:58:27     DataSink(Print to System.out)(1/1)
> > >>>
> > >> switched to
> > >>
> > >>> CANCELED
> > >>> 02/11/2015 14:58:27     Sync(BulkIteration (Bulk
> > >>>
> > >> Iteration))(1/1) switched
> > >>
> > >>> to
> > >>> CANCELING
> > >>> 02/11/2015 14:58:27     Sync(BulkIteration (Bulk
> > >>>
> > >> Iteration))(1/1) switched
> > >>
> > >>> to
> > >>> CANCELED
> > >>> 02/11/2015 14:58:27     CHAIN Map (Map at
> > >>>
> > >> main(DoTheKMeans.java:64)) ->
> > >>
> > >>> Map (Map
> > >>> at main(DoTheKMeans.java:65))(1/1) switched to CANCELED
> > >>> 02/11/2015 14:58:27     Combine (Reduce at
> > >>>
> > >> main(DoTheKMeans.java:68))(1/1)
> > >>
> > >>> switched to CANCELED
> > >>> 02/11/2015 14:58:27     CHAIN Reduce(Reduce at
> > >>>
> > >> main(DoTheKMeans.java:68))
> > >>
> > >>> -> Map
> > >>> (Map at main(DoTheKMeans.java:71))(1/1) switched to CANCELED
> > >>> 02/11/2015 14:58:27     Job execution switched to status FAILED.
> > >>> Exception in thread "main"
> > >>> org.apache.flink.runtime.client.JobExecutionException:
> > >>> com.esotericsoftware.kryo.KryoException: Buffer underflow
> > >>>           at
> > >>>
> > >>>
> > >>>  org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(
> > >> NoFetchingInput.java:76)
> > >>
> > >>>           at
> > >>>
> > >> com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355)
> > >>
> > >>>           at
> > >>>
> > >>>
> > >>>  com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(
> > >> DefaultClassResolver.java:109)
> > >>
> > >>>           at
> > >>>
> > >> com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
> > >>
> > >>>           at
> > >>>
> > >> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
> > >>
> > >>>           at
> > >>>
> > >>>
> > >>>
> > org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(
> > >> KryoSerializer.java:205)
> > >>
> > >>>           at
> > >>>
> > >>>
> > >>>
> > org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(
> > >> KryoSerializer.java:210)
> > >>
> > >>>           at
> > >>>
> > >>>
> > >>>  org.apache.flink.runtime.io.disk.InputViewIterator.next(
> > >> InputViewIterator.java:43)
> > >>
> > >>>           at
> > >>>
> > >>>
> org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:91)
> > >>
> > >>>           at
> > >>>
> > >>>
> > >>>  org.apache.flink.runtime.operators.RegularPactTask.run(
> > >> RegularPactTask.java:496)
> > >>
> > >>>           at
> > >>>
> > >>>
> > >>>
> org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(
> > >> AbstractIterativePactTask.java:138)
> > >>
> > >>>           at
> > >>>
> > >>>
> > >>>  org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(
> > >> IterationHeadPactTask.java:324)
> > >>
> > >>>           at
> > >>>
> > >>>
> > >>>  org.apache.flink.runtime.operators.RegularPactTask.
> > >> invoke(RegularPactTask.java:360)
> > >>
> > >>>           at
> > >>>
> > >>>
> > >>>  org.apache.flink.runtime.execution.RuntimeEnvironment.
> > >> run(RuntimeEnvironment.java:204)
> > >>
> > >>>           at java.lang.Thread.run(Thread.java:745)
> > >>>
> > >>>           at
> > >>>
> > >>>
> > >>>  org.apache.flink.runtime.client.JobClientListener$$anonfun$
> > >> receiveWithLogMessages$2.applyOrElse(JobClient.scala:88)
> > >>
> > >>>           at
> > >>>
> > >>>
> > >>>  scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(
> > >> AbstractPartialFunction.scala:33)
> > >>
> > >>>           at
> > >>>
> > >>>
> > >>>  scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
> > >> AbstractPartialFunction.scala:33)
> > >>
> > >>>           at
> > >>>
> > >>>
> > >>>  scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
> > >> AbstractPartialFunction.scala:25)
> > >>
> > >>>           at
> > >>>
> > >>>
> > >>>  org.apache.flink.runtime.ActorLogMessages$$anon$1.
> > >> apply(ActorLogMessages.scala:37)
> > >>
> > >>>           at
> > >>>
> > >>>
> > >>>  org.apache.flink.runtime.ActorLogMessages$$anon$1.
> > >> apply(ActorLogMessages.scala:30)
> > >>
> > >>>           at
> > >>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
> > >>>           at
> > >>>
> > >>>
> > >>>  org.apache.flink.runtime.ActorLogMessages$$anon$1.
> > >> applyOrElse(ActorLogMessages.scala:30)
> > >>
> > >>>           at
> > >>>
> > >> akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> > >>
> > >>>           at
> > >>>
> > >>>
> > >>>  org.apache.flink.runtime.client.JobClientListener.
> > >> aroundReceive(JobClient.scala:74)
> > >>
> > >>>           at
> > >>>
> > >> akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> > >>
> > >>>           at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> > >>>           at
> > >>>
> > >> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
> > >>
> > >>>           at akka.dispatch.Mailbox.run(Mailbox.scala:221)
> > >>>           at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> > >>>           at
> > >>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> > >>>           at
> > >>>
> > >>>
> > >>>  scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> > >> runTask(ForkJoinPool.java:1339)
> > >>
> > >>>           at
> > >>>
> > >>>  scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> > >> ForkJoinPool.java:1979)
> > >>
> > >>>           at
> > >>>
> > >>>
> > >>>  scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> > >> ForkJoinWorkerThread.java:107)
> > >>
> > >>> The centroid25 data is exactly the same in both cases. Could you
> > >>>
> > >> help me
> > >>
> > >>> retrace what is wrong?
> > >>>
> > >>> Thanks and best regards,
> > >>>
> > >>> Tran Nam-Luc
> > >>>
> > >>>
> > >>>
> > >>> --
> > >>> View this message in context:
> > >>>
> > >>>  http://apache-flink-incubator-mailing-list-archive.1008284.
> > >> n3.nabble.com/kryoException-Buffer-underflow-tp3760.html
> > >>
> > >>> Sent from the Apache Flink (Incubator) Mailing List archive. mailing
> > >>>
> > >> list
> > >>
> > >>> archive at Nabble.com.
> > >>>
> > >>>
> > >>
> > >>
> > >
> >
>

Reply via email to