[
https://issues.apache.org/jira/browse/FLINK-2608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17329569#comment-17329569
]
Flink Jira Bot commented on FLINK-2608:
---------------------------------------
This issue is assigned but has not received an update in 7 days so it has been
labeled "stale-assigned". If you are still working on the issue, please give an
update and remove the label. If you are no longer working on the issue, please
unassign so someone else may work on it. In 7 days the issue will be
automatically unassigned.
> Arrays.asList(..) does not work with CollectionInputFormat
> ----------------------------------------------------------
>
> Key: FLINK-2608
> URL: https://issues.apache.org/jira/browse/FLINK-2608
> Project: Flink
> Issue Type: Bug
> Components: API / Type Serialization System
> Affects Versions: 0.9, 0.10.0
> Reporter: Maximilian Michels
> Assignee: Alexander Chermenin
> Priority: Minor
> Labels: stale-assigned, stale-minor
>
> When using Arrays.asList(..) as input for a CollectionInputFormat, the
> serialization/deserialization fails when deploying the task.
> See the following program:
> {code:java}
> public class WordCountExample {
> public static void main(String[] args) throws Exception {
> final ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> DataSet<String> text = env.fromElements(
> "Who's there?",
> "I think I hear them. Stand, ho! Who's there?");
> // DOES NOT WORK
> List<Integer> elements = Arrays.asList(0, 0, 0);
> // The following works:
> //List<Integer> elements = new ArrayList<>(new int[] {0,0,0});
> DataSet<TestClass> set = env.fromElements(new TestClass(elements));
> DataSet<Tuple2<String, Integer>> wordCounts = text
> .flatMap(new LineSplitter())
> .withBroadcastSet(set, "set")
> .groupBy(0)
> .sum(1);
> wordCounts.print();
> }
> public static class LineSplitter implements FlatMapFunction<String,
> Tuple2<String, Integer>> {
> @Override
> public void flatMap(String line, Collector<Tuple2<String,
> Integer>> out) {
> for (String word : line.split(" ")) {
> out.collect(new Tuple2<String, Integer>(word, 1));
> }
> }
> }
> public static class TestClass implements Serializable {
> private static final long serialVersionUID = -2932037991574118651L;
> List<Integer> integerList;
> public TestClass(List<Integer> integerList){
> this.integerList=integerList;
> }
> }
> {code}
> {noformat}
> Exception in thread "main"
> org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task
> 'DataSource (at main(Test.java:32)
> (org.apache.flink.api.java.io.CollectionInputFormat))': Deserializing the
> InputFormat ([mytests.Test$TestClass@4d6025c5]) failed: unread block data
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:523)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at
> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:507)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> at
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
> at
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
> at
> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> at
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
> at akka.dispatch.Mailbox.run(Mailbox.scala:221)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.Exception: Deserializing the InputFormat
> ([mytests.Test$TestClass@4d6025c5]) failed: unread block data
> at
> org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:60)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:520)
> ... 25 more
> Caused by: java.lang.IllegalStateException: unread block data
> at
> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2424)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1383)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:302)
> at
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:264)
> at
> org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:282)
> at
> org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:57)
> ... 26 more
> {noformat}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)