[ 
https://issues.apache.org/jira/browse/FLINK-1087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek resolved FLINK-1087.
-------------------------------------
    Resolution: Fixed

> A DeltaIteration fails  with Reducer as Input
> ---------------------------------------------
>
>                 Key: FLINK-1087
>                 URL: https://issues.apache.org/jira/browse/FLINK-1087
>             Project: Flink
>          Issue Type: Bug
>          Components: Distributed Runtime
>            Reporter: Aljoscha Krettek
>            Assignee: Aljoscha Krettek
>
> The following modified WordCount example fails:
> {code}
> // set up the execution environment
>               final ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
>               
>               // get input data
>               DataSet<String> text = getTextDataSet(env);
>               
>               DataSet<Tuple2<String, Integer>> counts = 
>                               // split up the lines in pairs (2-tuples) 
> containing: (word,1)
>                               text.flatMap(new Tokenizer())
>                               // group by the tuple field "0" and sum up 
> tuple field "1"
>                               .groupBy(0)
>                               .sum(1);
>               DeltaIteration<Tuple2<String, Integer>, Tuple2<String, 
> Integer>> iteration = counts.iterateDelta(counts, 10, 0);
>               DataSet<Tuple2<String, Integer>> delta =
>                               
> iteration.getSolutionSet().join(iteration.getWorkset()).where(0).equalTo(0).with(
>                               new JoinFunction<Tuple2<String, Integer>, 
> Tuple2<String, Integer>, Tuple2<String, Integer>>() {
>                                       public Tuple2<String, Integer> 
> join(Tuple2<String, Integer> first, Tuple2<String, Integer> second) throws 
> Exception {
>                                               return second;
>                                       }
>                               });
>               DataSet<Tuple2<String, Integer>> newWorkset = delta.filter(new 
> FilterFunction<Tuple2<String, Integer>>() {
>                       @Override
>                       public boolean filter(Tuple2<String, Integer> value) 
> throws Exception {
>                               return false;
>                       }
>               });
>               DataSet<Tuple2<String, Integer>> result = 
> iteration.closeWith(delta, newWorkset);
>               result.print();
>               // execute program
>               env.execute("WordCount Example");
> {code}
> With this Exception:
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: 
> java.lang.RuntimeException: Initializing the input streams failed in Task 
> Join(org.apache.flink.api.java.operators.JoinOperator$DefaultJoin$WrappingFlatJoinFunction):
>  Illegal input group size in task configuration: -1
>       at 
> org.apache.flink.runtime.operators.RegularPactTask.registerInputOutput(RegularPactTask.java:260)
>       at 
> org.apache.flink.runtime.execution.RuntimeEnvironment.<init>(RuntimeEnvironment.java:205)
>       at 
> org.apache.flink.runtime.taskmanager.TaskManager.submitTasks(TaskManager.java:775)
>       at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:483)
>       at org.apache.flink.runtime.ipc.RPC$Server.call(RPC.java:422)
>       at org.apache.flink.runtime.ipc.Server$Handler.run(Server.java:958)
> Caused by: java.lang.Exception: Illegal input group size in task 
> configuration: -1
>       at 
> org.apache.flink.runtime.operators.RegularPactTask.initInputReaders(RegularPactTask.java:739)
>       at 
> org.apache.flink.runtime.operators.RegularPactTask.registerInputOutput(RegularPactTask.java:256)
>       ... 7 more
>       at 
> org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:361)
>       at 
> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:245)
>       at 
> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:58)
>       at 
> org.apache.flink.example.java.wordcount.WordCount.main(WordCount.java:100)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:483)
>       at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to