I'm not sure whether it is currently possible to schedule first the receiver and then the sender. Recently, I had to fix the TaskManagerTest.testRunWithForwardChannel test case where this was exactly the case. Due to first scheduling the receiver, it happened sometimes that an IllegalQueueIteratorRequestException in the method IntermediateResultPartitionManager.getIntermediateResultPartitionIterator was thrown. The partition manager complained that the producer execution ID was unknown. I assume that this has to be fixed first in order to schedule all task immediately. But Ufuk will probably know it better.
Greets, Till On Wed, Jan 21, 2015 at 8:58 PM, Stephan Ewen <[email protected]> wrote: > I think that this is a fairly delicate thing. > > The execution graph / scheduling is the most delicate part of the system. I > would not feel too well about a quick fix there, so let's think this > through a little bit. > > The logic currently does the following: > > 1) It schedules the sources (see "ExecutionGaph.scheduleForExecution()") > > 2) Successors of operators are scheduled when the intermediate result > partition / queue tells the master that data is available. > > 3) The successor requests the stream from the producer. > > > Possible changes: > - We could definitely change the "ExecutionGraph.scheduleForExecution()" > method to deploy all tasks immediately. I would suggest to have a "schedule > mode" attached to the JobGraph that defines how to do that. The mode could > have values (FROM_SOURCES, ALL, BACK_TRACKING). From sources is what we do > right now, backtracking is what we will do in the next release, ALL is what > you need) > > - When all tasks are scheduled immediately, it may be that for a channel, > the sender is not yet deployed when the receiver is deployed. That should > be okay, since the same can happen right now when all-to-all patterns > connect the tasks. > > - The queues would still send notifications to the JobManager that data is > available, but the JM will see that the target task is already deployed (or > currently being deployed). Then the info where to grab a channel from would > need to be sent to the task. That mechanism also exists already. > > > @Ufuk: It seems that it may actually work to simply kick off the deployment > of all tasks immediately (in the ExecutionGraph.scheduleForExecution()" > method). Do you see any other implications? > > Greetings, > Stephan > > > On Wed, Jan 21, 2015 at 6:50 AM, Ufuk Celebi <[email protected]> wrote: > > > Hey Gyula, > > > > On 21 Jan 2015, at 15:41, Gyula Fóra <[email protected]> wrote: > > > > > Hey Guys, > > > > > > I think it would make sense to turn lazy operator execution off for > > > streaming programs because it would make life simpler for windowing. I > > > also created a JIRA issue here > > > <https://issues.apache.org/jira/browse/FLINK-1425>. > > > > > > Can anyone give me some quick pointers how to do this? Its probably > > simple, > > > I am just not familiar with that part of the code. (Or maybe its so > easy > > > that someone could pick this up :) ) > > > > Have a look at the JobManager ScheduleOrUpdateConsumers message, which is > > how it is done currently. The first produced buffer of an intermediate > > results triggers this message. I think the cleanest solution would be to > do > > this directly when scheduling a streaming job? > > > > > By the way, do you see any reasons why we should not do this? > > > > ATM, I don't. >
