I'm very excited about the possibility of implementing a DAGFetcher (per prior thread about this) that is aware of dynamic data sources, and can handle abstracting/caching/deploying them itself, rather than having each Airflow process run the query for each DAG refresh.
On Thu, Mar 22, 2018 at 2:12 PM, Taylor Edmiston <[email protected]> wrote: > I'm interested in hearing further discussion too, and if others have tried > something similar to our approach. Several companies on this list have > mentioned various approaches to dynamic DAGs, and I think everyone needs > them eventually. Maybe it's an opportunity for additional docs regarding > use cases like this and to document best practices from the community. > > *Taylor Edmiston* > TEdmiston.com <https://www.tedmiston.com/> | Blog > <http://blog.tedmiston.com> > Stack Overflow CV <https://stackoverflow.com/story/taylor> | LinkedIn > <https://www.linkedin.com/in/tedmiston/> | AngelList > <https://angel.co/taylor> > > > On Thu, Mar 22, 2018 at 12:43 PM, Kyle Hamlin <[email protected]> wrote: > > > @Chris @Taylor > > Thank you guy very much for your explanations! Your strategy makes a lot > of > > sense to me. Generating a dag for each client I'm going to have a ton of > > dags on the front page but at least that is searchable haha. I'm going to > > give this implementation a shot and I'll try to report back with the > > outcome. > > > > Can anyone comment on future work to support data science workflows like > > these, or is Airflow fundamentally the wrong tool? > > > > On Thu, Mar 22, 2018 at 12:07 PM Taylor Edmiston <[email protected]> > > wrote: > > > > > We're not using SubDagOperator. Our approach is using 1 DAG file to > > > generate a separate DAG class instance for each similar config, which > > gets > > > hoisted into global namespace. In simplified pseudo-Python, it looks > > like: > > > > > > # sources --> {'configs': [{...}, {...}], 'expire': '<timestamp>'} > > > cache = Variable.get('sources', default_var={}, deserialize_json=True) > > > sources = fetch_configs() if is_empty(cache) or is_expired(cache) else > > > cache['configs'] > > > for source in sources: > > > dag = DAG(...) > > > globals()[source._id] = dag > > > # ...create tasks and set dependencies for each DAG (some config > pulled > > > from source object for each)... > > > > > > We added the cache part for the same reason you pointed out, because > the > > > DAG processing loop was hitting the API a lot. Btw, you can also turn > > down > > > how much the processing loop runs with scheduler_heartbeat_sec under > the > > > scheduler group in config. > > > > > > We also considered the route Chris mentioned of updating cache via a > > > separate DAG but weren't crazy about having a DAG scheduled once per > > > minute. > > > > > > *Taylor Edmiston* > > > TEdmiston.com <https://www.tedmiston.com/> | Blog > > > <http://blog.tedmiston.com> > > > Stack Overflow CV <https://stackoverflow.com/story/taylor> | LinkedIn > > > <https://www.linkedin.com/in/tedmiston/> | AngelList > > > <https://angel.co/taylor> > > > > > > > > > On Thu, Mar 22, 2018 at 9:17 AM, David Capwell <[email protected]> > > wrote: > > > > > > > For us we compile down to Python rather than do the logic in Python, > > that > > > > makes it so the load doesn't do real work. > > > > > > > > We have our own DSL that is just a simplified compiler; parse, > analyze, > > > > optimize, code gen. In code gen we just generate the Python code. > Our > > > > build then packages it up and have airflow fetch it (very hacky fetch > > > right > > > > now) > > > > > > > > This does make it so loading is simple and fast, but means you can't > > use > > > > the Python api directly > > > > > > > > On Thu, Mar 22, 2018, 12:43 AM Andrew Maguire <[email protected] > > > > > > wrote: > > > > > > > > > I've had similar issues with large dags being slow to render on ui > > and > > > > > crashing chrome. > > > > > > > > > > I got around it by changing the default tree view from 25 to just > 5. > > > > > > > > > > Involves a couple changes to source files though, would be great if > > > some > > > > of > > > > > the ui defaults could go into airflow.cfg. > > > > > > > > > > https://stackoverflow.com/a/48665734/1919374 > > > > > > > > > > On Thu, 22 Mar 2018, 01:26 Chris Fei, <[email protected]> wrote: > > > > > > > > > > > @Kyle, I do something similar and have run into the problems > you've > > > > > > mentioned. In my case, I access data from S3 and then generate > > > separate > > > > > > DAGs (of different structures) based on the data that's pulled. > > I've > > > > > > also found that the UI for accessing a single large DAG is slow > so > > I > > > > > > prefer to keep many separate DAGs. What I'd try is to define a > DAG > > > > > > that's responsible for accessing your API and caching the client > > IDs > > > > > > somewhere locally, maybe just to a file on disk or as an Airflow > > > > > > Variable. You can run this DAG on whatever schedule is > appropriate > > > for > > > > > > you. From there, build a function that creates a DAG and then for > > > each > > > > > > client ID, register a DAG built by that function to the global > > > context. > > > > > > Like this: > > > > > > def create_client_dag(client_id): > > > > > > # build dag here > > > > > > > > > > > > def get_client_ids_locally(): > > > > > > # access the data that was pulled from the API > > > > > > > > > > > > client_ids = get_client_ids_locally() > > > > > > for client in client_ids: > > > > > > dag = create_client_dag(client) > > > > > > globals()[dag.dag_id] = dag > > > > > > > > > > > > This approach also handles removing client IDs somewhat > gracefully. > > > > DAGs > > > > > > for removed clients will still appear in the UI (you can build a > > > > > > maintenance DAG to clean that up), but they'll be disabled and > > their > > > > > > tasks won't be scheduled. > > > > > > On Wed, Mar 21, 2018, at 7:32 PM, Kyle Hamlin wrote: > > > > > > > Thanks for all the responses let me try to address the main > > themes. > > > > > > > > > > > > > > @Ace @Nicholas @Taylor > > > > > > > I originally started with a loop over my list of client ids and > > > > > > > created a> SparkSubmitOperator for each client. The pseudo code > > > would > > > > > > look > > > > > > > something> like this: > > > > > > > > > > > > > > dag = DAG(...) > > > > > > > > > > > > > > client_ids = get_client_ids() > > > > > > > for client_id in client_ids: > > > > > > > SparkSubmitOperator( > > > > > > > ... > > > > > > > dag=dag > > > > > > > ) > > > > > > > > > > > > > > I found that this approach kind of clunky for a few reasons. > > > > > > > First, the> get_cleint_ids() function was hitting our API every > > > time > > > > > the > > > > > > dag > > > > > > > was read> by the scheduler which seemed excessive (every 30 > > seconds > > > > or > > > > > > > so?). Second,> it seemed like when a single task failure made > > > marked > > > > > the > > > > > > whole > > > > > > > dag as a> failure, but I guess retrying till the task worked > > could > > > > > solve > > > > > > > this? Third,> the UI gets really clunky and slow, basically > > > unusable > > > > > > when it > > > > > > > tries to> render the graph view for that many tasks. Finally, > > > Airflow > > > > > > > doesn't seem> very happy when client_ids are removed i.e. the > > > > > > get_client_ids() > > > > > > > no longer> returns a specific client_id, it really seems to > want > > a > > > > > > static dag. > > > > > > > > > > > > > > Do I really have to poll and API or database every 30 seconds > for > > > > this> > > > > > > dynamic client_id data? > > > > > > > > > > > > > > @Ace > > > > > > > I have been limiting concurrency so as to not blast the cluster > > > > > > > > > > > > > > @Nicholas > > > > > > > Thank you for the noise suggestion I will definitely implement > > > > > > > that if I> continue with the same methodology > > > > > > > > > > > > > > @Taylor > > > > > > > Are you using a SubDagOperator? Or is your process similar to > the > > > > > > > pseudo code I wrote above? > > > > > > > > > > > > > > > > > > > > > On Wed, Mar 21, 2018 at 2:49 PM Taylor Edmiston > > > > > > > <[email protected]> wrote:> > > > > > > >> We also use a similar approach to generate dynamic DAGs based > on > > > > > > >> a common>> template DAG file. We pull in the list of config > > > > objects, > > > > > > one > > > > > > >> per DAG,>> from an internal API lightly wrapping the database, > > > then > > > > we > > > > > > >> cache that>> response in a Airflow Variable that gets updated > > > once a > > > > > > minute. The>> dynamic DAGs are generated from that variable. > > > > > > >> > > > > > > >> *Taylor Edmiston* > > > > > > >> TEdmiston.com <https://www.tedmiston.com/> | Blog > > > > > > >> <http://blog.tedmiston.com> > > > > > > >> Stack Overflow CV <https://stackoverflow.com/story/taylor> | > > > > > > LinkedIn>> <https://www.linkedin.com/in/tedmiston/> | AngelList > > > > > > >> <https://angel.co/taylor> > > > > > > >> > > > > > > >> > > > > > > >> On Wed, Mar 21, 2018 at 1:54 PM, Nicolas Kijak < > > > > > > >> [email protected]> wrote: > > > > > > >> > > > > > > >>> Kyle, > > > > > > >>> > > > > > > >>> We have a similar approach but on a much, much smaller scale. > > We > > > > > > >>> now have>>> <100 “things to process” but expect it to grow to > > > under > > > > > > ~200. Each>> “thing > > > > > > >>> to process” has the same workflow so we have a single DAG > > > > > > >>> definition that>>> does about 20 tasks per, then we loop over > > the > > > > > list > > > > > > of items and > > > > > > >>> produce>> a > > > > > > >>> dag object for each one adding it to the global definition. > > > > > > >>> > > > > > > >>> One of the things we quickly ran into was crushing the > > scheduler > > > > > as>>> > > > > > > everything was running with the same start time. To get around > > > > > > >>> this we>> add > > > > > > >>> noise to the start time minute and seconds. Simply index % > 60. > > > > > > This>>> spreads out the load so that the scheduler isn’t trying > to > > > run > > > > > > >>> everything>>> at the exact same moment. I would suggest if > you > > > do > > > > go > > > > > > this > > > > > > >>> route, to>> also > > > > > > >>> stagger your hours if you can because of how many you plan to > > > run. > > > > > > >> Perhaps > > > > > > >>> your DAGs are smaller and aren’t as CPU intensive as ours. > > > > > > >>> > > > > > > >>> On 3/21/18, 1:35 PM, "Kyle Hamlin" <[email protected]> > > wrote: > > > > > > >>> > > > > > > >>> Hello, > > > > > > >>> > > > > > > >>> I'm currently using Airflow for some ETL tasks where I > > submit a > > > > > > >>> spark>>> job > > > > > > >>> to a cluster and poll till it is complete. This workflow is > > > nice > > > > > > >>> because it > > > > > > >>> is typically a single Dag. I'm now starting to do more > > machine > > > > > > >> learning > > > > > > >>> tasks and need to build a model per client which is 1000+ > > > > > > >>> clients. My>>> spark cluster is capable of handling this > > > > > workload, > > > > > > however, it > > > > > > >> doesn't > > > > > > >>> seem scalable to write 1000+ dags to fit models for each > > > client. > > > > > I>> > > > > > > want > > > > > > >>> each client to have its own task instance so it can be > > retried > > > > > > >>> if it>>> fails without having to run all 1000+ tasks over > > > > again. > > > > > > How do I > > > > > > >> handle > > > > > > >>> this type of workflow in Airflow? > > > > > > >>> > > > > > > >>> > > > > > > >>> > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > >
