+1 on DagFetcher abstraction, very airflow-esque :)

On Wed, Feb 28, 2018 at 11:25 AM, Maxime Beauchemin
<maximebeauche...@gmail.com> wrote:
> Addressing a few of your questions / concerns:
>
> * The scheduler uses a multiprocess queue to queue up tasks, each
> subprocess is in charge of a single DAG "scheduler cycle" which triggers
> what it can for active DagRuns. Currently it fills the DagBag from the
> local file system, looking for a specific module where the master process
> last saw that DAG. Fetching the DAG is a metadata operation, DAG artifacts
> shouldn't be too large, we can assume that it takes seconds at most to
> fetch a DAG, which is ok. We generally assume that the scheduler should
> fully cycle every minute or so. Version-aware DagFetcher could also
> implement some sort of caching if that was a concern (shouldn't be though).
> * For consistency within the whole DagRun, the scheduler absolutely has to
> read the right version. If tasks got removed they would never get scheduled
> and consistency cannot be achieved.
> * TaskInstances get created the first time they are identified as runnable
> by the scheduler and are born with a queued status I believe (from memory,
> haven't read the latest code to confirm). The worker double checks and sets
> it as running as part of a database transaction to avoid double-firing.
>
> Max
>
> On Wed, Feb 28, 2018 at 7:29 AM, Chris Palmer <ch...@crpalmer.com> wrote:
>
>> I'll preface this with the fact that I'm relatively new to Airflow, and
>> haven't played around with a lot of the internals.
>>
>> I find the idea of a DagFetcher interesting but would we worry about
>> slowing down the scheduler significantly? If the scheduler is having to
>> "fetch" multiple different DAG versions, be it git refs or artifacts from
>> Artifactory, we are talking about adding significant time to each scheduler
>> run. Also how would the scheduler know which DAGs to fetch from where if
>> there aren't local files on disk listing those DAGs? Maybe I'm missing
>> something in the implementation.
>>
>> It seems to me that the fetching of the different versions should be
>> delegated to the Task (or TaskInstance) itself. That ensures we only spend
>> the time to "fetch" the version that is needed when it is needed. One down
>> side might be that each TaskInstance running for the same version of the
>> DAG might end up doing the "fetch" independently (duplicating that work).
>>
>> I think this could be done by adding some version attribute to the DagRun
>> that gets set at creation, and have the scheduler pass that version to the
>> TaskInstances when they are created. You could even extend this so that you
>> could have an arbitrary set of "executor_parameters" that get set on a
>> DagRun and are passed to TaskInstances. Then the specific Executor class
>> that is running that TaskInstance could handle the "executor_parameters" as
>> it sees fit.
>>
>> One thing I'm not clear on is how and when TaskInstances are created. When
>> the scheduler first sees a specific DagRun do all the TaskInstances get
>> created immediately, but only some of them get queued? Or does the
>> scheduler only create those TaskInstances which can be queued right now?
>>
>> In particular if a DagRun gets created and while it is running the DAG is
>> updated and a new Task is added, will the scheduler pick up that new Task
>> for the running DagRun? If the answer is yes, then my suggestion above
>> would run the risk of scheduling a Task for a DAG version where that Task
>> didn't exist. I'm sure you could handle that somewhat gracefully but it's a
>> bit ugly.
>>
>> Chris
>>
>> On Wed, Feb 28, 2018 at 2:05 AM, Maxime Beauchemin <
>> maximebeauche...@gmail.com> wrote:
>>
>> > At a higher level I want to say a few things about the idea of enforcing
>> > version consistency within a DagRun.
>> >
>> > One thing we've been talking about is the need for a "DagFetcher"
>> > abstraction, where it's first implementation that would replace and mimic
>> > the current one would be "FileSystemDagFetcher". One specific DagFetcher
>> > implementation may or may not support version semantics, but if it does
>> > should be able to receive a version id and return the proper version of
>> the
>> > DAG object. For instance that first "FileSystemDagFetcher" would not
>> > support version semantic, but perhaps a "GitRepoDagFetcher" would, or an
>> > "ArtifactoryDagFetcher", or "TarballInS3DagFetcher" may as well.
>> >
>> > Of course that assumes that the scheduler knows and stores the active
>> > version number when generating a new DagRun, and for that information to
>> be
>> > leveraged on subsequent scheduler cycles and on workers when task are
>> > executed.
>> >
>> > This could also enable things like "remote" backfills (non local,
>> > parallelized) of a DAG definition that's on an arbitrary git ref
>> (assuming
>> > a "GitRepoDagFetcher").
>> >
>> > There are [perhaps] unintuitive implications where clearing a single task
>> > would then re-run the old DAG definition on that task (since the version
>> > was stamped in the DagRun and hasn't changed), but deleting/recreating a
>> > DagRun would run the latest version (or any other version that may be
>> > specified for that matter).
>> >
>> > I'm unclear on how much work that represents exactly, but it's certainly
>> > doable and may only require to change part of the DagBag class and a few
>> > other places.
>> >
>> > Max
>> >
>> > On Tue, Feb 27, 2018 at 6:48 PM, David Capwell <dcapw...@gmail.com>
>> wrote:
>> >
>> > > Thanks for your feedback!
>> > >
>> > > Option 1 is a non-starter for us. The reason is we have DAGs that take
>> 9+
>> > > hours to run.
>> > >
>> > > Option 2 is more where my mind was going, but it's rather large.  How I
>> > see
>> > > it you need a MVCC DagBag that's aware of multiple versions (what
>> > provides
>> > > version?).  Assuming you can track active dag runs pointing to which
>> > > versions you know how to cleanup (fine with external).  The pro here is
>> > you
>> > > have snapshot isolation for dag_run, con is more bookkeeping and
>> require
>> > > deploy to work with this (last part may be a good thing though).
>> > >
>> > > The only other option I can think of is to lock deploy so the system
>> only
>> > > picks up new versions when no dag_run holds the lock.  This is flawed
>> for
>> > > many reasons, but breaks horrible for dag_runs that takes minutes (I
>> > assume
>> > > 99% do).
>> > >
>> > >
>> > >
>> > > On Tue, Feb 27, 2018, 4:50 PM Joy Gao <j...@wepay.com> wrote:
>> > >
>> > > > Hi David!
>> > > >
>> > > > Thank you for clarifying, I think I understand your concern now. We
>> > > > currently also work around this by making sure a dag is turned off
>> > > > when we deploy a new version. We also make sure our jobs are
>> > > > idempotent and retry-enabled in the case when we forget to turn off
>> > > > the job, so the issue hasn't caused us too much headache.
>> > > >
>> > > > I do agree that it would be nice for Airflow to have the option to
>> > > > guarantee a single version of dag per dag run. I see two approaches:
>> > > >
>> > > > (1) If a dag is updated, the current dagrun fails and/or retries.
>> > > > (2) If a dag is updated, the current dagrun continues but uses
>> version
>> > > > before the update.
>> > > >
>> > > > (1) requires some mechanism to compare dag generations. One option is
>> > > > to hash the dagfile and storing that value to the dagrun table, and
>> > > > compare against it each time a task is running. And in the case if
>> the
>> > > > hash value is different, update the hash value, then fail/retry the
>> > > > dag. I think this is a fairly safe approach.
>> > > >
>> > > > (2) is trickier. A dag only has a property "fileloc" which tracks the
>> > > > location of the dag file, but the actual content of the dag file is
>> > > > never versioned. When a task instance starts running, it dynamically
>> > > > re-processes the dag file specified by the fileloc, generate all the
>> > > > task objects from the dag file, and fetch the task object by task_id
>> > > > in order to execute it. So in order to guarantee each dagrun to run a
>> > > > specific version, previous versions must be maintained on disk
>> somehow
>> > > > (maintaining this information in memory is difficult, since if the
>> > > > scheduler/worker shuts down, that information is lost). This makes it
>> > > > a pretty big change, and I haven't thought much on how to implement
>> > > > it.
>> > > >
>> > > > I'm personally leaning towards (1) for sake of simplicity. Note that
>> > > > some users may not want dag to fail/retry even when dag is updated,
>> so
>> > > > this should be an optional feature, not required.
>> > > >
>> > > > My scheduler-foo isn't that great, so curious what others have to say
>> > > > about this.
>> > > >
>> > > > On Fri, Feb 23, 2018 at 3:12 PM, David Capwell <dcapw...@gmail.com>
>> > > wrote:
>> > > > > Thanks for the reply Joy, let me walk you though things as they are
>> > > today
>> > > > >
>> > > > > 1) we don't stop airflow or disable DAGs while deploying updates to
>> > > > logic,
>> > > > > this is done live once its released
>> > > > > 2) the python script in the DAG folder doesn't actually have DAGs
>> in
>> > it
>> > > > but
>> > > > > is a shim layer to allow us to deploy in a atomic way for a single
>> > host
>> > > > >   2.1) this script reads a file on local disk (less than disk page
>> > > size)
>> > > > to
>> > > > > find latest git commit deployed
>> > > > >   2.2) re-does the airflow DAG load process but pointing to the git
>> > > > commit
>> > > > > path
>> > > > >
>> > > > > Example directory structure
>> > > > >
>> > > > > /airflow/dags/shim.py
>> > > > > /airflow/real_dags/
>> > > > >                             /latest # pointer to latest commit
>> > > > >                             /[git commit]/
>> > > > >
>> > > > > This is how we make sure deploys are consistent within a single
>> task.
>> > > > >
>> > > > >
>> > > > > Now, lets assume we have a fully atomic commit process and are able
>> > to
>> > > > > upgrade DAGs at the exact same moment.
>> > > > >
>> > > > > At time T0 the scheduler knows of DAG V1 and schedules two tasks,
>> > > Task1,
>> > > > > and Task2
>> > > > > At time T1 Task1 is picked up by Worker1, so starts executing the
>> > task
>> > > > (V1
>> > > > > logic)
>> > > > > At time T2 deploy commit happens, current DAG version: V2
>> > > > > At time T3, Task2 is picked up by Worker2, so starts executing the
>> > task
>> > > > (V2
>> > > > > logic)
>> > > > >
>> > > > > In many cases this isn't really a problem (tuning config change to
>> > > hadoop
>> > > > > job), but as we have more people using Airflow this is causing a
>> lot
>> > of
>> > > > > time spent debugging why production acted differently than expected
>> > > (the
>> > > > > problem was already fixed... why is it still here?).  We also see
>> > that
>> > > > some
>> > > > > tasks expect a given behavior from other tasks, and since they live
>> > in
>> > > > the
>> > > > > same git repo they can modify both tasks at the same time if a
>> > breaking
>> > > > > change is needed, but when this rolls out to prod there isn't a way
>> > to
>> > > do
>> > > > > this other than turn off the DAG, and login to all hosts to verify
>> > > fully
>> > > > > deployed.
>> > > > >
>> > > > > We would like to remove this confusion and make
>> generations/versions
>> > > > (same
>> > > > > thing really) exposed to users and make sure for a single dag_run
>> > only
>> > > > one
>> > > > > version is used.
>> > > > >
>> > > > > I hope this is more clear.
>> > > > >
>> > > > > On Fri, Feb 23, 2018 at 1:37 PM, Joy Gao <j...@wepay.com> wrote:
>> > > > >
>> > > > >> Hi David,
>> > > > >>
>> > > > >> Do you mind providing a concrete example of the scenario in which
>> > > > >> scheduler/workers see different states (I'm not 100% sure if I
>> > > > understood
>> > > > >> the issue at hand).
>> > > > >>
>> > > > >> And by same dag generation, are you referring to the dag version?
>> > (DAG
>> > > > >> version is currently not supported at all, but I can see it being
>> a
>> > > > >> building block for future use cases).
>> > > > >>
>> > > > >> Joy
>> > > > >>
>> > > > >> On Fri, Feb 23, 2018 at 1:00 PM, David Capwell <
>> dcapw...@gmail.com>
>> > > > wrote:
>> > > > >>
>> > > > >> > My current thinking is to add a field to the dag table that is
>> > > > optional
>> > > > >> and
>> > > > >> > provided by the dag. We currently intercept the load path do
>> could
>> > > use
>> > > > >> this
>> > > > >> > field to make sure we load the same generation.  My concern here
>> > is
>> > > > the
>> > > > >> > interaction with the scheduler, not as familiar with that logic
>> to
>> > > > >> predict
>> > > > >> > corner cases were this would fail.
>> > > > >> >
>> > > > >> > Any other recommendations for how this could be done?
>> > > > >> >
>> > > > >> > On Mon, Feb 19, 2018, 10:33 PM David Capwell <
>> dcapw...@gmail.com>
>> > > > wrote:
>> > > > >> >
>> > > > >> > > We have been using airflow for logic that delegates to other
>> > > > systems so
>> > > > >> > > inject a task all tasks depends to make sure all resources
>> used
>> > > are
>> > > > the
>> > > > >> > > same for all tasks in the dag. This works well for tasks that
>> > > > delegates
>> > > > >> > to
>> > > > >> > > external systems but people are starting to need to run logic
>> in
>> > > > >> airflow
>> > > > >> > > and the fact that scheduler and all workers can see different
>> > > > states is
>> > > > >> > > causing issues
>> > > > >> > >
>> > > > >> > > We can make sure that all the code is deployed in a consistent
>> > way
>> > > > but
>> > > > >> > > need help from the scheduler to tell the workers the current
>> > > > generation
>> > > > >> > for
>> > > > >> > > a DAG.
>> > > > >> > >
>> > > > >> > > My question is, what would be the best way to modify airflow
>> to
>> > > > allow
>> > > > >> > DAGs
>> > > > >> > > to define a generation value that the scheduler could send to
>> > > > workers?
>> > > > >> > >
>> > > > >> > > Thanks
>> > > > >> > >
>> > > > >> >
>> > > > >>
>> > > >
>> > > >
>> > >
>> >
>>

Reply via email to