[
https://issues.apache.org/jira/browse/FLINK-1087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Aljoscha Krettek resolved FLINK-1087.
-------------------------------------
Resolution: Fixed
> A DeltaIteration fails with Reducer as Input
> ---------------------------------------------
>
> Key: FLINK-1087
> URL: https://issues.apache.org/jira/browse/FLINK-1087
> Project: Flink
> Issue Type: Bug
> Components: Distributed Runtime
> Reporter: Aljoscha Krettek
> Assignee: Aljoscha Krettek
>
> The following modified WordCount example fails:
> {code}
> // set up the execution environment
> final ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
>
> // get input data
> DataSet<String> text = getTextDataSet(env);
>
> DataSet<Tuple2<String, Integer>> counts =
> // split up the lines in pairs (2-tuples)
> containing: (word,1)
> text.flatMap(new Tokenizer())
> // group by the tuple field "0" and sum up
> tuple field "1"
> .groupBy(0)
> .sum(1);
> DeltaIteration<Tuple2<String, Integer>, Tuple2<String,
> Integer>> iteration = counts.iterateDelta(counts, 10, 0);
> DataSet<Tuple2<String, Integer>> delta =
>
> iteration.getSolutionSet().join(iteration.getWorkset()).where(0).equalTo(0).with(
> new JoinFunction<Tuple2<String, Integer>,
> Tuple2<String, Integer>, Tuple2<String, Integer>>() {
> public Tuple2<String, Integer>
> join(Tuple2<String, Integer> first, Tuple2<String, Integer> second) throws
> Exception {
> return second;
> }
> });
> DataSet<Tuple2<String, Integer>> newWorkset = delta.filter(new
> FilterFunction<Tuple2<String, Integer>>() {
> @Override
> public boolean filter(Tuple2<String, Integer> value)
> throws Exception {
> return false;
> }
> });
> DataSet<Tuple2<String, Integer>> result =
> iteration.closeWith(delta, newWorkset);
> result.print();
> // execute program
> env.execute("WordCount Example");
> {code}
> With this Exception:
> Exception in thread "main"
> org.apache.flink.runtime.client.JobExecutionException:
> java.lang.RuntimeException: Initializing the input streams failed in Task
> Join(org.apache.flink.api.java.operators.JoinOperator$DefaultJoin$WrappingFlatJoinFunction):
> Illegal input group size in task configuration: -1
> at
> org.apache.flink.runtime.operators.RegularPactTask.registerInputOutput(RegularPactTask.java:260)
> at
> org.apache.flink.runtime.execution.RuntimeEnvironment.<init>(RuntimeEnvironment.java:205)
> at
> org.apache.flink.runtime.taskmanager.TaskManager.submitTasks(TaskManager.java:775)
> at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:483)
> at org.apache.flink.runtime.ipc.RPC$Server.call(RPC.java:422)
> at org.apache.flink.runtime.ipc.Server$Handler.run(Server.java:958)
> Caused by: java.lang.Exception: Illegal input group size in task
> configuration: -1
> at
> org.apache.flink.runtime.operators.RegularPactTask.initInputReaders(RegularPactTask.java:739)
> at
> org.apache.flink.runtime.operators.RegularPactTask.registerInputOutput(RegularPactTask.java:256)
> ... 7 more
> at
> org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:361)
> at
> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:245)
> at
> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:58)
> at
> org.apache.flink.example.java.wordcount.WordCount.main(WordCount.java:100)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:483)
> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)