For us we compile down to Python rather than do the logic in Python, that
makes it so the load doesn't do real work.

We have our own DSL that is just a simplified compiler; parse, analyze,
optimize, code gen.  In code gen we just generate the Python code.  Our
build then packages it up and have airflow fetch it (very hacky fetch right
now)

This does make it so loading is simple and fast, but means you can't use
the Python api directly

On Thu, Mar 22, 2018, 12:43 AM Andrew Maguire <andrewm4...@gmail.com> wrote:

> I've had similar issues with large dags being slow to render on ui and
> crashing chrome.
>
> I got around it by changing the default tree view from 25 to just 5.
>
> Involves a couple changes to source files though, would be great if some of
> the ui defaults could go into airflow.cfg.
>
> https://stackoverflow.com/a/48665734/1919374
>
> On Thu, 22 Mar 2018, 01:26 Chris Fei, <cfe...@gmail.com> wrote:
>
> > @Kyle, I do something similar and have run into the problems you've
> > mentioned. In my case, I access data from S3 and then generate separate
> > DAGs (of different structures) based on the data that's pulled. I've
> > also found that the UI for accessing a single large DAG is slow so I
> > prefer to keep many separate DAGs. What I'd try is to define a DAG
> > that's responsible for accessing your API and caching the client IDs
> > somewhere locally, maybe just to a file on disk or as an Airflow
> > Variable. You can run this DAG on whatever schedule is appropriate for
> > you. From there, build a function that creates a DAG and then for each
> > client ID, register a DAG built by that function to the global context.
> > Like this:
> > def create_client_dag(client_id):
> >     # build dag here
> >
> > def get_client_ids_locally():
> >     # access the data that was pulled from the API
> >
> > client_ids = get_client_ids_locally()
> > for client in client_ids:
> >     dag = create_client_dag(client)
> >     globals()[dag.dag_id] = dag
> >
> > This approach also handles removing client IDs somewhat gracefully. DAGs
> > for removed clients will still appear in the UI (you can build a
> > maintenance DAG to clean that up), but they'll be disabled and their
> > tasks won't be scheduled.
> > On Wed, Mar 21, 2018, at 7:32 PM, Kyle Hamlin wrote:
> > > Thanks for all the responses let me try to address the main themes.
> > >
> > > @Ace @Nicholas @Taylor
> > > I originally started with a loop over my list of client ids and
> > > created a> SparkSubmitOperator for each client. The pseudo code would
> > look
> > > something> like this:
> > >
> > > dag = DAG(...)
> > >
> > > client_ids = get_client_ids()
> > > for client_id in client_ids:
> > >     SparkSubmitOperator(
> > >         ...
> > >         dag=dag
> > >     )
> > >
> > > I found that this approach kind of clunky for a few reasons.
> > > First, the> get_cleint_ids() function was hitting our API every time
> the
> > dag
> > > was read> by the scheduler which seemed excessive (every 30 seconds or
> > > so?). Second,> it seemed like when a single task failure made marked
> the
> > whole
> > > dag as a> failure, but I guess retrying till the task worked could
> solve
> > > this? Third,> the UI gets really clunky and slow, basically unusable
> > when it
> > > tries to> render the graph view for that many tasks. Finally, Airflow
> > > doesn't seem> very happy when client_ids are removed i.e. the
> > get_client_ids()
> > > no longer> returns a specific client_id, it really seems to want a
> > static dag.
> > >
> > > Do I really have to poll and API or database every 30 seconds for this>
> > dynamic client_id data?
> > >
> > > @Ace
> > > I have been limiting concurrency so as to not blast the cluster
> > >
> > > @Nicholas
> > > Thank you for the noise suggestion I will definitely implement
> > > that if I> continue with the same methodology
> > >
> > > @Taylor
> > > Are you using a SubDagOperator? Or is your process similar to the
> > > pseudo code I wrote above?
> > >
> > >
> > > On Wed, Mar 21, 2018 at 2:49 PM Taylor Edmiston
> > > <tedmis...@gmail.com> wrote:>
> > >> We also use a similar approach to generate dynamic DAGs based on
> > >> a common>> template DAG file.  We pull in the list of config objects,
> > one
> > >> per DAG,>> from an internal API lightly wrapping the database, then we
> > >> cache that>> response in a Airflow Variable that gets updated once a
> > minute.  The>> dynamic DAGs are generated from that variable.
> > >>
> > >> *Taylor Edmiston*
> > >> TEdmiston.com <https://www.tedmiston.com/> | Blog
> > >> <http://blog.tedmiston.com>
> > >> Stack Overflow CV <https://stackoverflow.com/story/taylor> |
> > LinkedIn>> <https://www.linkedin.com/in/tedmiston/> | AngelList
> > >> <https://angel.co/taylor>
> > >>
> > >>
> > >> On Wed, Mar 21, 2018 at 1:54 PM, Nicolas Kijak <
> > >> nicolas.ki...@weightwatchers.com> wrote:
> > >>
> > >>> Kyle,
> > >>>
> > >>> We have a similar approach but on a much, much smaller scale. We
> > >>> now have>>> <100 “things to process” but expect it to grow to under
> > ~200.  Each>> “thing
> > >>> to process” has the same workflow so we have a single DAG
> > >>> definition that>>> does about 20 tasks per, then we loop over the
> list
> > of items and
> > >>> produce>> a
> > >>> dag object for each one adding it to the global definition.
> > >>>
> > >>> One of the things we quickly ran into was crushing the scheduler
> as>>>
> > everything was running with the same start time.  To get around
> > >>> this we>> add
> > >>> noise to the start time minute and seconds. Simply index % 60.
> > This>>> spreads out the load so that the scheduler isn’t trying to run
> > >>> everything>>> at the exact same moment.  I would suggest if you do go
> > this
> > >>> route, to>> also
> > >>> stagger your hours if you can because of how many you plan to run.
> > >> Perhaps
> > >>> your DAGs are smaller and aren’t as CPU intensive as ours.
> > >>>
> > >>> On 3/21/18, 1:35 PM, "Kyle Hamlin" <hamlin...@gmail.com> wrote:
> > >>>
> > >>>   Hello,
> > >>>
> > >>>   I'm currently using Airflow for some ETL tasks where I submit a
> > >>>   spark>>> job
> > >>>   to a cluster and poll till it is complete. This workflow is nice
> > >>> because it
> > >>>   is typically a single Dag. I'm now starting to do more machine
> > >> learning
> > >>>   tasks and need to build a model per client which is 1000+
> > >>>   clients. My>>>   spark cluster is capable of handling this
> workload,
> > however, it
> > >> doesn't
> > >>>   seem scalable to write 1000+ dags to fit models for each client.
> I>>
> > want
> > >>>   each client to have its own task instance so it can be retried
> > >>>   if it>>>   fails without having to run all 1000+ tasks over again.
> > How do I
> > >> handle
> > >>>   this type of workflow in Airflow?
> > >>>
> > >>>
> > >>>
> > >>
> >
> >
>

Reply via email to