I have a few hundred thousand files arriving from an external service
each day and would like to ETL their contents into my store with
Airflow. As the files are large and numerous and slow to process, I'd
also like to process them in parallel...so I thought something like
this:
def sub_dag (
parent_dag_name,
child_dag_name,
start_date,
schedule_interval):
dag = DAG(
"%s.%s" % (parent_dag_name, child_dag_name),
schedule_interval = schedule_interval,
start_date = start_date,
)
fan_out = operators.DummyOperator(
task_id = "fan_out",
dag = dag,
)
fan_in = operators.DummyOperator(
task_id = "fan_in",
dag = dag,
)
cur = hooks.PostgresHook ("MY_DB").get_cursor ()
cur.execute ("""SELECT file_id
FROM some_table
WHERE something;""".format (foo = func(start_date))
for rec in cur:
fid = rec[0]
o = operators.PythonOperator (
task_id = "ImportThing__%s" % fid,
provide_context = True,
python_callable = import_func,
params = {"file_id": fid,},
dag = dag)
o.set_upstream (fan_out)
o.set_downstream (fan_in)
cur.close ()
return dag
The idea being that the number and identity of the tasks in the sub-DAG
would vary dynamically depending on what day it was running for (ie
which what rows come back from the query for that day). But...no, this
doesn't seem to work.
Any recommendations for how to approach this?
-- JCL