[ 
https://issues.apache.org/jira/browse/FLINK-1343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14315875#comment-14315875
 ] 

Stephan Ewen commented on FLINK-1343:
-------------------------------------

[~fhueske] Your analysis (probe input needs to be available up front) showed 
that at some point during some refactoring, the mechanism behind the deadlock 
avoidance for hash joins was destroyed.

Fortunately, we should really be able to get rid of all deadlock detection and 
resolution related code with that next fix :-)

> Branching Join Program Deadlocks
> --------------------------------
>
>                 Key: FLINK-1343
>                 URL: https://issues.apache.org/jira/browse/FLINK-1343
>             Project: Flink
>          Issue Type: Bug
>          Components: Optimizer
>    Affects Versions: 0.8, 0.9
>            Reporter: Fabian Hueske
>            Assignee: Fabian Hueske
>
> The following program which gets its data from a single non-parallel data 
> source, branches two times, and joins the branches with two joins, deadlocks.
> {code:java}
> public class DeadlockProgram {
>     public static void main(String[] args) throws Exception {
>         final ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
>         DataSet<Long> longs = 
> env.generateSequence(0,1000000l).setParallelism(1);
>         DataSet<Long> longs2 = env.generateSequence(0, 
> 1000000l).setParallelism(1);
>         DataSet<Tuple1<Long>> longT1 = longs.map(new TupleWrapper());
>         DataSet<Tuple1<Long>> longT2 = longT1.project(0);
>         DataSet<Tuple1<Long>> longT3 = longs.map(new TupleWrapper()); // 
> deadlocks
> //        DataSet<Tuple1<Long>> longT3 = longs2.map(new TupleWrapper()); // 
> works
>         longT2.join(longT3).where(0).equalTo(0).projectFirst(0)
>             .join(longT1).where(0).equalTo(0).projectFirst(0)
>             .print();
>         env.execute();
>     }
>     public static class TupleWrapper implements MapFunction<Long, 
> Tuple1<Long>> {
>         @Override
>         public Tuple1<Long> map(Long l) throws Exception {
>             return new Tuple1<Long>(l);
>         }
>     };
> }
> {code}
> If one of the branches reads its data from a second data source (see inline 
> comment) or if the single data source uses the default parallelism, the 
> program executes correctly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to