The kryo underflow should be fixed with the PR [1]. [1] https://github.com/apache/flink/pull/391
On Thu, Feb 12, 2015 at 4:10 PM, Nam-Luc Tran <namluc.t...@euranova.eu> wrote: > Without the .returns(...) statement it yelled about type erasure. > Putting.returns(Centroid25.class) did the trick. > > Thanks everyone for your help. > > Tran Nam-Luc > > At Thursday, 12/02/2015 on 12:06 Kirschnick, Johannes wrote: > > Hi, > > I basically just reported an issue and found this thread on the list > about the same error > > Just bringing this up here, in case these issues are linked ... > > There is a small testcase to reproduce attached > https://issues.apache.org/jira/browse/FLINK-1531 > > I tried to single in on the code and find the problem - which might be > related to the type eraser? > > It seems that in the mentioned scenario there is a > MutableObjectIterator which is iterated and null is used to signal "no > more". > Because kryo is in the mix - it eagerly tries to read "next" which > fails with buffer underflow. > So somewhere there should be a hasNext call .. > > Johannes > ________________________________________ > Von: Timo Walther > Gesendet: Mittwoch, 11. Februar 2015 21:55 > An: dev@flink.apache.org > Betreff: Re: kryoException : Buffer underflow > > @Stephan: Yes you are correct. Both omitting the "returns(...)" > statement, or changing it to "returns(Centroid25.class)" would help. > > The returns(TypeInformation) and returns(String) methods do absolutely > no type extraction, the user has to know what he is doing. If you read > the methods description: > > Pojo types such as org.my.MyPojo > Generic types such as java.lang.Class > > With the returns(String) method you can create all types of type > information we currently support. > > returns(Class) the description is as follows: > > This method takes a class that will be analyzed by Flink's type > extraction capabilities. > > On 11.02.2015 21:42, Stephan Ewen wrote: > > But in this case, there are no type parameters, correct? Centroid25 > is not > > a generic class... > > > > On Wed, Feb 11, 2015 at 9:40 PM, Robert Metzger wrote: > > > >> I think the issue is that the > returns("eu.euranova.flink.Centroid25") > >> variant only passes a string and the system does not know the > >> typeparameters. > >> So we have to put GenericTypeInfo there, because we basically see > Object's. > >> > >> On Wed, Feb 11, 2015 at 9:37 PM, Stephan Ewen wrote: > >> > >>> @Timo If I understand it correctly, both omitting the > "returns(...)" > >>> statement, or changing it to "returns(Centroid25.class)" would > help? > >>> > >>> I think that the behavior between "returns(Centroid25.class)" and > " > >>> returns("eu.euranova.flink.Centroid25")" should be consistent in > that > >> they > >>> both handle the type as a POJO. > >>> > >>> Stephan > >>> > >>> > >>> On Wed, Feb 11, 2015 at 9:28 PM, Timo Walther > >> wrote: > >>>> Hey Nam-Luc, > >>>> > >>>> I think your problem lies in the following line: > >>>> > >>>> .returns("eu.euranova.flink.Centroid25") > >>>> > >>>> If you do not specify the fields of the class in the String by > using > >>>> "", the underlying parser will create an > >>>> "GenericTypeInfo" type information which then uses Kryo for > >>> serialization. > >>>> In general, lambda expressions are a very new feature which > currently > >>>> makes a lot of problems due to missing type information by > compilers. > >>> Maybe > >>>> it is better to use (anonymous) classes instead. > >>>> > >>>> In case of "map()" functions you don't need to provide type hints > >> through > >>>> the "returns()" method. > >>>> > >>>> For other operators you need to either specify all fields of the > class > >> in > >>>> the String (makes no sense in you case) or you change the method > to > >>>> > >>>> .returns(Centroid25.class) > >>>> > >>>> I hope that helps. > >>>> > >>>> Regards, > >>>> Timo > >>>> > >>>> > >>>> On 11.02.2015 17:38, Nam-Luc Tran wrote: > >>>> > >>>>> Hello Stephan, > >>>>> > >>>>> Thank you for your help. > >>>>> > >>>>> I ensured all the POJO classes used comply to what you > previously said > >>>>> and the same exception occurs. Here is the listing of classes > >>>>> Centroid25 and Point25: > >>>>> > >>>>> public class Centroid25 extends Point25 { > >>>>> > >>>>> public int id; > >>>>> > >>>>> public Centroid25() {} > >>>>> > >>>>> public Centroid25(int id, Double value0, Double value1, Double > value2, > >>>>> Double value3, Double value4, Double value5, > >>>>> Double value6, Double value7, Double value8, Double value9, > Double > >>>>> value10, Double value11, Double value12, > >>>>> Double value13, Double value14, Double value15, Double value16, > Double > >>>>> value17, Double value18, > >>>>> Double value19, Double value20, Double value21, Double value22, > Double > >>>>> value23, Double value24) { > >>>>> super(value0, value1, value2, value3, value4, value5, value6, > value7, > >>>>> value8, value9, value10, value11, > >>>>> value12, value13, value14, value15, value16, value17, value18, > >>>>> value19, value20, value21, value22, > >>>>> value23, value24); > >>>>> this.id = id; > >>>>> } > >>>>> > >>>>> public Centroid25(int id, Point25 p) { > >>>>> super(p.f0, > >>>>> p.f1,p.f2,p.f3,p.f4,p.f5,p.f6,p.f7,p.f8,p.f9,p.f10,p.f11,p. > >>>>> f12,p.f13,p.f14,p.f15,p.f16,p.f17,p.f18,p.f19,p.f20,p.f21,p. > >>>>> f22,p.f23,p.f24); > >>>>> this.id = id; > >>>>> } > >>>>> > >>>>> public Centroid25(int id, Tuple25 p) { > >>>>> super(p.f0, > >>>>> p.f1,p.f2,p.f3,p.f4,p.f5,p.f6,p.f7,p.f8,p.f9,p.f10,p.f11,p. > >>>>> f12,p.f13,p.f14,p.f15,p.f16,p.f17,p.f18,p.f19,p.f20,p.f21,p. > >>>>> f22,p.f23,p.f24); > >>>>> this.id = id; > >>>>> } > >>>>> > >>>>> @Override > >>>>> public String toString() { > >>>>> return id + " " + super.toString(); > >>>>> } > >>>>> } > >>>>> > >>>>> public class Point25{ > >>>>> > >>>>> public Double > >>>>> f0,f1,f2,f3,f4,f5,f6,f7,f8,f9,f10,f11,f12,f13,f14,f15,f16, > >>>>> f17,f18,f19,f20,f21,f22,f23,f24 > >>>>> = 0.0; > >>>>> > >>>>> public Point25() { > >>>>> } > >>>>> > >>>>> public Point25(Double value0, Double value1, Double value2, > Double > >>>>> value3, Double value4, Double value5, > >>>>> Double value6, Double value7, Double value8, Double value9, > Double > >>>>> value10, Double value11, Double value12, > >>>>> Double value13, Double value14, Double value15, Double value16, > Double > >>>>> value17, Double value18, > >>>>> Double value19, Double value20, Double value21, Double value22, > Double > >>>>> value23, Double value24) { > >>>>> f0=value0; > >>>>> f1=value1; > >>>>> f2=value2; > >>>>> f3=value3; > >>>>> f4=value4; > >>>>> f5=value5; > >>>>> f6=value6; > >>>>> f7=value7; > >>>>> f8=value8; > >>>>> f9=value9; > >>>>> f10=value10; > >>>>> f11=value11; > >>>>> f12=value12; > >>>>> f13=value13; > >>>>> f14=value14; > >>>>> f15=value15; > >>>>> f16=value16; > >>>>> f17=value17; > >>>>> f18=value18; > >>>>> f19=value19; > >>>>> f20=value20; > >>>>> f21=value21; > >>>>> f22=value22; > >>>>> f23=value23; > >>>>> f24=value24; > >>>>> > >>>>> } > >>>>> > >>>>> public List getFieldsAsList() { > >>>>> return Arrays.asList(f0, f1, f2, f3, f4, f5, f6, f7, f8, f9, > f10, f11, > >>>>> f12, f13, f14, f15, f16, f17, f18, f19, > >>>>> f20, f21, f22, f23, f24); > >>>>> } > >>>>> > >>>>> public Point25 add(Point25 other) { > >>>>> f0 += other.f0; > >>>>> f1 += other.f1; > >>>>> f2 += other.f2; > >>>>> f3 += other.f3; > >>>>> f4 += other.f4; > >>>>> f5 += other.f5; > >>>>> f6 += other.f6; > >>>>> f7 += other.f7; > >>>>> f8 += other.f8; > >>>>> f9 += other.f9; > >>>>> f10 += other.f10; > >>>>> f11 += other.f11; > >>>>> f12 += other.f12; > >>>>> f13 += other.f13; > >>>>> f14 += other.f14; > >>>>> f15 += other.f15; > >>>>> f16 += other.f16; > >>>>> f17 += other.f17; > >>>>> f18 += other.f18; > >>>>> f19 += other.f19; > >>>>> f20 += other.f20; > >>>>> f21 += other.f21; > >>>>> f22 += other.f22; > >>>>> f23 += other.f23; > >>>>> f24 += other.f24; > >>>>> return this; > >>>>> } > >>>>> > >>>>> public Point25 div(long val) { > >>>>> f0 /= val; > >>>>> f1 /= val; > >>>>> f2 /= val; > >>>>> f3 /= val; > >>>>> f4 /= val; > >>>>> f5 += val; > >>>>> f6 += val; > >>>>> f7 += val; > >>>>> f8 += val; > >>>>> f9 += val; > >>>>> f10 += val; > >>>>> f11 += val; > >>>>> f12 += val; > >>>>> f13 += val; > >>>>> f14 += val; > >>>>> f15 += val; > >>>>> f16 += val; > >>>>> f17 += val; > >>>>> f18 += val; > >>>>> f19 += val; > >>>>> f20 += val; > >>>>> f21 += val; > >>>>> f22 += val; > >>>>> f23 += val; > >>>>> f24 += val; > >>>>> return this; > >>>>> } > >>>>> > >>>>> public double euclideanDistance(Point25 other) { > >>>>> List l = this.getFieldsAsList(); > >>>>> List ol = other.getFieldsAsList(); > >>>>> double res = 0; > >>>>> for(int i=0;i > >>>>> > >>>>>> I came accross an error for which I am unable to retrace the > exact > >>>>>> > >>>>> cause. > >>>>> > >>>>>> Starting from flink-java-examples module, I have extended the > KMeans > >>>>>> example > >>>>>> to a case where points have 25 coordinates. It follows the > exact > >>>>>> > >>>>> same > >>>>> > >>>>>> structure and transformations as the original example, only > with > >>>>>> > >>>>> points > >>>>> > >>>>>> having 25 coordinates instead of 2. > >>>>>> > >>>>>> When creating the centroids dataset within the code as follows > the > >>>>>> > >>>>> job > >>>>> > >>>>>> iterates and executes well: > >>>>>> > >>>>>> Centroid25 cent1 = new > >>>>>> > >>>>> Centroid25(ThreadLocalRandom.current().nextInt(0, > >>>>> > >>>>>> 1000), > >>>>>> > >>>>>> > >>>>>> -10.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-0.0, > >>>>> 0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,10.0); > >>>>> > >>>>>> Centroid25 cent2 = new > >>>>>> > >>>>> Centroid25(ThreadLocalRandom.current().nextInt(0, > >>>>> > >>>>>> 1000), > >>>>>> > >>>>>> > >>>>>> -1.0,-1.0,1.0,1.0,-1.0,1.0,1.0,1.0,1.0,1.0,1.0,-1.0,1.0,1. > >>>>> 0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0); > >>>>> > >>>>>> DataSet centroids = env.fromCollection(Arrays.asList(cent1, > >>>>>> cent2)); > >>>>>> > >>>>>> When reading from a csv file containing the following: > >>>>>> > >>>>>> > >>>>>> -10.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-0.0, > >>>>> 0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,10.0 > >>>>> > >>>>>> -1.0,-1.0,1.0,1.0,-1.0,1.0,1.0,1.0,1.0,1.0,1.0,-1.0,1.0,1. > >>>>> 0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0 > >>>>> > >>>>>> with the following code: > >>>>>> DataSet> centroids = env > >>>>>> > >>>>>> .readCsvFile("file:///home/nltran/res3.csv") > >>>>>> > >>>>>> > >>>>> .fieldDelimiter(",") > >>>>> > >>>>>> > >>>>> .includeFields("1111111111111111111111111") > >>>>> > >>>>>> > >>>>> .types(Double.class, Double.class, > >>>>> > >>>>>> Double.class, Double.class, > >>>>>> Double.class, Double.class, > >>>>>> > >>>>>> > >>>>> Double.class, > >>>>> > >>>>>> Double.class, Double.class, Double.class, Double.class, > >>>>>> Double.class, > >>>>>> > >>>>>> > >>>>> Double.class, > >>>>> > >>>>>> Double.class, Double.class, Double.class, Double.class, > >>>>>> Double.class, > >>>>>> > >>>>>> > >>>>> Double.class, > >>>>> > >>>>>> Double.class, Double.class, Double.class, Double.class, > >>>>>> Double.class, > >>>>>> > >>>>>> > >>>>> Double.class).map(p -> { > >>>>> > >>>>>> > >>>>> return new > >>>>> > >>>>>> Centroid25(ThreadLocalRandom.current().nextInt(0, 1000), > >>>>>> > >>>>>> > >>>>>> p.f0,p.f1,p.f2,p.f3,p.f4,p.f5,p.f6,p.f7,p.f8,p.f9,p.f10,p. > >>>>> f11,p.f12,p.f13,p.f14,p.f15,p.f16,p.f17,p.f18,p.f19,p.f20,p. > >>>>> f21,p.f22,p.f23,p.f24); > >>>>> > >>>>>> > >>>>> }).returns("eu.euranova.flink.Centroid25"); > >>>>> > >>>>>> I hit the following exception: > >>>>>> > >>>>>> 02/11/2015 14:58:27 PartialSolution (BulkIteration > (Bulk > >>>>>> Iteration))(1/1) > >>>>>> switched to FAILED > >>>>>> com.esotericsoftware.kryo.KryoException: Buffer underflow > >>>>>> at > >>>>>> > >>>>>> > >>>>>> > org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require( > >>>>> NoFetchingInput.java:76) > >>>>> > >>>>>> at > >>>>>> > >>>>> com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355) > >>>>> > >>>>>> at > >>>>>> > >>>>>> > >>>>>> > com.esotericsoftware.kryo.util.DefaultClassResolver.readClass( > >>>>> DefaultClassResolver.java:109) > >>>>> > >>>>>> at > >>>>>> > >>>>> com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) > >>>>> > >>>>>> at > >>>>>> > >>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) > >>>>> > >>>>>> at > >>>>>> > >>>>>> > >>>>>> > >>> > org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize( > >>>>> KryoSerializer.java:205) > >>>>> > >>>>>> at > >>>>>> > >>>>>> > >>>>>> > >>> > org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize( > >>>>> KryoSerializer.java:210) > >>>>> > >>>>>> at > >>>>>> > >>>>>> > >>>>>> org.apache.flink.runtime.io.disk.InputViewIterator.next( > >>>>> InputViewIterator.java:43) > >>>>> > >>>>>> at > >>>>>> > >>>>>> > >> > org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:91) > >>>>>> at > >>>>>> > >>>>>> > >>>>>> org.apache.flink.runtime.operators.RegularPactTask.run( > >>>>> RegularPactTask.java:496) > >>>>> > >>>>>> at > >>>>>> > >>>>>> > >>>>>> > >> > org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run( > >>>>> AbstractIterativePactTask.java:138) > >>>>> > >>>>>> at > >>>>>> > >>>>>> > >>>>>> > org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run( > >>>>> IterationHeadPactTask.java:324) > >>>>> > >>>>>> at > >>>>>> > >>>>>> > >>>>>> org.apache.flink.runtime.operators.RegularPactTask. > >>>>> invoke(RegularPactTask.java:360) > >>>>> > >>>>>> at > >>>>>> > >>>>>> > >>>>>> org.apache.flink.runtime.execution.RuntimeEnvironment. > >>>>> run(RuntimeEnvironment.java:204) > >>>>> > >>>>>> at java.lang.Thread.run(Thread.java:745) > >>>>>> > >>>>>> 02/11/2015 14:58:27 Job execution switched to status > >>>>>> > >>>>> FAILING. > >>>>> > >>>>>> 02/11/2015 14:58:27 CHAIN Map (Map at > >>>>>> > >>>>> main(DoTheKMeans.java:64)) -> > >>>>> > >>>>>> Map (Map > >>>>>> at main(DoTheKMeans.java:65))(1/1) switched to CANCELING > >>>>>> 02/11/2015 14:58:27 Combine (Reduce at > >>>>>> > >>>>> main(DoTheKMeans.java:68))(1/1) > >>>>> > >>>>>> switched to CANCELING > >>>>>> 02/11/2015 14:58:27 CHAIN Reduce(Reduce at > >>>>>> > >>>>> main(DoTheKMeans.java:68)) > >>>>> > >>>>>> -> Map > >>>>>> (Map at main(DoTheKMeans.java:71))(1/1) switched to CANCELING > >>>>>> 02/11/2015 14:58:27 DataSink(Print to System.out)(1/1) > >>>>>> > >>>>> switched to > >>>>> > >>>>>> CANCELED > >>>>>> 02/11/2015 14:58:27 Sync(BulkIteration (Bulk > >>>>>> > >>>>> Iteration))(1/1) switched > >>>>> > >>>>>> to > >>>>>> CANCELING > >>>>>> 02/11/2015 14:58:27 Sync(BulkIteration (Bulk > >>>>>> > >>>>> Iteration))(1/1) switched > >>>>> > >>>>>> to > >>>>>> CANCELED > >>>>>> 02/11/2015 14:58:27 CHAIN Map (Map at > >>>>>> > >>>>> main(DoTheKMeans.java:64)) -> > >>>>> > >>>>>> Map (Map > >>>>>> at main(DoTheKMeans.java:65))(1/1) switched to CANCELED > >>>>>> 02/11/2015 14:58:27 Combine (Reduce at > >>>>>> > >>>>> main(DoTheKMeans.java:68))(1/1) > >>>>> > >>>>>> switched to CANCELED > >>>>>> 02/11/2015 14:58:27 CHAIN Reduce(Reduce at > >>>>>> > >>>>> main(DoTheKMeans.java:68)) > >>>>> > >>>>>> -> Map > >>>>>> (Map at main(DoTheKMeans.java:71))(1/1) switched to CANCELED > >>>>>> 02/11/2015 14:58:27 Job execution switched to status > FAILED. > >>>>>> Exception in thread "main" > >>>>>> org.apache.flink.runtime.client.JobExecutionException: > >>>>>> com.esotericsoftware.kryo.KryoException: Buffer underflow > >>>>>> at > >>>>>> > >>>>>> > >>>>>> > org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require( > >>>>> NoFetchingInput.java:76) > >>>>> > >>>>>> at > >>>>>> > >>>>> com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355) > >>>>> > >>>>>> at > >>>>>> > >>>>>> > >>>>>> > com.esotericsoftware.kryo.util.DefaultClassResolver.readClass( > >>>>> DefaultClassResolver.java:109) > >>>>> > >>>>>> at > >>>>>> > >>>>> com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) > >>>>> > >>>>>> at > >>>>>> > >>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) > >>>>> > >>>>>> at > >>>>>> > >>>>>> > >>>>>> > >>> > org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize( > >>>>> KryoSerializer.java:205) > >>>>> > >>>>>> at > >>>>>> > >>>>>> > >>>>>> > >>> > org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize( > >>>>> KryoSerializer.java:210) > >>>>> > >>>>>> at > >>>>>> > >>>>>> > >>>>>> org.apache.flink.runtime.io.disk.InputViewIterator.next( > >>>>> InputViewIterator.java:43) > >>>>> > >>>>>> at > >>>>>> > >>>>>> > >> > org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:91) > >>>>>> at > >>>>>> > >>>>>> > >>>>>> org.apache.flink.runtime.operators.RegularPactTask.run( > >>>>> RegularPactTask.java:496) > >>>>> > >>>>>> at > >>>>>> > >>>>>> > >>>>>> > >> > org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run( > >>>>> AbstractIterativePactTask.java:138) > >>>>> > >>>>>> at > >>>>>> > >>>>>> > >>>>>> > org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run( > >>>>> IterationHeadPactTask.java:324) > >>>>> > >>>>>> at > >>>>>> > >>>>>> > >>>>>> org.apache.flink.runtime.operators.RegularPactTask. > >>>>> invoke(RegularPactTask.java:360) > >>>>> > >>>>>> at > >>>>>> > >>>>>> > >>>>>> org.apache.flink.runtime.execution.RuntimeEnvironment. > >>>>> run(RuntimeEnvironment.java:204) > >>>>> > >>>>>> at java.lang.Thread.run(Thread.java:745) > >>>>>> > >>>>>> at > >>>>>> > >>>>>> > >>>>>> org.apache.flink.runtime.client.JobClientListener$$anonfun$ > >>>>> receiveWithLogMessages$2.applyOrElse(JobClient.scala:88) > >>>>> > >>>>>> at > >>>>>> > >>>>>> > >>>>>> > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp( > >>>>> AbstractPartialFunction.scala:33) > >>>>> > >>>>>> at > >>>>>> > >>>>>> > >>>>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply( > >>>>> AbstractPartialFunction.scala:33) > >>>>> > >>>>>> at > >>>>>> > >>>>>> > >>>>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply( > >>>>> AbstractPartialFunction.scala:25) > >>>>> > >>>>>> at > >>>>>> > >>>>>> > >>>>>> org.apache.flink.runtime.ActorLogMessages$$anon$1. > >>>>> apply(ActorLogMessages.scala:37) > >>>>> > >>>>>> at > >>>>>> > >>>>>> > >>>>>> org.apache.flink.runtime.ActorLogMessages$$anon$1. > >>>>> apply(ActorLogMessages.scala:30) > >>>>> > >>>>>> at > >>>>>> > scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > >>>>>> at > >>>>>> > >>>>>> > >>>>>> org.apache.flink.runtime.ActorLogMessages$$anon$1. > >>>>> applyOrElse(ActorLogMessages.scala:30) > >>>>> > >>>>>> at > >>>>>> > >>>>> akka.actor.Actor$class.aroundReceive(Actor.scala:465) > >>>>> > >>>>>> at > >>>>>> > >>>>>> > >>>>>> org.apache.flink.runtime.client.JobClientListener. > >>>>> aroundReceive(JobClient.scala:74) > >>>>> > >>>>>> at > >>>>>> > >>>>> akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > >>>>> > >>>>>> at > akka.actor.ActorCell.invoke(ActorCell.scala:487) > >>>>>> at > >>>>>> > >>>>> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > >>>>> > >>>>>> at > akka.dispatch.Mailbox.run(Mailbox.scala:221) > >>>>>> at > akka.dispatch.Mailbox.exec(Mailbox.scala:231) > >>>>>> at > >>>>>> > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > >>>>>> at > >>>>>> > >>>>>> > >>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue. > >>>>> runTask(ForkJoinPool.java:1339) > >>>>> > >>>>>> at > >>>>>> > >>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker( > >>>>> ForkJoinPool.java:1979) > >>>>> > >>>>>> at > >>>>>> > >>>>>> > >>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run( > >>>>> ForkJoinWorkerThread.java:107) > >>>>> > >>>>>> The centroid25 data is exactly the same in both cases. Could > you > >>>>>> > >>>>> help me > >>>>> > >>>>>> retrace what is wrong? > >>>>>> > >>>>>> Thanks and best regards, > >>>>>> > >>>>>> Tran Nam-Luc > >>>>>> > >>>>>> > >>>>>> > >>>>>> -- > >>>>>> View this message in context: > >>>>>> > >>>>>> http://apache-flink-incubator-mailing-list-archive.1008284. > >>>>> n3.nabble.com/kryoException-Buffer-underflow-tp3760.html > >>>>> > >>>>>> Sent from the Apache Flink (Incubator) Mailing List archive. > mailing > >>>>>> > >>>>> list > >>>>> > >>>>>> archive at Nabble.com. > >>>>>> > >>>>>> > >>>>> > > >