Hi, Airflow is built with the assumption that a task is idempotent and atomic. This means that computation task need to either take place in a remote system, or if they are taking place in an Airflow task, this same task should fetch the data, compute and send it somewhere persistent off of the worker before declaring it succeeded.
Knowing this, I'd advise against leaving things on a specific worker for another task to pick up. It's important for distributed systems to have this sort of assumption. A trick or workaround here would be to use a SequentialExecutor in your SubDagOperator, this insure that tasks are run in process within your SubDag task. If your SubDag task fails in step 2 or 3, you'll have to treat the SubDag as being atomic by clearing the SubDag task to insure all 3 tasks. You'll understand that worker 1 might have trash to cleanup at that point. The clean way is to use PythonOperator and make your 3 steps run within that one task. You could use `models.Log` to leave markers within it if it helps. We could expose a more builtin way of leaving track or metadata around checkpoints / progress eventually. Max On Thu, May 19, 2016 at 9:19 AM, Chris Riccomini <[email protected]> wrote: > Hey Paul, > > I believe what you're asking for is subdag-worker affinity. As far as I > know, this hasn't been implemented. > > You might be able to use pools, and assign only a single worker to the > pool, but I haven't tried this. This also runs the risk of limiting your > throughput, since the pool-worker mapping is always just to a single worker > (to prevent subdags from getting load balanced on to other workers). > > Cheers, > Chris > > On Thu, May 19, 2016 at 8:07 AM, Ryabchuk, Pavlo < > [email protected]> wrote: > > > Hello, > > > > I am trying to fully use Airflow analytics and trying to make my tasks as > > granular as possible still having the benefit of CeleryExecutor present. > > In general I want to make my DAG to consist of 100+ Subgags which are > > actually distributed by Celery, but I want to have subdag tasks all > > executed on same worker instance. > > SubDAG is in general this: copyData data from S3 to instance -> run > > calculation -> copy result to S3. The reason I want to split it into 3 > > tasks is to have ability to measure pure calculation time and aply SLA to > > it and also get better statisticts on copy operations. > > So the main question is how to execute tasks of SubDAG one after another > > on same instance? > > > > I've came across this issue/workaround here > > https://issues.apache.org/jira/browse/AIRFLOW-74, but I believe it won't > > solve my issue. > > If it is not supported and I am not missing some magic configuration :) > > but still could be implemented with relatively small effort - I am in :) > > > > Best, > > Paul > > >
