Thanks to my schedule, Maxime beat me to it :) but nonetheless I'd like to
call everyone's attention to the proposal here:
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=62694286 and
new tracking issue here: https://issues.apache.org/jira/browse/AIRFLOW-14.

To set the stage, there are two major DAG execution paths in Airflow -
Scheduler and Backfill. They do not work the same way; in particular,
Scheduler creates DagRuns and Backfill does not. For users who exclusively
utilize one or the other, things generally work as expected, but problems
can arise when the two are used simultaneously (most egregiously with the
SubDagOperator). In addition to the potential for "cross-Job" collisions,
the two paths create maintenance difficulties because execution
bugs/issues/enhancements must be addressed in two different environments.

The framework I'm proposing here unifies all DAG executions around a more
formalized DagRun concept.

Just as a task/operator is an abstract description of work and a
TaskInstance is an actual execution with an associated date and state, a
DAG is a description of workflow and a DagRun is an execution of a DAG with
associated date and state. In other words: if you want to run a DAG, you
create a DagRun and let the DagRun and a DagRunJob manage the execution.
You don't just loop over all the tasks until they're done (looking at you,
BackfillJob).

So as I've hinted, DagRunJob is a new type of Job that makes sure DagRuns
execute appropriately. Each DRJ has a set of DagRuns that it's responsible
for executing until they succeed/fail. However, a core idea is that DagRuns
are not tied exclusively to any one DRJ; we need to support many-to-many
relationships where multiple DRJs are trying to execute multiple
(overlapping) DagRuns. This is critical for making Airflow robust in
distributed environments, not to mention supporting existing workflows like
backfilling a DAG while the scheduler is running. The way that works is a
DRJ takes a lock on any DagRun it's trying to execute; other DRJs skip over
locked DagRuns.

Scheduler and Backfill are simply subclasses of DagRunJob. Scheduler is
special in that its execution loop never terminates; it just keeps
scheduling/looking for DagRuns. Backfill is special in that it automates
creating the DagRuns for its requested backfill dates; otherwise it's a
completely normal DRJ.

