This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-1-test by this push:
new 49c0fff4a49 [v3-1-test] Fix scheduler crash with email notifications
(#56429) (#56431)
49c0fff4a49 is described below
commit 49c0fff4a4941592399b0b11503c09c8f4dbcfb2
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Oct 6 07:11:12 2025 -0700
[v3-1-test] Fix scheduler crash with email notifications (#56429) (#56431)
The ``EmailNotificationRequest`` class name (25 characters) exceeded the
database constraint for ``DbCallbackRequest.callback_type column`` (20
characters), causing scheduler crashes when email notifications were
triggered for task failures or retries.
This fix renames the class to ``EmailRequest`` (12 characters) to fit within
the constraint. A backwards compatibility alias ensures existing DB
entries with `'EmailNotificationRequest'` can still be deserialized via
getattr lookup.
The 20-character limit is arbitrary and does not affect performance.
In a follow-up PR for 3.2, we should increase this to 50+ characters
to accommodate descriptive class names without requiring abbreviations.
Fixes #56426
(cherry picked from commit a18fc01dbda319d6670cfab9071b2760a7fc9fe3)
Co-authored-by: Kaxil Naik <[email protected]>
---
.../src/airflow/callbacks/callback_requests.py | 9 ++++++---
.../src/airflow/dag_processing/processor.py | 8 +++-----
.../src/airflow/jobs/scheduler_job_runner.py | 4 ++--
.../tests/unit/callbacks/test_callback_requests.py | 22 +++++++++++-----------
.../tests/unit/dag_processing/test_processor.py | 10 +++++-----
5 files changed, 27 insertions(+), 26 deletions(-)
diff --git a/airflow-core/src/airflow/callbacks/callback_requests.py
b/airflow-core/src/airflow/callbacks/callback_requests.py
index e0666b397c2..d2bdf0968bc 100644
--- a/airflow-core/src/airflow/callbacks/callback_requests.py
+++ b/airflow-core/src/airflow/callbacks/callback_requests.py
@@ -77,7 +77,7 @@ class TaskCallbackRequest(BaseCallbackRequest):
}
-class EmailNotificationRequest(BaseCallbackRequest):
+class EmailRequest(BaseCallbackRequest):
"""Email notification request for task failures/retries."""
ti: ti_datamodel.TaskInstance
@@ -86,7 +86,7 @@ class EmailNotificationRequest(BaseCallbackRequest):
"""Whether this is for a failure or retry email"""
context_from_server: ti_datamodel.TIRunContext
"""Task execution context from the Server"""
- type: Literal["EmailNotificationRequest"] = "EmailNotificationRequest"
+ type: Literal["EmailRequest"] = "EmailRequest"
class DagRunContext(BaseModel):
@@ -108,6 +108,9 @@ class DagCallbackRequest(BaseCallbackRequest):
CallbackRequest = Annotated[
- DagCallbackRequest | TaskCallbackRequest | EmailNotificationRequest,
+ DagCallbackRequest | TaskCallbackRequest | EmailRequest,
Field(discriminator="type"),
]
+
+# Backwards compatibility alias
+EmailNotificationRequest = EmailRequest
diff --git a/airflow-core/src/airflow/dag_processing/processor.py
b/airflow-core/src/airflow/dag_processing/processor.py
index a86d308c8c5..98228f2bee8 100644
--- a/airflow-core/src/airflow/dag_processing/processor.py
+++ b/airflow-core/src/airflow/dag_processing/processor.py
@@ -31,7 +31,7 @@ from pydantic import BaseModel, Field, TypeAdapter
from airflow.callbacks.callback_requests import (
CallbackRequest,
DagCallbackRequest,
- EmailNotificationRequest,
+ EmailRequest,
TaskCallbackRequest,
)
from airflow.configuration import conf
@@ -243,7 +243,7 @@ def _execute_callbacks(
_execute_task_callbacks(dagbag, request, log)
elif isinstance(request, DagCallbackRequest):
_execute_dag_callbacks(dagbag, request, log)
- elif isinstance(request, EmailNotificationRequest):
+ elif isinstance(request, EmailRequest):
_execute_email_callbacks(dagbag, request, log)
@@ -354,9 +354,7 @@ def _execute_task_callbacks(dagbag: DagBag, request:
TaskCallbackRequest, log: F
log.exception("Error in callback at index %d: %s", idx,
callback_repr)
-def _execute_email_callbacks(
- dagbag: DagBag, request: EmailNotificationRequest, log:
FilteringBoundLogger
-) -> None:
+def _execute_email_callbacks(dagbag: DagBag, request: EmailRequest, log:
FilteringBoundLogger) -> None:
"""Execute email notification for task failure/retry."""
dag = dagbag.dags[request.ti.dag_id]
task = dag.get_task(request.ti.task_id)
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index 3ab84d6d5ed..fee2646ff74 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -43,7 +43,7 @@ from
airflow.api_fastapi.execution_api.datamodels.taskinstance import DagRun as
from airflow.callbacks.callback_requests import (
DagCallbackRequest,
DagRunContext,
- EmailNotificationRequest,
+ EmailRequest,
TaskCallbackRequest,
)
from airflow.configuration import conf
@@ -959,7 +959,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
"Sending email request for task %s to DAG Processor",
ti,
)
- email_request = EmailNotificationRequest(
+ email_request = EmailRequest(
filepath=ti.dag_model.relative_fileloc,
bundle_name=ti.dag_version.bundle_name,
bundle_version=ti.dag_version.bundle_version,
diff --git a/airflow-core/tests/unit/callbacks/test_callback_requests.py
b/airflow-core/tests/unit/callbacks/test_callback_requests.py
index 428ee34d166..8f1d959e27e 100644
--- a/airflow-core/tests/unit/callbacks/test_callback_requests.py
+++ b/airflow-core/tests/unit/callbacks/test_callback_requests.py
@@ -32,7 +32,7 @@ from airflow.callbacks.callback_requests import (
CallbackRequest,
DagCallbackRequest,
DagRunContext,
- EmailNotificationRequest,
+ EmailRequest,
TaskCallbackRequest,
)
from airflow.models.dag import DAG
@@ -314,9 +314,9 @@ class TestDagCallbackRequestWithContext:
assert result.context_from_server.last_ti.task_id == "test_task"
-class TestEmailNotificationRequest:
+class TestEmailRequest:
def test_email_notification_request_serialization(self):
- """Test EmailNotificationRequest can be serialized and used in
CallbackRequest union."""
+ """Test EmailRequest can be serialized and used in CallbackRequest
union."""
ti_data = TIDataModel(
id=str(uuid.uuid4()),
task_id="test_task",
@@ -331,8 +331,8 @@ class TestEmailNotificationRequest:
current_time = timezone.utcnow()
- # Create EmailNotificationRequest
- email_request = EmailNotificationRequest(
+ # Create EmailRequest
+ email_request = EmailRequest(
filepath="/path/to/dag.py",
bundle_name="test_bundle",
bundle_version="1.0.0",
@@ -359,17 +359,17 @@ class TestEmailNotificationRequest:
# Test serialization
json_str = email_request.to_json()
- assert "EmailNotificationRequest" in json_str
+ assert "EmailRequest" in json_str
assert "failure" in json_str
# Test deserialization
- result = EmailNotificationRequest.from_json(json_str)
+ result = EmailRequest.from_json(json_str)
assert result == email_request
assert result.email_type == "failure"
assert result.ti.task_id == "test_task"
def test_callback_request_union_with_email_notification(self):
- """Test EmailNotificationRequest works in CallbackRequest union
type."""
+ """Test EmailRequest works in CallbackRequest union type."""
ti_data = TIDataModel(
id=str(uuid.uuid4()),
task_id="test_task",
@@ -402,7 +402,7 @@ class TestEmailNotificationRequest:
)
email_data = {
- "type": "EmailNotificationRequest",
+ "type": "EmailRequest",
"filepath": "/path/to/dag.py",
"bundle_name": "test_bundle",
"bundle_version": "1.0.0",
@@ -416,7 +416,7 @@ class TestEmailNotificationRequest:
adapter = TypeAdapter(CallbackRequest)
callback_request = adapter.validate_python(email_data)
- # Verify it's correctly identified as EmailNotificationRequest
- assert isinstance(callback_request, EmailNotificationRequest)
+ # Verify it's correctly identified as EmailRequest
+ assert isinstance(callback_request, EmailRequest)
assert callback_request.email_type == "retry"
assert callback_request.ti.task_id == "test_task"
diff --git a/airflow-core/tests/unit/dag_processing/test_processor.py
b/airflow-core/tests/unit/dag_processing/test_processor.py
index 8acba729f05..bde6a631c31 100644
--- a/airflow-core/tests/unit/dag_processing/test_processor.py
+++ b/airflow-core/tests/unit/dag_processing/test_processor.py
@@ -44,7 +44,7 @@ from airflow.callbacks.callback_requests import (
CallbackRequest,
DagCallbackRequest,
DagRunContext,
- EmailNotificationRequest,
+ EmailRequest,
TaskCallbackRequest,
)
from airflow.dag_processing.manager import process_parse_results
@@ -1297,7 +1297,7 @@ class TestExecuteEmailCallbacks:
)
current_time = timezone.utcnow()
- request = EmailNotificationRequest(
+ request = EmailRequest(
filepath="/path/to/dag.py",
bundle_name="test_bundle",
bundle_version="1.0.0",
@@ -1357,7 +1357,7 @@ class TestExecuteEmailCallbacks:
current_time = timezone.utcnow()
- request = EmailNotificationRequest(
+ request = EmailRequest(
filepath="/path/to/dag.py",
bundle_name="test_bundle",
bundle_version="1.0.0",
@@ -1416,7 +1416,7 @@ class TestExecuteEmailCallbacks:
)
current_time = timezone.utcnow()
- request = EmailNotificationRequest(
+ request = EmailRequest(
filepath="/path/to/dag.py",
bundle_name="test_bundle",
bundle_version="1.0.0",
@@ -1474,7 +1474,7 @@ class TestExecuteEmailCallbacks:
current_time = timezone.utcnow()
# Create request for failure (but email_on_failure is False)
- request = EmailNotificationRequest(
+ request = EmailRequest(
filepath="/path/to/dag.py",
bundle_name="test_bundle",
bundle_version="1.0.0",