Hi Daniel,
If you don’t find it too unwieldy, the following should satisfy your use
case. It basically converts what I assume was your use of
ShortCircuitOperator into a BranchPythonOperator with a dummy task. Try
running it with the environment variable FOO_SKIP=true to see the other
possible path. Let me know if you have any questions.
from datetime import datetimeimport os
from airflow.models import DAGfrom airflow.operators.bash_operator
import BashOperatorfrom airflow.operators.python_operator import
BranchPythonOperatorfrom airflow.operators.dummy_operator import
DummyOperator
default_args = {
'owner': 'airflow',
'start_date': datetime(2017, 1, 1),
'concurrency': 16,
}
with DAG('foo', default_args=default_args, schedule_interval='@once') as dag:
sensor_dataA = BranchPythonOperator(
task_id='sensor_dataA',
python_callable=lambda: 'skipA' if os.environ.get('FOO_SKIP',
'false') == 'true' else 'preprocess_and_stage_dataA')
sensor_dataB = BranchPythonOperator(
task_id='sensor_dataB',
python_callable=lambda: 'skipB' if os.environ.get('FOO_SKIP',
'false') == 'true' else 'preprocess_and_stage_dataB')
sensor_dataC = BranchPythonOperator(
task_id='sensor_dataC',
python_callable=lambda: 'skipC' if os.environ.get('FOO_SKIP',
'false') == 'true' else 'preprocess_and_stage_dataC')
preprocess_and_stage_dataA = BashOperator(
task_id='preprocess_and_stage_dataA',
bash_command='echo {{ti.task_id}}')
preprocess_and_stage_dataB = BashOperator(
task_id='preprocess_and_stage_dataB',
bash_command='echo {{ti.task_id}}')
preprocess_and_stage_dataC = BashOperator(
task_id='preprocess_and_stage_dataC',
bash_command='echo {{ti.task_id}}')
skipA = DummyOperator(
task_id='skipA')
skipB = DummyOperator(
task_id='skipB')
skipC = DummyOperator(
task_id='skipC')
joinA = DummyOperator(
task_id='joinA',
trigger_rule='one_success')
joinB = DummyOperator(
task_id='joinB',
trigger_rule='one_success')
joinC = DummyOperator(
task_id='joinC',
trigger_rule='one_success')
process_stagesABC = BashOperator(
task_id='process_stagesABC',
bash_command='echo {{ti.task_id}}')
cleanup = BashOperator(
task_id='cleanup',
bash_command='echo {{ti.task_id}}')
sensor_dataA >> preprocess_and_stage_dataA
sensor_dataA >> skipA
sensor_dataB >> preprocess_and_stage_dataB
sensor_dataB >> skipB
sensor_dataC >> preprocess_and_stage_dataC
sensor_dataC >> skipC
preprocess_and_stage_dataA >> joinA
skipA >> joinA
preprocess_and_stage_dataB >> joinB
skipB >> joinB
preprocess_and_stage_dataC >> joinC
skipC >> joinC
joinA >> process_stagesABC
joinB >> process_stagesABC
joinC >> process_stagesABC
process_stagesABC >> cleanup
Best,
Alek
On Thu, Oct 12, 2017 at 1:43 AM, Daniel Lamblin [Data Science & Platform
Center] <[email protected]> wrote:
> I hope this is an alright place to ask the following:
> In a case where some inputs will irregularly be missing, but where it's
> okay, I was reading
> https://airflow.incubator.apache.org/concepts.html#trigger-rules
> and I thought I needed `all_done` for a final task, but a skip is not a
> done state, nor does it (seem to) propagate.
> Is there a way to trigger something after all upstreams are either
> successful or skipped?
>
> My case looks a little like:
> sensor_dataA >> preprocess_and_stage_dataA >> process_stagesABC >> clean_up
> sensor_dataB >> preprocess_and_stage_dataB >> process_stagesABC
> sensor_dataC >> preprocess_and_stage_dataC >> process_stagesABC
>
> I don't want the preprocess to fail because the data isn't there and there
> will be side-effects, but if the sensor skips its associated
> preprocess_and_stage is not queued. The task doesn't seem to have any state
> (like `upstream_skipped`?) so process_stagesABC won't be triggered by
> `all_done`. `one_success` seems like it would be prefect except that it
> would start before all preprocess tasks have been either run or skipped.
>
> Am I missing a way that this can be done? Is there some general guide to
> changing the DAG structure that would handle completing the process? Am I
> supposed to be using XCOM here?
>
> If all these answers are "no/maybe" then is there some opportunity to
> introduce an `upstream_skipped` state or a different `trigger_rule`... a
> kludgy `SkipAheadOperator`, or something?
>
> Thanks,
> -Daniel Lamblin
>