topherinternational commented on issue #40119:
URL: https://github.com/apache/airflow/issues/40119#issuecomment-2282438922

   I poked at this a bit and was able to repro from the DAGs in the zip file 
above. It seems like the key discrepancy is tasks created within a DAG 
context/with a DAG argument, versus tasks whose DAG is assigned when the task 
is set as a dependency of another task. To wit, if a task is assigned to a DAG 
after the task object is created, the DAG default_args are not applied to the 
task (or, the DAG must be known at task object creation time for the 
default_args to take effect). 
   
   I can get the desired behavior (the failure callback is called) if I change 
the demonstration code like this:
   
   ```
   ssh_operator = SSHOperator(
       task_id='ssh_operator',
       command='donothing' ,
       ssh_hook=ssh_hook,
       do_xcom_push = True,
   +   dag=ONFAILURECALLBACK,
       )
   ```
   
   @trlopes1974 is there a particular reason you want to assign the DAG only to 
the first task and use the dependency assignment to carry that DAG to the other 
tasks? In my experience it's always good to create all tasks inside a `with 
<my_dag>:` block so it's impossible to forget to link the tasks to the DAG, and 
this brings the added benefit of applying the `default_args`, including 
callback params, to all of the tasks.
   
   The rest of this comment is detail for the maintainers.
   
   #### Technical Discussion
   
   Looking through the base_operator code, DAG default_args are referenced when 
a task/operator is instantiated and a DAG is available; `apply_defaults` and 
`_get_parent_defaults` are relevant functions. Setting the DAG when the 
operator is created means the default_args are read from the DAG and set on the 
task fields.
   
   To discuss how a DAG is assigned to a task after task creation, here is a 
simplified, annotated snippet of @trlopes1974's demonstration code:
   ```
   test1 = PythonOperator(...)  # no dag arg
   ssh_operator = SSHOperator(...)  # no dag arg
   start_task = EmptyOperator(..., dag=ONFAILURECALLBACK)
   
   start_task >> test1 >> ssh_operator  # dependencies configured, test1 and 
ssh_operator get DAG assigned
   ```
   
   `start_task` is assigned the DAG on creation via the `dag` argument, and the 
`test1` task has the DAG set when it is configured as a downstream dependency 
of `start_task`. `ssh_operator` is then assigned the DAG when it is configured 
as a downstream dependency of `test1`.
   
   These `>>`/`<<` operators (`rshift`/`lshift` dunder methods) delegate to 
`DAGNode._set_relatives()`. If this method receives a task with a parent DAG 
and a task with no parent DAG, it will assign the first task's DAG to the 
second task. Critically, neither this function nor the `DAG.add_task()` called 
by this function recompute the task parameters in light of the newly-assigned 
DAG and its possible `default_args`.
   
   IOW, as said above, if a task is not instantiated inside of a DAG context, 
OR lacks a dag keyword arg, it looks like the task code never accesses the 
parent DAG default_args. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to