Fabian Hueske commented on FLINK-2662:

I found the root cause of this problem. The plan contains a Union (U1) operator 
with two outputs of which one is another union operator (U2). One of the input 
requires a partitioning shipping strategy (U1, pushed down from a following 
distinct), the other not. In some cases, the partitioning is not pushed to the 
input of U1, such that the connection between U1 and U2 is hash-partitioned. As 
one of the last steps, the optimizer merges consecutive binary union operator 
into an n-ary union operator and checks that their connection is a simple 
forward connection, i.e., not a partitioned connection. This is where the 
translation fails.

I have a fix for this issue that translates union operators with two (or more) 
inputs into multiple internal union operators with a single output. Thereby, we 
avoid multiple competing shipping strategies for union operators and a 
potential partitioning is always pushed to the two input of a union.
Note, the fix touches only the program translation. The API does not change and 
the program does not need to be changed.

I'll open a PR with the fix soon.

> CompilerException: "Bug: Plan generation for Unions picked a ship strategy 
> between binary plan operators."
> ----------------------------------------------------------------------------------------------------------
>                 Key: FLINK-2662
>                 URL: https://issues.apache.org/jira/browse/FLINK-2662
>             Project: Flink
>          Issue Type: Bug
>          Components: Optimizer
>    Affects Versions: 0.9.1, 0.10.0
>            Reporter: Gabor Gevay
>            Priority: Critical
>             Fix For: 1.0.0
>         Attachments: FlinkBug.scala
> I have a Flink program which throws the exception in the jira title. Full 
> text:
> Exception in thread "main" org.apache.flink.optimizer.CompilerException: Bug: 
> Plan generation for Unions picked a ship strategy between binary plan 
> operators.
>       at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.collect(BinaryUnionReplacer.java:113)
>       at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:72)
>       at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:41)
>       at 
> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
>       at 
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>       at 
> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>       at 
> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>       at 
> org.apache.flink.optimizer.plan.WorksetIterationPlanNode.acceptForStepFunction(WorksetIterationPlanNode.java:194)
>       at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.preVisit(BinaryUnionReplacer.java:49)
>       at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.preVisit(BinaryUnionReplacer.java:41)
>       at 
> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:162)
>       at 
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>       at 
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>       at 
> org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127)
>       at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:520)
>       at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:402)
>       at 
> org.apache.flink.client.LocalExecutor.getOptimizerPlanAsJSON(LocalExecutor.java:202)
>       at 
> org.apache.flink.api.java.LocalEnvironment.getExecutionPlan(LocalEnvironment.java:63)
>       at malom.Solver.main(Solver.java:66)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:497)
>       at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
> The execution plan:
> http://compalg.inf.elte.hu/~ggevay/flink/plan_3_4_0_0_without_verif.txt
> (I obtained this by commenting out the line that throws the exception)
> The code is here:
> https://github.com/ggevay/flink/tree/plan-generation-bug
> The class to run is "Solver". It needs a command line argument, which is a 
> directory where it would write output. (On first run, it generates some 
> lookuptables for a few minutes, which are then placed to /tmp/movegen)

This message was sent by Atlassian JIRA

Reply via email to