[ 
https://issues.apache.org/jira/browse/FLINK-2608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15568798#comment-15568798
 ] 

ASF GitHub Bot commented on FLINK-2608:
---------------------------------------

Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/2623
  
    As far as I read it, Kryo 3.x is not strictly serialization compatible with 
2.x, hence the major version number bump.
    
    If the interfaces are still stable, then it should be fine to bump the 
chill dependency version, exclude any kryo dependency, and add our own 2.x kryo 
dependency. I would prefer that approach.


> Arrays.asList(..) does not work with CollectionInputFormat
> ----------------------------------------------------------
>
>                 Key: FLINK-2608
>                 URL: https://issues.apache.org/jira/browse/FLINK-2608
>             Project: Flink
>          Issue Type: Bug
>          Components: Type Serialization System
>    Affects Versions: 0.9, 0.10.0
>            Reporter: Maximilian Michels
>            Priority: Minor
>             Fix For: 1.0.0
>
>
> When using Arrays.asList(..) as input for a CollectionInputFormat, the 
> serialization/deserialization fails when deploying the task.
> See the following program:
> {code:java}
> public class WordCountExample {
>     public static void main(String[] args) throws Exception {
>         final ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
>     DataSet<String> text = env.fromElements(
>                 "Who's there?",
>                 "I think I hear them. Stand, ho! Who's there?");
>         // DOES NOT WORK
>         List<Integer> elements = Arrays.asList(0, 0, 0);
>         // The following works:
>         //List<Integer> elements = new ArrayList<>(new int[] {0,0,0});
>         DataSet<TestClass> set = env.fromElements(new TestClass(elements));
>         DataSet<Tuple2<String, Integer>> wordCounts = text
>                 .flatMap(new LineSplitter())
>                 .withBroadcastSet(set, "set")
>                 .groupBy(0)
>                 .sum(1);
>         wordCounts.print();
>     }
>     public static class LineSplitter implements FlatMapFunction<String,
> Tuple2<String, Integer>> {
>         @Override
>         public void flatMap(String line, Collector<Tuple2<String,
> Integer>> out) {
>             for (String word : line.split(" ")) {
>                 out.collect(new Tuple2<String, Integer>(word, 1));
>             }
>         }
>     }
>     public static class TestClass implements Serializable {
>         private static final long serialVersionUID = -2932037991574118651L;
>         List<Integer> integerList;
>         public TestClass(List<Integer> integerList){
>             this.integerList=integerList;
>         }
>     }
> {code}
> {noformat}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 
> 'DataSource (at main(Test.java:32) 
> (org.apache.flink.api.java.io.CollectionInputFormat))': Deserializing the 
> InputFormat ([mytests.Test$TestClass@4d6025c5]) failed: unread block data
>     at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:523)
>     at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507)
>     at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>     at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at 
> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:507)
>     at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
>     at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>     at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>     at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>     at 
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
>     at 
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
>     at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>     at 
> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
>     at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>     at 
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>     at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>     at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>     at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>     at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>     at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>     at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>     at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.Exception: Deserializing the InputFormat 
> ([mytests.Test$TestClass@4d6025c5]) failed: unread block data
>     at 
> org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:60)
>     at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:520)
>     ... 25 more
> Caused by: java.lang.IllegalStateException: unread block data
>     at 
> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2424)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1383)
>     at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
>     at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
>     at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:302)
>     at 
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:264)
>     at 
> org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:282)
>     at 
> org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:57)
>     ... 26 more
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to