Till Rohrmann created FLINK-3052: ------------------------------------ Summary: Optimizer does not push properties out of bulk iterations Key: FLINK-3052 URL: https://issues.apache.org/jira/browse/FLINK-3052 Project: Flink Issue Type: Bug Components: Optimizer Affects Versions: 0.10.0 Reporter: Till Rohrmann Assignee: Till Rohrmann Fix For: 0.10.1
Flink's optimizer should be able to reuse interesting properties from outside the loop. In order to do that it is sometimes necessary to append a NoOp node to the step function which recomputes the required properties. This is currently not working for {{BulkIterations}}, because the plans with the appended NoOp nodes are not added to the overall list of candidates. This not only leads to sub-optimal plan selection but sometimes to the rejection of valid jobs. The following job, for example, will be falsely rejected by flink. {code} ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Tuple1<Long>> input1 = env.generateSequence(1, 10).map(new MapFunction<Long, Tuple1<Long>>() { @Override public Tuple1<Long> map(Long value) throws Exception { return new Tuple1<>(value); } }); DataSet<Tuple1<Long>> input2 = env.generateSequence(1, 10).map(new MapFunction<Long, Tuple1<Long>>() { @Override public Tuple1<Long> map(Long value) throws Exception { return new Tuple1<>(value); } }); DataSet<Tuple1<Long>> distinctInput = input1.distinct(); IterativeDataSet<Tuple1<Long>> iteration = distinctInput.iterate(10); DataSet<Tuple1<Long>> iterationStep = iteration .coGroup(input2) .where(0) .equalTo(0) .with(new CoGroupFunction<Tuple1<Long>, Tuple1<Long>, Tuple1<Long>>() { @Override public void coGroup( Iterable<Tuple1<Long>> first, Iterable<Tuple1<Long>> second, Collector<Tuple1<Long>> out) throws Exception { Iterator<Tuple1<Long>> it = first.iterator(); if (it.hasNext()) { out.collect(it.next()); } } }); DataSet<Tuple1<Long>> iterationResult = iteration.closeWith(iterationStep); iterationResult.output(new DiscardingOutputFormat<Tuple1<Long>>()); {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)