AlejandroIzuel opened a new issue, #40550:
URL: https://github.com/apache/airflow/issues/40550
### Apache Airflow version
Other Airflow 2 version (please specify below)
### If "Other Airflow 2 version" selected, which one?
2.8.1
### What happened?
I have a DAG which is getting a -9 error code
`[2024-07-01, 16:06:05 CEST] {{xxxxClient.py:198}} INFO - Execute URL:
https://......
[2024-07-01, 16:07:34 CEST] {{local_task_job_runner.py:234}} INFO - Task
exited with return code -9
[2024-07-01, 16:07:34 CEST] {{taskinstance.py:3280}} INFO - 0 downstream
tasks scheduled from follow-on schedule check`
On the DAG I've defined a on_failure_callback function, on the default_args,
but it is not being triggered. It is only triggered if I call a kind of
"manually throw exception" function. So the SMTP settings are fine and work,
just the on_failure_callback is somehow not being triggered when the -9 error
happens.
Here a part as example from my DAG
`
import ast
import logging
import os
import traceback
from datetime import datetime, timedelta
from airflow import DAG
from airflow.models import Variable
from airflow.operators.email import EmailOperator
from airflow.operators.python_operator import PythonOperator
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
default_args = {
"owner": "airflow",
"depends_on_past": False,
"email_on_failure": False,
"email_on_retry": False,
"retries": 3,
"retry_delay": timedelta(seconds=5),
"mwaa_secret_name": "xxxxxxxxxxxxxxxxxx",
"redshift_secret_name": "xxxxxxxxxxxxx",
"s3_bucket": "xxxxxxxxxxxxxxx",
"region_name": "xxxxxxxxxxxxxxxxxxxxx",
"on_failure_callback": "failure_callback",
}
error_email_recipients = ["[email protected]"]
credentials = ast.literal_eval(
str(Variable.get("ses-smtp-ca", default_var="undefined"))
)
os.environ["AIRFLOW__SMTP__SMTP_USER"] = credentials["user"]
os.environ["AIRFLOW__SMTP__SMTP_PASSWORD"] = credentials["password"]
os.environ["AIRFLOW__EMAIL__EMAIL_BACKEND"] = credentials["email_backend"]
os.environ["AIRFLOW__SMTP__SMTP_HOST"] = credentials["smtp_host"]
os.environ["AIRFLOW__SMTP__SMTP_PORT"] = credentials["smtp_port"]
os.environ["AIRFLOW__SMTP__SMTP_MAIL_FROM"] = credentials["mail_from"]
os.environ["AIRFLOW__SMTP__SMTP_SSL"] = credentials["smtp_ssl"]
os.environ["AIRFLOW__SMTP__SMTP_STARTTLS"] = credentials["smtp_starttls"]
# Failure callback function
def failure_callback(context):
print("#### failure_callback is being called")
exception = context.get("exception")
print(f"Exception: {exception}")
exception_str = "".join(
traceback.format_exception(None, exception, exception.__traceback__)
)
print(f"Exception String: {exception_str}")
email_operator = EmailOperator(
task_id="send_email_on_failure",
to=error_email_recipients,
subject=f"DAG {context['task_instance'].dag_id} - Task
{context['task_instance'].task_id} Failed",
html_content=f"""
<h3>DAG: {context['task_instance'].dag_id}</h3>
<p>Task: {context['task_instance'].task_id} failed.</p>
<p>Execution Time: {context['execution_date']}</p>
<p>Exception: {exception_str}</p>
<p>Log URL: <a href='{context['task_instance'].log_url}'>Click here
to view log</a></p>
""",
)
try:
print("#### Sending email about the exception")
email_operator.execute(context=context)
except Exception as e:
print(f"Failed to send email: {str(e)}")
with DAG(
"TestDAG_NOEMAIL",
default_args=default_args,
schedule_interval="0 14 * * *",
start_date=datetime(2024, 5, 3),
catchup=False,
) as dag:
update_xxx1_table_task = PythonOperator(
task_id="update_xxx1_table",
python_callable=xxxx.update_xxx1_table,
on_failure_callback=failure_callback,
)
update_xxx2_table_task = PythonOperator(
task_id="update_hana_SMC_Interactions_table",
python_callable=xxxx.update_xxx1_table,
on_failure_callback=failure_callback,
)
(update_xxx1_table_task >> update_xxx2_table_task)
`
The task update_xxx1_table_task is getting the -9 from above, and no email
is being sent.
If I define something like
`
def throw_error(*args, **kwargs):
raise ValueError("Simulated task failure for testing.")
...
with DAG(
"Test-Email",
default_args=default_args,
description="Test-Email",
schedule_interval="0 14 * * *",
start_date=datetime(2024, 5, 3),
catchup=False,
) as dag:
throw_error_task = PythonOperator(
task_id="throw_error_task",
python_callable=throw_error,
)
`
This works fine sending the email.
### What you think should happen instead?
_No response_
### How to reproduce
Define a DAG as my example from above, put a task which casues a -9 (memory
issue) error, and see how the error Email is not being sent.
### Operating System
AWS
### Versions of Apache Airflow Providers
_No response_
### Deployment
Amazon (AWS) MWAA
### Deployment details
_No response_
### Anything else?
Let me know if I can check some concrete logs to provide more information.
I've been checking all Airflow Logs I know and I could not find any error or
something related to the Email sending or related to the on_failure_callback.
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]