[ https://issues.apache.org/jira/browse/FLINK-1343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14315873#comment-14315873 ]
Ufuk Celebi commented on FLINK-1343: ------------------------------------ No, I don't think so. The task, where the flow branches is set to use the blocking result iff it the flow is merged later again. If one of the channels of this task has a materializing temp mode, the temp mode is discarded during job graph generation when the config is written, which is kind of hacky. But I didn't make a great effort to get into the temping/pipeline-breaking logic, so there might be a nicer way to do this. > Branching Join Program Deadlocks > -------------------------------- > > Key: FLINK-1343 > URL: https://issues.apache.org/jira/browse/FLINK-1343 > Project: Flink > Issue Type: Bug > Components: Optimizer > Affects Versions: 0.8, 0.9 > Reporter: Fabian Hueske > Assignee: Fabian Hueske > > The following program which gets its data from a single non-parallel data > source, branches two times, and joins the branches with two joins, deadlocks. > {code:java} > public class DeadlockProgram { > public static void main(String[] args) throws Exception { > final ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > DataSet<Long> longs = > env.generateSequence(0,1000000l).setParallelism(1); > DataSet<Long> longs2 = env.generateSequence(0, > 1000000l).setParallelism(1); > DataSet<Tuple1<Long>> longT1 = longs.map(new TupleWrapper()); > DataSet<Tuple1<Long>> longT2 = longT1.project(0); > DataSet<Tuple1<Long>> longT3 = longs.map(new TupleWrapper()); // > deadlocks > // DataSet<Tuple1<Long>> longT3 = longs2.map(new TupleWrapper()); // > works > longT2.join(longT3).where(0).equalTo(0).projectFirst(0) > .join(longT1).where(0).equalTo(0).projectFirst(0) > .print(); > env.execute(); > } > public static class TupleWrapper implements MapFunction<Long, > Tuple1<Long>> { > @Override > public Tuple1<Long> map(Long l) throws Exception { > return new Tuple1<Long>(l); > } > }; > } > {code} > If one of the branches reads its data from a second data source (see inline > comment) or if the single data source uses the default parallelism, the > program executes correctly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)