I've had similar issues with large dags being slow to render on ui and crashing chrome.
I got around it by changing the default tree view from 25 to just 5. Involves a couple changes to source files though, would be great if some of the ui defaults could go into airflow.cfg. https://stackoverflow.com/a/48665734/1919374 On Thu, 22 Mar 2018, 01:26 Chris Fei, <cfe...@gmail.com> wrote: > @Kyle, I do something similar and have run into the problems you've > mentioned. In my case, I access data from S3 and then generate separate > DAGs (of different structures) based on the data that's pulled. I've > also found that the UI for accessing a single large DAG is slow so I > prefer to keep many separate DAGs. What I'd try is to define a DAG > that's responsible for accessing your API and caching the client IDs > somewhere locally, maybe just to a file on disk or as an Airflow > Variable. You can run this DAG on whatever schedule is appropriate for > you. From there, build a function that creates a DAG and then for each > client ID, register a DAG built by that function to the global context. > Like this: > def create_client_dag(client_id): > # build dag here > > def get_client_ids_locally(): > # access the data that was pulled from the API > > client_ids = get_client_ids_locally() > for client in client_ids: > dag = create_client_dag(client) > globals()[dag.dag_id] = dag > > This approach also handles removing client IDs somewhat gracefully. DAGs > for removed clients will still appear in the UI (you can build a > maintenance DAG to clean that up), but they'll be disabled and their > tasks won't be scheduled. > On Wed, Mar 21, 2018, at 7:32 PM, Kyle Hamlin wrote: > > Thanks for all the responses let me try to address the main themes. > > > > @Ace @Nicholas @Taylor > > I originally started with a loop over my list of client ids and > > created a> SparkSubmitOperator for each client. The pseudo code would > look > > something> like this: > > > > dag = DAG(...) > > > > client_ids = get_client_ids() > > for client_id in client_ids: > > SparkSubmitOperator( > > ... > > dag=dag > > ) > > > > I found that this approach kind of clunky for a few reasons. > > First, the> get_cleint_ids() function was hitting our API every time the > dag > > was read> by the scheduler which seemed excessive (every 30 seconds or > > so?). Second,> it seemed like when a single task failure made marked the > whole > > dag as a> failure, but I guess retrying till the task worked could solve > > this? Third,> the UI gets really clunky and slow, basically unusable > when it > > tries to> render the graph view for that many tasks. Finally, Airflow > > doesn't seem> very happy when client_ids are removed i.e. the > get_client_ids() > > no longer> returns a specific client_id, it really seems to want a > static dag. > > > > Do I really have to poll and API or database every 30 seconds for this> > dynamic client_id data? > > > > @Ace > > I have been limiting concurrency so as to not blast the cluster > > > > @Nicholas > > Thank you for the noise suggestion I will definitely implement > > that if I> continue with the same methodology > > > > @Taylor > > Are you using a SubDagOperator? Or is your process similar to the > > pseudo code I wrote above? > > > > > > On Wed, Mar 21, 2018 at 2:49 PM Taylor Edmiston > > <tedmis...@gmail.com> wrote:> > >> We also use a similar approach to generate dynamic DAGs based on > >> a common>> template DAG file. We pull in the list of config objects, > one > >> per DAG,>> from an internal API lightly wrapping the database, then we > >> cache that>> response in a Airflow Variable that gets updated once a > minute. The>> dynamic DAGs are generated from that variable. > >> > >> *Taylor Edmiston* > >> TEdmiston.com <https://www.tedmiston.com/> | Blog > >> <http://blog.tedmiston.com> > >> Stack Overflow CV <https://stackoverflow.com/story/taylor> | > LinkedIn>> <https://www.linkedin.com/in/tedmiston/> | AngelList > >> <https://angel.co/taylor> > >> > >> > >> On Wed, Mar 21, 2018 at 1:54 PM, Nicolas Kijak < > >> nicolas.ki...@weightwatchers.com> wrote: > >> > >>> Kyle, > >>> > >>> We have a similar approach but on a much, much smaller scale. We > >>> now have>>> <100 “things to process” but expect it to grow to under > ~200. Each>> “thing > >>> to process” has the same workflow so we have a single DAG > >>> definition that>>> does about 20 tasks per, then we loop over the list > of items and > >>> produce>> a > >>> dag object for each one adding it to the global definition. > >>> > >>> One of the things we quickly ran into was crushing the scheduler as>>> > everything was running with the same start time. To get around > >>> this we>> add > >>> noise to the start time minute and seconds. Simply index % 60. > This>>> spreads out the load so that the scheduler isn’t trying to run > >>> everything>>> at the exact same moment. I would suggest if you do go > this > >>> route, to>> also > >>> stagger your hours if you can because of how many you plan to run. > >> Perhaps > >>> your DAGs are smaller and aren’t as CPU intensive as ours. > >>> > >>> On 3/21/18, 1:35 PM, "Kyle Hamlin" <hamlin...@gmail.com> wrote: > >>> > >>> Hello, > >>> > >>> I'm currently using Airflow for some ETL tasks where I submit a > >>> spark>>> job > >>> to a cluster and poll till it is complete. This workflow is nice > >>> because it > >>> is typically a single Dag. I'm now starting to do more machine > >> learning > >>> tasks and need to build a model per client which is 1000+ > >>> clients. My>>> spark cluster is capable of handling this workload, > however, it > >> doesn't > >>> seem scalable to write 1000+ dags to fit models for each client. I>> > want > >>> each client to have its own task instance so it can be retried > >>> if it>>> fails without having to run all 1000+ tasks over again. > How do I > >> handle > >>> this type of workflow in Airflow? > >>> > >>> > >>> > >> > >