Fabian Hueske created FLINK-1343:
------------------------------------

             Summary: Branching Join Program Deadlocks
                 Key: FLINK-1343
                 URL: https://issues.apache.org/jira/browse/FLINK-1343
             Project: Flink
          Issue Type: Bug
          Components: Optimizer
    Affects Versions: 0.8, 0.9
            Reporter: Fabian Hueske
            Assignee: Fabian Hueske


The following program which gets its data from a single non-parallel data 
source, branches two times, and joins the branches with two joins, deadlocks.

{code:java}
public class DeadlockProgram {

    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();

        DataSet<Long> longs = 
env.generateSequence(0,1000000l).setParallelism(1);
        DataSet<Long> longs2 = env.generateSequence(0, 
1000000l).setParallelism(1);

        DataSet<Tuple1<Long>> longT1 = longs.map(new TupleWrapper());
        DataSet<Tuple1<Long>> longT2 = longT1.project(0);
        DataSet<Tuple1<Long>> longT3 = longs.map(new TupleWrapper()); // 
deadlocks
//        DataSet<Tuple1<Long>> longT3 = longs2.map(new TupleWrapper()); // 
works


        longT2.join(longT3).where(0).equalTo(0).projectFirst(0)
            .join(longT1).where(0).equalTo(0).projectFirst(0)
            .print();

        env.execute();
    }

    public static class TupleWrapper implements MapFunction<Long, Tuple1<Long>> 
{
        @Override
        public Tuple1<Long> map(Long l) throws Exception {
            return new Tuple1<Long>(l);
        }
    };
}
{code}

If one of the branches reads its data from a second data source (see inline 
comment) or if the single data source uses the default parallelism, the program 
executes correctly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to