Anyway, this is a very brief overview of my thought process. Please see the
wiki/issue for more details and I will respond to any questions (including
Maxime's in one second!). This is a large change but I believe a critical
one and the more eyes the better!

J

On Thu, Apr 28, 2016 at 2:11 AM Chris Riccomini <[email protected]>
wrote:

> Please also open JIRAs for this stuff so people can see what feature work
> is going on without tracking the mailing list.
>
> On Wed, Apr 27, 2016 at 11:10 PM, Chris Riccomini <[email protected]>
> wrote:
>
> > Hey Maxime,
> >
> > Great, thanks.
> >
> > > We should share our roadmap and sprints systematically, I'll talk to
> > our PM about making this part of the process.
> >
> > Keep in mind that you guys will need to get feedback from the community.
> > Deciding on how things are implemented (e.g. how DAGs are deployed in
> > Airflow (is this what git time machine is? I have concerns about using
> Git
> > as a deployment mechanism, as you described with Data Swarm)) has to be
> > done collectively.
> >
> > Cheers,
> > Chris
> >
> > On Wed, Apr 27, 2016 at 11:03 PM, Maxime Beauchemin <
> > [email protected]> wrote:
> >
> >> Dan's got a work in progress PR out here around refactoring the
> dependency
> >> engine:
> >> https://github.com/airbnb/airflow/pull/1435
> >>
> >> Paul, can you share the work you're doing on the scheduler or your
> plans?
> >> The idea there is to parse dags only in short lived subprocesses.
> >>
> >> As for the "git time machine" I believe Paul has a wiki page we're
> getting
> >> ready to share. Dan has worked on git sync at scale for CI workloads at
> >> Twitter, so that brings extra confidence in this approach.
> >>
> >> About docker/containment it's pretty much just conversations so far.
> We're
> >> struggling with the idea of getting some of our chef recipes assets like
> >> service discovery inside docker containers. Juggling with container in a
> >> chef world is pretty foreign to all of us.
> >>
> >> Most pieces aren't exactly in movement, but we know big things are going
> >> to
> >> move soon.
> >>
> >> We should share our roadmap and sprints systematically, I'll talk to our
> >> PM
> >> about making this part of the process.
> >>
> >> Max
> >>
> >> On Wed, Apr 27, 2016 at 10:22 PM, Chris Riccomini <
> [email protected]>
> >> wrote:
> >>
> >> > > There are many other large pieces in movement (distributing the
> >> scheduler
> >> > and parsing DagBag in subprocesses, the git time machine,
> >> > docker/containment,
> >> > ...).
> >> >
> >> > Maxime, can you please get the work you're doing documented somewhere
> >> > public?
> >> >
> >> > On Wed, Apr 27, 2016 at 4:03 PM, Maxime Beauchemin <
> >> > [email protected]> wrote:
> >> >
> >> > > Notes related to the proposal here:
> >> > >
> >> https://github.com/airbnb/airflow/wiki/DagRun-Refactor-(Scheduler-2.0)
> >> > >
> >> > > * All of this seems very sound to me. Moving the methods to the
> right
> >> > > places will bring a lot of clarity. I clearly see that I'm not alone
> >> > > understanding the current challenges and potential solutions
> anymore!
> >> > This
> >> > > is awesome!
> >> > > * DagRun.run_id's purpose is to allow people to define something
> >> > meaningful
> >> > > to the grain of their ETL. Say if you wait on a genome file in a
> >> folder
> >> > and
> >> > > want a DagRun for each genome file, you can put your unique filename
> >> as
> >> > > that run_id and refer to it in your templates/code. It's more of way
> >> for
> >> > > people to express and use their own "run id" that is meaningful to
> >> them
> >> > and
> >> > > carry it through inside Airflow. Airflow's internals would always
> use
> >> > > dag_id and execution_date internally as the key regardless of
> run_id.
> >> > >
> >> > > * what goes in DagRun.lock_id? the job_id of the process managing
> it?
> >> > What
> >> > > if it needs to be restarted? We could also just have DagRun.type
> where
> >> > type
> >> > > is either 'backfill' or 'scheduler'. backfilling to overwrite
> >> scheduler
> >> > job
> >> > > may mean that backfill would appropriates itself the DagRuns that
> are
> >> not
> >> > > in a running state. Lots of complexity and edge cases in this
> area...
> >> > > * One constraint around backfill (until we get the git time-machine
> >> up)
> >> > is
> >> > > to allow users to run local code with no handoff to the scheduler,
> so
> >> > that
> >> > > you can go to any version of your DAG in your local repo and run the
> >> DAG
> >> > as
> >> > > defined locally
> >> > > * I'm unclear on DagRunJob being sync or async, the scheduler needs
> >> it to
> >> > > be async I think, backfill overall should be synchronous and log
> >> progress
> >> > > * Some of the design might need to change to accommodate for the
> >> > subprocess
> >> > > handling I just described in the Google group (
> >> > > https://groups.google.com/forum/#!topic/airbnb_airflow/96hd61T7kgg)
> >> that
> >> > > Paul is working on, but essentially the scheduling needs to take
> place
> >> > in a
> >> > > subprocess and should be async. For backfill it's not a constraint.
> I
> >> > could
> >> > > take place in the main process and can be synchronous...
> >> > >
> >> > > All of this is fairly brutal and should be broken down in many small
> >> PRs
> >> > > (3? 5?). There are many other large pieces in movement (distributing
> >> the
> >> > > scheduler and parsing DagBag in subprocesses, the git time machine,
> >> > > docker/containment, ...). We should land the pieces that help
> >> everything
> >> > > else fall into place, and be very careful of changes that make other
> >> > pieces
> >> > > of the puzzle harder to fit in.
> >> > >
> >> > > Max
> >> > >
> >> >
> >>
> >
> >
>

Reply via email to