[
https://issues.apache.org/jira/browse/FLINK-1254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Stephan Ewen resolved FLINK-1254.
---------------------------------
Resolution: Fixed
Fixed via ce822bf7f5ec80df5d5a749b1439320af3fb8b18
> Optimizer bug during pipeline breaker placement
> -----------------------------------------------
>
> Key: FLINK-1254
> URL: https://issues.apache.org/jira/browse/FLINK-1254
> Project: Flink
> Issue Type: Bug
> Components: Optimizer
> Affects Versions: 0.8-incubating
> Reporter: Stephan Ewen
> Assignee: Stephan Ewen
> Fix For: 0.8-incubating
>
>
> The compiler fails on certain programs when trying to place pipeline breakers.
> This code reproduces the error:
> {code}
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> env.setDegreeOfParallelism(8);
> // the workset (input two of the delta iteration) is the same as what is
> consumed be the successive join
> DataSet<Tuple2<Long, Long>> initialWorkset =
> env.readCsvFile("/some/file/path").types(Long.class).map(new
> DuplicateValue());
> DataSet<Tuple2<Long, Long>> initialSolutionSet =
> env.readCsvFile("/some/file/path").types(Long.class).map(new
> DuplicateValue());
> // trivial iteration, since we are interested in the inputs to the iteration
> DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration =
> initialSolutionSet.iterateDelta(initialWorkset, 100, 0);
> DataSet<Tuple2<Long, Long>> next = iteration.getWorkset().map(new
> IdentityMapper<Tuple2<Long,Long>>());
> DataSet<Tuple2<Long, Long>> result = iteration.closeWith(next, next);
> initialWorkset
> .join(result, JoinHint.REPARTITION_HASH_FIRST)
> .where(0).equalTo(0)
> .print();
> Plan p = env.createProgramPlan();
> compileNoStats(p);
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)