Sabeer Zaman created AIRFLOW-104: ------------------------------------ Summary: State of `ExternalTaskSensor` task when the external task is marked as "failure" Key: AIRFLOW-104 URL: https://issues.apache.org/jira/browse/AIRFLOW-104 Project: Apache Airflow Issue Type: Improvement Reporter: Sabeer Zaman Priority: Minor
Dear Airflow Maintainers, Before I tell you about my issue, let me describe my environment: h3. Environment * Version of Airflow: v1.6.2 * Airflow components and configuration: Running with CeleryExecutor (separate docker containers running webserver, worker, rabbitmq and mysql db) * Operating System: {{Darwin Kernel Version 15.3.0: Thu Dec 10 18:40:58 PST 2015; root:xnu-3248.30.4~1/RELEASE_X86_64 x86_64}} * Python Version: 2.7.6 Now that you know a little about me, let me tell you about the issue I am having: h3. Description of Issue I created two DAGs - let's call them {{dag_a}} and {{dag_b}}. One of the tasks in {{dag_b}} is an {{ExternalTaskSensor}} referencing a task with {{task_id="external_task"}} in {{dag_a}}. So the code looked as shown below: {code} # in DAG definition for "dag_a" # ... imports, boilerplate setup - e.g., defining `default_args` dag = DAG(dag_id="dag_a", default_args=default_args, schedule_interval="0 0 * * *",) external_task = DummyOperator( task_id="external_task", dag=dag, ) {code} {code} # in DAG definition for "dag_b" # ... imports, boilerplate setup - e.g., defining `default_args` dag = DAG(dag_id="dag_b", default_args=default_args, schedule_interval="0 0 * * *",) task_sensor = ExternalTaskSensor( task_id="dag_a.external_task", external_dag_id="dag_a", external_task_id="external_task", dag=dag, ) {code} To test failure behavior, I marked the task with {{task_id="external_task"}} in {{dag_a}} as "failed" (for a particular execution date). I then ran the backfill for the _same execution date_ for {{dag_b}}. * What did you expect to happen? ** I expected the task named {{"dag_a.external_task"}} in {{dag_b}} to be marked either as {{failed}} or {{upstream_failed}}, since the actual task it was referencing in {{dag_a}} failed. * What happened instead? ** The log for the task {{"dag_a.external_task"}} in {{dag_b}} showed that it kept poking {{external_task}} in {{dag_a}} every minute h3. Requested Change Looking at the logic in the [{{poke}} function for the {{ExternalTaskSensor}}|https://github.com/airbnb/airflow/blob/1.7.0/airflow/operators/sensors.py#L178-L200], it's evident that it's acting as a regular Airflow Sensor and just waiting until something becomes true, and is in no way coupling the state of the current task with the state of the external task. That being said, is it reasonable to request such behavior (i.e., the {{ExternalTaskSensor}}'s state is set to failed if the task it's waiting on is marked as {{failed}})? I'd be willing to take a stab at adding the logic, but I'd like to make sure that this is in line with how this Sensor's intended behavior, or if there's a suggested alternative way of achieving this behavior. -- This message was sent by Atlassian JIRA (v6.3.4#6332)