Sabeer Zaman created AIRFLOW-104:
------------------------------------

             Summary: State of `ExternalTaskSensor` task when the external task 
is marked as "failure"
                 Key: AIRFLOW-104
                 URL: https://issues.apache.org/jira/browse/AIRFLOW-104
             Project: Apache Airflow
          Issue Type: Improvement
            Reporter: Sabeer Zaman
            Priority: Minor


Dear Airflow Maintainers,

Before I tell you about my issue, let me describe my environment:

h3. Environment

* Version of Airflow: v1.6.2
* Airflow components and configuration: Running with CeleryExecutor (separate 
docker containers running webserver, worker, rabbitmq and mysql db)
* Operating System: {{Darwin Kernel Version 15.3.0: Thu Dec 10 18:40:58 PST 
2015; root:xnu-3248.30.4~1/RELEASE_X86_64 x86_64}}
* Python Version: 2.7.6

Now that you know a little about me, let me tell you about the issue I am 
having:

h3. Description of Issue

I created two DAGs - let's call them {{dag_a}} and {{dag_b}}. One of the tasks 
in {{dag_b}} is an {{ExternalTaskSensor}} referencing a task with 
{{task_id="external_task"}} in {{dag_a}}. So the code looked as shown below:

{code}
# in DAG definition for "dag_a"
# ... imports, boilerplate setup - e.g., defining `default_args`
dag = DAG(dag_id="dag_a", default_args=default_args, schedule_interval="0 0 * * 
*",)
external_task = DummyOperator(
  task_id="external_task",
  dag=dag,
)
{code}
{code}
# in DAG definition for "dag_b"
# ... imports, boilerplate setup - e.g., defining `default_args`
dag = DAG(dag_id="dag_b", default_args=default_args, schedule_interval="0 0 * * 
*",)
task_sensor = ExternalTaskSensor(
  task_id="dag_a.external_task",
  external_dag_id="dag_a",
  external_task_id="external_task",
  dag=dag,
)
{code}

To test failure behavior, I marked the task with {{task_id="external_task"}} in 
{{dag_a}} as "failed" (for a particular execution date). I then ran the 
backfill for the _same execution date_ for {{dag_b}}.

* What did you expect to happen?
** I expected the task named {{"dag_a.external_task"}} in {{dag_b}} to be 
marked either as {{failed}} or {{upstream_failed}}, since the actual task it 
was referencing in {{dag_a}} failed.
* What happened instead?
** The log for the task {{"dag_a.external_task"}} in {{dag_b}} showed that it 
kept poking {{external_task}} in {{dag_a}} every minute

h3. Requested Change 

Looking at the logic in the [{{poke}} function for the 
{{ExternalTaskSensor}}|https://github.com/airbnb/airflow/blob/1.7.0/airflow/operators/sensors.py#L178-L200],
 it's evident that it's acting as a regular Airflow Sensor and just waiting 
until something becomes true, and is in no way coupling the state of the 
current task with the state of the external task.

That being said, is it reasonable to request such behavior (i.e., the 
{{ExternalTaskSensor}}'s state is set to failed if the task it's waiting on is 
marked as {{failed}})? I'd be willing to take a stab at adding the logic, but 
I'd like to make sure that this is in line with how this Sensor's intended 
behavior, or if there's a suggested alternative way of achieving this behavior.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to