patgarz opened a new issue, #32158:
URL: https://github.com/apache/airflow/issues/32158
### Apache Airflow version
2.6.2
### What happened
Defining `on_*_callbacks` in Airflow 2.6.0+ using `functools.partial` (for
example, to define a callable with args) causes unexpected behavior. Most
notably, defining an `on_success_callback` that fails causes the process to
crash(? I think), causing a failing heartbeat and marking the task as a zombie
job: `ERROR - Detected zombie job`.
For `on_success_callback`, this most notably causes tasks to be marked as
failure even though it was the callback that failed. However, this issue also
persists in other erratic ways (likely due to the zombie job):
1. Logging presents a strange message, `'functools.partial' object has no
attribute '__module__'`, rather than the actual callback exception
2. Configuring `on_failure_callback` as well as a partial-defined, failing
`on_success_callback` causes the `on_failure_callback` to fire when the
`on_success_callback` fails
3. Configuring `on_failure_callback` as a `partial` and a failing
`email_on_failure` causes the failure callback to continuously trigger on a
loop and the DAGRun never exits
4. HTTP logs cannot be retrieved
### What you think should happen instead
When using Airflow <=2.5.3, defining a callback with or without partial is
identical. Likewise, defining a function in 2.6.0+ without `partial` causes the
callback to process as expected:
- If the callback fails, the task status matches the task execution, not the
callback status
- Callbacks do not initiate retries
- Exception text matches the callback exception
### How to reproduce
Sample DAG. Remove/comment/etc. various pieces to see how they interact
(partials, callbacks, email_on_failure, etc).
```py
from datetime import timedelta
from functools import partial
import time
from airflow.decorators import dag, task
import pendulum
import logging
def my_callback(context):
logging.info("success callback")
raise Exception("exception text")
default_args = {
"on_success_callback": partial(my_callback),
"on_failure_callback": partial(my_callback),
"email_on_failure": True,
"email": "[email protected]",
}
@dag(
dag_id="Start_and_Wait",
schedule=None,
start_date=pendulum.datetime(2022, 11, 15, tz="UTC"),
catchup=False,
default_args=default_args,
)
def wait_flow():
@task
def wait():
time.sleep(1)
@task
def log():
logging.info("done!")
wait() >> log()
wait_flow()
```
### Operating System
Ubuntu 20.04
### Versions of Apache Airflow Providers
Experiencing issue in apache-airflow 2.6.0+
Not experiencing issue in apache-airflow <=2.5.3
Recommended / upstream constraints file in all cases
### Deployment
Other Docker-based deployment
### Deployment details
Presents in both LocalExecutor and CeleryExecutor
### Anything else
Expected execution in 2.5.3:
```txt
c336145d4ae9
*** Reading local file:
/usr/local/airflow/shared/logs/dag_id=Start_and_Wait/run_id=manual__2023-06-26T20:07:31.314971+00:00/task_id=wait/attempt=1.log
[2023-06-26, 20:07:31 UTC] {taskinstance.py:1090} INFO - Dependencies all
met for dep_context=non-requeueable deps ti=<TaskInstance: Start_and_Wait.wait
manual__2023-06-26T20:07:31.314971+00:00 [queued]>
[2023-06-26, 20:07:31 UTC] {taskinstance.py:1090} INFO - Dependencies all
met for dep_context=requeueable deps ti=<TaskInstance: Start_and_Wait.wait
manual__2023-06-26T20:07:31.314971+00:00 [queued]>
[2023-06-26, 20:07:31 UTC] {taskinstance.py:1288} INFO -
--------------------------------------------------------------------------------
[2023-06-26, 20:07:31 UTC] {taskinstance.py:1289} INFO - Starting attempt 1
of 1
[2023-06-26, 20:07:31 UTC] {taskinstance.py:1290} INFO -
--------------------------------------------------------------------------------
[2023-06-26, 20:07:31 UTC] {taskinstance.py:1309} INFO - Executing
<Task(_PythonDecoratedOperator): wait> on 2023-06-26 20:07:31.314971+00:00
[2023-06-26, 20:07:31 UTC] {standard_task_runner.py:55} INFO - Started
process 901 to run task
[2023-06-26, 20:07:31 UTC] {standard_task_runner.py:82} INFO - Running:
['***', 'tasks', 'run', 'Start_and_Wait', 'wait',
'manual__2023-06-26T20:07:31.314971+00:00', '--job-id', '12', '--raw',
'--subdir', 'DAGS_FOLDER/_local/wait.py', '--cfg-path', '/tmp/tmpmm0tra8e']
[2023-06-26, 20:07:31 UTC] {standard_task_runner.py:83} INFO - Job 12:
Subtask wait
[2023-06-26, 20:07:32 UTC] {task_command.py:389} INFO - Running
<TaskInstance: Start_and_Wait.wait manual__2023-06-26T20:07:31.314971+00:00
[running]> on host c336145d4ae9
[2023-06-26, 20:07:32 UTC] {taskinstance.py:1516} INFO - Exporting the
following env vars:
AIRFLOW_CTX_DAG_EMAIL=***
AIRFLOW_CTX_DAG_OWNER=***
AIRFLOW_CTX_DAG_ID=Start_and_Wait
AIRFLOW_CTX_TASK_ID=wait
AIRFLOW_CTX_EXECUTION_DATE=2023-06-26T20:07:31.314971+00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=manual__2023-06-26T20:07:31.314971+00:00
[2023-06-26, 20:07:33 UTC] {python.py:177} INFO - Done. Returned value was:
None
[2023-06-26, 20:07:33 UTC] {taskinstance.py:1327} INFO - Marking task as
SUCCESS. dag_id=Start_and_Wait, task_id=wait, execution_date=20230626T200731,
start_date=20230626T200731, end_date=20230626T200733
[2023-06-26, 20:07:33 UTC] {wait.py:9} INFO - success callback
[2023-06-26, 20:07:33 UTC] {taskinstance.py:1544} ERROR - Error when
executing on_success callback
Traceback (most recent call last):
File
"/app/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line
1542, in _run_finished_callback
callback(context)
File "/usr/local/airflow/dags/_local/wait.py", line 10, in my_callback
raise Exception("exception text")
Exception: exception text
[2023-06-26, 20:07:33 UTC] {local_task_job.py:212} INFO - Task exited with
return code 0
[2023-06-26, 20:07:33 UTC] {taskinstance.py:2596} INFO - 1 downstream tasks
scheduled from follow-on schedule check
```
In 2.6.0+ (task marked as failed by scheduler? regardless of log here saying
marked success):
```txt
5c688d484e65
*** Found local files:
*** *
/usr/local/airflow/shared/logs/dag_id=Start_and_Wait/run_id=manual__2023-06-26T20:04:44.289200+00:00/task_id=wait/attempt=1.log
*** Could not read served logs: timed out
[2023-06-26, 20:04:45 UTC] {taskinstance.py:1103} INFO - Dependencies all
met for dep_context=non-requeueable deps ti=<TaskInstance: Start_and_Wait.wait
manual__2023-06-26T20:04:44.289200+00:00 [queued]>
[2023-06-26, 20:04:45 UTC] {taskinstance.py:1103} INFO - Dependencies all
met for dep_context=requeueable deps ti=<TaskInstance: Start_and_Wait.wait
manual__2023-06-26T20:04:44.289200+00:00 [queued]>
[2023-06-26, 20:04:45 UTC] {taskinstance.py:1308} INFO - Starting attempt 1
of 1
[2023-06-26, 20:04:45 UTC] {taskinstance.py:1327} INFO - Executing
<Task(_PythonDecoratedOperator): wait> on 2023-06-26 20:04:44.289200+00:00
[2023-06-26, 20:04:45 UTC] {standard_task_runner.py:57} INFO - Started
process 786 to run task
[2023-06-26, 20:04:45 UTC] {standard_task_runner.py:84} INFO - Running:
['***', 'tasks', 'run', 'Start_and_Wait', 'wait',
'manual__2023-06-26T20:04:44.289200+00:00', '--job-id', '14', '--raw',
'--subdir', 'DAGS_FOLDER/_local/wait.py', '--cfg-path', '/tmp/tmpx798o7bj']
[2023-06-26, 20:04:45 UTC] {standard_task_runner.py:85} INFO - Job 14:
Subtask wait
[2023-06-26, 20:04:45 UTC] {task_command.py:410} INFO - Running
<TaskInstance: Start_and_Wait.wait manual__2023-06-26T20:04:44.289200+00:00
[running]> on host 5c688d484e65
[2023-06-26, 20:04:45 UTC] {taskinstance.py:1545} INFO - Exporting env vars:
AIRFLOW_CTX_DAG_EMAIL='[email protected]' AIRFLOW_CTX_DAG_OWNER='***'
AIRFLOW_CTX_DAG_ID='Start_and_Wait' AIRFLOW_CTX_TASK_ID='wait'
AIRFLOW_CTX_EXECUTION_DATE='2023-06-26T20:04:44.289200+00:00'
AIRFLOW_CTX_TRY_NUMBER='1'
AIRFLOW_CTX_DAG_RUN_ID='manual__2023-06-26T20:04:44.289200+00:00'
[2023-06-26, 20:04:46 UTC] {python.py:183} INFO - Done. Returned value was:
None
[2023-06-26, 20:04:46 UTC] {taskinstance.py:1345} INFO - Marking task as
SUCCESS. dag_id=Start_and_Wait, task_id=wait, execution_date=20230626T200444,
start_date=20230626T200445, end_date=20230626T200446
[2023-06-26, 20:04:46 UTC] {wait.py:9} INFO - success callback
[2023-06-26, 20:04:46 UTC] {standard_task_runner.py:104} ERROR - Failed to
execute job 14 for task wait ('functools.partial' object has no attribute
'__module__'; 786)
[2023-06-26, 20:04:46 UTC] {local_task_job_runner.py:225} INFO - Task exited
with return code 1
[2023-06-26, 20:04:46 UTC] {taskinstance.py:2651} INFO - 0 downstream tasks
scheduled from follow-on schedule check
```
Server logs:
```txt
***-local-webserver-local-1 | [2023-06-26T20:29:28.371+0000]
{scheduler_job_runner.py:1688} WARNING - Failing (1) jobs without heartbeat
after 2023-06-26 20:24:28.364681+00:00
***-local-webserver-local-1 | [2023-06-26T20:29:28.371+0000]
{scheduler_job_runner.py:1698} ERROR - Detected zombie job: {'full_filepath':
'/usr/local/airflow/dags/_local/wait.py', 'processor_subdir':
'/usr/local/airflow/dags', 'msg': "{'DAG Id': 'Start_and_Wait', 'Task Id':
'wait', 'Run Id': 'manual__2023-06-26T20:29:06.100360+00:00', 'Hostname':
'ee1eef189157'}", 'simple_task_instance':
<airflow.models.taskinstance.SimpleTaskInstance object at 0x7fa1e59eaac0>,
'is_failure_callback': True}
```
Failure to fetch logfile via HTTP:
```txt
***-local-webserver-local-1 | 172.23.0.1 - - [26/Jun/2023:20:15:14 +0000]
"GET
/api/v1/dags/Start_and_Wait/dagRuns/manual__2023-06-26T20:14:45.191590+00:00/taskInstances/wait/logs/1?full_content=false
HTTP/1.1" 200 2618
"http://localhost:8080/dags/Start_and_Wait/grid?tab=logs&dag_run_id=manual__2023-06-26T20%3A14%3A45.191590%2B00%3A00&task_id=wait"
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML,
like Gecko) Chrome/114.0.0.0 Safari/537.36"
***-local-webserver-local-1 | [2023-06-26T20:15:14.800+0000]
{file_task_handler.py:522} ERROR - Could not read served logs
***-local-webserver-local-1 | Traceback (most recent call last):
***-local-webserver-local-1 | File
"/app/.local/lib/python3.9/site-packages/httpcore/_exceptions.py", line 10, in
map_exceptions
***-local-webserver-local-1 | yield
***-local-webserver-local-1 | File
"/app/.local/lib/python3.9/site-packages/httpcore/backends/sync.py", line 28,
in read
***-local-webserver-local-1 | return self._sock.recv(max_bytes)
***-local-webserver-local-1 | socket.timeout: timed out
```
### Are you willing to submit PR?
- [X] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]