Stephan Ewen created FLINK-1254: ----------------------------------- Summary: Optimizer bug during pipeline breaker placement Key: FLINK-1254 URL: https://issues.apache.org/jira/browse/FLINK-1254 Project: Flink Issue Type: Bug Components: Optimizer Affects Versions: 0.8-incubating Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 0.8-incubating
The compiler fails on certain programs when trying to place pipeline breakers. This code reproduces the error: {code} ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.setDegreeOfParallelism(8); // the workset (input two of the delta iteration) is the same as what is consumed be the successive join DataSet<Tuple2<Long, Long>> initialWorkset = env.readCsvFile("/some/file/path").types(Long.class).map(new DuplicateValue()); DataSet<Tuple2<Long, Long>> initialSolutionSet = env.readCsvFile("/some/file/path").types(Long.class).map(new DuplicateValue()); // trivial iteration, since we are interested in the inputs to the iteration DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration = initialSolutionSet.iterateDelta(initialWorkset, 100, 0); DataSet<Tuple2<Long, Long>> next = iteration.getWorkset().map(new IdentityMapper<Tuple2<Long,Long>>()); DataSet<Tuple2<Long, Long>> result = iteration.closeWith(next, next); initialWorkset .join(result, JoinHint.REPARTITION_HASH_FIRST) .where(0).equalTo(0) .print(); Plan p = env.createProgramPlan(); compileNoStats(p); {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)