This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 2dcb88ff758 Move email notifications from scheduler to DAG processor 
(#55238)
2dcb88ff758 is described below

commit 2dcb88ff758516572706d5b2ff0b5fca50b3a14e
Author: Kaxil Naik <[email protected]>
AuthorDate: Thu Sep 4 12:45:37 2025 +0100

    Move email notifications from scheduler to DAG processor (#55238)
---
 .../src/airflow/callbacks/callback_requests.py     |  14 +-
 .../src/airflow/dag_processing/processor.py        |  69 +++++-
 .../src/airflow/jobs/scheduler_job_runner.py       |  54 ++++-
 airflow-core/src/airflow/models/mappedoperator.py  |  13 ++
 airflow-core/src/airflow/models/taskinstance.py    |  42 +---
 .../tests/unit/callbacks/test_callback_requests.py | 112 ++++++++++
 .../tests/unit/dag_processing/test_processor.py    | 246 ++++++++++++++++++++-
 .../tests/unit/openlineage/utils/test_utils.py     |   2 +-
 task-sdk/src/airflow/sdk/bases/operator.py         |   2 +
 .../src/airflow/sdk/definitions/mappedoperator.py  |   8 +
 .../src/airflow/sdk/execution_time/task_runner.py  |  26 ++-
 .../task_sdk/execution_time/test_task_runner.py    |   2 -
 12 files changed, 536 insertions(+), 54 deletions(-)

diff --git a/airflow-core/src/airflow/callbacks/callback_requests.py 
b/airflow-core/src/airflow/callbacks/callback_requests.py
index 611f9ac7b4d..e0666b397c2 100644
--- a/airflow-core/src/airflow/callbacks/callback_requests.py
+++ b/airflow-core/src/airflow/callbacks/callback_requests.py
@@ -77,6 +77,18 @@ class TaskCallbackRequest(BaseCallbackRequest):
         }
 
 
+class EmailNotificationRequest(BaseCallbackRequest):
+    """Email notification request for task failures/retries."""
+
+    ti: ti_datamodel.TaskInstance
+    """Simplified Task Instance representation"""
+    email_type: Literal["failure", "retry"] = "failure"
+    """Whether this is for a failure or retry email"""
+    context_from_server: ti_datamodel.TIRunContext
+    """Task execution context from the Server"""
+    type: Literal["EmailNotificationRequest"] = "EmailNotificationRequest"
+
+
 class DagRunContext(BaseModel):
     """Class to pass context info from the server to build a Execution context 
object."""
 
@@ -96,6 +108,6 @@ class DagCallbackRequest(BaseCallbackRequest):
 
 
 CallbackRequest = Annotated[
-    DagCallbackRequest | TaskCallbackRequest,
+    DagCallbackRequest | TaskCallbackRequest | EmailNotificationRequest,
     Field(discriminator="type"),
 ]
diff --git a/airflow-core/src/airflow/dag_processing/processor.py 
b/airflow-core/src/airflow/dag_processing/processor.py
index e5fbd9b436f..d4c73e61fea 100644
--- a/airflow-core/src/airflow/dag_processing/processor.py
+++ b/airflow-core/src/airflow/dag_processing/processor.py
@@ -31,6 +31,7 @@ from pydantic import BaseModel, Field, TypeAdapter
 from airflow.callbacks.callback_requests import (
     CallbackRequest,
     DagCallbackRequest,
+    EmailNotificationRequest,
     TaskCallbackRequest,
 )
 from airflow.configuration import conf
@@ -51,7 +52,7 @@ from airflow.sdk.execution_time.comms import (
     VariableResult,
 )
 from airflow.sdk.execution_time.supervisor import WatchedSubprocess
-from airflow.sdk.execution_time.task_runner import RuntimeTaskInstance
+from airflow.sdk.execution_time.task_runner import RuntimeTaskInstance, 
_send_task_error_email
 from airflow.serialization.serialized_objects import LazyDeserializedDAG, 
SerializedDAG
 from airflow.stats import Stats
 from airflow.utils.file import iter_airflow_imports
@@ -224,8 +225,10 @@ def _execute_callbacks(
         log.debug("Processing Callback Request", request=request.to_json())
         if isinstance(request, TaskCallbackRequest):
             _execute_task_callbacks(dagbag, request, log)
-        if isinstance(request, DagCallbackRequest):
+        elif isinstance(request, DagCallbackRequest):
             _execute_dag_callbacks(dagbag, request, log)
+        elif isinstance(request, EmailNotificationRequest):
+            _execute_email_callbacks(dagbag, request, log)
 
 
 def _execute_dag_callbacks(dagbag: DagBag, request: DagCallbackRequest, log: 
FilteringBoundLogger) -> None:
@@ -335,6 +338,68 @@ def _execute_task_callbacks(dagbag: DagBag, request: 
TaskCallbackRequest, log: F
             log.exception("Error in callback at index %d: %s", idx, 
callback_repr)
 
 
+def _execute_email_callbacks(
+    dagbag: DagBag, request: EmailNotificationRequest, log: 
FilteringBoundLogger
+) -> None:
+    """Execute email notification for task failure/retry."""
+    dag = dagbag.dags[request.ti.dag_id]
+    task = dag.get_task(request.ti.task_id)
+
+    if not task.email:
+        log.warning(
+            "Email callback requested but no email configured",
+            dag_id=request.ti.dag_id,
+            task_id=request.ti.task_id,
+            run_id=request.ti.run_id,
+        )
+        return
+
+    # Check if email should be sent based on task configuration
+    should_send_email = False
+    if request.email_type == "failure" and task.email_on_failure:
+        should_send_email = True
+    elif request.email_type == "retry" and task.email_on_retry:
+        should_send_email = True
+
+    if not should_send_email:
+        log.info(
+            "Email not sent - task configured with email_on_%s=False",
+            request.email_type,
+            dag_id=request.ti.dag_id,
+            task_id=request.ti.task_id,
+            run_id=request.ti.run_id,
+        )
+        return
+
+    ctx_from_server = request.context_from_server
+
+    runtime_ti = RuntimeTaskInstance.model_construct(
+        **request.ti.model_dump(exclude_unset=True),
+        task=task,
+        _ti_context_from_server=ctx_from_server,
+        max_tries=ctx_from_server.max_tries,
+    )
+
+    log.info(
+        "Sending %s email for task %s",
+        request.email_type,
+        request.ti.task_id,
+        dag_id=request.ti.dag_id,
+        run_id=request.ti.run_id,
+    )
+
+    try:
+        _send_task_error_email(task.email, runtime_ti, request.msg, log)
+    except Exception:
+        log.exception(
+            "Failed to send %s email",
+            request.email_type,
+            dag_id=request.ti.dag_id,
+            task_id=request.ti.task_id,
+            run_id=request.ti.run_id,
+        )
+
+
 def in_process_api_server() -> InProcessExecutionAPI:
     from airflow.api_fastapi.execution_api.app import InProcessExecutionAPI
 
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py 
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index 217d8d663c3..e8778d17d18 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -40,7 +40,12 @@ from sqlalchemy.sql import expression
 from airflow import settings
 from airflow._shared.timezones import timezone
 from airflow.api_fastapi.execution_api.datamodels.taskinstance import DagRun 
as DRDataModel, TIRunContext
-from airflow.callbacks.callback_requests import DagCallbackRequest, 
DagRunContext, TaskCallbackRequest
+from airflow.callbacks.callback_requests import (
+    DagCallbackRequest,
+    DagRunContext,
+    EmailNotificationRequest,
+    TaskCallbackRequest,
+)
 from airflow.configuration import conf
 from airflow.dag_processing.bundles.base import BundleUsageTrackingManager
 from airflow.executors import workloads
@@ -750,7 +755,28 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
         cls, executor: BaseExecutor, job_id: str | None, scheduler_dag_bag: 
DBDagBag, session: Session
     ) -> int:
         """
-        Respond to executor events.
+        Process task completion events from the executor and update task 
instance states.
+
+        This method handles task state transitions reported by executors, 
ensuring proper
+        state management, callback execution, and notification processing. It 
maintains
+        scheduler architectural principles by delegating user code execution 
to appropriate
+        isolated processes.
+
+        The method handles several key scenarios:
+        1. **Normal task completion**: Updates task states for 
successful/failed tasks
+        2. **External termination**: Detects tasks killed outside Airflow and 
marks them as failed
+        3. **Task requeuing**: Handles tasks that were requeued by other 
schedulers or executors
+        4. **Callback processing**: Sends task callback requests to DAG 
Processor for execution
+        5. **Email notifications**: Sends email notification requests to DAG 
Processor
+
+        :param executor: The executor reporting task completion events
+        :param job_id: The scheduler job ID, used to detect task requeuing by 
other schedulers
+        :param scheduler_dag_bag: Serialized DAG bag for retrieving task 
definitions
+        :param session: Database session for task instance updates
+
+        :return: Number of events processed from the executor event buffer
+
+        :raises Exception: If DAG retrieval or task processing fails, logs 
error and continues
 
         This is a classmethod because this is also used in `dag.test()`.
         `dag.test` execute DAGs with no scheduler, therefore it needs to 
handle the events pushed by the
@@ -929,6 +955,30 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                     ti.set_state(None)
                     continue
 
+                # Send email notification request to DAG processor via DB
+                if task.email and (task.email_on_failure or 
task.email_on_retry):
+                    cls.logger().info(
+                        "Sending email request for task %s to DAG Processor",
+                        ti,
+                    )
+                    email_request = EmailNotificationRequest(
+                        filepath=ti.dag_model.relative_fileloc,
+                        bundle_name=ti.dag_version.bundle_name,
+                        bundle_version=ti.dag_version.bundle_version,
+                        ti=ti,
+                        msg=msg,
+                        email_type="retry" if ti.is_eligible_to_retry() else 
"failure",
+                        context_from_server=TIRunContext(
+                            dag_run=DRDataModel.model_validate(ti.dag_run, 
from_attributes=True),
+                            max_tries=ti.max_tries,
+                            variables=[],
+                            connections=[],
+                            xcom_keys_to_clear=[],
+                        ),
+                    )
+                    executor.send_callback(email_request)
+
+                # Update task state - emails are handled by DAG processor now
                 ti.handle_failure(error=msg, session=session)
 
         return len(event_buffer)
diff --git a/airflow-core/src/airflow/models/mappedoperator.py 
b/airflow-core/src/airflow/models/mappedoperator.py
index 310573985da..7c7988d3c9b 100644
--- a/airflow-core/src/airflow/models/mappedoperator.py
+++ b/airflow-core/src/airflow/models/mappedoperator.py
@@ -20,6 +20,7 @@ from __future__ import annotations
 
 import functools
 import operator
+from collections.abc import Iterable
 from typing import TYPE_CHECKING, Any, ClassVar, TypeAlias, TypeGuard, overload
 
 import attrs
@@ -298,6 +299,18 @@ class MappedOperator(DAGNode):
     def outlets(self) -> list[Any]:
         return self.partial_kwargs.get("outlets", [])
 
+    @property
+    def email(self) -> str | Iterable[str] | None:
+        return self.partial_kwargs.get("email")
+
+    @property
+    def email_on_failure(self) -> bool:
+        return self.partial_kwargs.get("email_on_failure", True)
+
+    @property
+    def email_on_retry(self) -> bool:
+        return self.partial_kwargs.get("email_on_retry", True)
+
     @property
     def on_failure_fail_dagrun(self) -> bool:
         return bool(self.partial_kwargs.get("on_failure_fail_dagrun"))
diff --git a/airflow-core/src/airflow/models/taskinstance.py 
b/airflow-core/src/airflow/models/taskinstance.py
index 86b4eaf992d..4d440bd38a0 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -22,7 +22,6 @@ import hashlib
 import itertools
 import logging
 import math
-import operator
 import uuid
 from collections import defaultdict
 from collections.abc import Collection, Iterable
@@ -1514,34 +1513,19 @@ class TaskInstance(Base, LoggingMixin):
 
         ti.clear_next_method_args()
 
-        context = None
-        # In extreme cases (task instance heartbeat timeout in case of dag with
-        # parse error) we might _not_ have a Task.
-        if getattr(ti, "task", None):
-            context = ti.get_template_context(session)
-
-        if context is not None:
-            context["exception"] = error
-
         # Set state correctly and figure out how to log it and decide whether
         # to email
 
-        # Note, callback invocation needs to be handled by caller of
-        # _run_raw_task to avoid race conditions which could lead to duplicate
-        # invocations or miss invocation.
-
         # Since this function is called only when the TaskInstance state is 
running,
         # try_number contains the current try_number (not the next). We
         # only mark task instance as FAILED if the next task instance
         # try_number exceeds the max_tries ... or if force_fail is truthy
 
-        # Use the original task directly - scheduler only needs to check email 
settings
         # Actual callbacks are handled by the DAG processor, not the scheduler
         task = getattr(ti, "task", None)
 
         if not ti.is_eligible_to_retry():
             ti.state = TaskInstanceState.FAILED
-            email_for_state = operator.attrgetter("email_on_failure")
 
             if task and fail_fast:
                 _stop_remaining_tasks(task_instance=ti, session=session)
@@ -1553,7 +1537,6 @@ class TaskInstance(Base, LoggingMixin):
                 ti.prepare_db_for_next_try(session)
 
             ti.state = State.UP_FOR_RETRY
-            email_for_state = operator.attrgetter("email_on_retry")
 
         try:
             get_listener_manager().hook.on_task_instance_failed(
@@ -1562,12 +1545,7 @@ class TaskInstance(Base, LoggingMixin):
         except Exception:
             log.exception("error calling listener")
 
-        return {
-            "ti": ti,
-            "email_for_state": email_for_state,
-            "task": task,
-            "context": context,
-        }
+        return ti
 
     @staticmethod
     @provide_session
@@ -1600,7 +1578,7 @@ class TaskInstance(Base, LoggingMixin):
             fail_fast = False
         if test_mode is None:
             test_mode = self.test_mode
-        failure_context = TaskInstance.fetch_handle_failure_context(
+        ti = TaskInstance.fetch_handle_failure_context(
             ti=self,
             error=error,
             test_mode=test_mode,
@@ -1609,23 +1587,9 @@ class TaskInstance(Base, LoggingMixin):
         )
 
         _log_state(task_instance=self)
-        if (
-            (failure_task := failure_context["task"])
-            and failure_context["email_for_state"](failure_task)
-            and (failure_email := failure_task.email)
-        ):
-            try:
-                import structlog
-
-                from airflow.sdk.execution_time.task_runner import 
_send_task_error_email
-
-                log = structlog.get_logger(logger_name="task")
-                _send_task_error_email(failure_email, self, error, log=log)
-            except Exception:
-                log.exception("Failed to send email to: %s", failure_email)
 
         if not test_mode:
-            TaskInstance.save_to_db(failure_context["ti"], session)
+            TaskInstance.save_to_db(ti, session)
 
     def is_eligible_to_retry(self) -> bool:
         """Is task instance is eligible for retry."""
diff --git a/airflow-core/tests/unit/callbacks/test_callback_requests.py 
b/airflow-core/tests/unit/callbacks/test_callback_requests.py
index 7d6fa752a83..428ee34d166 100644
--- a/airflow-core/tests/unit/callbacks/test_callback_requests.py
+++ b/airflow-core/tests/unit/callbacks/test_callback_requests.py
@@ -20,15 +20,19 @@ import uuid
 from datetime import datetime
 
 import pytest
+from pydantic import TypeAdapter
 
 from airflow._shared.timezones import timezone
 from airflow.api_fastapi.execution_api.datamodels.taskinstance import (
     DagRun as DRDataModel,
     TaskInstance as TIDataModel,
+    TIRunContext,
 )
 from airflow.callbacks.callback_requests import (
+    CallbackRequest,
     DagCallbackRequest,
     DagRunContext,
+    EmailNotificationRequest,
     TaskCallbackRequest,
 )
 from airflow.models.dag import DAG
@@ -308,3 +312,111 @@ class TestDagCallbackRequestWithContext:
         assert result.context_from_server is not None
         assert result.context_from_server.dag_run.dag_id == "test_dag"
         assert result.context_from_server.last_ti.task_id == "test_task"
+
+
+class TestEmailNotificationRequest:
+    def test_email_notification_request_serialization(self):
+        """Test EmailNotificationRequest can be serialized and used in 
CallbackRequest union."""
+        ti_data = TIDataModel(
+            id=str(uuid.uuid4()),
+            task_id="test_task",
+            dag_id="test_dag",
+            run_id="test_run",
+            logical_date="2023-01-01T00:00:00Z",
+            try_number=1,
+            attempt_number=1,
+            state="failed",
+            dag_version_id=str(uuid.uuid4()),
+        )
+
+        current_time = timezone.utcnow()
+
+        # Create EmailNotificationRequest
+        email_request = EmailNotificationRequest(
+            filepath="/path/to/dag.py",
+            bundle_name="test_bundle",
+            bundle_version="1.0.0",
+            ti=ti_data,
+            context_from_server=TIRunContext(
+                dag_run=DRDataModel(
+                    dag_id="test_dag",
+                    run_id="test_run",
+                    logical_date="2023-01-01T00:00:00Z",
+                    data_interval_start=current_time,
+                    data_interval_end=current_time,
+                    run_after=current_time,
+                    start_date=current_time,
+                    end_date=None,
+                    run_type="manual",
+                    state="running",
+                    consumed_asset_events=[],
+                ),
+                max_tries=2,
+            ),
+            email_type="failure",
+            msg="Task failed",
+        )
+
+        # Test serialization
+        json_str = email_request.to_json()
+        assert "EmailNotificationRequest" in json_str
+        assert "failure" in json_str
+
+        # Test deserialization
+        result = EmailNotificationRequest.from_json(json_str)
+        assert result == email_request
+        assert result.email_type == "failure"
+        assert result.ti.task_id == "test_task"
+
+    def test_callback_request_union_with_email_notification(self):
+        """Test EmailNotificationRequest works in CallbackRequest union 
type."""
+        ti_data = TIDataModel(
+            id=str(uuid.uuid4()),
+            task_id="test_task",
+            dag_id="test_dag",
+            run_id="test_run",
+            logical_date="2023-01-01T00:00:00Z",
+            try_number=1,
+            attempt_number=1,
+            state="failed",
+            dag_version_id=str(uuid.uuid4()),
+        )
+
+        current_time = timezone.utcnow()
+
+        context_from_server = TIRunContext(
+            dag_run=DRDataModel(
+                dag_id="test_dag",
+                run_id="test_run",
+                logical_date="2023-01-01T00:00:00Z",
+                data_interval_start=current_time,
+                data_interval_end=current_time,
+                run_after=current_time,
+                start_date=current_time,
+                end_date=None,
+                run_type="manual",
+                state="running",
+                consumed_asset_events=[],
+            ),
+            max_tries=2,
+        )
+
+        email_data = {
+            "type": "EmailNotificationRequest",
+            "filepath": "/path/to/dag.py",
+            "bundle_name": "test_bundle",
+            "bundle_version": "1.0.0",
+            "ti": ti_data.model_dump(),
+            "context_from_server": context_from_server.model_dump(),
+            "email_type": "retry",
+            "msg": "Task retry",
+        }
+
+        # Validate as CallbackRequest union
+        adapter = TypeAdapter(CallbackRequest)
+        callback_request = adapter.validate_python(email_data)
+
+        # Verify it's correctly identified as EmailNotificationRequest
+        assert isinstance(callback_request, EmailNotificationRequest)
+        assert callback_request.email_type == "retry"
+        assert callback_request.ti.task_id == "test_task"
diff --git a/airflow-core/tests/unit/dag_processing/test_processor.py 
b/airflow-core/tests/unit/dag_processing/test_processor.py
index 6d793e4a0b6..14427a6dee5 100644
--- a/airflow-core/tests/unit/dag_processing/test_processor.py
+++ b/airflow-core/tests/unit/dag_processing/test_processor.py
@@ -25,6 +25,7 @@ import uuid
 from collections.abc import Callable
 from socket import socketpair
 from typing import TYPE_CHECKING, BinaryIO
+from unittest import mock
 from unittest.mock import MagicMock, patch
 
 import pytest
@@ -43,6 +44,7 @@ from airflow.callbacks.callback_requests import (
     CallbackRequest,
     DagCallbackRequest,
     DagRunContext,
+    EmailNotificationRequest,
     TaskCallbackRequest,
 )
 from airflow.dag_processing.processor import (
@@ -50,13 +52,13 @@ from airflow.dag_processing.processor import (
     DagFileParsingResult,
     DagFileProcessorProcess,
     _execute_dag_callbacks,
+    _execute_email_callbacks,
     _execute_task_callbacks,
     _parse_file,
     _pre_import_airflow_modules,
 )
 from airflow.models import DagBag, DagRun
-from airflow.models.baseoperator import BaseOperator
-from airflow.sdk import DAG
+from airflow.sdk import DAG, BaseOperator
 from airflow.sdk.api.client import Client
 from airflow.sdk.api.datamodels._generated import DagRunState
 from airflow.sdk.execution_time import comms
@@ -1083,3 +1085,243 @@ class TestExecuteTaskCallbacks:
         _execute_task_callbacks(dagbag, request, log)
 
         assert call_count == 2
+
+
+class TestExecuteEmailCallbacks:
+    """Test the email callback execution functionality."""
+
+    @patch("airflow.dag_processing.processor._send_task_error_email")
+    def test_execute_email_callbacks_failure(self, mock_send_email):
+        """Test email callback execution for task failure."""
+        dagbag = MagicMock(spec=DagBag)
+        with DAG(dag_id="test_dag") as dag:
+            BaseOperator(task_id="test_task", email="[email protected]")
+        dagbag.dags = {"test_dag": dag}
+
+        # Create TI data
+        ti_data = TIDataModel(
+            id=str(uuid.uuid4()),
+            task_id="test_task",
+            dag_id="test_dag",
+            run_id="test_run",
+            logical_date="2023-01-01T00:00:00Z",
+            try_number=1,
+            attempt_number=1,
+            state="failed",
+            dag_version_id=str(uuid.uuid4()),
+        )
+
+        current_time = timezone.utcnow()
+        request = EmailNotificationRequest(
+            filepath="/path/to/dag.py",
+            bundle_name="test_bundle",
+            bundle_version="1.0.0",
+            ti=ti_data,
+            context_from_server=TIRunContext(
+                dag_run=DRDataModel(
+                    dag_id="test_dag",
+                    run_id="test_run",
+                    logical_date="2023-01-01T00:00:00Z",
+                    data_interval_start=current_time,
+                    data_interval_end=current_time,
+                    run_after=current_time,
+                    start_date=current_time,
+                    end_date=None,
+                    run_type="manual",
+                    state="running",
+                    consumed_asset_events=[],
+                ),
+                max_tries=2,
+            ),
+            email_type="failure",
+            msg="Task failed",
+        )
+
+        log = MagicMock(spec=FilteringBoundLogger)
+
+        # Execute email callbacks
+        _execute_email_callbacks(dagbag, request, log)
+
+        # Verify email was sent
+        mock_send_email.assert_called_once_with(
+            "[email protected]",
+            mock.ANY,  # mocked Runtime TI
+            "Task failed",
+            log,
+        )
+
+    @patch("airflow.dag_processing.processor._send_task_error_email")
+    def test_execute_email_callbacks_retry(self, mock_send_email):
+        """Test email callback execution for task retry."""
+        dagbag = MagicMock(spec=DagBag)
+        with DAG(dag_id="test_dag") as dag:
+            BaseOperator(task_id="test_task", email=["[email protected]"])
+        dagbag.dags = {"test_dag": dag}
+
+        ti_data = TIDataModel(
+            id=str(uuid.uuid4()),
+            task_id="test_task",
+            dag_id="test_dag",
+            run_id="test_run",
+            logical_date="2023-01-01T00:00:00Z",
+            try_number=2,
+            attempt_number=2,
+            state="up_for_retry",
+            dag_version_id=str(uuid.uuid4()),
+        )
+
+        current_time = timezone.utcnow()
+
+        request = EmailNotificationRequest(
+            filepath="/path/to/dag.py",
+            bundle_name="test_bundle",
+            bundle_version="1.0.0",
+            ti=ti_data,
+            email_type="retry",
+            context_from_server=TIRunContext(
+                dag_run=DRDataModel(
+                    dag_id="test_dag",
+                    run_id="test_run",
+                    logical_date="2023-01-01T00:00:00Z",
+                    data_interval_start=current_time,
+                    data_interval_end=current_time,
+                    run_after=current_time,
+                    start_date=current_time,
+                    end_date=None,
+                    run_type="manual",
+                    state="running",
+                    consumed_asset_events=[],
+                ),
+                max_tries=2,
+            ),
+            msg="Task retry",
+        )
+
+        log = MagicMock(spec=FilteringBoundLogger)
+
+        # Execute email callbacks
+        _execute_email_callbacks(dagbag, request, log)
+
+        # Verify email was sent
+        mock_send_email.assert_called_once_with(
+            ["[email protected]"],
+            mock.ANY,  # mocked Runtime TI
+            "Task retry",
+            log,
+        )
+
+    @patch("airflow.dag_processing.processor._send_task_error_email")
+    def test_execute_email_callbacks_no_email_configured(self, 
mock_send_email):
+        """Test email callback when no email is configured."""
+        dagbag = MagicMock(spec=DagBag)
+        with DAG(dag_id="test_dag") as dag:
+            BaseOperator(task_id="test_task", email=None)
+        dagbag.dags = {"test_dag": dag}
+
+        ti_data = TIDataModel(
+            id=str(uuid.uuid4()),
+            task_id="test_task",
+            dag_id="test_dag",
+            run_id="test_run",
+            logical_date="2023-01-01T00:00:00Z",
+            try_number=1,
+            attempt_number=1,
+            state="failed",
+            dag_version_id=str(uuid.uuid4()),
+        )
+
+        current_time = timezone.utcnow()
+        request = EmailNotificationRequest(
+            filepath="/path/to/dag.py",
+            bundle_name="test_bundle",
+            bundle_version="1.0.0",
+            ti=ti_data,
+            context_from_server=TIRunContext(
+                dag_run=DRDataModel(
+                    dag_id="test_dag",
+                    run_id="test_run",
+                    logical_date="2023-01-01T00:00:00Z",
+                    data_interval_start=current_time,
+                    data_interval_end=current_time,
+                    run_after=current_time,
+                    start_date=current_time,
+                    end_date=None,
+                    run_type="manual",
+                    state="running",
+                    consumed_asset_events=[],
+                ),
+                max_tries=2,
+            ),
+            email_type="failure",
+        )
+
+        log = MagicMock(spec=FilteringBoundLogger)
+
+        # Execute email callbacks - should not raise exception
+        _execute_email_callbacks(dagbag, request, log)
+
+        # Verify warning was logged
+        log.warning.assert_called_once()
+        warning_call = log.warning.call_args[0][0]
+        assert "Email callback requested but no email configured" in 
warning_call
+        mock_send_email.assert_not_called()
+
+    @patch("airflow.dag_processing.processor._send_task_error_email")
+    def test_execute_email_callbacks_email_disabled_for_type(self, 
mock_send_email):
+        """Test email callback when email is disabled for the specific type."""
+        dagbag = MagicMock(spec=DagBag)
+        with DAG(dag_id="test_dag") as dag:
+            BaseOperator(task_id="test_task", email=["[email protected]"], 
email_on_failure=False)
+        dagbag.dags = {"test_dag": dag}
+
+        ti_data = TIDataModel(
+            id=str(uuid.uuid4()),
+            task_id="test_task",
+            dag_id="test_dag",
+            run_id="test_run",
+            logical_date="2023-01-01T00:00:00Z",
+            try_number=1,
+            attempt_number=1,
+            state="failed",
+            dag_version_id=str(uuid.uuid4()),
+        )
+
+        current_time = timezone.utcnow()
+
+        # Create request for failure (but email_on_failure is False)
+        request = EmailNotificationRequest(
+            filepath="/path/to/dag.py",
+            bundle_name="test_bundle",
+            bundle_version="1.0.0",
+            ti=ti_data,
+            context_from_server=TIRunContext(
+                dag_run=DRDataModel(
+                    dag_id="test_dag",
+                    run_id="test_run",
+                    logical_date="2023-01-01T00:00:00Z",
+                    data_interval_start=current_time,
+                    data_interval_end=current_time,
+                    run_after=current_time,
+                    start_date=current_time,
+                    end_date=None,
+                    run_type="manual",
+                    state="running",
+                    consumed_asset_events=[],
+                ),
+                max_tries=2,
+            ),
+            email_type="failure",
+        )
+
+        log = MagicMock(spec=FilteringBoundLogger)
+
+        # Execute email callbacks
+        _execute_email_callbacks(dagbag, request, log)
+
+        # Verify no email was sent
+        mock_send_email.assert_not_called()
+
+        # Verify info log about email being disabled
+        log.info.assert_called_once()
+        info_call = log.info.call_args[0][0]
+        assert "Email not sent - task configured with email_on_" in info_call
diff --git a/providers/openlineage/tests/unit/openlineage/utils/test_utils.py 
b/providers/openlineage/tests/unit/openlineage/utils/test_utils.py
index d99889aedd5..d8ac2ade76a 100644
--- a/providers/openlineage/tests/unit/openlineage/utils/test_utils.py
+++ b/providers/openlineage/tests/unit/openlineage/utils/test_utils.py
@@ -1763,7 +1763,7 @@ def test_taskinstance_info_af3():
     runtime_ti.bundle_instance = bundle_instance
 
     assert dict(TaskInstanceInfo(runtime_ti)) == {
-        "log_url": None,
+        "log_url": runtime_ti.log_url,
         "map_index": 2,
         "try_number": 1,
         "dag_bundle_version": "bundle_version",
diff --git a/task-sdk/src/airflow/sdk/bases/operator.py 
b/task-sdk/src/airflow/sdk/bases/operator.py
index ca4d5701071..7a8e80ec4d9 100644
--- a/task-sdk/src/airflow/sdk/bases/operator.py
+++ b/task-sdk/src/airflow/sdk/bases/operator.py
@@ -213,6 +213,8 @@ class _PartialDescriptor:
 OPERATOR_DEFAULTS: dict[str, Any] = {
     "allow_nested_operators": True,
     "depends_on_past": False,
+    "email_on_failure": True,
+    "email_on_retry": True,
     "execution_timeout": DEFAULT_TASK_EXECUTION_TIMEOUT,
     # "executor": DEFAULT_EXECUTOR,
     "executor_config": {},
diff --git a/task-sdk/src/airflow/sdk/definitions/mappedoperator.py 
b/task-sdk/src/airflow/sdk/definitions/mappedoperator.py
index 09ae0420e57..0bffa59c0d3 100644
--- a/task-sdk/src/airflow/sdk/definitions/mappedoperator.py
+++ b/task-sdk/src/airflow/sdk/definitions/mappedoperator.py
@@ -402,6 +402,14 @@ class MappedOperator(AbstractOperator):
     def email(self) -> None | str | Iterable[str]:
         return self.partial_kwargs.get("email")
 
+    @property
+    def email_on_failure(self) -> bool:
+        return self.partial_kwargs.get("email_on_failure", True)
+
+    @property
+    def email_on_retry(self) -> bool:
+        return self.partial_kwargs.get("email_on_retry", True)
+
     @property
     def map_index_template(self) -> None | str:
         return self.partial_kwargs.get("map_index_template")
diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py 
b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
index d694d4f2b80..0ca66d05e93 100644
--- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
+++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
@@ -31,6 +31,7 @@ from datetime import datetime, timezone
 from itertools import product
 from pathlib import Path
 from typing import TYPE_CHECKING, Annotated, Any, Literal
+from urllib.parse import quote
 
 import attrs
 import lazy_object_proxy
@@ -147,8 +148,6 @@ class RuntimeTaskInstance(TaskInstance):
 
     rendered_map_index: str | None = None
 
-    log_url: str | None = None
-
     def __rich_repr__(self):
         yield "id", self.id
         yield "task_id", self.task_id
@@ -182,8 +181,6 @@ class RuntimeTaskInstance(TaskInstance):
             "run_id": self.run_id,
             "task": self.task,
             "task_instance": self,
-            # TODO: Ensure that ti.log_url and such are available to use in 
context
-            #   especially after removal of `conf` from Context.
             "ti": self,
             "outlet_events": OutletEventAccessors(),
             "inlet_events": InletEventsAccessors(self.task.inlets),
@@ -552,6 +549,26 @@ class RuntimeTaskInstance(TaskInstance):
 
         return response.state
 
+    @property
+    def log_url(self) -> str:
+        run_id = quote(self.run_id)
+        base_url = conf.get("api", "base_url", 
fallback="http://localhost:8080/";)
+        map_index_value = self.map_index
+        map_index = (
+            f"/mapped/{map_index_value}" if map_index_value is not None and 
map_index_value >= 0 else ""
+        )
+        try_number_value = self.try_number
+        try_number = (
+            f"?try_number={try_number_value}" if try_number_value is not None 
and try_number_value > 0 else ""
+        )
+        _log_uri = 
f"{base_url}dags/{self.dag_id}/runs/{run_id}/tasks/{self.task_id}{map_index}{try_number}"
+        return _log_uri
+
+    @property
+    def mark_success_url(self) -> str:
+        """URL to mark TI success."""
+        return self.log_url
+
 
 def _xcom_push(ti: RuntimeTaskInstance, key: str, value: Any, mapped_length: 
int | None = None) -> None:
     """Push a XCom through XCom.set, which pushes to XCom Backend if 
configured."""
@@ -728,7 +745,6 @@ def startup() -> tuple[RuntimeTaskInstance, Context, 
Logger]:
 
     with _airflow_parsing_context_manager(dag_id=msg.ti.dag_id, 
task_id=msg.ti.task_id):
         ti = parse(msg, log)
-        ti.log_url = get_log_url_from_ti(ti)
     log.debug("Dag file parsed", file=msg.dag_rel_path)
 
     run_as_user = getattr(ti.task, "run_as_user", None) or conf.get(
diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py 
b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
index 841137c07b3..6de891ad837 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
@@ -117,7 +117,6 @@ from airflow.sdk.execution_time.task_runner import (
     _push_xcom_if_needed,
     _xcom_push,
     finalize,
-    get_log_url_from_ti,
     parse,
     run,
     startup,
@@ -2608,7 +2607,6 @@ class TestTaskRunnerCallsListeners:
 
         runtime_ti, context, log = startup()
         assert runtime_ti is not None
-        assert runtime_ti.log_url == get_log_url_from_ti(runtime_ti)
         assert isinstance(listener.component, TaskRunnerMarker)
         del listener.component
 


Reply via email to