@Kyle, I do something similar and have run into the problems you've
mentioned. In my case, I access data from S3 and then generate separate
DAGs (of different structures) based on the data that's pulled. I've
also found that the UI for accessing a single large DAG is slow so I
prefer to keep many separate DAGs. What I'd try is to define a DAG
that's responsible for accessing your API and caching the client IDs
somewhere locally, maybe just to a file on disk or as an Airflow
Variable. You can run this DAG on whatever schedule is appropriate for
you. From there, build a function that creates a DAG and then for each
client ID, register a DAG built by that function to the global context.
Like this:
def create_client_dag(client_id):
    # build dag here

def get_client_ids_locally():
    # access the data that was pulled from the API

client_ids = get_client_ids_locally()
for client in client_ids:
    dag = create_client_dag(client)
    globals()[dag.dag_id] = dag

This approach also handles removing client IDs somewhat gracefully. DAGs
for removed clients will still appear in the UI (you can build a
maintenance DAG to clean that up), but they'll be disabled and their
tasks won't be scheduled.
On Wed, Mar 21, 2018, at 7:32 PM, Kyle Hamlin wrote:
> Thanks for all the responses let me try to address the main themes.
> 
> @Ace @Nicholas @Taylor
> I originally started with a loop over my list of client ids and
> created a> SparkSubmitOperator for each client. The pseudo code would look
> something> like this:
> 
> dag = DAG(...)
> 
> client_ids = get_client_ids()
> for client_id in client_ids:
>     SparkSubmitOperator(
>         ...
>         dag=dag
>     )
> 
> I found that this approach kind of clunky for a few reasons.
> First, the> get_cleint_ids() function was hitting our API every time the dag
> was read> by the scheduler which seemed excessive (every 30 seconds or
> so?). Second,> it seemed like when a single task failure made marked the whole
> dag as a> failure, but I guess retrying till the task worked could solve
> this? Third,> the UI gets really clunky and slow, basically unusable when it
> tries to> render the graph view for that many tasks. Finally, Airflow
> doesn't seem> very happy when client_ids are removed i.e. the get_client_ids()
> no longer> returns a specific client_id, it really seems to want a static dag.
> 
> Do I really have to poll and API or database every 30 seconds for this> 
> dynamic client_id data?
> 
> @Ace
> I have been limiting concurrency so as to not blast the cluster
> 
> @Nicholas
> Thank you for the noise suggestion I will definitely implement
> that if I> continue with the same methodology
> 
> @Taylor
> Are you using a SubDagOperator? Or is your process similar to the
> pseudo code I wrote above?
> 
> 
> On Wed, Mar 21, 2018 at 2:49 PM Taylor Edmiston
> <[email protected]> wrote:> 
>> We also use a similar approach to generate dynamic DAGs based on
>> a common>> template DAG file.  We pull in the list of config objects, one
>> per DAG,>> from an internal API lightly wrapping the database, then we
>> cache that>> response in a Airflow Variable that gets updated once a minute. 
>>  The>> dynamic DAGs are generated from that variable.
>> 
>> *Taylor Edmiston*
>> TEdmiston.com <https://www.tedmiston.com/> | Blog
>> <http://blog.tedmiston.com>
>> Stack Overflow CV <https://stackoverflow.com/story/taylor> | LinkedIn>> 
>> <https://www.linkedin.com/in/tedmiston/> | AngelList
>> <https://angel.co/taylor>
>> 
>> 
>> On Wed, Mar 21, 2018 at 1:54 PM, Nicolas Kijak <
>> [email protected]> wrote:
>> 
>>> Kyle,
>>> 
>>> We have a similar approach but on a much, much smaller scale. We
>>> now have>>> <100 “things to process” but expect it to grow to under ~200.  
>>> Each>> “thing
>>> to process” has the same workflow so we have a single DAG
>>> definition that>>> does about 20 tasks per, then we loop over the list of 
>>> items and
>>> produce>> a
>>> dag object for each one adding it to the global definition.
>>> 
>>> One of the things we quickly ran into was crushing the scheduler as>>> 
>>> everything was running with the same start time.  To get around
>>> this we>> add
>>> noise to the start time minute and seconds. Simply index % 60.  This>>> 
>>> spreads out the load so that the scheduler isn’t trying to run
>>> everything>>> at the exact same moment.  I would suggest if you do go this
>>> route, to>> also
>>> stagger your hours if you can because of how many you plan to run.
>> Perhaps
>>> your DAGs are smaller and aren’t as CPU intensive as ours.
>>> 
>>> On 3/21/18, 1:35 PM, "Kyle Hamlin" <[email protected]> wrote:
>>> 
>>>   Hello,
>>> 
>>>   I'm currently using Airflow for some ETL tasks where I submit a
>>>   spark>>> job
>>>   to a cluster and poll till it is complete. This workflow is nice
>>> because it
>>>   is typically a single Dag. I'm now starting to do more machine
>> learning
>>>   tasks and need to build a model per client which is 1000+
>>>   clients. My>>>   spark cluster is capable of handling this workload, 
>>> however, it
>> doesn't
>>>   seem scalable to write 1000+ dags to fit models for each client. I>> want
>>>   each client to have its own task instance so it can be retried
>>>   if it>>>   fails without having to run all 1000+ tasks over again. How do 
>>> I
>> handle
>>>   this type of workflow in Airflow?
>>> 
>>> 
>>> 
>> 

Reply via email to