We have done this a lot, and the one issue is that every time the DAG is evaluated (even during a run), the SQL will be re-run, and tasks can vary. In fact, we had a select statement that actually marked items as in process during select, and THAT was bad.
We have moved to x number of tasks, and each one grabs a line from the DB, and 0 to n of them can actually get skipped if they don't get a line from the DB. To be clear, we would really like the DAG's tasks to be frozen at time of schedule, but that has not been our experience, and I believe will take a fairly major re-factor. Furthermore, I believe that the definition of a Dynamic Acyclic Graph is that it is re-evaluated during runtime and that the path is non-determinate at runtime. Thanks, Ben *--* *ben tallman* | *apigee <http://t.sidekickopen06.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nMJW7t5XZs4WJfgqW4WJj7n3MP7VWW3LqXLC56dWRRf2H8CkP02?t=http%3A%2F%2Fwww.apigee.com%2F&si=5141814536306688&pi=999a610c-8298-4095-eefd-dfab06b90c1f>* | m: +1.503.680.5709 | o: +1.503.608.7552 | twitter @anonymousmanage <http://t.sidekickopen06.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nMJW7t5XZs4WJfgqW4WJj7n3MP7VWW3LqXLC56dWRRf2H8CkP02?t=http%3A%2F%2Ftwitter.com%2Fanonymousmanage&si=5141814536306688&pi=999a610c-8298-4095-eefd-dfab06b90c1f> @apigee <http://t.sidekickopen06.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nMJW7t5XZs4WJfgqW4WJj7n3MP7VWW3LqXLC56dWRRf2H8CkP02?t=https%3A%2F%2Ftwitter.com%2Fapigee&si=5141814536306688&pi=999a610c-8298-4095-eefd-dfab06b90c1f> <http://t.sidekickopen06.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nMJW7t5XZs4WJfgqW4WJj7n3MP7VWW3LqXLC56dWRRf2H8CkP02?t=http%3A%2F%2Fadapt.apigee.com%2F&si=5141814536306688&pi=999a610c-8298-4095-eefd-dfab06b90c1f> On Thu, Sep 8, 2016 at 1:50 PM, J C Lawrence <[email protected]> wrote: > I have a few hundred thousand files arriving from an external service > each day and would like to ETL their contents into my store with > Airflow. As the files are large and numerous and slow to process, I'd > also like to process them in parallel...so I thought something like > this: > > def sub_dag ( > parent_dag_name, > child_dag_name, > start_date, > schedule_interval): > dag = DAG( > "%s.%s" % (parent_dag_name, child_dag_name), > schedule_interval = schedule_interval, > start_date = start_date, > ) > fan_out = operators.DummyOperator( > task_id = "fan_out", > dag = dag, > ) > fan_in = operators.DummyOperator( > task_id = "fan_in", > dag = dag, > ) > cur = hooks.PostgresHook ("MY_DB").get_cursor () > cur.execute ("""SELECT file_id > FROM some_table > WHERE something;""".format (foo = func(start_date)) > for rec in cur: > fid = rec[0] > o = operators.PythonOperator ( > task_id = "ImportThing__%s" % fid, > provide_context = True, > python_callable = import_func, > params = {"file_id": fid,}, > dag = dag) > o.set_upstream (fan_out) > o.set_downstream (fan_in) > cur.close () > return dag > > The idea being that the number and identity of the tasks in the sub-DAG > would vary dynamically depending on what day it was running for (ie > which what rows come back from the query for that day). But...no, this > doesn't seem to work. > > Any recommendations for how to approach this? > > -- JCL >
