On the open PR I described how DagFetcher might imply a new DAG manifest (replacing the current DAG_FOLDER auto-parsing & discovery) that describes a list of dag_ids and related DAG URIs.
That DAG manifest could be a static list OR a something dynamic if you pass it a callable. To enable the dynamic DAGs pattern, perhaps that manifest supports not only URIs but also DAG factories as functions that return one or many DAG objects. This might be a tad too dynamic though, it can be good to keep the balance and have something more predictable. Max On Thu, Mar 22, 2018 at 1:24 PM, James Meickle <[email protected]> wrote: > I'm very excited about the possibility of implementing a DAGFetcher (per > prior thread about this) that is aware of dynamic data sources, and can > handle abstracting/caching/deploying them itself, rather than having each > Airflow process run the query for each DAG refresh. > > On Thu, Mar 22, 2018 at 2:12 PM, Taylor Edmiston <[email protected]> > wrote: > > > I'm interested in hearing further discussion too, and if others have > tried > > something similar to our approach. Several companies on this list have > > mentioned various approaches to dynamic DAGs, and I think everyone needs > > them eventually. Maybe it's an opportunity for additional docs regarding > > use cases like this and to document best practices from the community. > > > > *Taylor Edmiston* > > TEdmiston.com <https://www.tedmiston.com/> | Blog > > <http://blog.tedmiston.com> > > Stack Overflow CV <https://stackoverflow.com/story/taylor> | LinkedIn > > <https://www.linkedin.com/in/tedmiston/> | AngelList > > <https://angel.co/taylor> > > > > > > On Thu, Mar 22, 2018 at 12:43 PM, Kyle Hamlin <[email protected]> > wrote: > > > > > @Chris @Taylor > > > Thank you guy very much for your explanations! Your strategy makes a > lot > > of > > > sense to me. Generating a dag for each client I'm going to have a ton > of > > > dags on the front page but at least that is searchable haha. I'm going > to > > > give this implementation a shot and I'll try to report back with the > > > outcome. > > > > > > Can anyone comment on future work to support data science workflows > like > > > these, or is Airflow fundamentally the wrong tool? > > > > > > On Thu, Mar 22, 2018 at 12:07 PM Taylor Edmiston <[email protected]> > > > wrote: > > > > > > > We're not using SubDagOperator. Our approach is using 1 DAG file to > > > > generate a separate DAG class instance for each similar config, which > > > gets > > > > hoisted into global namespace. In simplified pseudo-Python, it looks > > > like: > > > > > > > > # sources --> {'configs': [{...}, {...}], 'expire': '<timestamp>'} > > > > cache = Variable.get('sources', default_var={}, > deserialize_json=True) > > > > sources = fetch_configs() if is_empty(cache) or is_expired(cache) > else > > > > cache['configs'] > > > > for source in sources: > > > > dag = DAG(...) > > > > globals()[source._id] = dag > > > > # ...create tasks and set dependencies for each DAG (some config > > pulled > > > > from source object for each)... > > > > > > > > We added the cache part for the same reason you pointed out, because > > the > > > > DAG processing loop was hitting the API a lot. Btw, you can also > turn > > > down > > > > how much the processing loop runs with scheduler_heartbeat_sec under > > the > > > > scheduler group in config. > > > > > > > > We also considered the route Chris mentioned of updating cache via a > > > > separate DAG but weren't crazy about having a DAG scheduled once per > > > > minute. > > > > > > > > *Taylor Edmiston* > > > > TEdmiston.com <https://www.tedmiston.com/> | Blog > > > > <http://blog.tedmiston.com> > > > > Stack Overflow CV <https://stackoverflow.com/story/taylor> | > LinkedIn > > > > <https://www.linkedin.com/in/tedmiston/> | AngelList > > > > <https://angel.co/taylor> > > > > > > > > > > > > On Thu, Mar 22, 2018 at 9:17 AM, David Capwell <[email protected]> > > > wrote: > > > > > > > > > For us we compile down to Python rather than do the logic in > Python, > > > that > > > > > makes it so the load doesn't do real work. > > > > > > > > > > We have our own DSL that is just a simplified compiler; parse, > > analyze, > > > > > optimize, code gen. In code gen we just generate the Python code. > > Our > > > > > build then packages it up and have airflow fetch it (very hacky > fetch > > > > right > > > > > now) > > > > > > > > > > This does make it so loading is simple and fast, but means you > can't > > > use > > > > > the Python api directly > > > > > > > > > > On Thu, Mar 22, 2018, 12:43 AM Andrew Maguire < > [email protected] > > > > > > > > wrote: > > > > > > > > > > > I've had similar issues with large dags being slow to render on > ui > > > and > > > > > > crashing chrome. > > > > > > > > > > > > I got around it by changing the default tree view from 25 to just > > 5. > > > > > > > > > > > > Involves a couple changes to source files though, would be great > if > > > > some > > > > > of > > > > > > the ui defaults could go into airflow.cfg. > > > > > > > > > > > > https://stackoverflow.com/a/48665734/1919374 > > > > > > > > > > > > On Thu, 22 Mar 2018, 01:26 Chris Fei, <[email protected]> wrote: > > > > > > > > > > > > > @Kyle, I do something similar and have run into the problems > > you've > > > > > > > mentioned. In my case, I access data from S3 and then generate > > > > separate > > > > > > > DAGs (of different structures) based on the data that's pulled. > > > I've > > > > > > > also found that the UI for accessing a single large DAG is slow > > so > > > I > > > > > > > prefer to keep many separate DAGs. What I'd try is to define a > > DAG > > > > > > > that's responsible for accessing your API and caching the > client > > > IDs > > > > > > > somewhere locally, maybe just to a file on disk or as an > Airflow > > > > > > > Variable. You can run this DAG on whatever schedule is > > appropriate > > > > for > > > > > > > you. From there, build a function that creates a DAG and then > for > > > > each > > > > > > > client ID, register a DAG built by that function to the global > > > > context. > > > > > > > Like this: > > > > > > > def create_client_dag(client_id): > > > > > > > # build dag here > > > > > > > > > > > > > > def get_client_ids_locally(): > > > > > > > # access the data that was pulled from the API > > > > > > > > > > > > > > client_ids = get_client_ids_locally() > > > > > > > for client in client_ids: > > > > > > > dag = create_client_dag(client) > > > > > > > globals()[dag.dag_id] = dag > > > > > > > > > > > > > > This approach also handles removing client IDs somewhat > > gracefully. > > > > > DAGs > > > > > > > for removed clients will still appear in the UI (you can build > a > > > > > > > maintenance DAG to clean that up), but they'll be disabled and > > > their > > > > > > > tasks won't be scheduled. > > > > > > > On Wed, Mar 21, 2018, at 7:32 PM, Kyle Hamlin wrote: > > > > > > > > Thanks for all the responses let me try to address the main > > > themes. > > > > > > > > > > > > > > > > @Ace @Nicholas @Taylor > > > > > > > > I originally started with a loop over my list of client ids > and > > > > > > > > created a> SparkSubmitOperator for each client. The pseudo > code > > > > would > > > > > > > look > > > > > > > > something> like this: > > > > > > > > > > > > > > > > dag = DAG(...) > > > > > > > > > > > > > > > > client_ids = get_client_ids() > > > > > > > > for client_id in client_ids: > > > > > > > > SparkSubmitOperator( > > > > > > > > ... > > > > > > > > dag=dag > > > > > > > > ) > > > > > > > > > > > > > > > > I found that this approach kind of clunky for a few reasons. > > > > > > > > First, the> get_cleint_ids() function was hitting our API > every > > > > time > > > > > > the > > > > > > > dag > > > > > > > > was read> by the scheduler which seemed excessive (every 30 > > > seconds > > > > > or > > > > > > > > so?). Second,> it seemed like when a single task failure made > > > > marked > > > > > > the > > > > > > > whole > > > > > > > > dag as a> failure, but I guess retrying till the task worked > > > could > > > > > > solve > > > > > > > > this? Third,> the UI gets really clunky and slow, basically > > > > unusable > > > > > > > when it > > > > > > > > tries to> render the graph view for that many tasks. Finally, > > > > Airflow > > > > > > > > doesn't seem> very happy when client_ids are removed i.e. the > > > > > > > get_client_ids() > > > > > > > > no longer> returns a specific client_id, it really seems to > > want > > > a > > > > > > > static dag. > > > > > > > > > > > > > > > > Do I really have to poll and API or database every 30 seconds > > for > > > > > this> > > > > > > > dynamic client_id data? > > > > > > > > > > > > > > > > @Ace > > > > > > > > I have been limiting concurrency so as to not blast the > cluster > > > > > > > > > > > > > > > > @Nicholas > > > > > > > > Thank you for the noise suggestion I will definitely > implement > > > > > > > > that if I> continue with the same methodology > > > > > > > > > > > > > > > > @Taylor > > > > > > > > Are you using a SubDagOperator? Or is your process similar to > > the > > > > > > > > pseudo code I wrote above? > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Mar 21, 2018 at 2:49 PM Taylor Edmiston > > > > > > > > <[email protected]> wrote:> > > > > > > > >> We also use a similar approach to generate dynamic DAGs > based > > on > > > > > > > >> a common>> template DAG file. We pull in the list of config > > > > > objects, > > > > > > > one > > > > > > > >> per DAG,>> from an internal API lightly wrapping the > database, > > > > then > > > > > we > > > > > > > >> cache that>> response in a Airflow Variable that gets > updated > > > > once a > > > > > > > minute. The>> dynamic DAGs are generated from that variable. > > > > > > > >> > > > > > > > >> *Taylor Edmiston* > > > > > > > >> TEdmiston.com <https://www.tedmiston.com/> | Blog > > > > > > > >> <http://blog.tedmiston.com> > > > > > > > >> Stack Overflow CV <https://stackoverflow.com/story/taylor> > | > > > > > > > LinkedIn>> <https://www.linkedin.com/in/tedmiston/> | > AngelList > > > > > > > >> <https://angel.co/taylor> > > > > > > > >> > > > > > > > >> > > > > > > > >> On Wed, Mar 21, 2018 at 1:54 PM, Nicolas Kijak < > > > > > > > >> [email protected]> wrote: > > > > > > > >> > > > > > > > >>> Kyle, > > > > > > > >>> > > > > > > > >>> We have a similar approach but on a much, much smaller > scale. > > > We > > > > > > > >>> now have>>> <100 “things to process” but expect it to grow > to > > > > under > > > > > > > ~200. Each>> “thing > > > > > > > >>> to process” has the same workflow so we have a single DAG > > > > > > > >>> definition that>>> does about 20 tasks per, then we loop > over > > > the > > > > > > list > > > > > > > of items and > > > > > > > >>> produce>> a > > > > > > > >>> dag object for each one adding it to the global definition. > > > > > > > >>> > > > > > > > >>> One of the things we quickly ran into was crushing the > > > scheduler > > > > > > as>>> > > > > > > > everything was running with the same start time. To get around > > > > > > > >>> this we>> add > > > > > > > >>> noise to the start time minute and seconds. Simply index % > > 60. > > > > > > > This>>> spreads out the load so that the scheduler isn’t trying > > to > > > > run > > > > > > > >>> everything>>> at the exact same moment. I would suggest if > > you > > > > do > > > > > go > > > > > > > this > > > > > > > >>> route, to>> also > > > > > > > >>> stagger your hours if you can because of how many you plan > to > > > > run. > > > > > > > >> Perhaps > > > > > > > >>> your DAGs are smaller and aren’t as CPU intensive as ours. > > > > > > > >>> > > > > > > > >>> On 3/21/18, 1:35 PM, "Kyle Hamlin" <[email protected]> > > > wrote: > > > > > > > >>> > > > > > > > >>> Hello, > > > > > > > >>> > > > > > > > >>> I'm currently using Airflow for some ETL tasks where I > > > submit a > > > > > > > >>> spark>>> job > > > > > > > >>> to a cluster and poll till it is complete. This workflow > is > > > > nice > > > > > > > >>> because it > > > > > > > >>> is typically a single Dag. I'm now starting to do more > > > machine > > > > > > > >> learning > > > > > > > >>> tasks and need to build a model per client which is 1000+ > > > > > > > >>> clients. My>>> spark cluster is capable of handling > this > > > > > > workload, > > > > > > > however, it > > > > > > > >> doesn't > > > > > > > >>> seem scalable to write 1000+ dags to fit models for each > > > > client. > > > > > > I>> > > > > > > > want > > > > > > > >>> each client to have its own task instance so it can be > > > retried > > > > > > > >>> if it>>> fails without having to run all 1000+ tasks > over > > > > > again. > > > > > > > How do I > > > > > > > >> handle > > > > > > > >>> this type of workflow in Airflow? > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
