I'm very excited about the possibility of implementing a DAGFetcher (per
prior thread about this) that is aware of dynamic data sources, and can
handle abstracting/caching/deploying them itself, rather than having each
Airflow process run the query for each DAG refresh.

On Thu, Mar 22, 2018 at 2:12 PM, Taylor Edmiston <[email protected]>
wrote:

> I'm interested in hearing further discussion too, and if others have tried
> something similar to our approach.  Several companies on this list have
> mentioned various approaches to dynamic DAGs, and I think everyone needs
> them eventually.  Maybe it's an opportunity for additional docs regarding
> use cases like this and to document best practices from the community.
>
> *Taylor Edmiston*
> TEdmiston.com <https://www.tedmiston.com/> | Blog
> <http://blog.tedmiston.com>
> Stack Overflow CV <https://stackoverflow.com/story/taylor> | LinkedIn
> <https://www.linkedin.com/in/tedmiston/> | AngelList
> <https://angel.co/taylor>
>
>
> On Thu, Mar 22, 2018 at 12:43 PM, Kyle Hamlin <[email protected]> wrote:
>
> > @Chris @Taylor
> > Thank you guy very much for your explanations! Your strategy makes a lot
> of
> > sense to me. Generating a dag for each client I'm going to have a ton of
> > dags on the front page but at least that is searchable haha. I'm going to
> > give this implementation a shot and I'll try to report back with the
> > outcome.
> >
> > Can anyone comment on future work to support data science workflows like
> > these, or is Airflow fundamentally the wrong tool?
> >
> > On Thu, Mar 22, 2018 at 12:07 PM Taylor Edmiston <[email protected]>
> > wrote:
> >
> > > We're not using SubDagOperator.  Our approach is using 1 DAG file to
> > > generate a separate DAG class instance for each similar config, which
> > gets
> > > hoisted into global namespace.  In simplified pseudo-Python, it looks
> > like:
> > >
> > > # sources --> {'configs': [{...}, {...}], 'expire': '<timestamp>'}
> > > cache = Variable.get('sources', default_var={}, deserialize_json=True)
> > > sources = fetch_configs() if is_empty(cache) or is_expired(cache) else
> > > cache['configs']
> > > for source in sources:
> > >   dag = DAG(...)
> > >   globals()[source._id] = dag
> > >   # ...create tasks and set dependencies for each DAG (some config
> pulled
> > > from source object for each)...
> > >
> > > We added the cache part for the same reason you pointed out, because
> the
> > > DAG processing loop was hitting the API a lot.  Btw, you can also turn
> > down
> > > how much the processing loop runs with scheduler_heartbeat_sec under
> the
> > > scheduler group in config.
> > >
> > > We also considered the route Chris mentioned of updating cache via a
> > > separate DAG but weren't crazy about having a DAG scheduled once per
> > > minute.
> > >
> > > *Taylor Edmiston*
> > > TEdmiston.com <https://www.tedmiston.com/> | Blog
> > > <http://blog.tedmiston.com>
> > > Stack Overflow CV <https://stackoverflow.com/story/taylor> | LinkedIn
> > > <https://www.linkedin.com/in/tedmiston/> | AngelList
> > > <https://angel.co/taylor>
> > >
> > >
> > > On Thu, Mar 22, 2018 at 9:17 AM, David Capwell <[email protected]>
> > wrote:
> > >
> > > > For us we compile down to Python rather than do the logic in Python,
> > that
> > > > makes it so the load doesn't do real work.
> > > >
> > > > We have our own DSL that is just a simplified compiler; parse,
> analyze,
> > > > optimize, code gen.  In code gen we just generate the Python code.
> Our
> > > > build then packages it up and have airflow fetch it (very hacky fetch
> > > right
> > > > now)
> > > >
> > > > This does make it so loading is simple and fast, but means you can't
> > use
> > > > the Python api directly
> > > >
> > > > On Thu, Mar 22, 2018, 12:43 AM Andrew Maguire <[email protected]
> >
> > > > wrote:
> > > >
> > > > > I've had similar issues with large dags being slow to render on ui
> > and
> > > > > crashing chrome.
> > > > >
> > > > > I got around it by changing the default tree view from 25 to just
> 5.
> > > > >
> > > > > Involves a couple changes to source files though, would be great if
> > > some
> > > > of
> > > > > the ui defaults could go into airflow.cfg.
> > > > >
> > > > > https://stackoverflow.com/a/48665734/1919374
> > > > >
> > > > > On Thu, 22 Mar 2018, 01:26 Chris Fei, <[email protected]> wrote:
> > > > >
> > > > > > @Kyle, I do something similar and have run into the problems
> you've
> > > > > > mentioned. In my case, I access data from S3 and then generate
> > > separate
> > > > > > DAGs (of different structures) based on the data that's pulled.
> > I've
> > > > > > also found that the UI for accessing a single large DAG is slow
> so
> > I
> > > > > > prefer to keep many separate DAGs. What I'd try is to define a
> DAG
> > > > > > that's responsible for accessing your API and caching the client
> > IDs
> > > > > > somewhere locally, maybe just to a file on disk or as an Airflow
> > > > > > Variable. You can run this DAG on whatever schedule is
> appropriate
> > > for
> > > > > > you. From there, build a function that creates a DAG and then for
> > > each
> > > > > > client ID, register a DAG built by that function to the global
> > > context.
> > > > > > Like this:
> > > > > > def create_client_dag(client_id):
> > > > > >     # build dag here
> > > > > >
> > > > > > def get_client_ids_locally():
> > > > > >     # access the data that was pulled from the API
> > > > > >
> > > > > > client_ids = get_client_ids_locally()
> > > > > > for client in client_ids:
> > > > > >     dag = create_client_dag(client)
> > > > > >     globals()[dag.dag_id] = dag
> > > > > >
> > > > > > This approach also handles removing client IDs somewhat
> gracefully.
> > > > DAGs
> > > > > > for removed clients will still appear in the UI (you can build a
> > > > > > maintenance DAG to clean that up), but they'll be disabled and
> > their
> > > > > > tasks won't be scheduled.
> > > > > > On Wed, Mar 21, 2018, at 7:32 PM, Kyle Hamlin wrote:
> > > > > > > Thanks for all the responses let me try to address the main
> > themes.
> > > > > > >
> > > > > > > @Ace @Nicholas @Taylor
> > > > > > > I originally started with a loop over my list of client ids and
> > > > > > > created a> SparkSubmitOperator for each client. The pseudo code
> > > would
> > > > > > look
> > > > > > > something> like this:
> > > > > > >
> > > > > > > dag = DAG(...)
> > > > > > >
> > > > > > > client_ids = get_client_ids()
> > > > > > > for client_id in client_ids:
> > > > > > >     SparkSubmitOperator(
> > > > > > >         ...
> > > > > > >         dag=dag
> > > > > > >     )
> > > > > > >
> > > > > > > I found that this approach kind of clunky for a few reasons.
> > > > > > > First, the> get_cleint_ids() function was hitting our API every
> > > time
> > > > > the
> > > > > > dag
> > > > > > > was read> by the scheduler which seemed excessive (every 30
> > seconds
> > > > or
> > > > > > > so?). Second,> it seemed like when a single task failure made
> > > marked
> > > > > the
> > > > > > whole
> > > > > > > dag as a> failure, but I guess retrying till the task worked
> > could
> > > > > solve
> > > > > > > this? Third,> the UI gets really clunky and slow, basically
> > > unusable
> > > > > > when it
> > > > > > > tries to> render the graph view for that many tasks. Finally,
> > > Airflow
> > > > > > > doesn't seem> very happy when client_ids are removed i.e. the
> > > > > > get_client_ids()
> > > > > > > no longer> returns a specific client_id, it really seems to
> want
> > a
> > > > > > static dag.
> > > > > > >
> > > > > > > Do I really have to poll and API or database every 30 seconds
> for
> > > > this>
> > > > > > dynamic client_id data?
> > > > > > >
> > > > > > > @Ace
> > > > > > > I have been limiting concurrency so as to not blast the cluster
> > > > > > >
> > > > > > > @Nicholas
> > > > > > > Thank you for the noise suggestion I will definitely implement
> > > > > > > that if I> continue with the same methodology
> > > > > > >
> > > > > > > @Taylor
> > > > > > > Are you using a SubDagOperator? Or is your process similar to
> the
> > > > > > > pseudo code I wrote above?
> > > > > > >
> > > > > > >
> > > > > > > On Wed, Mar 21, 2018 at 2:49 PM Taylor Edmiston
> > > > > > > <[email protected]> wrote:>
> > > > > > >> We also use a similar approach to generate dynamic DAGs based
> on
> > > > > > >> a common>> template DAG file.  We pull in the list of config
> > > > objects,
> > > > > > one
> > > > > > >> per DAG,>> from an internal API lightly wrapping the database,
> > > then
> > > > we
> > > > > > >> cache that>> response in a Airflow Variable that gets updated
> > > once a
> > > > > > minute.  The>> dynamic DAGs are generated from that variable.
> > > > > > >>
> > > > > > >> *Taylor Edmiston*
> > > > > > >> TEdmiston.com <https://www.tedmiston.com/> | Blog
> > > > > > >> <http://blog.tedmiston.com>
> > > > > > >> Stack Overflow CV <https://stackoverflow.com/story/taylor> |
> > > > > > LinkedIn>> <https://www.linkedin.com/in/tedmiston/> | AngelList
> > > > > > >> <https://angel.co/taylor>
> > > > > > >>
> > > > > > >>
> > > > > > >> On Wed, Mar 21, 2018 at 1:54 PM, Nicolas Kijak <
> > > > > > >> [email protected]> wrote:
> > > > > > >>
> > > > > > >>> Kyle,
> > > > > > >>>
> > > > > > >>> We have a similar approach but on a much, much smaller scale.
> > We
> > > > > > >>> now have>>> <100 “things to process” but expect it to grow to
> > > under
> > > > > > ~200.  Each>> “thing
> > > > > > >>> to process” has the same workflow so we have a single DAG
> > > > > > >>> definition that>>> does about 20 tasks per, then we loop over
> > the
> > > > > list
> > > > > > of items and
> > > > > > >>> produce>> a
> > > > > > >>> dag object for each one adding it to the global definition.
> > > > > > >>>
> > > > > > >>> One of the things we quickly ran into was crushing the
> > scheduler
> > > > > as>>>
> > > > > > everything was running with the same start time.  To get around
> > > > > > >>> this we>> add
> > > > > > >>> noise to the start time minute and seconds. Simply index %
> 60.
> > > > > > This>>> spreads out the load so that the scheduler isn’t trying
> to
> > > run
> > > > > > >>> everything>>> at the exact same moment.  I would suggest if
> you
> > > do
> > > > go
> > > > > > this
> > > > > > >>> route, to>> also
> > > > > > >>> stagger your hours if you can because of how many you plan to
> > > run.
> > > > > > >> Perhaps
> > > > > > >>> your DAGs are smaller and aren’t as CPU intensive as ours.
> > > > > > >>>
> > > > > > >>> On 3/21/18, 1:35 PM, "Kyle Hamlin" <[email protected]>
> > wrote:
> > > > > > >>>
> > > > > > >>>   Hello,
> > > > > > >>>
> > > > > > >>>   I'm currently using Airflow for some ETL tasks where I
> > submit a
> > > > > > >>>   spark>>> job
> > > > > > >>>   to a cluster and poll till it is complete. This workflow is
> > > nice
> > > > > > >>> because it
> > > > > > >>>   is typically a single Dag. I'm now starting to do more
> > machine
> > > > > > >> learning
> > > > > > >>>   tasks and need to build a model per client which is 1000+
> > > > > > >>>   clients. My>>>   spark cluster is capable of handling this
> > > > > workload,
> > > > > > however, it
> > > > > > >> doesn't
> > > > > > >>>   seem scalable to write 1000+ dags to fit models for each
> > > client.
> > > > > I>>
> > > > > > want
> > > > > > >>>   each client to have its own task instance so it can be
> > retried
> > > > > > >>>   if it>>>   fails without having to run all 1000+ tasks over
> > > > again.
> > > > > > How do I
> > > > > > >> handle
> > > > > > >>>   this type of workflow in Airflow?
> > > > > > >>>
> > > > > > >>>
> > > > > > >>>
> > > > > > >>
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to