[ 
https://issues.apache.org/jira/browse/FLINK-2814?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15930371#comment-15930371
 ] 

ASF GitHub Bot commented on FLINK-2814:
---------------------------------------

GitHub user greghogan opened a pull request:

    https://github.com/apache/flink/pull/3563

    [FLINK-2814] [optimizer] DualInputPlanNode cannot be cast to 
SingleInputPlanNode

    WorksetIterationNode#instantiate loops over all solution and work set 
candidates. Since the solution set reference is modified in place when the 
predecessor node can be used in its place, swith this variable to the inner 
loop.
    
    @StephanEwen this is similar to #2029 but resets the reference in the loop. 
I believe my prior suggestion to immediately return upon adding a node was 
incorrect as the `instantiate` methods look to be compiling all valid 
combinations.
    
    IntelliJ code coverage on `flink-optimizer` shows 105 hits through 
`WorksetIterationNode#instantiate` and it does fix this issue with my Katz 
Centrality algorithm (which should not be using delta iterations, but I was 
young and naive when I wrote it).

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/greghogan/flink 
2814_deltaiteration_dualinputplannode_cannot_be_cast_to_singleinputplannode

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3563.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3563
    
----
commit 34f017834e17fa69e2b7c72bd95e1a819e4e6aa3
Author: Greg Hogan <[email protected]>
Date:   2017-03-17T16:09:34Z

    [FLINK-2814] [optimizer] DualInputPlanNode cannot be cast to 
SingleInputPlanNode
    
    WorksetIterationNode#instantiate loops over all solution and work set
    candidates. Since the solution set reference is modified in place when
    the predecessor node can be used in its place, swith this variable to
    the inner loop.

----


> DeltaIteration: DualInputPlanNode cannot be cast to SingleInputPlanNode
> -----------------------------------------------------------------------
>
>                 Key: FLINK-2814
>                 URL: https://issues.apache.org/jira/browse/FLINK-2814
>             Project: Flink
>          Issue Type: Bug
>          Components: Optimizer
>    Affects Versions: 0.10.0
>            Reporter: Greg Hogan
>            Assignee: Rekha Joshi
>
> A delta iteration that closes with a solution set which is a {{JoinOperator}} 
> throws the following exception:
> {noformat}
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error.
>       at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:444)
>       at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:345)
>       at org.apache.flink.client.program.Client.runBlocking(Client.java:289)
>       at 
> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:669)
>       at org.apache.flink.client.CliFrontend.run(CliFrontend.java:324)
>       at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:969)
>       at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1019)
> Caused by: java.lang.ClassCastException: 
> org.apache.flink.optimizer.plan.DualInputPlanNode cannot be cast to 
> org.apache.flink.optimizer.plan.SingleInputPlanNode
>       at 
> org.apache.flink.optimizer.dag.WorksetIterationNode.instantiate(WorksetIterationNode.java:432)
>       at 
> org.apache.flink.optimizer.dag.TwoInputNode.addLocalCandidates(TwoInputNode.java:557)
>       at 
> org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:478)
>       at 
> org.apache.flink.optimizer.dag.DataSinkNode.getAlternativePlans(DataSinkNode.java:204)
>       at 
> org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:309)
>       at 
> org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308)
>       at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:500)
>       at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:402)
>       at 
> org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:271)
>       at 
> org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:543)
>       at org.apache.flink.client.program.Client.runBlocking(Client.java:350)
>       at 
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:64)
>       at 
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:796)
>       at org.apache.flink.api.java.DataSet.collect(DataSet.java:424)
>       at org.apache.flink.api.java.DataSet.print(DataSet.java:1365)
>       at Driver.main(Driver.java:366)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:497)
>       at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:429)
>       ... 6 more
> {noformat}
> Temporary fix is to attach an identity mapper.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to