@Timo If I understand it correctly, both omitting the "returns(...)"
statement, or changing it to "returns(Centroid25.class)" would help?

I think that the behavior between "returns(Centroid25.class)" and "
returns("eu.euranova.flink.Centroid25")" should be consistent in that they
both handle the type as a POJO.

Stephan


On Wed, Feb 11, 2015 at 9:28 PM, Timo Walther <twal...@apache.org> wrote:

> Hey Nam-Luc,
>
> I think your problem lies in the following line:
>
> .returns("eu.euranova.flink.Centroid25")
>
> If you do not specify the fields of the class in the String by using
> "<myfield=String,otherField=int>", the underlying parser will create an
> "GenericTypeInfo" type information which then uses Kryo for serialization.
>
> In general, lambda expressions are a very new feature which currently
> makes a lot of problems due to missing type information by compilers. Maybe
> it is better to use (anonymous) classes instead.
>
> In case of "map()" functions you don't need to provide type hints through
> the "returns()" method.
>
> For other operators you need to either specify all fields of the class in
> the String (makes no sense in you case) or you change the method to
>
> .returns(Centroid25.class)
>
> I hope that helps.
>
> Regards,
> Timo
>
>
> On 11.02.2015 17:38, Nam-Luc Tran wrote:
>
>> Hello Stephan,
>>
>> Thank you for your help.
>>
>> I ensured all the POJO classes used comply to what you previously said
>> and the same exception occurs. Here is the listing of classes
>> Centroid25 and Point25:
>>
>> public class Centroid25 extends Point25 {
>>
>> public int id;
>>
>> public Centroid25() {}
>>
>> public Centroid25(int id, Double value0, Double value1, Double value2,
>> Double value3, Double value4, Double value5,
>> Double value6, Double value7, Double value8, Double value9, Double
>> value10, Double value11, Double value12,
>> Double value13, Double value14, Double value15, Double value16, Double
>> value17, Double value18,
>> Double value19, Double value20, Double value21, Double value22, Double
>> value23, Double value24) {
>> super(value0, value1, value2, value3, value4, value5, value6, value7,
>> value8, value9, value10, value11,
>> value12, value13, value14, value15, value16, value17, value18,
>> value19, value20, value21, value22,
>> value23, value24);
>> this.id = id;
>> }
>>
>> public Centroid25(int id, Point25 p) {
>> super(p.f0,
>> p.f1,p.f2,p.f3,p.f4,p.f5,p.f6,p.f7,p.f8,p.f9,p.f10,p.f11,p.
>> f12,p.f13,p.f14,p.f15,p.f16,p.f17,p.f18,p.f19,p.f20,p.f21,p.
>> f22,p.f23,p.f24);
>> this.id = id;
>> }
>>
>> public Centroid25(int id, Tuple25 p) {
>> super(p.f0,
>> p.f1,p.f2,p.f3,p.f4,p.f5,p.f6,p.f7,p.f8,p.f9,p.f10,p.f11,p.
>> f12,p.f13,p.f14,p.f15,p.f16,p.f17,p.f18,p.f19,p.f20,p.f21,p.
>> f22,p.f23,p.f24);
>> this.id = id;
>> }
>>
>> @Override
>> public String toString() {
>> return id + " " + super.toString();
>> }
>> }
>>
>> public class Point25{
>>
>> public Double
>> f0,f1,f2,f3,f4,f5,f6,f7,f8,f9,f10,f11,f12,f13,f14,f15,f16,
>> f17,f18,f19,f20,f21,f22,f23,f24
>> = 0.0;
>>
>> public Point25() {
>> }
>>
>> public Point25(Double value0, Double value1, Double value2, Double
>> value3, Double value4, Double value5,
>> Double value6, Double value7, Double value8, Double value9, Double
>> value10, Double value11, Double value12,
>> Double value13, Double value14, Double value15, Double value16, Double
>> value17, Double value18,
>> Double value19, Double value20, Double value21, Double value22, Double
>> value23, Double value24) {
>> f0=value0;
>> f1=value1;
>> f2=value2;
>> f3=value3;
>> f4=value4;
>> f5=value5;
>> f6=value6;
>> f7=value7;
>> f8=value8;
>> f9=value9;
>> f10=value10;
>> f11=value11;
>> f12=value12;
>> f13=value13;
>> f14=value14;
>> f15=value15;
>> f16=value16;
>> f17=value17;
>> f18=value18;
>> f19=value19;
>> f20=value20;
>> f21=value21;
>> f22=value22;
>> f23=value23;
>> f24=value24;
>>
>> }
>>
>> public List getFieldsAsList() {
>> return Arrays.asList(f0, f1, f2, f3, f4, f5, f6, f7, f8, f9, f10, f11,
>> f12, f13, f14, f15, f16, f17, f18, f19,
>> f20, f21, f22, f23, f24);
>> }
>>
>> public Point25 add(Point25 other) {
>> f0 += other.f0;
>> f1 += other.f1;
>> f2 += other.f2;
>> f3 += other.f3;
>> f4 += other.f4;
>> f5 += other.f5;
>> f6 += other.f6;
>> f7 += other.f7;
>> f8 += other.f8;
>> f9 += other.f9;
>> f10 += other.f10;
>> f11 += other.f11;
>> f12 += other.f12;
>> f13 += other.f13;
>> f14 += other.f14;
>> f15 += other.f15;
>> f16 += other.f16;
>> f17 += other.f17;
>> f18 += other.f18;
>> f19 += other.f19;
>> f20 += other.f20;
>> f21 += other.f21;
>> f22 += other.f22;
>> f23 += other.f23;
>> f24 += other.f24;
>> return this;
>> }
>>
>> public Point25 div(long val) {
>> f0 /= val;
>> f1 /= val;
>> f2 /= val;
>> f3 /= val;
>> f4 /= val;
>> f5 += val;
>> f6 += val;
>> f7 += val;
>> f8 += val;
>> f9 += val;
>> f10 += val;
>> f11 += val;
>> f12 += val;
>> f13 += val;
>> f14 += val;
>> f15 += val;
>> f16 += val;
>> f17 += val;
>> f18 += val;
>> f19 += val;
>> f20 += val;
>> f21 += val;
>> f22 += val;
>> f23 += val;
>> f24 += val;
>> return this;
>> }
>>
>> public double euclideanDistance(Point25 other) {
>> List l = this.getFieldsAsList();
>> List ol = other.getFieldsAsList();
>> double res = 0;
>> for(int i=0;i
>>
>>> I came accross an error for which I am unable to retrace the exact
>>>
>> cause.
>>
>>> Starting from flink-java-examples module, I have extended the KMeans
>>> example
>>> to a case where points have 25 coordinates. It follows the exact
>>>
>> same
>>
>>> structure and transformations as the original example, only with
>>>
>> points
>>
>>> having 25 coordinates instead of 2.
>>>
>>> When creating the centroids dataset within the code as follows the
>>>
>> job
>>
>>> iterates and executes well:
>>>
>>> Centroid25 cent1 = new
>>>
>> Centroid25(ThreadLocalRandom.current().nextInt(0,
>>
>>> 1000),
>>>
>>>
>>>  -10.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-0.0,
>> 0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,10.0);
>>
>>> Centroid25 cent2 = new
>>>
>> Centroid25(ThreadLocalRandom.current().nextInt(0,
>>
>>> 1000),
>>>
>>>
>>>  -1.0,-1.0,1.0,1.0,-1.0,1.0,1.0,1.0,1.0,1.0,1.0,-1.0,1.0,1.
>> 0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0);
>>
>>> DataSet centroids = env.fromCollection(Arrays.asList(cent1,
>>> cent2));
>>>
>>> When reading from a csv file containing the following:
>>>
>>>
>>>  -10.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-0.0,
>> 0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,10.0
>>
>>>
>>>  -1.0,-1.0,1.0,1.0,-1.0,1.0,1.0,1.0,1.0,1.0,1.0,-1.0,1.0,1.
>> 0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0
>>
>>> with the following code:
>>> DataSet> centroids = env
>>>
>>> .readCsvFile("file:///home/nltran/res3.csv")
>>>
>>>
>> .fieldDelimiter(",")
>>
>>>
>>>
>> .includeFields("1111111111111111111111111")
>>
>>>
>>>
>> .types(Double.class, Double.class,
>>
>>> Double.class, Double.class,
>>> Double.class, Double.class,
>>>
>>>
>> Double.class,
>>
>>> Double.class, Double.class, Double.class, Double.class,
>>> Double.class,
>>>
>>>
>> Double.class,
>>
>>> Double.class, Double.class, Double.class, Double.class,
>>> Double.class,
>>>
>>>
>> Double.class,
>>
>>> Double.class, Double.class, Double.class, Double.class,
>>> Double.class,
>>>
>>>
>> Double.class).map(p -> {
>>
>>>
>>>
>> return new
>>
>>> Centroid25(ThreadLocalRandom.current().nextInt(0, 1000),
>>>
>>>
>>>  p.f0,p.f1,p.f2,p.f3,p.f4,p.f5,p.f6,p.f7,p.f8,p.f9,p.f10,p.
>> f11,p.f12,p.f13,p.f14,p.f15,p.f16,p.f17,p.f18,p.f19,p.f20,p.
>> f21,p.f22,p.f23,p.f24);
>>
>>>
>>>
>> }).returns("eu.euranova.flink.Centroid25");
>>
>>>
>>> I hit the following exception:
>>>
>>> 02/11/2015 14:58:27     PartialSolution (BulkIteration (Bulk
>>> Iteration))(1/1)
>>> switched to FAILED
>>> com.esotericsoftware.kryo.KryoException: Buffer underflow
>>>           at
>>>
>>>
>>>  org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(
>> NoFetchingInput.java:76)
>>
>>>           at
>>>
>> com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355)
>>
>>>           at
>>>
>>>
>>>  com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(
>> DefaultClassResolver.java:109)
>>
>>>           at
>>>
>> com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>>
>>>           at
>>>
>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>>
>>>           at
>>>
>>>
>>>  org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(
>> KryoSerializer.java:205)
>>
>>>           at
>>>
>>>
>>>  org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(
>> KryoSerializer.java:210)
>>
>>>           at
>>>
>>>
>>>  org.apache.flink.runtime.io.disk.InputViewIterator.next(
>> InputViewIterator.java:43)
>>
>>>           at
>>>
>>>  org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:91)
>>
>>>           at
>>>
>>>
>>>  org.apache.flink.runtime.operators.RegularPactTask.run(
>> RegularPactTask.java:496)
>>
>>>           at
>>>
>>>
>>>  org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(
>> AbstractIterativePactTask.java:138)
>>
>>>           at
>>>
>>>
>>>  org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(
>> IterationHeadPactTask.java:324)
>>
>>>           at
>>>
>>>
>>>  org.apache.flink.runtime.operators.RegularPactTask.
>> invoke(RegularPactTask.java:360)
>>
>>>           at
>>>
>>>
>>>  org.apache.flink.runtime.execution.RuntimeEnvironment.
>> run(RuntimeEnvironment.java:204)
>>
>>>           at java.lang.Thread.run(Thread.java:745)
>>>
>>> 02/11/2015 14:58:27     Job execution switched to status
>>>
>> FAILING.
>>
>>> 02/11/2015 14:58:27     CHAIN Map (Map at
>>>
>> main(DoTheKMeans.java:64)) ->
>>
>>> Map (Map
>>> at main(DoTheKMeans.java:65))(1/1) switched to CANCELING
>>> 02/11/2015 14:58:27     Combine (Reduce at
>>>
>> main(DoTheKMeans.java:68))(1/1)
>>
>>> switched to CANCELING
>>> 02/11/2015 14:58:27     CHAIN Reduce(Reduce at
>>>
>> main(DoTheKMeans.java:68))
>>
>>> -> Map
>>> (Map at main(DoTheKMeans.java:71))(1/1) switched to CANCELING
>>> 02/11/2015 14:58:27     DataSink(Print to System.out)(1/1)
>>>
>> switched to
>>
>>> CANCELED
>>> 02/11/2015 14:58:27     Sync(BulkIteration (Bulk
>>>
>> Iteration))(1/1) switched
>>
>>> to
>>> CANCELING
>>> 02/11/2015 14:58:27     Sync(BulkIteration (Bulk
>>>
>> Iteration))(1/1) switched
>>
>>> to
>>> CANCELED
>>> 02/11/2015 14:58:27     CHAIN Map (Map at
>>>
>> main(DoTheKMeans.java:64)) ->
>>
>>> Map (Map
>>> at main(DoTheKMeans.java:65))(1/1) switched to CANCELED
>>> 02/11/2015 14:58:27     Combine (Reduce at
>>>
>> main(DoTheKMeans.java:68))(1/1)
>>
>>> switched to CANCELED
>>> 02/11/2015 14:58:27     CHAIN Reduce(Reduce at
>>>
>> main(DoTheKMeans.java:68))
>>
>>> -> Map
>>> (Map at main(DoTheKMeans.java:71))(1/1) switched to CANCELED
>>> 02/11/2015 14:58:27     Job execution switched to status FAILED.
>>> Exception in thread "main"
>>> org.apache.flink.runtime.client.JobExecutionException:
>>> com.esotericsoftware.kryo.KryoException: Buffer underflow
>>>           at
>>>
>>>
>>>  org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(
>> NoFetchingInput.java:76)
>>
>>>           at
>>>
>> com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355)
>>
>>>           at
>>>
>>>
>>>  com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(
>> DefaultClassResolver.java:109)
>>
>>>           at
>>>
>> com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>>
>>>           at
>>>
>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>>
>>>           at
>>>
>>>
>>>  org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(
>> KryoSerializer.java:205)
>>
>>>           at
>>>
>>>
>>>  org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(
>> KryoSerializer.java:210)
>>
>>>           at
>>>
>>>
>>>  org.apache.flink.runtime.io.disk.InputViewIterator.next(
>> InputViewIterator.java:43)
>>
>>>           at
>>>
>>>  org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:91)
>>
>>>           at
>>>
>>>
>>>  org.apache.flink.runtime.operators.RegularPactTask.run(
>> RegularPactTask.java:496)
>>
>>>           at
>>>
>>>
>>>  org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(
>> AbstractIterativePactTask.java:138)
>>
>>>           at
>>>
>>>
>>>  org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(
>> IterationHeadPactTask.java:324)
>>
>>>           at
>>>
>>>
>>>  org.apache.flink.runtime.operators.RegularPactTask.
>> invoke(RegularPactTask.java:360)
>>
>>>           at
>>>
>>>
>>>  org.apache.flink.runtime.execution.RuntimeEnvironment.
>> run(RuntimeEnvironment.java:204)
>>
>>>           at java.lang.Thread.run(Thread.java:745)
>>>
>>>           at
>>>
>>>
>>>  org.apache.flink.runtime.client.JobClientListener$$anonfun$
>> receiveWithLogMessages$2.applyOrElse(JobClient.scala:88)
>>
>>>           at
>>>
>>>
>>>  scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(
>> AbstractPartialFunction.scala:33)
>>
>>>           at
>>>
>>>
>>>  scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
>> AbstractPartialFunction.scala:33)
>>
>>>           at
>>>
>>>
>>>  scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
>> AbstractPartialFunction.scala:25)
>>
>>>           at
>>>
>>>
>>>  org.apache.flink.runtime.ActorLogMessages$$anon$1.
>> apply(ActorLogMessages.scala:37)
>>
>>>           at
>>>
>>>
>>>  org.apache.flink.runtime.ActorLogMessages$$anon$1.
>> apply(ActorLogMessages.scala:30)
>>
>>>           at
>>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>>>           at
>>>
>>>
>>>  org.apache.flink.runtime.ActorLogMessages$$anon$1.
>> applyOrElse(ActorLogMessages.scala:30)
>>
>>>           at
>>>
>> akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>>
>>>           at
>>>
>>>
>>>  org.apache.flink.runtime.client.JobClientListener.
>> aroundReceive(JobClient.scala:74)
>>
>>>           at
>>>
>> akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>>
>>>           at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>>>           at
>>>
>> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>>
>>>           at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>>>           at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>>>           at
>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>           at
>>>
>>>
>>>  scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
>> runTask(ForkJoinPool.java:1339)
>>
>>>           at
>>>
>>>  scala.concurrent.forkjoin.ForkJoinPool.runWorker(
>> ForkJoinPool.java:1979)
>>
>>>           at
>>>
>>>
>>>  scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
>> ForkJoinWorkerThread.java:107)
>>
>>> The centroid25 data is exactly the same in both cases. Could you
>>>
>> help me
>>
>>> retrace what is wrong?
>>>
>>> Thanks and best regards,
>>>
>>> Tran Nam-Luc
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>>
>>>  http://apache-flink-incubator-mailing-list-archive.1008284.
>> n3.nabble.com/kryoException-Buffer-underflow-tp3760.html
>>
>>> Sent from the Apache Flink (Incubator) Mailing List archive. mailing
>>>
>> list
>>
>>> archive at Nabble.com.
>>>
>>>
>>
>>
>

Reply via email to