This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-1-test by this push:
     new 49c0fff4a49 [v3-1-test] Fix scheduler crash with email notifications 
(#56429) (#56431)
49c0fff4a49 is described below

commit 49c0fff4a4941592399b0b11503c09c8f4dbcfb2
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Oct 6 07:11:12 2025 -0700

    [v3-1-test] Fix scheduler crash with email notifications (#56429) (#56431)
    
    The ``EmailNotificationRequest`` class name (25 characters) exceeded the
    database constraint for ``DbCallbackRequest.callback_type column`` (20
    characters), causing scheduler crashes when email notifications were
    triggered for task failures or retries.
    
    This fix renames the class to ``EmailRequest`` (12 characters) to fit within
    the constraint. A backwards compatibility alias ensures existing DB
    entries with `'EmailNotificationRequest'` can still be deserialized via
    getattr lookup.
    
    The 20-character limit is arbitrary and does not affect performance.
    In a follow-up PR for 3.2, we should increase this to 50+ characters
    to accommodate descriptive class names without requiring abbreviations.
    
    Fixes #56426
    (cherry picked from commit a18fc01dbda319d6670cfab9071b2760a7fc9fe3)
    
    Co-authored-by: Kaxil Naik <[email protected]>
---
 .../src/airflow/callbacks/callback_requests.py     |  9 ++++++---
 .../src/airflow/dag_processing/processor.py        |  8 +++-----
 .../src/airflow/jobs/scheduler_job_runner.py       |  4 ++--
 .../tests/unit/callbacks/test_callback_requests.py | 22 +++++++++++-----------
 .../tests/unit/dag_processing/test_processor.py    | 10 +++++-----
 5 files changed, 27 insertions(+), 26 deletions(-)

diff --git a/airflow-core/src/airflow/callbacks/callback_requests.py 
b/airflow-core/src/airflow/callbacks/callback_requests.py
index e0666b397c2..d2bdf0968bc 100644
--- a/airflow-core/src/airflow/callbacks/callback_requests.py
+++ b/airflow-core/src/airflow/callbacks/callback_requests.py
@@ -77,7 +77,7 @@ class TaskCallbackRequest(BaseCallbackRequest):
         }
 
 
-class EmailNotificationRequest(BaseCallbackRequest):
+class EmailRequest(BaseCallbackRequest):
     """Email notification request for task failures/retries."""
 
     ti: ti_datamodel.TaskInstance
@@ -86,7 +86,7 @@ class EmailNotificationRequest(BaseCallbackRequest):
     """Whether this is for a failure or retry email"""
     context_from_server: ti_datamodel.TIRunContext
     """Task execution context from the Server"""
-    type: Literal["EmailNotificationRequest"] = "EmailNotificationRequest"
+    type: Literal["EmailRequest"] = "EmailRequest"
 
 
 class DagRunContext(BaseModel):
@@ -108,6 +108,9 @@ class DagCallbackRequest(BaseCallbackRequest):
 
 
 CallbackRequest = Annotated[
-    DagCallbackRequest | TaskCallbackRequest | EmailNotificationRequest,
+    DagCallbackRequest | TaskCallbackRequest | EmailRequest,
     Field(discriminator="type"),
 ]
+
+# Backwards compatibility alias
+EmailNotificationRequest = EmailRequest
diff --git a/airflow-core/src/airflow/dag_processing/processor.py 
b/airflow-core/src/airflow/dag_processing/processor.py
index a86d308c8c5..98228f2bee8 100644
--- a/airflow-core/src/airflow/dag_processing/processor.py
+++ b/airflow-core/src/airflow/dag_processing/processor.py
@@ -31,7 +31,7 @@ from pydantic import BaseModel, Field, TypeAdapter
 from airflow.callbacks.callback_requests import (
     CallbackRequest,
     DagCallbackRequest,
-    EmailNotificationRequest,
+    EmailRequest,
     TaskCallbackRequest,
 )
 from airflow.configuration import conf
@@ -243,7 +243,7 @@ def _execute_callbacks(
             _execute_task_callbacks(dagbag, request, log)
         elif isinstance(request, DagCallbackRequest):
             _execute_dag_callbacks(dagbag, request, log)
-        elif isinstance(request, EmailNotificationRequest):
+        elif isinstance(request, EmailRequest):
             _execute_email_callbacks(dagbag, request, log)
 
 
@@ -354,9 +354,7 @@ def _execute_task_callbacks(dagbag: DagBag, request: 
TaskCallbackRequest, log: F
             log.exception("Error in callback at index %d: %s", idx, 
callback_repr)
 
 
-def _execute_email_callbacks(
-    dagbag: DagBag, request: EmailNotificationRequest, log: 
FilteringBoundLogger
-) -> None:
+def _execute_email_callbacks(dagbag: DagBag, request: EmailRequest, log: 
FilteringBoundLogger) -> None:
     """Execute email notification for task failure/retry."""
     dag = dagbag.dags[request.ti.dag_id]
     task = dag.get_task(request.ti.task_id)
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py 
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index 3ab84d6d5ed..fee2646ff74 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -43,7 +43,7 @@ from 
airflow.api_fastapi.execution_api.datamodels.taskinstance import DagRun as
 from airflow.callbacks.callback_requests import (
     DagCallbackRequest,
     DagRunContext,
-    EmailNotificationRequest,
+    EmailRequest,
     TaskCallbackRequest,
 )
 from airflow.configuration import conf
@@ -959,7 +959,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                         "Sending email request for task %s to DAG Processor",
                         ti,
                     )
-                    email_request = EmailNotificationRequest(
+                    email_request = EmailRequest(
                         filepath=ti.dag_model.relative_fileloc,
                         bundle_name=ti.dag_version.bundle_name,
                         bundle_version=ti.dag_version.bundle_version,
diff --git a/airflow-core/tests/unit/callbacks/test_callback_requests.py 
b/airflow-core/tests/unit/callbacks/test_callback_requests.py
index 428ee34d166..8f1d959e27e 100644
--- a/airflow-core/tests/unit/callbacks/test_callback_requests.py
+++ b/airflow-core/tests/unit/callbacks/test_callback_requests.py
@@ -32,7 +32,7 @@ from airflow.callbacks.callback_requests import (
     CallbackRequest,
     DagCallbackRequest,
     DagRunContext,
-    EmailNotificationRequest,
+    EmailRequest,
     TaskCallbackRequest,
 )
 from airflow.models.dag import DAG
@@ -314,9 +314,9 @@ class TestDagCallbackRequestWithContext:
         assert result.context_from_server.last_ti.task_id == "test_task"
 
 
-class TestEmailNotificationRequest:
+class TestEmailRequest:
     def test_email_notification_request_serialization(self):
-        """Test EmailNotificationRequest can be serialized and used in 
CallbackRequest union."""
+        """Test EmailRequest can be serialized and used in CallbackRequest 
union."""
         ti_data = TIDataModel(
             id=str(uuid.uuid4()),
             task_id="test_task",
@@ -331,8 +331,8 @@ class TestEmailNotificationRequest:
 
         current_time = timezone.utcnow()
 
-        # Create EmailNotificationRequest
-        email_request = EmailNotificationRequest(
+        # Create EmailRequest
+        email_request = EmailRequest(
             filepath="/path/to/dag.py",
             bundle_name="test_bundle",
             bundle_version="1.0.0",
@@ -359,17 +359,17 @@ class TestEmailNotificationRequest:
 
         # Test serialization
         json_str = email_request.to_json()
-        assert "EmailNotificationRequest" in json_str
+        assert "EmailRequest" in json_str
         assert "failure" in json_str
 
         # Test deserialization
-        result = EmailNotificationRequest.from_json(json_str)
+        result = EmailRequest.from_json(json_str)
         assert result == email_request
         assert result.email_type == "failure"
         assert result.ti.task_id == "test_task"
 
     def test_callback_request_union_with_email_notification(self):
-        """Test EmailNotificationRequest works in CallbackRequest union 
type."""
+        """Test EmailRequest works in CallbackRequest union type."""
         ti_data = TIDataModel(
             id=str(uuid.uuid4()),
             task_id="test_task",
@@ -402,7 +402,7 @@ class TestEmailNotificationRequest:
         )
 
         email_data = {
-            "type": "EmailNotificationRequest",
+            "type": "EmailRequest",
             "filepath": "/path/to/dag.py",
             "bundle_name": "test_bundle",
             "bundle_version": "1.0.0",
@@ -416,7 +416,7 @@ class TestEmailNotificationRequest:
         adapter = TypeAdapter(CallbackRequest)
         callback_request = adapter.validate_python(email_data)
 
-        # Verify it's correctly identified as EmailNotificationRequest
-        assert isinstance(callback_request, EmailNotificationRequest)
+        # Verify it's correctly identified as EmailRequest
+        assert isinstance(callback_request, EmailRequest)
         assert callback_request.email_type == "retry"
         assert callback_request.ti.task_id == "test_task"
diff --git a/airflow-core/tests/unit/dag_processing/test_processor.py 
b/airflow-core/tests/unit/dag_processing/test_processor.py
index 8acba729f05..bde6a631c31 100644
--- a/airflow-core/tests/unit/dag_processing/test_processor.py
+++ b/airflow-core/tests/unit/dag_processing/test_processor.py
@@ -44,7 +44,7 @@ from airflow.callbacks.callback_requests import (
     CallbackRequest,
     DagCallbackRequest,
     DagRunContext,
-    EmailNotificationRequest,
+    EmailRequest,
     TaskCallbackRequest,
 )
 from airflow.dag_processing.manager import process_parse_results
@@ -1297,7 +1297,7 @@ class TestExecuteEmailCallbacks:
         )
 
         current_time = timezone.utcnow()
-        request = EmailNotificationRequest(
+        request = EmailRequest(
             filepath="/path/to/dag.py",
             bundle_name="test_bundle",
             bundle_version="1.0.0",
@@ -1357,7 +1357,7 @@ class TestExecuteEmailCallbacks:
 
         current_time = timezone.utcnow()
 
-        request = EmailNotificationRequest(
+        request = EmailRequest(
             filepath="/path/to/dag.py",
             bundle_name="test_bundle",
             bundle_version="1.0.0",
@@ -1416,7 +1416,7 @@ class TestExecuteEmailCallbacks:
         )
 
         current_time = timezone.utcnow()
-        request = EmailNotificationRequest(
+        request = EmailRequest(
             filepath="/path/to/dag.py",
             bundle_name="test_bundle",
             bundle_version="1.0.0",
@@ -1474,7 +1474,7 @@ class TestExecuteEmailCallbacks:
         current_time = timezone.utcnow()
 
         # Create request for failure (but email_on_failure is False)
-        request = EmailNotificationRequest(
+        request = EmailRequest(
             filepath="/path/to/dag.py",
             bundle_name="test_bundle",
             bundle_version="1.0.0",

Reply via email to