Thanks Alek, this is an interesting alternative approach that would
accomplish what in looking for. I've got 24 such data staging tasks in a
daily dag, so going from 2 to 4 take per data source is only mildly more
work.

The sensor was an s3 prefix sensor from
https://airflow.incubator.apache.org/_modules/airflow/operators/sensors.html
not a short circuit.

I'd have to reimplement or wrap it in some way as the proposed branch
operator.

I appreciate the suggestion and will likely use it, yet it seems to me that
the sensor operator is missing functionality if the skip status can't
propegate down/over dependencies and still trigger some clean up tasks.
-Daniel

On Sat, Oct 14, 2017, 5:25 AM Alek Storm <[email protected]> wrote:

> Hi Daniel,
>
> If you don’t find it too unwieldy, the following should satisfy your use
> case. It basically converts what I assume was your use of
> ShortCircuitOperator into a BranchPythonOperator with a dummy task. Try
> running it with the environment variable FOO_SKIP=true to see the other
> possible path. Let me know if you have any questions.
>
> from datetime import datetimeimport os
> from airflow.models import DAGfrom airflow.operators.bash_operator
> import BashOperatorfrom airflow.operators.python_operator import
> BranchPythonOperatorfrom airflow.operators.dummy_operator import
> DummyOperator
>
> default_args = {
>     'owner': 'airflow',
>     'start_date': datetime(2017, 1, 1),
>     'concurrency': 16,
> }
> with DAG('foo', default_args=default_args, schedule_interval='@once') as
> dag:
>     sensor_dataA = BranchPythonOperator(
>         task_id='sensor_dataA',
>         python_callable=lambda: 'skipA' if os.environ.get('FOO_SKIP',
> 'false') == 'true' else 'preprocess_and_stage_dataA')
>     sensor_dataB = BranchPythonOperator(
>         task_id='sensor_dataB',
>         python_callable=lambda: 'skipB' if os.environ.get('FOO_SKIP',
> 'false') == 'true' else 'preprocess_and_stage_dataB')
>     sensor_dataC = BranchPythonOperator(
>         task_id='sensor_dataC',
>         python_callable=lambda: 'skipC' if os.environ.get('FOO_SKIP',
> 'false') == 'true' else 'preprocess_and_stage_dataC')
>
>     preprocess_and_stage_dataA = BashOperator(
>         task_id='preprocess_and_stage_dataA',
>         bash_command='echo {{ti.task_id}}')
>     preprocess_and_stage_dataB = BashOperator(
>         task_id='preprocess_and_stage_dataB',
>         bash_command='echo {{ti.task_id}}')
>     preprocess_and_stage_dataC = BashOperator(
>         task_id='preprocess_and_stage_dataC',
>         bash_command='echo {{ti.task_id}}')
>
>     skipA = DummyOperator(
>         task_id='skipA')
>     skipB = DummyOperator(
>         task_id='skipB')
>     skipC = DummyOperator(
>         task_id='skipC')
>
>     joinA = DummyOperator(
>         task_id='joinA',
>         trigger_rule='one_success')
>     joinB = DummyOperator(
>         task_id='joinB',
>         trigger_rule='one_success')
>     joinC = DummyOperator(
>         task_id='joinC',
>         trigger_rule='one_success')
>
>     process_stagesABC = BashOperator(
>         task_id='process_stagesABC',
>         bash_command='echo {{ti.task_id}}')
>
>     cleanup = BashOperator(
>         task_id='cleanup',
>         bash_command='echo {{ti.task_id}}')
>
>     sensor_dataA >> preprocess_and_stage_dataA
>     sensor_dataA >> skipA
>     sensor_dataB >> preprocess_and_stage_dataB
>     sensor_dataB >> skipB
>     sensor_dataC >> preprocess_and_stage_dataC
>     sensor_dataC >> skipC
>
>     preprocess_and_stage_dataA >> joinA
>     skipA >> joinA
>     preprocess_and_stage_dataB >> joinB
>     skipB >> joinB
>     preprocess_and_stage_dataC >> joinC
>     skipC >> joinC
>
>     joinA >> process_stagesABC
>     joinB >> process_stagesABC
>     joinC >> process_stagesABC
>
>     process_stagesABC >> cleanup
>
> Best,
> Alek
> ​
>
> On Thu, Oct 12, 2017 at 1:43 AM, Daniel Lamblin [Data Science & Platform
> Center] <[email protected]> wrote:
>
> > I hope this is an alright place to ask the following:
> > In a case where some inputs will irregularly be missing, but where it's
> > okay, I was reading
> > https://airflow.incubator.apache.org/concepts.html#trigger-rules
> > and I thought I needed `all_done` for a final task, but a skip is not a
> > done state, nor does it (seem to) propagate.
> > Is there a way to trigger something after all upstreams are either
> > successful or skipped?
> >
> > My case looks a little like:
> > sensor_dataA >> preprocess_and_stage_dataA >> process_stagesABC >>
> clean_up
> > sensor_dataB >> preprocess_and_stage_dataB >> process_stagesABC
> > sensor_dataC >> preprocess_and_stage_dataC >> process_stagesABC
> >
> > I don't want the preprocess to fail because the data isn't there and
> there
> > will be side-effects, but if the sensor skips its associated
> > preprocess_and_stage is not queued. The task doesn't seem to have any
> state
> > (like `upstream_skipped`?) so process_stagesABC won't be triggered by
> > `all_done`. `one_success` seems like it would be prefect except that it
> > would start before all preprocess tasks have been either run or skipped.
> >
> > Am I missing a way that this can be done? Is there some general guide to
> > changing the DAG structure that would handle completing the process? Am I
> > supposed to be using XCOM here?
> >
> > If all these answers are "no/maybe" then is there some opportunity to
> > introduce an `upstream_skipped` state or a different `trigger_rule`... a
> > kludgy `SkipAheadOperator`, or something?
> >
> > Thanks,
> > -Daniel Lamblin
> >
>

Reply via email to