[ https://issues.apache.org/jira/browse/FLINK-5025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15643752#comment-15643752 ]
Niels Basjes commented on FLINK-5025: ------------------------------------- See here for a minimal project that does this: https://github.com/nielsbasjes/Reproduce-FLINK-5025 NOTE: This code is Apache 2.0 licensed so feel free to copy this into the project as a unit test or documentation. > Job fails because of Optimizer bug > ---------------------------------- > > Key: FLINK-5025 > URL: https://issues.apache.org/jira/browse/FLINK-5025 > Project: Flink > Issue Type: Bug > Affects Versions: 1.1.3 > Reporter: Niels Basjes > > I have a batch job that when I run it I get the error message: > {code} > org.apache.flink.optimizer.CompilerException: Bug: Plan generation for Unions > picked a ship strategy between binary plan operators. > at > org.apache.flink.optimizer.traversals.BinaryUnionReplacer.collect(BinaryUnionReplacer.java:113) > at > org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:72) > at > org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:41) > at > org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170) > at > org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) > at > org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) > at > org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128) > at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:516) > at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:398) > at > org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:185) > at > org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91) > at com.bol.reproduce.flink.Main.run(Main.java:42) > at com.bol.reproduce.flink.Main.main(Main.java:21) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147) > {code} > The smallest code snippet I have been able to create that reproduces this > problem is below here. > Note that when using a single union this error does not happen. > {code} > public class Main implements Serializable { > public static void main(String[] args) throws Exception { > System.exit(new Main().run()); > } > private int run() throws IOException { > final ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > final DataSet<String> lines = > env.createInput(new TextInputFormat(new > Path("/tmp/doesNotExist"))) > .union(env.createInput(new TextInputFormat(new > Path("/tmp/doesNotExist")))) > .union(env.createInput(new TextInputFormat(new > Path("/tmp/doesNotExist")))); > List<String> allLines = new ArrayList<>(); > lines > .rebalance() > .output(new LocalCollectionOutputFormat<>(allLines)); > // execute program > try { > env.execute("Running"); > } catch (Exception e) { > e.printStackTrace(); > } > return 0; > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)