By the, one important thing I forgot to mention: my intention was for this to be almost 100% transparent to users, at least in its initial version. To that end, all but 2 *existing* unit tests are currently passing with minimal changes required (and a fix coming shortly for the stragglers) and I've added a number of other unit tests around the new functionality.
Maxime, to respond to your thoughts: - I misunderstood the utility of run_id; I thought it would be replaced by the more concrete (dag_id, execution_date) key. I understand the use case and it will go back in. - DagRun.lock_id is the id number of the job that is currently executing the DagRun. This could have been a simple True/False flag, but that could lead to orphaned DagRuns (job locks it, then dies, and it stays locked forever). So I have a zombie check to make sure that every locked DagRun corresponds to a living job. If no (living) job matches the lock_id, the zombie check unlocks the DagRun so it can be executed by a different job. - Could you please elaborate on the point about the backfill constraint? If I understand you, we want to make sure that when Backfilling a bunch of DagRuns, they are only executed by the Backfill and don't get picked up by the Scheduler. That is easily achieved by locking the DagRuns to the Backfill's id, but I'm not totally clear on the motivation? - DagRunJob is as async as the executor it's working with. Think of it as a generalization of the current Scheduler loop -- all it really does is hand the tasks off to an Executor in the correct order, and call executor.heartbeat(). If the executor blocks (like SequentialExecutor), then DRJ will pause until it's done. If the executor runs async (Local/Celery), then DRJ will continue. The Backfill DRJ is the same, with the addition of a progress() function to print % complete and other statistics, similar to the current Backfill. - I will have a look at the subprocess work, I'm not familiar with it now. - +1 to splitting it up :) This is probably revision #3 or #4 and the history was so messy that I squashed it all. I will break it into manageable bite-sized commits shortly. Best, J On Wed, Apr 27, 2016 at 7:03 PM Maxime Beauchemin < [email protected]> wrote: > Notes related to the proposal here: > https://github.com/airbnb/airflow/wiki/DagRun-Refactor-(Scheduler-2.0) > > * All of this seems very sound to me. Moving the methods to the right > places will bring a lot of clarity. I clearly see that I'm not alone > understanding the current challenges and potential solutions anymore! This > is awesome! > * DagRun.run_id's purpose is to allow people to define something meaningful > to the grain of their ETL. Say if you wait on a genome file in a folder and > want a DagRun for each genome file, you can put your unique filename as > that run_id and refer to it in your templates/code. It's more of way for > people to express and use their own "run id" that is meaningful to them and > carry it through inside Airflow. Airflow's internals would always use > dag_id and execution_date internally as the key regardless of run_id. > > * what goes in DagRun.lock_id? the job_id of the process managing it? What > if it needs to be restarted? We could also just have DagRun.type where type > is either 'backfill' or 'scheduler'. backfilling to overwrite scheduler job > may mean that backfill would appropriates itself the DagRuns that are not > in a running state. Lots of complexity and edge cases in this area... > * One constraint around backfill (until we get the git time-machine up) is > to allow users to run local code with no handoff to the scheduler, so that > you can go to any version of your DAG in your local repo and run the DAG as > defined locally > * I'm unclear on DagRunJob being sync or async, the scheduler needs it to > be async I think, backfill overall should be synchronous and log progress > * Some of the design might need to change to accommodate for the subprocess > handling I just described in the Google group ( > https://groups.google.com/forum/#!topic/airbnb_airflow/96hd61T7kgg) that > Paul is working on, but essentially the scheduling needs to take place in a > subprocess and should be async. For backfill it's not a constraint. I could > take place in the main process and can be synchronous... > > All of this is fairly brutal and should be broken down in many small PRs > (3? 5?). There are many other large pieces in movement (distributing the > scheduler and parsing DagBag in subprocesses, the git time machine, > docker/containment, ...). We should land the pieces that help everything > else fall into place, and be very careful of changes that make other pieces > of the puzzle harder to fit in. > > Max >
