This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 2dcb88ff758 Move email notifications from scheduler to DAG processor
(#55238)
2dcb88ff758 is described below
commit 2dcb88ff758516572706d5b2ff0b5fca50b3a14e
Author: Kaxil Naik <[email protected]>
AuthorDate: Thu Sep 4 12:45:37 2025 +0100
Move email notifications from scheduler to DAG processor (#55238)
---
.../src/airflow/callbacks/callback_requests.py | 14 +-
.../src/airflow/dag_processing/processor.py | 69 +++++-
.../src/airflow/jobs/scheduler_job_runner.py | 54 ++++-
airflow-core/src/airflow/models/mappedoperator.py | 13 ++
airflow-core/src/airflow/models/taskinstance.py | 42 +---
.../tests/unit/callbacks/test_callback_requests.py | 112 ++++++++++
.../tests/unit/dag_processing/test_processor.py | 246 ++++++++++++++++++++-
.../tests/unit/openlineage/utils/test_utils.py | 2 +-
task-sdk/src/airflow/sdk/bases/operator.py | 2 +
.../src/airflow/sdk/definitions/mappedoperator.py | 8 +
.../src/airflow/sdk/execution_time/task_runner.py | 26 ++-
.../task_sdk/execution_time/test_task_runner.py | 2 -
12 files changed, 536 insertions(+), 54 deletions(-)
diff --git a/airflow-core/src/airflow/callbacks/callback_requests.py
b/airflow-core/src/airflow/callbacks/callback_requests.py
index 611f9ac7b4d..e0666b397c2 100644
--- a/airflow-core/src/airflow/callbacks/callback_requests.py
+++ b/airflow-core/src/airflow/callbacks/callback_requests.py
@@ -77,6 +77,18 @@ class TaskCallbackRequest(BaseCallbackRequest):
}
+class EmailNotificationRequest(BaseCallbackRequest):
+ """Email notification request for task failures/retries."""
+
+ ti: ti_datamodel.TaskInstance
+ """Simplified Task Instance representation"""
+ email_type: Literal["failure", "retry"] = "failure"
+ """Whether this is for a failure or retry email"""
+ context_from_server: ti_datamodel.TIRunContext
+ """Task execution context from the Server"""
+ type: Literal["EmailNotificationRequest"] = "EmailNotificationRequest"
+
+
class DagRunContext(BaseModel):
"""Class to pass context info from the server to build a Execution context
object."""
@@ -96,6 +108,6 @@ class DagCallbackRequest(BaseCallbackRequest):
CallbackRequest = Annotated[
- DagCallbackRequest | TaskCallbackRequest,
+ DagCallbackRequest | TaskCallbackRequest | EmailNotificationRequest,
Field(discriminator="type"),
]
diff --git a/airflow-core/src/airflow/dag_processing/processor.py
b/airflow-core/src/airflow/dag_processing/processor.py
index e5fbd9b436f..d4c73e61fea 100644
--- a/airflow-core/src/airflow/dag_processing/processor.py
+++ b/airflow-core/src/airflow/dag_processing/processor.py
@@ -31,6 +31,7 @@ from pydantic import BaseModel, Field, TypeAdapter
from airflow.callbacks.callback_requests import (
CallbackRequest,
DagCallbackRequest,
+ EmailNotificationRequest,
TaskCallbackRequest,
)
from airflow.configuration import conf
@@ -51,7 +52,7 @@ from airflow.sdk.execution_time.comms import (
VariableResult,
)
from airflow.sdk.execution_time.supervisor import WatchedSubprocess
-from airflow.sdk.execution_time.task_runner import RuntimeTaskInstance
+from airflow.sdk.execution_time.task_runner import RuntimeTaskInstance,
_send_task_error_email
from airflow.serialization.serialized_objects import LazyDeserializedDAG,
SerializedDAG
from airflow.stats import Stats
from airflow.utils.file import iter_airflow_imports
@@ -224,8 +225,10 @@ def _execute_callbacks(
log.debug("Processing Callback Request", request=request.to_json())
if isinstance(request, TaskCallbackRequest):
_execute_task_callbacks(dagbag, request, log)
- if isinstance(request, DagCallbackRequest):
+ elif isinstance(request, DagCallbackRequest):
_execute_dag_callbacks(dagbag, request, log)
+ elif isinstance(request, EmailNotificationRequest):
+ _execute_email_callbacks(dagbag, request, log)
def _execute_dag_callbacks(dagbag: DagBag, request: DagCallbackRequest, log:
FilteringBoundLogger) -> None:
@@ -335,6 +338,68 @@ def _execute_task_callbacks(dagbag: DagBag, request:
TaskCallbackRequest, log: F
log.exception("Error in callback at index %d: %s", idx,
callback_repr)
+def _execute_email_callbacks(
+ dagbag: DagBag, request: EmailNotificationRequest, log:
FilteringBoundLogger
+) -> None:
+ """Execute email notification for task failure/retry."""
+ dag = dagbag.dags[request.ti.dag_id]
+ task = dag.get_task(request.ti.task_id)
+
+ if not task.email:
+ log.warning(
+ "Email callback requested but no email configured",
+ dag_id=request.ti.dag_id,
+ task_id=request.ti.task_id,
+ run_id=request.ti.run_id,
+ )
+ return
+
+ # Check if email should be sent based on task configuration
+ should_send_email = False
+ if request.email_type == "failure" and task.email_on_failure:
+ should_send_email = True
+ elif request.email_type == "retry" and task.email_on_retry:
+ should_send_email = True
+
+ if not should_send_email:
+ log.info(
+ "Email not sent - task configured with email_on_%s=False",
+ request.email_type,
+ dag_id=request.ti.dag_id,
+ task_id=request.ti.task_id,
+ run_id=request.ti.run_id,
+ )
+ return
+
+ ctx_from_server = request.context_from_server
+
+ runtime_ti = RuntimeTaskInstance.model_construct(
+ **request.ti.model_dump(exclude_unset=True),
+ task=task,
+ _ti_context_from_server=ctx_from_server,
+ max_tries=ctx_from_server.max_tries,
+ )
+
+ log.info(
+ "Sending %s email for task %s",
+ request.email_type,
+ request.ti.task_id,
+ dag_id=request.ti.dag_id,
+ run_id=request.ti.run_id,
+ )
+
+ try:
+ _send_task_error_email(task.email, runtime_ti, request.msg, log)
+ except Exception:
+ log.exception(
+ "Failed to send %s email",
+ request.email_type,
+ dag_id=request.ti.dag_id,
+ task_id=request.ti.task_id,
+ run_id=request.ti.run_id,
+ )
+
+
def in_process_api_server() -> InProcessExecutionAPI:
from airflow.api_fastapi.execution_api.app import InProcessExecutionAPI
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index 217d8d663c3..e8778d17d18 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -40,7 +40,12 @@ from sqlalchemy.sql import expression
from airflow import settings
from airflow._shared.timezones import timezone
from airflow.api_fastapi.execution_api.datamodels.taskinstance import DagRun
as DRDataModel, TIRunContext
-from airflow.callbacks.callback_requests import DagCallbackRequest,
DagRunContext, TaskCallbackRequest
+from airflow.callbacks.callback_requests import (
+ DagCallbackRequest,
+ DagRunContext,
+ EmailNotificationRequest,
+ TaskCallbackRequest,
+)
from airflow.configuration import conf
from airflow.dag_processing.bundles.base import BundleUsageTrackingManager
from airflow.executors import workloads
@@ -750,7 +755,28 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
cls, executor: BaseExecutor, job_id: str | None, scheduler_dag_bag:
DBDagBag, session: Session
) -> int:
"""
- Respond to executor events.
+ Process task completion events from the executor and update task
instance states.
+
+ This method handles task state transitions reported by executors,
ensuring proper
+ state management, callback execution, and notification processing. It
maintains
+ scheduler architectural principles by delegating user code execution
to appropriate
+ isolated processes.
+
+ The method handles several key scenarios:
+ 1. **Normal task completion**: Updates task states for
successful/failed tasks
+ 2. **External termination**: Detects tasks killed outside Airflow and
marks them as failed
+ 3. **Task requeuing**: Handles tasks that were requeued by other
schedulers or executors
+ 4. **Callback processing**: Sends task callback requests to DAG
Processor for execution
+ 5. **Email notifications**: Sends email notification requests to DAG
Processor
+
+ :param executor: The executor reporting task completion events
+ :param job_id: The scheduler job ID, used to detect task requeuing by
other schedulers
+ :param scheduler_dag_bag: Serialized DAG bag for retrieving task
definitions
+ :param session: Database session for task instance updates
+
+ :return: Number of events processed from the executor event buffer
+
+ :raises Exception: If DAG retrieval or task processing fails, logs
error and continues
This is a classmethod because this is also used in `dag.test()`.
`dag.test` execute DAGs with no scheduler, therefore it needs to
handle the events pushed by the
@@ -929,6 +955,30 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
ti.set_state(None)
continue
+ # Send email notification request to DAG processor via DB
+ if task.email and (task.email_on_failure or
task.email_on_retry):
+ cls.logger().info(
+ "Sending email request for task %s to DAG Processor",
+ ti,
+ )
+ email_request = EmailNotificationRequest(
+ filepath=ti.dag_model.relative_fileloc,
+ bundle_name=ti.dag_version.bundle_name,
+ bundle_version=ti.dag_version.bundle_version,
+ ti=ti,
+ msg=msg,
+ email_type="retry" if ti.is_eligible_to_retry() else
"failure",
+ context_from_server=TIRunContext(
+ dag_run=DRDataModel.model_validate(ti.dag_run,
from_attributes=True),
+ max_tries=ti.max_tries,
+ variables=[],
+ connections=[],
+ xcom_keys_to_clear=[],
+ ),
+ )
+ executor.send_callback(email_request)
+
+ # Update task state - emails are handled by DAG processor now
ti.handle_failure(error=msg, session=session)
return len(event_buffer)
diff --git a/airflow-core/src/airflow/models/mappedoperator.py
b/airflow-core/src/airflow/models/mappedoperator.py
index 310573985da..7c7988d3c9b 100644
--- a/airflow-core/src/airflow/models/mappedoperator.py
+++ b/airflow-core/src/airflow/models/mappedoperator.py
@@ -20,6 +20,7 @@ from __future__ import annotations
import functools
import operator
+from collections.abc import Iterable
from typing import TYPE_CHECKING, Any, ClassVar, TypeAlias, TypeGuard, overload
import attrs
@@ -298,6 +299,18 @@ class MappedOperator(DAGNode):
def outlets(self) -> list[Any]:
return self.partial_kwargs.get("outlets", [])
+ @property
+ def email(self) -> str | Iterable[str] | None:
+ return self.partial_kwargs.get("email")
+
+ @property
+ def email_on_failure(self) -> bool:
+ return self.partial_kwargs.get("email_on_failure", True)
+
+ @property
+ def email_on_retry(self) -> bool:
+ return self.partial_kwargs.get("email_on_retry", True)
+
@property
def on_failure_fail_dagrun(self) -> bool:
return bool(self.partial_kwargs.get("on_failure_fail_dagrun"))
diff --git a/airflow-core/src/airflow/models/taskinstance.py
b/airflow-core/src/airflow/models/taskinstance.py
index 86b4eaf992d..4d440bd38a0 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -22,7 +22,6 @@ import hashlib
import itertools
import logging
import math
-import operator
import uuid
from collections import defaultdict
from collections.abc import Collection, Iterable
@@ -1514,34 +1513,19 @@ class TaskInstance(Base, LoggingMixin):
ti.clear_next_method_args()
- context = None
- # In extreme cases (task instance heartbeat timeout in case of dag with
- # parse error) we might _not_ have a Task.
- if getattr(ti, "task", None):
- context = ti.get_template_context(session)
-
- if context is not None:
- context["exception"] = error
-
# Set state correctly and figure out how to log it and decide whether
# to email
- # Note, callback invocation needs to be handled by caller of
- # _run_raw_task to avoid race conditions which could lead to duplicate
- # invocations or miss invocation.
-
# Since this function is called only when the TaskInstance state is
running,
# try_number contains the current try_number (not the next). We
# only mark task instance as FAILED if the next task instance
# try_number exceeds the max_tries ... or if force_fail is truthy
- # Use the original task directly - scheduler only needs to check email
settings
# Actual callbacks are handled by the DAG processor, not the scheduler
task = getattr(ti, "task", None)
if not ti.is_eligible_to_retry():
ti.state = TaskInstanceState.FAILED
- email_for_state = operator.attrgetter("email_on_failure")
if task and fail_fast:
_stop_remaining_tasks(task_instance=ti, session=session)
@@ -1553,7 +1537,6 @@ class TaskInstance(Base, LoggingMixin):
ti.prepare_db_for_next_try(session)
ti.state = State.UP_FOR_RETRY
- email_for_state = operator.attrgetter("email_on_retry")
try:
get_listener_manager().hook.on_task_instance_failed(
@@ -1562,12 +1545,7 @@ class TaskInstance(Base, LoggingMixin):
except Exception:
log.exception("error calling listener")
- return {
- "ti": ti,
- "email_for_state": email_for_state,
- "task": task,
- "context": context,
- }
+ return ti
@staticmethod
@provide_session
@@ -1600,7 +1578,7 @@ class TaskInstance(Base, LoggingMixin):
fail_fast = False
if test_mode is None:
test_mode = self.test_mode
- failure_context = TaskInstance.fetch_handle_failure_context(
+ ti = TaskInstance.fetch_handle_failure_context(
ti=self,
error=error,
test_mode=test_mode,
@@ -1609,23 +1587,9 @@ class TaskInstance(Base, LoggingMixin):
)
_log_state(task_instance=self)
- if (
- (failure_task := failure_context["task"])
- and failure_context["email_for_state"](failure_task)
- and (failure_email := failure_task.email)
- ):
- try:
- import structlog
-
- from airflow.sdk.execution_time.task_runner import
_send_task_error_email
-
- log = structlog.get_logger(logger_name="task")
- _send_task_error_email(failure_email, self, error, log=log)
- except Exception:
- log.exception("Failed to send email to: %s", failure_email)
if not test_mode:
- TaskInstance.save_to_db(failure_context["ti"], session)
+ TaskInstance.save_to_db(ti, session)
def is_eligible_to_retry(self) -> bool:
"""Is task instance is eligible for retry."""
diff --git a/airflow-core/tests/unit/callbacks/test_callback_requests.py
b/airflow-core/tests/unit/callbacks/test_callback_requests.py
index 7d6fa752a83..428ee34d166 100644
--- a/airflow-core/tests/unit/callbacks/test_callback_requests.py
+++ b/airflow-core/tests/unit/callbacks/test_callback_requests.py
@@ -20,15 +20,19 @@ import uuid
from datetime import datetime
import pytest
+from pydantic import TypeAdapter
from airflow._shared.timezones import timezone
from airflow.api_fastapi.execution_api.datamodels.taskinstance import (
DagRun as DRDataModel,
TaskInstance as TIDataModel,
+ TIRunContext,
)
from airflow.callbacks.callback_requests import (
+ CallbackRequest,
DagCallbackRequest,
DagRunContext,
+ EmailNotificationRequest,
TaskCallbackRequest,
)
from airflow.models.dag import DAG
@@ -308,3 +312,111 @@ class TestDagCallbackRequestWithContext:
assert result.context_from_server is not None
assert result.context_from_server.dag_run.dag_id == "test_dag"
assert result.context_from_server.last_ti.task_id == "test_task"
+
+
+class TestEmailNotificationRequest:
+ def test_email_notification_request_serialization(self):
+ """Test EmailNotificationRequest can be serialized and used in
CallbackRequest union."""
+ ti_data = TIDataModel(
+ id=str(uuid.uuid4()),
+ task_id="test_task",
+ dag_id="test_dag",
+ run_id="test_run",
+ logical_date="2023-01-01T00:00:00Z",
+ try_number=1,
+ attempt_number=1,
+ state="failed",
+ dag_version_id=str(uuid.uuid4()),
+ )
+
+ current_time = timezone.utcnow()
+
+ # Create EmailNotificationRequest
+ email_request = EmailNotificationRequest(
+ filepath="/path/to/dag.py",
+ bundle_name="test_bundle",
+ bundle_version="1.0.0",
+ ti=ti_data,
+ context_from_server=TIRunContext(
+ dag_run=DRDataModel(
+ dag_id="test_dag",
+ run_id="test_run",
+ logical_date="2023-01-01T00:00:00Z",
+ data_interval_start=current_time,
+ data_interval_end=current_time,
+ run_after=current_time,
+ start_date=current_time,
+ end_date=None,
+ run_type="manual",
+ state="running",
+ consumed_asset_events=[],
+ ),
+ max_tries=2,
+ ),
+ email_type="failure",
+ msg="Task failed",
+ )
+
+ # Test serialization
+ json_str = email_request.to_json()
+ assert "EmailNotificationRequest" in json_str
+ assert "failure" in json_str
+
+ # Test deserialization
+ result = EmailNotificationRequest.from_json(json_str)
+ assert result == email_request
+ assert result.email_type == "failure"
+ assert result.ti.task_id == "test_task"
+
+ def test_callback_request_union_with_email_notification(self):
+ """Test EmailNotificationRequest works in CallbackRequest union
type."""
+ ti_data = TIDataModel(
+ id=str(uuid.uuid4()),
+ task_id="test_task",
+ dag_id="test_dag",
+ run_id="test_run",
+ logical_date="2023-01-01T00:00:00Z",
+ try_number=1,
+ attempt_number=1,
+ state="failed",
+ dag_version_id=str(uuid.uuid4()),
+ )
+
+ current_time = timezone.utcnow()
+
+ context_from_server = TIRunContext(
+ dag_run=DRDataModel(
+ dag_id="test_dag",
+ run_id="test_run",
+ logical_date="2023-01-01T00:00:00Z",
+ data_interval_start=current_time,
+ data_interval_end=current_time,
+ run_after=current_time,
+ start_date=current_time,
+ end_date=None,
+ run_type="manual",
+ state="running",
+ consumed_asset_events=[],
+ ),
+ max_tries=2,
+ )
+
+ email_data = {
+ "type": "EmailNotificationRequest",
+ "filepath": "/path/to/dag.py",
+ "bundle_name": "test_bundle",
+ "bundle_version": "1.0.0",
+ "ti": ti_data.model_dump(),
+ "context_from_server": context_from_server.model_dump(),
+ "email_type": "retry",
+ "msg": "Task retry",
+ }
+
+ # Validate as CallbackRequest union
+ adapter = TypeAdapter(CallbackRequest)
+ callback_request = adapter.validate_python(email_data)
+
+ # Verify it's correctly identified as EmailNotificationRequest
+ assert isinstance(callback_request, EmailNotificationRequest)
+ assert callback_request.email_type == "retry"
+ assert callback_request.ti.task_id == "test_task"
diff --git a/airflow-core/tests/unit/dag_processing/test_processor.py
b/airflow-core/tests/unit/dag_processing/test_processor.py
index 6d793e4a0b6..14427a6dee5 100644
--- a/airflow-core/tests/unit/dag_processing/test_processor.py
+++ b/airflow-core/tests/unit/dag_processing/test_processor.py
@@ -25,6 +25,7 @@ import uuid
from collections.abc import Callable
from socket import socketpair
from typing import TYPE_CHECKING, BinaryIO
+from unittest import mock
from unittest.mock import MagicMock, patch
import pytest
@@ -43,6 +44,7 @@ from airflow.callbacks.callback_requests import (
CallbackRequest,
DagCallbackRequest,
DagRunContext,
+ EmailNotificationRequest,
TaskCallbackRequest,
)
from airflow.dag_processing.processor import (
@@ -50,13 +52,13 @@ from airflow.dag_processing.processor import (
DagFileParsingResult,
DagFileProcessorProcess,
_execute_dag_callbacks,
+ _execute_email_callbacks,
_execute_task_callbacks,
_parse_file,
_pre_import_airflow_modules,
)
from airflow.models import DagBag, DagRun
-from airflow.models.baseoperator import BaseOperator
-from airflow.sdk import DAG
+from airflow.sdk import DAG, BaseOperator
from airflow.sdk.api.client import Client
from airflow.sdk.api.datamodels._generated import DagRunState
from airflow.sdk.execution_time import comms
@@ -1083,3 +1085,243 @@ class TestExecuteTaskCallbacks:
_execute_task_callbacks(dagbag, request, log)
assert call_count == 2
+
+
+class TestExecuteEmailCallbacks:
+ """Test the email callback execution functionality."""
+
+ @patch("airflow.dag_processing.processor._send_task_error_email")
+ def test_execute_email_callbacks_failure(self, mock_send_email):
+ """Test email callback execution for task failure."""
+ dagbag = MagicMock(spec=DagBag)
+ with DAG(dag_id="test_dag") as dag:
+ BaseOperator(task_id="test_task", email="[email protected]")
+ dagbag.dags = {"test_dag": dag}
+
+ # Create TI data
+ ti_data = TIDataModel(
+ id=str(uuid.uuid4()),
+ task_id="test_task",
+ dag_id="test_dag",
+ run_id="test_run",
+ logical_date="2023-01-01T00:00:00Z",
+ try_number=1,
+ attempt_number=1,
+ state="failed",
+ dag_version_id=str(uuid.uuid4()),
+ )
+
+ current_time = timezone.utcnow()
+ request = EmailNotificationRequest(
+ filepath="/path/to/dag.py",
+ bundle_name="test_bundle",
+ bundle_version="1.0.0",
+ ti=ti_data,
+ context_from_server=TIRunContext(
+ dag_run=DRDataModel(
+ dag_id="test_dag",
+ run_id="test_run",
+ logical_date="2023-01-01T00:00:00Z",
+ data_interval_start=current_time,
+ data_interval_end=current_time,
+ run_after=current_time,
+ start_date=current_time,
+ end_date=None,
+ run_type="manual",
+ state="running",
+ consumed_asset_events=[],
+ ),
+ max_tries=2,
+ ),
+ email_type="failure",
+ msg="Task failed",
+ )
+
+ log = MagicMock(spec=FilteringBoundLogger)
+
+ # Execute email callbacks
+ _execute_email_callbacks(dagbag, request, log)
+
+ # Verify email was sent
+ mock_send_email.assert_called_once_with(
+ "[email protected]",
+ mock.ANY, # mocked Runtime TI
+ "Task failed",
+ log,
+ )
+
+ @patch("airflow.dag_processing.processor._send_task_error_email")
+ def test_execute_email_callbacks_retry(self, mock_send_email):
+ """Test email callback execution for task retry."""
+ dagbag = MagicMock(spec=DagBag)
+ with DAG(dag_id="test_dag") as dag:
+ BaseOperator(task_id="test_task", email=["[email protected]"])
+ dagbag.dags = {"test_dag": dag}
+
+ ti_data = TIDataModel(
+ id=str(uuid.uuid4()),
+ task_id="test_task",
+ dag_id="test_dag",
+ run_id="test_run",
+ logical_date="2023-01-01T00:00:00Z",
+ try_number=2,
+ attempt_number=2,
+ state="up_for_retry",
+ dag_version_id=str(uuid.uuid4()),
+ )
+
+ current_time = timezone.utcnow()
+
+ request = EmailNotificationRequest(
+ filepath="/path/to/dag.py",
+ bundle_name="test_bundle",
+ bundle_version="1.0.0",
+ ti=ti_data,
+ email_type="retry",
+ context_from_server=TIRunContext(
+ dag_run=DRDataModel(
+ dag_id="test_dag",
+ run_id="test_run",
+ logical_date="2023-01-01T00:00:00Z",
+ data_interval_start=current_time,
+ data_interval_end=current_time,
+ run_after=current_time,
+ start_date=current_time,
+ end_date=None,
+ run_type="manual",
+ state="running",
+ consumed_asset_events=[],
+ ),
+ max_tries=2,
+ ),
+ msg="Task retry",
+ )
+
+ log = MagicMock(spec=FilteringBoundLogger)
+
+ # Execute email callbacks
+ _execute_email_callbacks(dagbag, request, log)
+
+ # Verify email was sent
+ mock_send_email.assert_called_once_with(
+ ["[email protected]"],
+ mock.ANY, # mocked Runtime TI
+ "Task retry",
+ log,
+ )
+
+ @patch("airflow.dag_processing.processor._send_task_error_email")
+ def test_execute_email_callbacks_no_email_configured(self,
mock_send_email):
+ """Test email callback when no email is configured."""
+ dagbag = MagicMock(spec=DagBag)
+ with DAG(dag_id="test_dag") as dag:
+ BaseOperator(task_id="test_task", email=None)
+ dagbag.dags = {"test_dag": dag}
+
+ ti_data = TIDataModel(
+ id=str(uuid.uuid4()),
+ task_id="test_task",
+ dag_id="test_dag",
+ run_id="test_run",
+ logical_date="2023-01-01T00:00:00Z",
+ try_number=1,
+ attempt_number=1,
+ state="failed",
+ dag_version_id=str(uuid.uuid4()),
+ )
+
+ current_time = timezone.utcnow()
+ request = EmailNotificationRequest(
+ filepath="/path/to/dag.py",
+ bundle_name="test_bundle",
+ bundle_version="1.0.0",
+ ti=ti_data,
+ context_from_server=TIRunContext(
+ dag_run=DRDataModel(
+ dag_id="test_dag",
+ run_id="test_run",
+ logical_date="2023-01-01T00:00:00Z",
+ data_interval_start=current_time,
+ data_interval_end=current_time,
+ run_after=current_time,
+ start_date=current_time,
+ end_date=None,
+ run_type="manual",
+ state="running",
+ consumed_asset_events=[],
+ ),
+ max_tries=2,
+ ),
+ email_type="failure",
+ )
+
+ log = MagicMock(spec=FilteringBoundLogger)
+
+ # Execute email callbacks - should not raise exception
+ _execute_email_callbacks(dagbag, request, log)
+
+ # Verify warning was logged
+ log.warning.assert_called_once()
+ warning_call = log.warning.call_args[0][0]
+ assert "Email callback requested but no email configured" in
warning_call
+ mock_send_email.assert_not_called()
+
+ @patch("airflow.dag_processing.processor._send_task_error_email")
+ def test_execute_email_callbacks_email_disabled_for_type(self,
mock_send_email):
+ """Test email callback when email is disabled for the specific type."""
+ dagbag = MagicMock(spec=DagBag)
+ with DAG(dag_id="test_dag") as dag:
+ BaseOperator(task_id="test_task", email=["[email protected]"],
email_on_failure=False)
+ dagbag.dags = {"test_dag": dag}
+
+ ti_data = TIDataModel(
+ id=str(uuid.uuid4()),
+ task_id="test_task",
+ dag_id="test_dag",
+ run_id="test_run",
+ logical_date="2023-01-01T00:00:00Z",
+ try_number=1,
+ attempt_number=1,
+ state="failed",
+ dag_version_id=str(uuid.uuid4()),
+ )
+
+ current_time = timezone.utcnow()
+
+ # Create request for failure (but email_on_failure is False)
+ request = EmailNotificationRequest(
+ filepath="/path/to/dag.py",
+ bundle_name="test_bundle",
+ bundle_version="1.0.0",
+ ti=ti_data,
+ context_from_server=TIRunContext(
+ dag_run=DRDataModel(
+ dag_id="test_dag",
+ run_id="test_run",
+ logical_date="2023-01-01T00:00:00Z",
+ data_interval_start=current_time,
+ data_interval_end=current_time,
+ run_after=current_time,
+ start_date=current_time,
+ end_date=None,
+ run_type="manual",
+ state="running",
+ consumed_asset_events=[],
+ ),
+ max_tries=2,
+ ),
+ email_type="failure",
+ )
+
+ log = MagicMock(spec=FilteringBoundLogger)
+
+ # Execute email callbacks
+ _execute_email_callbacks(dagbag, request, log)
+
+ # Verify no email was sent
+ mock_send_email.assert_not_called()
+
+ # Verify info log about email being disabled
+ log.info.assert_called_once()
+ info_call = log.info.call_args[0][0]
+ assert "Email not sent - task configured with email_on_" in info_call
diff --git a/providers/openlineage/tests/unit/openlineage/utils/test_utils.py
b/providers/openlineage/tests/unit/openlineage/utils/test_utils.py
index d99889aedd5..d8ac2ade76a 100644
--- a/providers/openlineage/tests/unit/openlineage/utils/test_utils.py
+++ b/providers/openlineage/tests/unit/openlineage/utils/test_utils.py
@@ -1763,7 +1763,7 @@ def test_taskinstance_info_af3():
runtime_ti.bundle_instance = bundle_instance
assert dict(TaskInstanceInfo(runtime_ti)) == {
- "log_url": None,
+ "log_url": runtime_ti.log_url,
"map_index": 2,
"try_number": 1,
"dag_bundle_version": "bundle_version",
diff --git a/task-sdk/src/airflow/sdk/bases/operator.py
b/task-sdk/src/airflow/sdk/bases/operator.py
index ca4d5701071..7a8e80ec4d9 100644
--- a/task-sdk/src/airflow/sdk/bases/operator.py
+++ b/task-sdk/src/airflow/sdk/bases/operator.py
@@ -213,6 +213,8 @@ class _PartialDescriptor:
OPERATOR_DEFAULTS: dict[str, Any] = {
"allow_nested_operators": True,
"depends_on_past": False,
+ "email_on_failure": True,
+ "email_on_retry": True,
"execution_timeout": DEFAULT_TASK_EXECUTION_TIMEOUT,
# "executor": DEFAULT_EXECUTOR,
"executor_config": {},
diff --git a/task-sdk/src/airflow/sdk/definitions/mappedoperator.py
b/task-sdk/src/airflow/sdk/definitions/mappedoperator.py
index 09ae0420e57..0bffa59c0d3 100644
--- a/task-sdk/src/airflow/sdk/definitions/mappedoperator.py
+++ b/task-sdk/src/airflow/sdk/definitions/mappedoperator.py
@@ -402,6 +402,14 @@ class MappedOperator(AbstractOperator):
def email(self) -> None | str | Iterable[str]:
return self.partial_kwargs.get("email")
+ @property
+ def email_on_failure(self) -> bool:
+ return self.partial_kwargs.get("email_on_failure", True)
+
+ @property
+ def email_on_retry(self) -> bool:
+ return self.partial_kwargs.get("email_on_retry", True)
+
@property
def map_index_template(self) -> None | str:
return self.partial_kwargs.get("map_index_template")
diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
index d694d4f2b80..0ca66d05e93 100644
--- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
+++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
@@ -31,6 +31,7 @@ from datetime import datetime, timezone
from itertools import product
from pathlib import Path
from typing import TYPE_CHECKING, Annotated, Any, Literal
+from urllib.parse import quote
import attrs
import lazy_object_proxy
@@ -147,8 +148,6 @@ class RuntimeTaskInstance(TaskInstance):
rendered_map_index: str | None = None
- log_url: str | None = None
-
def __rich_repr__(self):
yield "id", self.id
yield "task_id", self.task_id
@@ -182,8 +181,6 @@ class RuntimeTaskInstance(TaskInstance):
"run_id": self.run_id,
"task": self.task,
"task_instance": self,
- # TODO: Ensure that ti.log_url and such are available to use in
context
- # especially after removal of `conf` from Context.
"ti": self,
"outlet_events": OutletEventAccessors(),
"inlet_events": InletEventsAccessors(self.task.inlets),
@@ -552,6 +549,26 @@ class RuntimeTaskInstance(TaskInstance):
return response.state
+ @property
+ def log_url(self) -> str:
+ run_id = quote(self.run_id)
+ base_url = conf.get("api", "base_url",
fallback="http://localhost:8080/")
+ map_index_value = self.map_index
+ map_index = (
+ f"/mapped/{map_index_value}" if map_index_value is not None and
map_index_value >= 0 else ""
+ )
+ try_number_value = self.try_number
+ try_number = (
+ f"?try_number={try_number_value}" if try_number_value is not None
and try_number_value > 0 else ""
+ )
+ _log_uri =
f"{base_url}dags/{self.dag_id}/runs/{run_id}/tasks/{self.task_id}{map_index}{try_number}"
+ return _log_uri
+
+ @property
+ def mark_success_url(self) -> str:
+ """URL to mark TI success."""
+ return self.log_url
+
def _xcom_push(ti: RuntimeTaskInstance, key: str, value: Any, mapped_length:
int | None = None) -> None:
"""Push a XCom through XCom.set, which pushes to XCom Backend if
configured."""
@@ -728,7 +745,6 @@ def startup() -> tuple[RuntimeTaskInstance, Context,
Logger]:
with _airflow_parsing_context_manager(dag_id=msg.ti.dag_id,
task_id=msg.ti.task_id):
ti = parse(msg, log)
- ti.log_url = get_log_url_from_ti(ti)
log.debug("Dag file parsed", file=msg.dag_rel_path)
run_as_user = getattr(ti.task, "run_as_user", None) or conf.get(
diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
index 841137c07b3..6de891ad837 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
@@ -117,7 +117,6 @@ from airflow.sdk.execution_time.task_runner import (
_push_xcom_if_needed,
_xcom_push,
finalize,
- get_log_url_from_ti,
parse,
run,
startup,
@@ -2608,7 +2607,6 @@ class TestTaskRunnerCallsListeners:
runtime_ti, context, log = startup()
assert runtime_ti is not None
- assert runtime_ti.log_url == get_log_url_from_ti(runtime_ti)
assert isinstance(listener.component, TaskRunnerMarker)
del listener.component