Hi Tran Nam-Luc! That is a problem we will look into.
In the meantime, can you try to modify your object such that it is a "Flink POJO"? Then we will serialize it ourselves, without involving Kryo. To do that, make sure that - The class is public - It has a public null-argument constructor - All fields are wither public, or have public getters and setters Here are some minor pointers for the program: - If you include all CSV fields, you need not have the " .includeFields("1111111111111111111111111")" function call. The "includeFields" function is only necessary if you want to skip over some fields. - If the lambda map function returns a simple class without generic parameters, you do not need the 'returns("eu.euranova.flink.Centroid25")' call. It should work even without. Greetings, Stephan On Wed, Feb 11, 2015 at 3:02 PM, Nam-Luc Tran <namluc.t...@euranova.eu> wrote: > Hello, > > I came accross an error for which I am unable to retrace the exact cause. > Starting from flink-java-examples module, I have extended the KMeans > example > to a case where points have 25 coordinates. It follows the exact same > structure and transformations as the original example, only with points > having 25 coordinates instead of 2. > > When creating the centroids dataset within the code as follows the job > iterates and executes well: > > Centroid25 cent1 = new Centroid25(ThreadLocalRandom.current().nextInt(0, > 1000), > > -10.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,10.0); > Centroid25 cent2 = new Centroid25(ThreadLocalRandom.current().nextInt(0, > 1000), > > -1.0,-1.0,1.0,1.0,-1.0,1.0,1.0,1.0,1.0,1.0,1.0,-1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0); > DataSet<Centroid25> centroids = env.fromCollection(Arrays.asList(cent1, > cent2)); > > When reading from a csv file containing the following: > > -10.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,10.0 > > -1.0,-1.0,1.0,1.0,-1.0,1.0,1.0,1.0,1.0,1.0,1.0,-1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0 > > with the following code: > DataSet<Centroid25>> centroids = env > > .readCsvFile("file:///home/nltran/res3.csv") > .fieldDelimiter(",") > .includeFields("1111111111111111111111111") > .types(Double.class, Double.class, > Double.class, Double.class, > Double.class, Double.class, > Double.class, > Double.class, Double.class, Double.class, Double.class, > Double.class, > Double.class, > Double.class, Double.class, Double.class, Double.class, > Double.class, > Double.class, > Double.class, Double.class, Double.class, Double.class, > Double.class, > Double.class).map(p -> { > return new > Centroid25(ThreadLocalRandom.current().nextInt(0, 1000), > > p.f0,p.f1,p.f2,p.f3,p.f4,p.f5,p.f6,p.f7,p.f8,p.f9,p.f10,p.f11,p.f12,p.f13,p.f14,p.f15,p.f16,p.f17,p.f18,p.f19,p.f20,p.f21,p.f22,p.f23,p.f24); > }).returns("eu.euranova.flink.Centroid25"); > > > I hit the following exception: > > 02/11/2015 14:58:27 PartialSolution (BulkIteration (Bulk > Iteration))(1/1) > switched to FAILED > com.esotericsoftware.kryo.KryoException: Buffer underflow > at > > org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:76) > at com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355) > at > > com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:109) > at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) > at > > org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(KryoSerializer.java:205) > at > > org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(KryoSerializer.java:210) > at > > org.apache.flink.runtime.io.disk.InputViewIterator.next(InputViewIterator.java:43) > at > org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:91) > at > > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496) > at > > org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(AbstractIterativePactTask.java:138) > at > > org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(IterationHeadPactTask.java:324) > at > > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:360) > at > > org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:204) > at java.lang.Thread.run(Thread.java:745) > > 02/11/2015 14:58:27 Job execution switched to status FAILING. > 02/11/2015 14:58:27 CHAIN Map (Map at main(DoTheKMeans.java:64)) -> > Map (Map > at main(DoTheKMeans.java:65))(1/1) switched to CANCELING > 02/11/2015 14:58:27 Combine (Reduce at main(DoTheKMeans.java:68))(1/1) > switched to CANCELING > 02/11/2015 14:58:27 CHAIN Reduce(Reduce at main(DoTheKMeans.java:68)) > -> Map > (Map at main(DoTheKMeans.java:71))(1/1) switched to CANCELING > 02/11/2015 14:58:27 DataSink(Print to System.out)(1/1) switched to > CANCELED > 02/11/2015 14:58:27 Sync(BulkIteration (Bulk Iteration))(1/1) switched > to > CANCELING > 02/11/2015 14:58:27 Sync(BulkIteration (Bulk Iteration))(1/1) switched > to > CANCELED > 02/11/2015 14:58:27 CHAIN Map (Map at main(DoTheKMeans.java:64)) -> > Map (Map > at main(DoTheKMeans.java:65))(1/1) switched to CANCELED > 02/11/2015 14:58:27 Combine (Reduce at main(DoTheKMeans.java:68))(1/1) > switched to CANCELED > 02/11/2015 14:58:27 CHAIN Reduce(Reduce at main(DoTheKMeans.java:68)) > -> Map > (Map at main(DoTheKMeans.java:71))(1/1) switched to CANCELED > 02/11/2015 14:58:27 Job execution switched to status FAILED. > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: > com.esotericsoftware.kryo.KryoException: Buffer underflow > at > > org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:76) > at com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355) > at > > com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:109) > at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) > at > > org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(KryoSerializer.java:205) > at > > org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(KryoSerializer.java:210) > at > > org.apache.flink.runtime.io.disk.InputViewIterator.next(InputViewIterator.java:43) > at > org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:91) > at > > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496) > at > > org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(AbstractIterativePactTask.java:138) > at > > org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(IterationHeadPactTask.java:324) > at > > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:360) > at > > org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:204) > at java.lang.Thread.run(Thread.java:745) > > at > > org.apache.flink.runtime.client.JobClientListener$$anonfun$receiveWithLogMessages$2.applyOrElse(JobClient.scala:88) > at > > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:37) > at > > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:30) > at > scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > at > > org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:30) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > > org.apache.flink.runtime.client.JobClientListener.aroundReceive(JobClient.scala:74) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > The centroid25 data is exactly the same in both cases. Could you help me > retrace what is wrong? > > Thanks and best regards, > > Tran Nam-Luc > > > > -- > View this message in context: > http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/kryoException-Buffer-underflow-tp3760.html > Sent from the Apache Flink (Incubator) Mailing List archive. mailing list > archive at Nabble.com. >