trlopes1974 commented on issue #40119:
URL: https://github.com/apache/airflow/issues/40119#issuecomment-2163133621
@vatsrahul1001
Thanks for you input.
The problem with your example was that it was not calling the ssh_operator
task. I have no issues with python operators failing.
Yet , it gave-me the opportunity to further test.
I think that, for some reason, the way the dag is declared affects the way
the callbacks are also "attached" to the tasks/operators.
Using YOUR code and adding the ssh_operator task to the workflow ( with a
cmdline to return some error ) worked like a charm and the callback was invoked:
```
raise AirflowException(f"SSH operator error: exit status =
{exit_status}")
airflow.exceptions.AirflowException: SSH operator error: exit status = 127
[2024-06-12, 14:22:16 WEST] {taskinstance.py:562} DEBUG - Task Duration set
to 0.653776
[2024-06-12, 14:22:16 WEST] {taskinstance.py:584} DEBUG - Clearing
next_method and next_kwargs.
[2024-06-12, 14:22:16 WEST] {taskinstance.py:1206} INFO - Marking task as
FAILED. dag_id=on_failure_callback, task_id=ssh_operator,
run_id=scheduled__2018-10-31T00:00:00+00:00, execution_date=20181031T000000,
start_date=20240612T132215, end_date=20240612T132216
[2024-06-12, 14:22:16 WEST] {logging_mixin.py:188} INFO - I HAVE BEEN
CALLED! I HAVE BEEN CALLED! I HAVE BEEN CALLED! I HAVE BEEN CALLED!
```
So. I have adapted the code to reflect the behavior on "my" dag and I found
that the DAG declaration ( this is the "only" difference I can see" ) is
probably affecting the callbacks.
So, instead of using "with DAG..." format, I'm using "DAGNAMEVAR = DAG(
...)" syntax. and this seems to be the only reason for the callback to fail (
not being called)
**Non working callback example dag:**
```
from airflow import DAG
from airflow.providers.ssh.operators.ssh import SSHOperator
from airflow.operators.python import PythonOperator
from airflow.operators.empty import EmptyOperator
from airflow.providers.ssh.hooks.ssh import SSHHook
from datetime import datetime
def pyoperator():
print("\nYOU SHALL NOT PASS!!!!" * 4)
def mycallback(context):
print("\nI HAVE FAILED" * 4 )
default_args = {
"owner": "airflow",
"start_date": datetime(2018, 10, 31),
'on_failure_callback': mycallback
}
ONFAILURECALLBACK = DAG( dag_id='on_failure_callback',
default_args=default_args,
catchup=False,
schedule_interval=None,
schedule=None,
start_date=datetime(2024, 2, 5),
description='failure callback test',
max_active_tasks=1,
max_active_runs=1,
render_template_as_native_obj=True,
tags=['core','SSH']
)
test1 = PythonOperator(
task_id="python_task",
python_callable=pyoperator,
)
ssh_hook = SSHHook(
ssh_conn_id='ssh_default',
cmd_timeout = 10,
keepalive_interval=5
)
ssh_operator = SSHOperator(
task_id='ssh_operator',
command='donothing' ,
ssh_hook=ssh_hook,
do_xcom_push = True
)
start_task = EmptyOperator(task_id='Start',dag=ONFAILURECALLBACK)
start_task >> test1 >> ssh_operator
```
Logs from this dag (no printing from the callback):
```
File
"/opt/tkapp/env_airflow/lib64/python3.9/site-packages/paramiko/client.py", line
819, in _auth
raise SSHException("No authentication methods available")
paramiko.ssh_exception.SSHException: No authentication methods available
[2024-06-12, 15:07:15 WEST] {taskinstance.py:562} DEBUG - Task Duration set
to 8.511477
[2024-06-12, 15:07:15 WEST] {taskinstance.py:584} DEBUG - Clearing
next_method and next_kwargs.
[2024-06-12, 15:07:15 WEST] {taskinstance.py:1206} INFO - Marking task as
FAILED. dag_id=on_failure_callback, task_id=ssh_operator,
run_id=manual__2024-06-12T14:06:51.203163+00:00,
execution_date=20240612T140651, start_date=20240612T140706,
end_date=20240612T140715
[2024-06-12, 15:07:15 WEST] {cli_action_loggers.py:88} DEBUG - Calling
callbacks: []
[2024-06-12, 15:07:15 WEST] {standard_task_runner.py:110} ERROR - Failed to
execute job 93276 for task ssh_operator (No authentication methods available;
299584)
[2024-06-12, 15:07:15 WEST] {local_task_job_runner.py:240} INFO - Task
exited with return code 1
[2024-06-12, 15:07:15 WEST] {dagrun.py:931} DEBUG - number of tis tasks for
<DagRun on_failure_callback @ 2024-06-12 14:06:51.203163+00:00:
manual__2024-06-12T14:06:51.203163+00:00, state:running, queued_at: 2024-06-12
14:06:51.237212+00:00. externally triggered: True>: 0 task(s)
[2024-06-12, 15:07:15 WEST] {taskinstance.py:3503} INFO - 0 downstream tasks
scheduled from follow-on schedule check
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]