This is an automated email from the ASF dual-hosted git repository.
weilee pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-1-test by this push:
new 67f20ed267c [v3-1-test] Fix operator extra links not appearing on
failed tasks (#58227) (#58508)
67f20ed267c is described below
commit 67f20ed267c4459c1f35dd2004c26d4363ff408d
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Nov 20 10:43:10 2025 +0800
[v3-1-test] Fix operator extra links not appearing on failed tasks (#58227)
(#58508)
Co-authored-by: Ankit Chaurasia <[email protected]>
---
.../src/airflow/sdk/execution_time/task_runner.py | 14 +++-
.../task_sdk/execution_time/test_task_runner.py | 90 +++++++++++++++++++++-
2 files changed, 100 insertions(+), 4 deletions(-)
diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
index 822ed9bb50a..461e770c425 100644
--- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
+++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
@@ -1395,9 +1395,17 @@ def finalize(
task = ti.task
# Pushing xcom for each operator extra links defined on the operator only.
for oe in task.operator_extra_links:
- link, xcom_key = oe.get_link(operator=task, ti_key=ti), oe.xcom_key #
type: ignore[arg-type]
- log.debug("Setting xcom for operator extra link", link=link,
xcom_key=xcom_key)
- _xcom_push_to_db(ti, key=xcom_key, value=link)
+ try:
+ link, xcom_key = oe.get_link(operator=task, ti_key=ti),
oe.xcom_key # type: ignore[arg-type]
+ log.debug("Setting xcom for operator extra link", link=link,
xcom_key=xcom_key)
+ _xcom_push_to_db(ti, key=xcom_key, value=link)
+ except Exception:
+ log.exception(
+ "Failed to push an xcom for task operator extra link",
+ link_name=oe.name,
+ xcom_key=oe.xcom_key,
+ ti=ti,
+ )
if getattr(ti.task, "overwrite_rtif_after_execution", False):
log.debug("Overwriting Rendered template fields.")
diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
index 58dda3371ef..4adf3c23723 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
@@ -27,7 +27,7 @@ from datetime import datetime, timedelta
from pathlib import Path
from typing import TYPE_CHECKING
from unittest import mock
-from unittest.mock import patch
+from unittest.mock import call, patch
import pandas as pd
import pytest
@@ -48,6 +48,7 @@ from airflow.providers.standard.operators.python import
PythonOperator
from airflow.sdk import (
DAG,
BaseOperator,
+ BaseOperatorLink,
Connection,
dag as dag_decorator,
get_current_context,
@@ -1723,6 +1724,93 @@ class TestRuntimeTaskInstance:
map_index=runtime_ti.map_index,
)
+ def test_task_failed_with_operator_extra_links(
+ self, create_runtime_ti, mock_supervisor_comms, time_machine
+ ):
+ """Test that operator extra links are pushed to xcoms even when task
fails."""
+ instant = timezone.datetime(2024, 12, 3, 10, 0)
+ time_machine.move_to(instant, tick=False)
+
+ class DummyTestOperator(BaseOperator):
+ operator_extra_links = (AirflowLink(),)
+
+ def execute(self, context):
+ raise ValueError("Task failed intentionally")
+
+ task = DummyTestOperator(task_id="task_with_operator_extra_links")
+ runtime_ti = create_runtime_ti(task=task)
+ context = runtime_ti.get_template_context()
+ runtime_ti.start_date = instant
+ runtime_ti.end_date = instant
+
+ state, _, error = run(runtime_ti, context=context,
log=mock.MagicMock())
+ assert state == TaskInstanceState.FAILED
+ assert error is not None
+
+ with mock.patch.object(XCom, "_set_xcom_in_db") as mock_xcom_set:
+ finalize(
+ runtime_ti,
+ log=mock.MagicMock(),
+ state=TaskInstanceState.FAILED,
+ context=context,
+ error=error,
+ )
+ assert mock_xcom_set.mock_calls == [
+ call(
+ key="_link_AirflowLink",
+ value="https://airflow.apache.org",
+ dag_id=runtime_ti.dag_id,
+ task_id=runtime_ti.task_id,
+ run_id=runtime_ti.run_id,
+ map_index=runtime_ti.map_index,
+ )
+ ]
+
+ def test_operator_extra_links_exception_handling(
+ self, create_runtime_ti, mock_supervisor_comms, time_machine
+ ):
+ """Test that exceptions in get_link() don't prevent other links from
being pushed."""
+ instant = timezone.datetime(2024, 12, 3, 10, 0)
+ time_machine.move_to(instant, tick=False)
+
+ class FailingLink(BaseOperatorLink):
+ """A link that raises an exception when get_link is called."""
+
+ name = "failing_link"
+
+ def get_link(self, operator, *, ti_key):
+ raise ValueError("Link generation failed")
+
+ class DummyTestOperator(BaseOperator):
+ operator_extra_links = (FailingLink(), AirflowLink())
+
+ def execute(self, context):
+ pass
+
+ task = DummyTestOperator(task_id="task_with_multiple_links")
+ runtime_ti = create_runtime_ti(task=task)
+ context = runtime_ti.get_template_context()
+ runtime_ti.start_date = instant
+ runtime_ti.end_date = instant
+
+ with mock.patch.object(XCom, "_set_xcom_in_db") as mock_xcom_set:
+ finalize(
+ runtime_ti,
+ log=mock.MagicMock(),
+ state=TaskInstanceState.SUCCESS,
+ context=context,
+ )
+ assert mock_xcom_set.mock_calls == [
+ call(
+ key="_link_AirflowLink",
+ value="https://airflow.apache.org",
+ dag_id=runtime_ti.dag_id,
+ task_id=runtime_ti.task_id,
+ run_id=runtime_ti.run_id,
+ map_index=runtime_ti.map_index,
+ )
+ ]
+
@pytest.mark.parametrize(
["cmd", "rendered_cmd"],
[