@Kyle, I do something similar and have run into the problems you've
mentioned. In my case, I access data from S3 and then generate separate
DAGs (of different structures) based on the data that's pulled. I've
also found that the UI for accessing a single large DAG is slow so I
prefer to keep many separate DAGs. What I'd try is to define a DAG
that's responsible for accessing your API and caching the client IDs
somewhere locally, maybe just to a file on disk or as an Airflow
Variable. You can run this DAG on whatever schedule is appropriate for
you. From there, build a function that creates a DAG and then for each
client ID, register a DAG built by that function to the global context.
Like this:
def create_client_dag(client_id):
# build dag here
def get_client_ids_locally():
# access the data that was pulled from the API
client_ids = get_client_ids_locally()
for client in client_ids:
dag = create_client_dag(client)
globals()[dag.dag_id] = dag
This approach also handles removing client IDs somewhat gracefully. DAGs
for removed clients will still appear in the UI (you can build a
maintenance DAG to clean that up), but they'll be disabled and their
tasks won't be scheduled.
On Wed, Mar 21, 2018, at 7:32 PM, Kyle Hamlin wrote:
> Thanks for all the responses let me try to address the main themes.
>
> @Ace @Nicholas @Taylor
> I originally started with a loop over my list of client ids and
> created a> SparkSubmitOperator for each client. The pseudo code would look
> something> like this:
>
> dag = DAG(...)
>
> client_ids = get_client_ids()
> for client_id in client_ids:
> SparkSubmitOperator(
> ...
> dag=dag
> )
>
> I found that this approach kind of clunky for a few reasons.
> First, the> get_cleint_ids() function was hitting our API every time the dag
> was read> by the scheduler which seemed excessive (every 30 seconds or
> so?). Second,> it seemed like when a single task failure made marked the whole
> dag as a> failure, but I guess retrying till the task worked could solve
> this? Third,> the UI gets really clunky and slow, basically unusable when it
> tries to> render the graph view for that many tasks. Finally, Airflow
> doesn't seem> very happy when client_ids are removed i.e. the get_client_ids()
> no longer> returns a specific client_id, it really seems to want a static dag.
>
> Do I really have to poll and API or database every 30 seconds for this>
> dynamic client_id data?
>
> @Ace
> I have been limiting concurrency so as to not blast the cluster
>
> @Nicholas
> Thank you for the noise suggestion I will definitely implement
> that if I> continue with the same methodology
>
> @Taylor
> Are you using a SubDagOperator? Or is your process similar to the
> pseudo code I wrote above?
>
>
> On Wed, Mar 21, 2018 at 2:49 PM Taylor Edmiston
> <[email protected]> wrote:>
>> We also use a similar approach to generate dynamic DAGs based on
>> a common>> template DAG file. We pull in the list of config objects, one
>> per DAG,>> from an internal API lightly wrapping the database, then we
>> cache that>> response in a Airflow Variable that gets updated once a minute.
>> The>> dynamic DAGs are generated from that variable.
>>
>> *Taylor Edmiston*
>> TEdmiston.com <https://www.tedmiston.com/> | Blog
>> <http://blog.tedmiston.com>
>> Stack Overflow CV <https://stackoverflow.com/story/taylor> | LinkedIn>>
>> <https://www.linkedin.com/in/tedmiston/> | AngelList
>> <https://angel.co/taylor>
>>
>>
>> On Wed, Mar 21, 2018 at 1:54 PM, Nicolas Kijak <
>> [email protected]> wrote:
>>
>>> Kyle,
>>>
>>> We have a similar approach but on a much, much smaller scale. We
>>> now have>>> <100 “things to process” but expect it to grow to under ~200.
>>> Each>> “thing
>>> to process” has the same workflow so we have a single DAG
>>> definition that>>> does about 20 tasks per, then we loop over the list of
>>> items and
>>> produce>> a
>>> dag object for each one adding it to the global definition.
>>>
>>> One of the things we quickly ran into was crushing the scheduler as>>>
>>> everything was running with the same start time. To get around
>>> this we>> add
>>> noise to the start time minute and seconds. Simply index % 60. This>>>
>>> spreads out the load so that the scheduler isn’t trying to run
>>> everything>>> at the exact same moment. I would suggest if you do go this
>>> route, to>> also
>>> stagger your hours if you can because of how many you plan to run.
>> Perhaps
>>> your DAGs are smaller and aren’t as CPU intensive as ours.
>>>
>>> On 3/21/18, 1:35 PM, "Kyle Hamlin" <[email protected]> wrote:
>>>
>>> Hello,
>>>
>>> I'm currently using Airflow for some ETL tasks where I submit a
>>> spark>>> job
>>> to a cluster and poll till it is complete. This workflow is nice
>>> because it
>>> is typically a single Dag. I'm now starting to do more machine
>> learning
>>> tasks and need to build a model per client which is 1000+
>>> clients. My>>> spark cluster is capable of handling this workload,
>>> however, it
>> doesn't
>>> seem scalable to write 1000+ dags to fit models for each client. I>> want
>>> each client to have its own task instance so it can be retried
>>> if it>>> fails without having to run all 1000+ tasks over again. How do
>>> I
>> handle
>>> this type of workflow in Airflow?
>>>
>>>
>>>
>>