I'm interested in hearing further discussion too, and if others have tried
something similar to our approach.  Several companies on this list have
mentioned various approaches to dynamic DAGs, and I think everyone needs
them eventually.  Maybe it's an opportunity for additional docs regarding
use cases like this and to document best practices from the community.

*Taylor Edmiston*
TEdmiston.com <https://www.tedmiston.com/> | Blog
<http://blog.tedmiston.com>
Stack Overflow CV <https://stackoverflow.com/story/taylor> | LinkedIn
<https://www.linkedin.com/in/tedmiston/> | AngelList
<https://angel.co/taylor>


On Thu, Mar 22, 2018 at 12:43 PM, Kyle Hamlin <[email protected]> wrote:

> @Chris @Taylor
> Thank you guy very much for your explanations! Your strategy makes a lot of
> sense to me. Generating a dag for each client I'm going to have a ton of
> dags on the front page but at least that is searchable haha. I'm going to
> give this implementation a shot and I'll try to report back with the
> outcome.
>
> Can anyone comment on future work to support data science workflows like
> these, or is Airflow fundamentally the wrong tool?
>
> On Thu, Mar 22, 2018 at 12:07 PM Taylor Edmiston <[email protected]>
> wrote:
>
> > We're not using SubDagOperator.  Our approach is using 1 DAG file to
> > generate a separate DAG class instance for each similar config, which
> gets
> > hoisted into global namespace.  In simplified pseudo-Python, it looks
> like:
> >
> > # sources --> {'configs': [{...}, {...}], 'expire': '<timestamp>'}
> > cache = Variable.get('sources', default_var={}, deserialize_json=True)
> > sources = fetch_configs() if is_empty(cache) or is_expired(cache) else
> > cache['configs']
> > for source in sources:
> >   dag = DAG(...)
> >   globals()[source._id] = dag
> >   # ...create tasks and set dependencies for each DAG (some config pulled
> > from source object for each)...
> >
> > We added the cache part for the same reason you pointed out, because the
> > DAG processing loop was hitting the API a lot.  Btw, you can also turn
> down
> > how much the processing loop runs with scheduler_heartbeat_sec under the
> > scheduler group in config.
> >
> > We also considered the route Chris mentioned of updating cache via a
> > separate DAG but weren't crazy about having a DAG scheduled once per
> > minute.
> >
> > *Taylor Edmiston*
> > TEdmiston.com <https://www.tedmiston.com/> | Blog
> > <http://blog.tedmiston.com>
> > Stack Overflow CV <https://stackoverflow.com/story/taylor> | LinkedIn
> > <https://www.linkedin.com/in/tedmiston/> | AngelList
> > <https://angel.co/taylor>
> >
> >
> > On Thu, Mar 22, 2018 at 9:17 AM, David Capwell <[email protected]>
> wrote:
> >
> > > For us we compile down to Python rather than do the logic in Python,
> that
> > > makes it so the load doesn't do real work.
> > >
> > > We have our own DSL that is just a simplified compiler; parse, analyze,
> > > optimize, code gen.  In code gen we just generate the Python code.  Our
> > > build then packages it up and have airflow fetch it (very hacky fetch
> > right
> > > now)
> > >
> > > This does make it so loading is simple and fast, but means you can't
> use
> > > the Python api directly
> > >
> > > On Thu, Mar 22, 2018, 12:43 AM Andrew Maguire <[email protected]>
> > > wrote:
> > >
> > > > I've had similar issues with large dags being slow to render on ui
> and
> > > > crashing chrome.
> > > >
> > > > I got around it by changing the default tree view from 25 to just 5.
> > > >
> > > > Involves a couple changes to source files though, would be great if
> > some
> > > of
> > > > the ui defaults could go into airflow.cfg.
> > > >
> > > > https://stackoverflow.com/a/48665734/1919374
> > > >
> > > > On Thu, 22 Mar 2018, 01:26 Chris Fei, <[email protected]> wrote:
> > > >
> > > > > @Kyle, I do something similar and have run into the problems you've
> > > > > mentioned. In my case, I access data from S3 and then generate
> > separate
> > > > > DAGs (of different structures) based on the data that's pulled.
> I've
> > > > > also found that the UI for accessing a single large DAG is slow so
> I
> > > > > prefer to keep many separate DAGs. What I'd try is to define a DAG
> > > > > that's responsible for accessing your API and caching the client
> IDs
> > > > > somewhere locally, maybe just to a file on disk or as an Airflow
> > > > > Variable. You can run this DAG on whatever schedule is appropriate
> > for
> > > > > you. From there, build a function that creates a DAG and then for
> > each
> > > > > client ID, register a DAG built by that function to the global
> > context.
> > > > > Like this:
> > > > > def create_client_dag(client_id):
> > > > >     # build dag here
> > > > >
> > > > > def get_client_ids_locally():
> > > > >     # access the data that was pulled from the API
> > > > >
> > > > > client_ids = get_client_ids_locally()
> > > > > for client in client_ids:
> > > > >     dag = create_client_dag(client)
> > > > >     globals()[dag.dag_id] = dag
> > > > >
> > > > > This approach also handles removing client IDs somewhat gracefully.
> > > DAGs
> > > > > for removed clients will still appear in the UI (you can build a
> > > > > maintenance DAG to clean that up), but they'll be disabled and
> their
> > > > > tasks won't be scheduled.
> > > > > On Wed, Mar 21, 2018, at 7:32 PM, Kyle Hamlin wrote:
> > > > > > Thanks for all the responses let me try to address the main
> themes.
> > > > > >
> > > > > > @Ace @Nicholas @Taylor
> > > > > > I originally started with a loop over my list of client ids and
> > > > > > created a> SparkSubmitOperator for each client. The pseudo code
> > would
> > > > > look
> > > > > > something> like this:
> > > > > >
> > > > > > dag = DAG(...)
> > > > > >
> > > > > > client_ids = get_client_ids()
> > > > > > for client_id in client_ids:
> > > > > >     SparkSubmitOperator(
> > > > > >         ...
> > > > > >         dag=dag
> > > > > >     )
> > > > > >
> > > > > > I found that this approach kind of clunky for a few reasons.
> > > > > > First, the> get_cleint_ids() function was hitting our API every
> > time
> > > > the
> > > > > dag
> > > > > > was read> by the scheduler which seemed excessive (every 30
> seconds
> > > or
> > > > > > so?). Second,> it seemed like when a single task failure made
> > marked
> > > > the
> > > > > whole
> > > > > > dag as a> failure, but I guess retrying till the task worked
> could
> > > > solve
> > > > > > this? Third,> the UI gets really clunky and slow, basically
> > unusable
> > > > > when it
> > > > > > tries to> render the graph view for that many tasks. Finally,
> > Airflow
> > > > > > doesn't seem> very happy when client_ids are removed i.e. the
> > > > > get_client_ids()
> > > > > > no longer> returns a specific client_id, it really seems to want
> a
> > > > > static dag.
> > > > > >
> > > > > > Do I really have to poll and API or database every 30 seconds for
> > > this>
> > > > > dynamic client_id data?
> > > > > >
> > > > > > @Ace
> > > > > > I have been limiting concurrency so as to not blast the cluster
> > > > > >
> > > > > > @Nicholas
> > > > > > Thank you for the noise suggestion I will definitely implement
> > > > > > that if I> continue with the same methodology
> > > > > >
> > > > > > @Taylor
> > > > > > Are you using a SubDagOperator? Or is your process similar to the
> > > > > > pseudo code I wrote above?
> > > > > >
> > > > > >
> > > > > > On Wed, Mar 21, 2018 at 2:49 PM Taylor Edmiston
> > > > > > <[email protected]> wrote:>
> > > > > >> We also use a similar approach to generate dynamic DAGs based on
> > > > > >> a common>> template DAG file.  We pull in the list of config
> > > objects,
> > > > > one
> > > > > >> per DAG,>> from an internal API lightly wrapping the database,
> > then
> > > we
> > > > > >> cache that>> response in a Airflow Variable that gets updated
> > once a
> > > > > minute.  The>> dynamic DAGs are generated from that variable.
> > > > > >>
> > > > > >> *Taylor Edmiston*
> > > > > >> TEdmiston.com <https://www.tedmiston.com/> | Blog
> > > > > >> <http://blog.tedmiston.com>
> > > > > >> Stack Overflow CV <https://stackoverflow.com/story/taylor> |
> > > > > LinkedIn>> <https://www.linkedin.com/in/tedmiston/> | AngelList
> > > > > >> <https://angel.co/taylor>
> > > > > >>
> > > > > >>
> > > > > >> On Wed, Mar 21, 2018 at 1:54 PM, Nicolas Kijak <
> > > > > >> [email protected]> wrote:
> > > > > >>
> > > > > >>> Kyle,
> > > > > >>>
> > > > > >>> We have a similar approach but on a much, much smaller scale.
> We
> > > > > >>> now have>>> <100 “things to process” but expect it to grow to
> > under
> > > > > ~200.  Each>> “thing
> > > > > >>> to process” has the same workflow so we have a single DAG
> > > > > >>> definition that>>> does about 20 tasks per, then we loop over
> the
> > > > list
> > > > > of items and
> > > > > >>> produce>> a
> > > > > >>> dag object for each one adding it to the global definition.
> > > > > >>>
> > > > > >>> One of the things we quickly ran into was crushing the
> scheduler
> > > > as>>>
> > > > > everything was running with the same start time.  To get around
> > > > > >>> this we>> add
> > > > > >>> noise to the start time minute and seconds. Simply index % 60.
> > > > > This>>> spreads out the load so that the scheduler isn’t trying to
> > run
> > > > > >>> everything>>> at the exact same moment.  I would suggest if you
> > do
> > > go
> > > > > this
> > > > > >>> route, to>> also
> > > > > >>> stagger your hours if you can because of how many you plan to
> > run.
> > > > > >> Perhaps
> > > > > >>> your DAGs are smaller and aren’t as CPU intensive as ours.
> > > > > >>>
> > > > > >>> On 3/21/18, 1:35 PM, "Kyle Hamlin" <[email protected]>
> wrote:
> > > > > >>>
> > > > > >>>   Hello,
> > > > > >>>
> > > > > >>>   I'm currently using Airflow for some ETL tasks where I
> submit a
> > > > > >>>   spark>>> job
> > > > > >>>   to a cluster and poll till it is complete. This workflow is
> > nice
> > > > > >>> because it
> > > > > >>>   is typically a single Dag. I'm now starting to do more
> machine
> > > > > >> learning
> > > > > >>>   tasks and need to build a model per client which is 1000+
> > > > > >>>   clients. My>>>   spark cluster is capable of handling this
> > > > workload,
> > > > > however, it
> > > > > >> doesn't
> > > > > >>>   seem scalable to write 1000+ dags to fit models for each
> > client.
> > > > I>>
> > > > > want
> > > > > >>>   each client to have its own task instance so it can be
> retried
> > > > > >>>   if it>>>   fails without having to run all 1000+ tasks over
> > > again.
> > > > > How do I
> > > > > >> handle
> > > > > >>>   this type of workflow in Airflow?
> > > > > >>>
> > > > > >>>
> > > > > >>>
> > > > > >>
> > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to