Thanks Alek, this is an interesting alternative approach that would accomplish what in looking for. I've got 24 such data staging tasks in a daily dag, so going from 2 to 4 take per data source is only mildly more work.
The sensor was an s3 prefix sensor from https://airflow.incubator.apache.org/_modules/airflow/operators/sensors.html not a short circuit. I'd have to reimplement or wrap it in some way as the proposed branch operator. I appreciate the suggestion and will likely use it, yet it seems to me that the sensor operator is missing functionality if the skip status can't propegate down/over dependencies and still trigger some clean up tasks. -Daniel On Sat, Oct 14, 2017, 5:25 AM Alek Storm <[email protected]> wrote: > Hi Daniel, > > If you don’t find it too unwieldy, the following should satisfy your use > case. It basically converts what I assume was your use of > ShortCircuitOperator into a BranchPythonOperator with a dummy task. Try > running it with the environment variable FOO_SKIP=true to see the other > possible path. Let me know if you have any questions. > > from datetime import datetimeimport os > from airflow.models import DAGfrom airflow.operators.bash_operator > import BashOperatorfrom airflow.operators.python_operator import > BranchPythonOperatorfrom airflow.operators.dummy_operator import > DummyOperator > > default_args = { > 'owner': 'airflow', > 'start_date': datetime(2017, 1, 1), > 'concurrency': 16, > } > with DAG('foo', default_args=default_args, schedule_interval='@once') as > dag: > sensor_dataA = BranchPythonOperator( > task_id='sensor_dataA', > python_callable=lambda: 'skipA' if os.environ.get('FOO_SKIP', > 'false') == 'true' else 'preprocess_and_stage_dataA') > sensor_dataB = BranchPythonOperator( > task_id='sensor_dataB', > python_callable=lambda: 'skipB' if os.environ.get('FOO_SKIP', > 'false') == 'true' else 'preprocess_and_stage_dataB') > sensor_dataC = BranchPythonOperator( > task_id='sensor_dataC', > python_callable=lambda: 'skipC' if os.environ.get('FOO_SKIP', > 'false') == 'true' else 'preprocess_and_stage_dataC') > > preprocess_and_stage_dataA = BashOperator( > task_id='preprocess_and_stage_dataA', > bash_command='echo {{ti.task_id}}') > preprocess_and_stage_dataB = BashOperator( > task_id='preprocess_and_stage_dataB', > bash_command='echo {{ti.task_id}}') > preprocess_and_stage_dataC = BashOperator( > task_id='preprocess_and_stage_dataC', > bash_command='echo {{ti.task_id}}') > > skipA = DummyOperator( > task_id='skipA') > skipB = DummyOperator( > task_id='skipB') > skipC = DummyOperator( > task_id='skipC') > > joinA = DummyOperator( > task_id='joinA', > trigger_rule='one_success') > joinB = DummyOperator( > task_id='joinB', > trigger_rule='one_success') > joinC = DummyOperator( > task_id='joinC', > trigger_rule='one_success') > > process_stagesABC = BashOperator( > task_id='process_stagesABC', > bash_command='echo {{ti.task_id}}') > > cleanup = BashOperator( > task_id='cleanup', > bash_command='echo {{ti.task_id}}') > > sensor_dataA >> preprocess_and_stage_dataA > sensor_dataA >> skipA > sensor_dataB >> preprocess_and_stage_dataB > sensor_dataB >> skipB > sensor_dataC >> preprocess_and_stage_dataC > sensor_dataC >> skipC > > preprocess_and_stage_dataA >> joinA > skipA >> joinA > preprocess_and_stage_dataB >> joinB > skipB >> joinB > preprocess_and_stage_dataC >> joinC > skipC >> joinC > > joinA >> process_stagesABC > joinB >> process_stagesABC > joinC >> process_stagesABC > > process_stagesABC >> cleanup > > Best, > Alek > > > On Thu, Oct 12, 2017 at 1:43 AM, Daniel Lamblin [Data Science & Platform > Center] <[email protected]> wrote: > > > I hope this is an alright place to ask the following: > > In a case where some inputs will irregularly be missing, but where it's > > okay, I was reading > > https://airflow.incubator.apache.org/concepts.html#trigger-rules > > and I thought I needed `all_done` for a final task, but a skip is not a > > done state, nor does it (seem to) propagate. > > Is there a way to trigger something after all upstreams are either > > successful or skipped? > > > > My case looks a little like: > > sensor_dataA >> preprocess_and_stage_dataA >> process_stagesABC >> > clean_up > > sensor_dataB >> preprocess_and_stage_dataB >> process_stagesABC > > sensor_dataC >> preprocess_and_stage_dataC >> process_stagesABC > > > > I don't want the preprocess to fail because the data isn't there and > there > > will be side-effects, but if the sensor skips its associated > > preprocess_and_stage is not queued. The task doesn't seem to have any > state > > (like `upstream_skipped`?) so process_stagesABC won't be triggered by > > `all_done`. `one_success` seems like it would be prefect except that it > > would start before all preprocess tasks have been either run or skipped. > > > > Am I missing a way that this can be done? Is there some general guide to > > changing the DAG structure that would handle completing the process? Am I > > supposed to be using XCOM here? > > > > If all these answers are "no/maybe" then is there some opportunity to > > introduce an `upstream_skipped` state or a different `trigger_rule`... a > > kludgy `SkipAheadOperator`, or something? > > > > Thanks, > > -Daniel Lamblin > > >
