trlopes1974 commented on issue #40119:
URL: https://github.com/apache/airflow/issues/40119#issuecomment-2163133621

   @vatsrahul1001 
   Thanks for you input.
   The problem with your example was that it was not calling the ssh_operator 
task. I have no issues with python operators failing.
   Yet , it gave-me the opportunity to further test.
   I think that, for some reason, the way the dag is declared affects the way 
the callbacks are also "attached" to the tasks/operators.
   Using YOUR code and adding the ssh_operator task to the workflow ( with  a  
cmdline to return some error ) worked like a charm and the callback was invoked:
   
   ```
       raise AirflowException(f"SSH operator error: exit status = 
{exit_status}")
   airflow.exceptions.AirflowException: SSH operator error: exit status = 127
   [2024-06-12, 14:22:16 WEST] {taskinstance.py:562} DEBUG - Task Duration set 
to 0.653776
   [2024-06-12, 14:22:16 WEST] {taskinstance.py:584} DEBUG - Clearing 
next_method and next_kwargs.
   [2024-06-12, 14:22:16 WEST] {taskinstance.py:1206} INFO - Marking task as 
FAILED. dag_id=on_failure_callback, task_id=ssh_operator, 
run_id=scheduled__2018-10-31T00:00:00+00:00, execution_date=20181031T000000, 
start_date=20240612T132215, end_date=20240612T132216
   [2024-06-12, 14:22:16 WEST] {logging_mixin.py:188} INFO - I HAVE BEEN 
CALLED! I HAVE BEEN CALLED! I HAVE BEEN CALLED! I HAVE BEEN CALLED! 
   ```
   
   So. I have adapted the code to reflect the behavior on "my" dag and I found 
that the DAG declaration ( this is the "only" difference I can see" ) is 
probably affecting the callbacks.
   
   So, instead of using  "with DAG..." format, I'm using  "DAGNAMEVAR = DAG( 
...)" syntax. and this seems to be the only reason for the callback to fail ( 
not being called)
   
   
   
   **Non working callback example dag:**
   ```
   
   from airflow import DAG
   from airflow.providers.ssh.operators.ssh import SSHOperator
   from airflow.operators.python import PythonOperator
   from airflow.operators.empty import EmptyOperator
   from airflow.providers.ssh.hooks.ssh import SSHHook
   from datetime import datetime
   
   def pyoperator():
       print("\nYOU SHALL NOT PASS!!!!" * 4)
   
   def mycallback(context):
       print("\nI HAVE FAILED" * 4 )
   
   default_args = {
       "owner": "airflow",
       "start_date": datetime(2018, 10, 31),
       'on_failure_callback': mycallback   
       }
   
   ONFAILURECALLBACK = DAG( dag_id='on_failure_callback',
       default_args=default_args,
       catchup=False,
       schedule_interval=None, 
       schedule=None,
       start_date=datetime(2024, 2, 5),
       description='failure callback test',
       max_active_tasks=1,
       max_active_runs=1,
       render_template_as_native_obj=True,
       tags=['core','SSH'] 
       )
   
   test1 = PythonOperator(
       task_id="python_task",
       python_callable=pyoperator,
       )
   
   ssh_hook = SSHHook(
       ssh_conn_id='ssh_default',
       cmd_timeout = 10,
       keepalive_interval=5
       )
   
   ssh_operator = SSHOperator(
       task_id='ssh_operator',
       command='donothing' ,
       ssh_hook=ssh_hook,
       do_xcom_push = True
       )
   
   start_task = EmptyOperator(task_id='Start',dag=ONFAILURECALLBACK)
   
   
   start_task >> test1 >> ssh_operator
   
   
   ```
   
   
   Logs from this dag (no printing from the callback):
    ```
   File 
"/opt/tkapp/env_airflow/lib64/python3.9/site-packages/paramiko/client.py", line 
819, in _auth
       raise SSHException("No authentication methods available")
   paramiko.ssh_exception.SSHException: No authentication methods available
   [2024-06-12, 15:07:15 WEST] {taskinstance.py:562} DEBUG - Task Duration set 
to 8.511477
   [2024-06-12, 15:07:15 WEST] {taskinstance.py:584} DEBUG - Clearing 
next_method and next_kwargs.
   [2024-06-12, 15:07:15 WEST] {taskinstance.py:1206} INFO - Marking task as 
FAILED. dag_id=on_failure_callback, task_id=ssh_operator, 
run_id=manual__2024-06-12T14:06:51.203163+00:00, 
execution_date=20240612T140651, start_date=20240612T140706, 
end_date=20240612T140715
   [2024-06-12, 15:07:15 WEST] {cli_action_loggers.py:88} DEBUG - Calling 
callbacks: []
   [2024-06-12, 15:07:15 WEST] {standard_task_runner.py:110} ERROR - Failed to 
execute job 93276 for task ssh_operator (No authentication methods available; 
299584)
   [2024-06-12, 15:07:15 WEST] {local_task_job_runner.py:240} INFO - Task 
exited with return code 1
   [2024-06-12, 15:07:15 WEST] {dagrun.py:931} DEBUG - number of tis tasks for 
<DagRun on_failure_callback @ 2024-06-12 14:06:51.203163+00:00: 
manual__2024-06-12T14:06:51.203163+00:00, state:running, queued_at: 2024-06-12 
14:06:51.237212+00:00. externally triggered: True>: 0 task(s)
   [2024-06-12, 15:07:15 WEST] {taskinstance.py:3503} INFO - 0 downstream tasks 
scheduled from follow-on schedule check
   
   
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to