Also, you might wish to add a Pool so that only N tasks go against your Postgres server at any one time. We have pools for all of our resources.
On Thu, Sep 8, 2016 at 2:04 PM, Ben Tallman <[email protected]> wrote: > We have done this a lot, and the one issue is that every time the DAG is > evaluated (even during a run), the SQL will be re-run, and tasks can vary. > In fact, we had a select statement that actually marked items as in process > during select, and THAT was bad. > > We have moved to x number of tasks, and each one grabs a line from the DB, > and 0 to n of them can actually get skipped if they don't get a line from > the DB. > > To be clear, we would really like the DAG's tasks to be frozen at time of > schedule, but that has not been our experience, and I believe will take a > fairly major re-factor. Furthermore, I believe that the definition of a > Dynamic Acyclic Graph is that it is re-evaluated during runtime and that > the path is non-determinate at runtime. > > > Thanks, > Ben > > *--* > *ben tallman* | *apigee > <http://t.sidekickopen06.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nM > JW7t5XZs4WJfgqW4WJj7n3MP7VWW3LqXLC56dWRRf2H8CkP02?t=http%3A% > 2F%2Fwww.apigee.com%2F&si=5141814536306688&pi=999a610c- > 8298-4095-eefd-dfab06b90c1f>* > | m: +1.503.680.5709 | o: +1.503.608.7552 | twitter @anonymousmanage > <http://t.sidekickopen06.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nM > JW7t5XZs4WJfgqW4WJj7n3MP7VWW3LqXLC56dWRRf2H8CkP02?t=http%3A% > 2F%2Ftwitter.com%2Fanonymousmanage&si=5141814536306688&pi=999a610c- > 8298-4095-eefd-dfab06b90c1f> > @apigee > <http://t.sidekickopen06.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nM > JW7t5XZs4WJfgqW4WJj7n3MP7VWW3LqXLC56dWRRf2H8CkP02?t=https% > 3A%2F%2Ftwitter.com%2Fapigee&si=5141814536306688&pi= > 999a610c-8298-4095-eefd-dfab06b90c1f> > <http://t.sidekickopen06.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nM > JW7t5XZs4WJfgqW4WJj7n3MP7VWW3LqXLC56dWRRf2H8CkP02?t=http%3A% > 2F%2Fadapt.apigee.com%2F&si=5141814536306688&pi=999a610c- > 8298-4095-eefd-dfab06b90c1f> > > On Thu, Sep 8, 2016 at 1:50 PM, J C Lawrence <[email protected]> wrote: > > > I have a few hundred thousand files arriving from an external service > > each day and would like to ETL their contents into my store with > > Airflow. As the files are large and numerous and slow to process, I'd > > also like to process them in parallel...so I thought something like > > this: > > > > def sub_dag ( > > parent_dag_name, > > child_dag_name, > > start_date, > > schedule_interval): > > dag = DAG( > > "%s.%s" % (parent_dag_name, child_dag_name), > > schedule_interval = schedule_interval, > > start_date = start_date, > > ) > > fan_out = operators.DummyOperator( > > task_id = "fan_out", > > dag = dag, > > ) > > fan_in = operators.DummyOperator( > > task_id = "fan_in", > > dag = dag, > > ) > > cur = hooks.PostgresHook ("MY_DB").get_cursor () > > cur.execute ("""SELECT file_id > > FROM some_table > > WHERE something;""".format (foo = func(start_date)) > > for rec in cur: > > fid = rec[0] > > o = operators.PythonOperator ( > > task_id = "ImportThing__%s" % fid, > > provide_context = True, > > python_callable = import_func, > > params = {"file_id": fid,}, > > dag = dag) > > o.set_upstream (fan_out) > > o.set_downstream (fan_in) > > cur.close () > > return dag > > > > The idea being that the number and identity of the tasks in the sub-DAG > > would vary dynamically depending on what day it was running for (ie > > which what rows come back from the query for that day). But...no, this > > doesn't seem to work. > > > > Any recommendations for how to approach this? > > > > -- JCL > > > -- Lance Norskog [email protected] Redwood City, CA
