Aljoscha Krettek created FLINK-1087:
---------------------------------------
Summary: A DeltaIteration fails with Reducer as Input
Key: FLINK-1087
URL: https://issues.apache.org/jira/browse/FLINK-1087
Project: Flink
Issue Type: Bug
Reporter: Aljoscha Krettek
Assignee: Stephan Ewen
The following modified WordCount example fails:
{code}
// set up the execution environment
final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
// get input data
DataSet<String> text = getTextDataSet(env);
DataSet<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples)
containing: (word,1)
text.flatMap(new Tokenizer())
// group by the tuple field "0" and sum up
tuple field "1"
.groupBy(0)
.sum(1);
DeltaIteration<Tuple2<String, Integer>, Tuple2<String,
Integer>> iteration = counts.iterateDelta(counts, 10, 0);
DataSet<Tuple2<String, Integer>> delta =
iteration.getSolutionSet().join(iteration.getWorkset()).where(0).equalTo(0).with(
new JoinFunction<Tuple2<String, Integer>,
Tuple2<String, Integer>, Tuple2<String, Integer>>() {
public Tuple2<String, Integer>
join(Tuple2<String, Integer> first, Tuple2<String, Integer> second) throws
Exception {
return second;
}
});
DataSet<Tuple2<String, Integer>> newWorkset = delta.filter(new
FilterFunction<Tuple2<String, Integer>>() {
@Override
public boolean filter(Tuple2<String, Integer> value)
throws Exception {
return false;
}
});
DataSet<Tuple2<String, Integer>> result =
iteration.closeWith(delta, newWorkset);
result.print();
// execute program
env.execute("WordCount Example");
{code}
With this Exception:
Exception in thread "main"
org.apache.flink.runtime.client.JobExecutionException:
java.lang.RuntimeException: Initializing the input streams failed in Task
Join(org.apache.flink.api.java.operators.JoinOperator$DefaultJoin$WrappingFlatJoinFunction):
Illegal input group size in task configuration: -1
at
org.apache.flink.runtime.operators.RegularPactTask.registerInputOutput(RegularPactTask.java:260)
at
org.apache.flink.runtime.execution.RuntimeEnvironment.<init>(RuntimeEnvironment.java:205)
at
org.apache.flink.runtime.taskmanager.TaskManager.submitTasks(TaskManager.java:775)
at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at org.apache.flink.runtime.ipc.RPC$Server.call(RPC.java:422)
at org.apache.flink.runtime.ipc.Server$Handler.run(Server.java:958)
Caused by: java.lang.Exception: Illegal input group size in task configuration:
-1
at
org.apache.flink.runtime.operators.RegularPactTask.initInputReaders(RegularPactTask.java:739)
at
org.apache.flink.runtime.operators.RegularPactTask.registerInputOutput(RegularPactTask.java:256)
... 7 more
at
org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:361)
at
org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:245)
at
org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:58)
at
org.apache.flink.example.java.wordcount.WordCount.main(WordCount.java:100)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)