Thanks for all the responses let me try to address the main themes.
@Ace @Nicholas @Taylor
I originally started with a loop over my list of client ids and created a
SparkSubmitOperator for each client. The pseudo code would look something
like this:
dag = DAG(...)
client_ids = get_client_ids()
for client_id in client_ids:
SparkSubmitOperator(
...
dag=dag
)
I found that this approach kind of clunky for a few reasons. First, the
get_cleint_ids() function was hitting our API every time the dag was read
by the scheduler which seemed excessive (every 30 seconds or so?). Second,
it seemed like when a single task failure made marked the whole dag as a
failure, but I guess retrying till the task worked could solve this? Third,
the UI gets really clunky and slow, basically unusable when it tries to
render the graph view for that many tasks. Finally, Airflow doesn't seem
very happy when client_ids are removed i.e. the get_client_ids() no longer
returns a specific client_id, it really seems to want a static dag.
Do I really have to poll and API or database every 30 seconds for this
dynamic client_id data?
@Ace
I have been limiting concurrency so as to not blast the cluster
@Nicholas
Thank you for the noise suggestion I will definitely implement that if I
continue with the same methodology
@Taylor
Are you using a SubDagOperator? Or is your process similar to the
pseudo code I wrote above?
On Wed, Mar 21, 2018 at 2:49 PM Taylor Edmiston <[email protected]> wrote:
> We also use a similar approach to generate dynamic DAGs based on a common
> template DAG file. We pull in the list of config objects, one per DAG,
> from an internal API lightly wrapping the database, then we cache that
> response in a Airflow Variable that gets updated once a minute. The
> dynamic DAGs are generated from that variable.
>
> *Taylor Edmiston*
> TEdmiston.com <https://www.tedmiston.com/> | Blog
> <http://blog.tedmiston.com>
> Stack Overflow CV <https://stackoverflow.com/story/taylor> | LinkedIn
> <https://www.linkedin.com/in/tedmiston/> | AngelList
> <https://angel.co/taylor>
>
>
> On Wed, Mar 21, 2018 at 1:54 PM, Nicolas Kijak <
> [email protected]> wrote:
>
> > Kyle,
> >
> > We have a similar approach but on a much, much smaller scale. We now have
> > <100 “things to process” but expect it to grow to under ~200. Each
> “thing
> > to process” has the same workflow so we have a single DAG definition that
> > does about 20 tasks per, then we loop over the list of items and produce
> a
> > dag object for each one adding it to the global definition.
> >
> > One of the things we quickly ran into was crushing the scheduler as
> > everything was running with the same start time. To get around this we
> add
> > noise to the start time minute and seconds. Simply index % 60. This
> > spreads out the load so that the scheduler isn’t trying to run everything
> > at the exact same moment. I would suggest if you do go this route, to
> also
> > stagger your hours if you can because of how many you plan to run.
> Perhaps
> > your DAGs are smaller and aren’t as CPU intensive as ours.
> >
> > On 3/21/18, 1:35 PM, "Kyle Hamlin" <[email protected]> wrote:
> >
> > Hello,
> >
> > I'm currently using Airflow for some ETL tasks where I submit a spark
> > job
> > to a cluster and poll till it is complete. This workflow is nice
> > because it
> > is typically a single Dag. I'm now starting to do more machine
> learning
> > tasks and need to build a model per client which is 1000+ clients. My
> > spark cluster is capable of handling this workload, however, it
> doesn't
> > seem scalable to write 1000+ dags to fit models for each client. I
> want
> > each client to have its own task instance so it can be retried if it
> > fails without having to run all 1000+ tasks over again. How do I
> handle
> > this type of workflow in Airflow?
> >
> >
> >
>