topherinternational commented on issue #40119:
URL: https://github.com/apache/airflow/issues/40119#issuecomment-2282438922
I poked at this a bit and was able to repro from the DAGs in the zip file
above. It seems like the key discrepancy is tasks created within a DAG
context/with a DAG argument, versus tasks whose DAG is assigned when the task
is set as a dependency of another task. To wit, if a task is assigned to a DAG
after the task object is created, the DAG default_args are not applied to the
task (or, the DAG must be known at task object creation time for the
default_args to take effect).
I can get the desired behavior (the failure callback is called) if I change
the demonstration code like this:
```
ssh_operator = SSHOperator(
task_id='ssh_operator',
command='donothing' ,
ssh_hook=ssh_hook,
do_xcom_push = True,
+ dag=ONFAILURECALLBACK,
)
```
@trlopes1974 is there a particular reason you want to assign the DAG only to
the first task and use the dependency assignment to carry that DAG to the other
tasks? In my experience it's always good to create all tasks inside a `with
<my_dag>:` block so it's impossible to forget to link the tasks to the DAG, and
this brings the added benefit of applying the `default_args`, including
callback params, to all of the tasks.
The rest of this comment is detail for the maintainers.
#### Technical Discussion
Looking through the base_operator code, DAG default_args are referenced when
a task/operator is instantiated and a DAG is available; `apply_defaults` and
`_get_parent_defaults` are relevant functions. Setting the DAG when the
operator is created means the default_args are read from the DAG and set on the
task fields.
To discuss how a DAG is assigned to a task after task creation, here is a
simplified, annotated snippet of @trlopes1974's demonstration code:
```
test1 = PythonOperator(...) # no dag arg
ssh_operator = SSHOperator(...) # no dag arg
start_task = EmptyOperator(..., dag=ONFAILURECALLBACK)
start_task >> test1 >> ssh_operator # dependencies configured, test1 and
ssh_operator get DAG assigned
```
`start_task` is assigned the DAG on creation via the `dag` argument, and the
`test1` task has the DAG set when it is configured as a downstream dependency
of `start_task`. `ssh_operator` is then assigned the DAG when it is configured
as a downstream dependency of `test1`.
These `>>`/`<<` operators (`rshift`/`lshift` dunder methods) delegate to
`DAGNode._set_relatives()`. If this method receives a task with a parent DAG
and a task with no parent DAG, it will assign the first task's DAG to the
second task. Critically, neither this function nor the `DAG.add_task()` called
by this function recompute the task parameters in light of the newly-assigned
DAG and its possible `default_args`.
IOW, as said above, if a task is not instantiated inside of a DAG context,
OR lacks a dag keyword arg, it looks like the task code never accesses the
parent DAG default_args.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]