This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 6c0032fdb9f Send executor integration info in workload (#57800)
6c0032fdb9f is described below
commit 6c0032fdb9f8faf1eedfe2f2edcf7ff4253821c3
Author: Tzu-ping Chung <[email protected]>
AuthorDate: Mon Nov 24 16:38:05 2025 +0800
Send executor integration info in workload (#57800)
---
airflow-core/docs/core-concepts/executor/index.rst | 4 +-
.../src/airflow/executors/base_executor.py | 2 +-
airflow-core/src/airflow/executors/workloads.py | 3 ++
.../src/airflow/jobs/scheduler_job_runner.py | 24 ++++++++++-
.../tests/unit/executors/test_base_executor.py | 6 +--
.../tests/unit/executors/test_local_executor.py | 4 +-
airflow-core/tests/unit/jobs/test_scheduler_job.py | 2 +
devel-common/src/tests_common/pytest_plugin.py | 7 +++-
.../providers/celery/executors/celery_executor.py | 3 ++
.../unit/celery/executors/test_celery_executor.py | 7 +++-
.../executors/test_kubernetes_executor.py | 13 ++++--
.../providers/edge3/openapi/v2-edge-generated.yaml | 4 ++
.../www/openapi-gen/requests/schemas.gen.ts | 5 +++
.../plugins/www/openapi-gen/requests/types.gen.ts | 1 +
providers/edge3/www-hash.txt | 2 +-
task-sdk/src/airflow/sdk/definitions/dag.py | 1 +
task-sdk/src/airflow/sdk/execution_time/comms.py | 1 +
.../airflow/sdk/execution_time/execute_workload.py | 1 +
.../sdk/execution_time/sentry/configured.py | 31 ++++++++-------
.../src/airflow/sdk/execution_time/supervisor.py | 22 ++++++++++-
.../src/airflow/sdk/execution_time/task_runner.py | 3 ++
.../tests/task_sdk/execution_time/test_comms.py | 1 +
.../tests/task_sdk/execution_time/test_sentry.py | 46 ++++++++++++++++++----
.../task_sdk/execution_time/test_task_runner.py | 11 ++++++
24 files changed, 163 insertions(+), 41 deletions(-)
diff --git a/airflow-core/docs/core-concepts/executor/index.rst
b/airflow-core/docs/core-concepts/executor/index.rst
index 7a4b5e88fe3..d27b306c3dc 100644
--- a/airflow-core/docs/core-concepts/executor/index.rst
+++ b/airflow-core/docs/core-concepts/executor/index.rst
@@ -308,12 +308,10 @@ Compatibility Attributes
The ``BaseExecutor`` class interface contains a set of attributes that Airflow
core code uses to check the features that your executor is compatible with.
When writing your own Airflow executor be sure to set these correctly for your
use case. Each attribute is simply a boolean to enable/disable a feature or
indicate that a feature is supported/unsupported by the executor:
* ``supports_pickling``: Whether or not the executor supports reading pickled
Dags from the Database before execution (rather than reading the Dag definition
from the file system).
-* ``supports_sentry``: Whether or not the executor supports `Sentry
<https://sentry.io>`_.
-
+* ``sentry_integration``: If the executor supports `Sentry
<https://sentry.io>`_, this should be a import path to a callable that creates
the integration. For example, ``CeleryExecutor`` sets this to
``"sentry_sdk.integrations.celery.CeleryIntegration"``.
* ``is_local``: Whether or not the executor is remote or local. See the
`Executor Types`_ section above.
* ``is_single_threaded``: Whether or not the executor is single threaded. This
is particularly relevant to what database backends are supported. Single
threaded executors can run with any backend, including SQLite.
* ``is_production``: Whether or not the executor should be used for production
purposes. A UI message is displayed to users when they are using a
non-production ready executor.
-
* ``serve_logs``: Whether or not the executor supports serving logs, see
:doc:`/administration-and-deployment/logging-monitoring/logging-tasks`.
CLI
diff --git a/airflow-core/src/airflow/executors/base_executor.py
b/airflow-core/src/airflow/executors/base_executor.py
index 70b9ba9ef08..985a6bcab8b 100644
--- a/airflow-core/src/airflow/executors/base_executor.py
+++ b/airflow-core/src/airflow/executors/base_executor.py
@@ -129,7 +129,7 @@ class BaseExecutor(LoggingMixin):
active_spans = ThreadSafeDict()
supports_ad_hoc_ti_run: bool = False
- supports_sentry: bool = False
+ sentry_integration: str = ""
is_local: bool = False
is_production: bool = True
diff --git a/airflow-core/src/airflow/executors/workloads.py
b/airflow-core/src/airflow/executors/workloads.py
index c453f153d21..7cf1aae60ff 100644
--- a/airflow-core/src/airflow/executors/workloads.py
+++ b/airflow-core/src/airflow/executors/workloads.py
@@ -112,6 +112,7 @@ class ExecuteTask(BaseDagBundleWorkload):
"""Execute the given Task."""
ti: TaskInstance
+ sentry_integration: str = ""
type: Literal["ExecuteTask"] = Field(init=False, default="ExecuteTask")
@@ -122,6 +123,7 @@ class ExecuteTask(BaseDagBundleWorkload):
dag_rel_path: Path | None = None,
generator: JWTGenerator | None = None,
bundle_info: BundleInfo | None = None,
+ sentry_integration: str = "",
) -> ExecuteTask:
from airflow.utils.helpers import log_filename_template_renderer
@@ -140,6 +142,7 @@ class ExecuteTask(BaseDagBundleWorkload):
token=cls.generate_token(str(ti.id), generator),
log_path=fname,
bundle_info=bundle_info,
+ sentry_integration=sentry_integration,
)
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index b1d007c15e0..057b3b06e76 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -791,13 +791,35 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
:param executor: The executor to enqueue tasks for
:param session: The session object
"""
+
+ def _get_sentry_integration(executor: BaseExecutor) -> str:
+ try:
+ sentry_integration = executor.sentry_integration
+ except AttributeError:
+ # Old executor interface hard-codes the supports_sentry flag.
+ if getattr(executor, "supports_sentry", False):
+ return "sentry_sdk.integrations.celery.CeleryIntegration"
+ return ""
+ if not isinstance(sentry_integration, str):
+ self.log.warning(
+ "Ignoring invalid sentry_integration on executor",
+ executor=executor,
+ sentry_integration=sentry_integration,
+ )
+ return ""
+ return sentry_integration
+
# actually enqueue them
for ti in task_instances:
if ti.dag_run.state in State.finished_dr_states:
ti.set_state(None, session=session)
continue
- workload = workloads.ExecuteTask.make(ti,
generator=executor.jwt_generator)
+ workload = workloads.ExecuteTask.make(
+ ti,
+ generator=executor.jwt_generator,
+ sentry_integration=_get_sentry_integration(executor),
+ )
executor.queue_workload(workload, session=session)
def _critical_section_enqueue_task_instances(self, session: Session) ->
int:
diff --git a/airflow-core/tests/unit/executors/test_base_executor.py
b/airflow-core/tests/unit/executors/test_base_executor.py
index 13ab21d3a41..ba540bbbddb 100644
--- a/airflow-core/tests/unit/executors/test_base_executor.py
+++ b/airflow-core/tests/unit/executors/test_base_executor.py
@@ -33,15 +33,15 @@ from airflow.cli.cli_parser import AirflowHelpFormatter
from airflow.executors import workloads
from airflow.executors.base_executor import BaseExecutor,
RunningRetryAttemptType
from airflow.executors.local_executor import LocalExecutor
-from airflow.models.baseoperator import BaseOperator
from airflow.models.taskinstance import TaskInstance, TaskInstanceKey
+from airflow.sdk import BaseOperator
from airflow.utils.state import State, TaskInstanceState
from tests_common.test_utils.markers import
skip_if_force_lowest_dependencies_marker
-def test_supports_sentry():
- assert not BaseExecutor.supports_sentry
+def test_sentry_integration():
+ assert not BaseExecutor.sentry_integration
def test_is_local_default_value():
diff --git a/airflow-core/tests/unit/executors/test_local_executor.py
b/airflow-core/tests/unit/executors/test_local_executor.py
index 845f43a8634..b0261baf04e 100644
--- a/airflow-core/tests/unit/executors/test_local_executor.py
+++ b/airflow-core/tests/unit/executors/test_local_executor.py
@@ -46,8 +46,8 @@ skip_spawn_mp_start = pytest.mark.skipif(
class TestLocalExecutor:
TEST_SUCCESS_COMMANDS = 5
- def test_supports_sentry(self):
- assert not LocalExecutor.supports_sentry
+ def test_sentry_integration(self):
+ assert not LocalExecutor.sentry_integration
def test_is_local_default_value(self):
assert LocalExecutor.is_local
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index 2794b100519..3b7f15e2ee6 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -226,10 +226,12 @@ class TestSchedulerJob:
default_executor.name = ExecutorName(alias="default_exec",
module_path="default.exec.module.path")
default_executor.jwt_generator = mock_jwt_generator
default_executor.team_name = None # Global executor
+ default_executor.sentry_integration = ""
second_executor = mock.MagicMock(name="SeconadaryExecutor",
slots_available=8, slots_occupied=0)
second_executor.name = ExecutorName(alias="secondary_exec",
module_path="secondary.exec.module.path")
second_executor.jwt_generator = mock_jwt_generator
second_executor.team_name = None # Global executor
+ second_executor.sentry_integration = ""
# TODO: Task-SDK Make it look like a bound method. Needed until we
remove the old queue_workload
# interface from executors
diff --git a/devel-common/src/tests_common/pytest_plugin.py
b/devel-common/src/tests_common/pytest_plugin.py
index e70ebd0e693..6c32b001295 100644
--- a/devel-common/src/tests_common/pytest_plugin.py
+++ b/devel-common/src/tests_common/pytest_plugin.py
@@ -2490,6 +2490,11 @@ def create_runtime_ti(mocked_parse):
if upstream_map_indexes is not None:
ti_context.upstream_map_indexes = upstream_map_indexes
+ compat_fields = {
+ "requests_fd": 0,
+ "sentry_integration": "",
+ }
+
startup_details = StartupDetails(
ti=TaskInstance(
id=ti_id,
@@ -2505,7 +2510,7 @@ def create_runtime_ti(mocked_parse):
ti_context=ti_context,
start_date=start_date, # type: ignore
# Back-compat of task-sdk. Only affects us when we manually create
these objects in tests.
- **({"requests_fd": 0} if "requests_fd" in
StartupDetails.model_fields else {}), # type: ignore
+ **{k: v for k, v in compat_fields.items() if k in
StartupDetails.model_fields}, # type: ignore
)
ti = mocked_parse(startup_details, dag_id, task)
diff --git
a/providers/celery/src/airflow/providers/celery/executors/celery_executor.py
b/providers/celery/src/airflow/providers/celery/executors/celery_executor.py
index c8107b9160e..58e83ff66d2 100644
--- a/providers/celery/src/airflow/providers/celery/executors/celery_executor.py
+++ b/providers/celery/src/airflow/providers/celery/executors/celery_executor.py
@@ -289,6 +289,9 @@ class CeleryExecutor(BaseExecutor):
"""
supports_ad_hoc_ti_run: bool = True
+ sentry_integration: str =
"sentry_sdk.integrations.celery.CeleryIntegration"
+
+ # TODO: Remove this flag once providers depend on Airflow 3.2.
supports_sentry: bool = True
if TYPE_CHECKING and AIRFLOW_V_3_0_PLUS:
diff --git
a/providers/celery/tests/unit/celery/executors/test_celery_executor.py
b/providers/celery/tests/unit/celery/executors/test_celery_executor.py
index c45090a0a44..0a1a49345ed 100644
--- a/providers/celery/tests/unit/celery/executors/test_celery_executor.py
+++ b/providers/celery/tests/unit/celery/executors/test_celery_executor.py
@@ -44,7 +44,7 @@ from airflow.utils.state import State
from tests_common.test_utils import db
from tests_common.test_utils.config import conf_vars
from tests_common.test_utils.dag import sync_dag_to_db
-from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS,
AIRFLOW_V_3_1_PLUS
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS,
AIRFLOW_V_3_1_PLUS, AIRFLOW_V_3_2_PLUS
if AIRFLOW_V_3_0_PLUS:
from airflow.models.dag_version import DagVersion
@@ -121,6 +121,11 @@ class TestCeleryExecutor:
db.clear_db_runs()
db.clear_db_jobs()
+ @pytest.mark.skipif(not AIRFLOW_V_3_2_PLUS, reason="Airflow 3.2+ prefers
new configuration")
+ def test_sentry_integration(self):
+ assert CeleryExecutor.sentry_integration ==
"sentry_sdk.integrations.celery.CeleryIntegration"
+
+ @pytest.mark.skipif(AIRFLOW_V_3_2_PLUS, reason="Test only for Airflow <
3.2")
def test_supports_sentry(self):
assert CeleryExecutor.supports_sentry
diff --git
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py
index 24b8f4aff1b..a80a031f66d 100644
---
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py
+++
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py
@@ -28,7 +28,6 @@ from kubernetes.client import models as k8s
from kubernetes.client.rest import ApiException
from urllib3 import HTTPResponse
-from airflow import __version__
from airflow.exceptions import AirflowException
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.providers.cncf.kubernetes import pod_generator
@@ -64,11 +63,12 @@ except ImportError:
from airflow.utils.state import State, TaskInstanceState
from tests_common.test_utils.config import conf_vars
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS,
AIRFLOW_V_3_2_PLUS
-if __version__.startswith("2."):
- LOGICAL_DATE_KEY = "execution_date"
-else:
+if AIRFLOW_V_3_0_PLUS:
LOGICAL_DATE_KEY = "logical_date"
+else:
+ LOGICAL_DATE_KEY = "execution_date"
class TestAirflowKubernetesScheduler:
@@ -1438,6 +1438,11 @@ class TestKubernetesExecutor:
"Reading from k8s pod logs failed: error_fetching_pod_log",
]
+ @pytest.mark.skipif(not AIRFLOW_V_3_2_PLUS, reason="Airflow 3.2+ prefers
new configuration")
+ def test_sentry_integration(self):
+ assert not KubernetesExecutor.sentry_integration
+
+ @pytest.mark.skipif(AIRFLOW_V_3_2_PLUS, reason="Test only for Airflow <
3.2")
def test_supports_sentry(self):
assert not KubernetesExecutor.supports_sentry
diff --git
a/providers/edge3/src/airflow/providers/edge3/openapi/v2-edge-generated.yaml
b/providers/edge3/src/airflow/providers/edge3/openapi/v2-edge-generated.yaml
index 015902e16ed..36e8a526411 100644
--- a/providers/edge3/src/airflow/providers/edge3/openapi/v2-edge-generated.yaml
+++ b/providers/edge3/src/airflow/providers/edge3/openapi/v2-edge-generated.yaml
@@ -950,6 +950,10 @@ components:
title: Log Path
ti:
$ref: '#/components/schemas/TaskInstance'
+ sentry_integration:
+ type: string
+ title: Sentry Integration
+ default: ''
type:
type: string
const: ExecuteTask
diff --git
a/providers/edge3/src/airflow/providers/edge3/plugins/www/openapi-gen/requests/schemas.gen.ts
b/providers/edge3/src/airflow/providers/edge3/plugins/www/openapi-gen/requests/schemas.gen.ts
index fe7c26596d2..11e48528f69 100644
---
a/providers/edge3/src/airflow/providers/edge3/plugins/www/openapi-gen/requests/schemas.gen.ts
+++
b/providers/edge3/src/airflow/providers/edge3/plugins/www/openapi-gen/requests/schemas.gen.ts
@@ -103,6 +103,11 @@ export const $ExecuteTask = {
ti: {
'$ref': '#/components/schemas/TaskInstance'
},
+ sentry_integration: {
+ type: 'string',
+ title: 'Sentry Integration',
+ default: ''
+ },
type: {
type: 'string',
const: 'ExecuteTask',
diff --git
a/providers/edge3/src/airflow/providers/edge3/plugins/www/openapi-gen/requests/types.gen.ts
b/providers/edge3/src/airflow/providers/edge3/plugins/www/openapi-gen/requests/types.gen.ts
index 6768f986c20..12e2a71fd64 100644
---
a/providers/edge3/src/airflow/providers/edge3/plugins/www/openapi-gen/requests/types.gen.ts
+++
b/providers/edge3/src/airflow/providers/edge3/plugins/www/openapi-gen/requests/types.gen.ts
@@ -56,6 +56,7 @@ export type ExecuteTask = {
bundle_info: BundleInfo;
log_path: string | null;
ti: TaskInstance;
+ sentry_integration?: string;
type?: "ExecuteTask";
};
diff --git a/providers/edge3/www-hash.txt b/providers/edge3/www-hash.txt
index 399af71085a..f328d185da2 100644
--- a/providers/edge3/www-hash.txt
+++ b/providers/edge3/www-hash.txt
@@ -1 +1 @@
-f1bc9efa109cfcd7587f48af7dd45c4a1f50c15cef2f00fd54bc7bdd61857177
+cb1cf4eb7022cf4ec04aa222cb39090796117014cb2dd8c9353955d2148e28a3
diff --git a/task-sdk/src/airflow/sdk/definitions/dag.py
b/task-sdk/src/airflow/sdk/definitions/dag.py
index 862fedfa5e0..e0ce764d016 100644
--- a/task-sdk/src/airflow/sdk/definitions/dag.py
+++ b/task-sdk/src/airflow/sdk/definitions/dag.py
@@ -1339,6 +1339,7 @@ class DAG:
ti,
dag_rel_path=Path(self.fileloc),
generator=executor.jwt_generator,
+ sentry_integration=executor.sentry_integration,
# For the system test/debug purpose, we use the
default bundle which uses
# local file system. If it turns out to be a
feature people want, we could
# plumb the Bundle to use as a parameter to
dag.test
diff --git a/task-sdk/src/airflow/sdk/execution_time/comms.py
b/task-sdk/src/airflow/sdk/execution_time/comms.py
index df24085ea66..934667c5e66 100644
--- a/task-sdk/src/airflow/sdk/execution_time/comms.py
+++ b/task-sdk/src/airflow/sdk/execution_time/comms.py
@@ -280,6 +280,7 @@ class StartupDetails(BaseModel):
bundle_info: BundleInfo
start_date: datetime
ti_context: TIRunContext
+ sentry_integration: str
type: Literal["StartupDetails"] = "StartupDetails"
diff --git a/task-sdk/src/airflow/sdk/execution_time/execute_workload.py
b/task-sdk/src/airflow/sdk/execution_time/execute_workload.py
index 477e66e88c6..410c676eeb9 100644
--- a/task-sdk/src/airflow/sdk/execution_time/execute_workload.py
+++ b/task-sdk/src/airflow/sdk/execution_time/execute_workload.py
@@ -71,6 +71,7 @@ def execute_workload(workload: ExecuteTask) -> None:
token=workload.token,
server=server,
log_path=workload.log_path,
+ sentry_integration=workload.sentry_integration,
# Include the output of the task to stdout too, so that in process
logs can be read from via the
# kubeapi as pod logs.
subprocess_logs_to_stdout=True,
diff --git a/task-sdk/src/airflow/sdk/execution_time/sentry/configured.py
b/task-sdk/src/airflow/sdk/execution_time/sentry/configured.py
index fef3662b682..d8339646216 100644
--- a/task-sdk/src/airflow/sdk/execution_time/sentry/configured.py
+++ b/task-sdk/src/airflow/sdk/execution_time/sentry/configured.py
@@ -27,7 +27,8 @@ where things in this module are re-exported.
from __future__ import annotations
import functools
-from typing import TYPE_CHECKING
+import importlib
+from typing import TYPE_CHECKING, Any
import sentry_sdk
import sentry_sdk.integrations.logging
@@ -62,7 +63,7 @@ class ConfiguredSentry(NoopSentry):
)
)
- def __init__(self):
+ def prepare_to_enrich_errors(self, executor_integration: str) -> None:
"""Initialize the Sentry SDK."""
from airflow.sdk.configuration import conf
@@ -71,16 +72,14 @@ class ConfiguredSentry(NoopSentry):
# LoggingIntegration is set by default.
integrations = []
- # TODO: How can we get executor info in the runner to support this?
- # executor_class, _ = ExecutorLoader.import_default_executor_cls()
- # if executor_class.supports_sentry:
- # from sentry_sdk.integrations.celery import CeleryIntegration
+ if executor_integration:
+ try:
+ mod_p, cls_n = executor_integration.rsplit(".", 1)
+ integrations.append(getattr(importlib.import_module(mod_p),
cls_n)())
+ except Exception:
+ log.exception("Invalid executor Sentry integration",
import_path=executor_integration)
- # sentry_celery = CeleryIntegration()
- # integrations.append(sentry_celery)
-
- dsn = None
- sentry_config_opts = conf.getsection("sentry") or {}
+ sentry_config_opts: dict[str, Any] = conf.getsection("sentry") or {}
if sentry_config_opts:
sentry_config_opts.pop("sentry_on")
old_way_dsn = sentry_config_opts.pop("sentry_dsn", None)
@@ -93,9 +92,12 @@ class ConfiguredSentry(NoopSentry):
"There are unsupported options in [sentry] section",
options=unsupported_options,
)
-
- sentry_config_opts["before_send"] = conf.getimport("sentry",
"before_send", fallback=None)
- sentry_config_opts["transport"] = conf.getimport("sentry",
"transport", fallback=None)
+ else:
+ dsn = None
+ if before_send := conf.getimport("sentry", "before_send",
fallback=None):
+ sentry_config_opts["before_send"] = before_send
+ if transport := conf.getimport("sentry", "transport",
fallback=None):
+ sentry_config_opts["transport"] = transport
if dsn:
sentry_sdk.init(dsn=dsn, integrations=integrations,
**sentry_config_opts)
@@ -137,6 +139,7 @@ class ConfiguredSentry(NoopSentry):
@functools.wraps(run)
def wrapped_run(ti: RuntimeTaskInstance, context: Context, log:
Logger) -> RunReturn:
+ self.prepare_to_enrich_errors(ti.sentry_integration)
with sentry_sdk.push_scope():
try:
self.add_tagging(context["dag_run"], ti)
diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
index 51466535318..3fd3e23b154 100644
--- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -957,15 +957,28 @@ class ActivitySubprocess(WatchedSubprocess):
client: Client,
target: Callable[[], None] = _subprocess_main,
logger: FilteringBoundLogger | None = None,
+ sentry_integration: str = "",
**kwargs,
) -> Self:
"""Fork and start a new subprocess to execute the given task."""
proc: Self = super().start(id=what.id, client=client, target=target,
logger=logger, **kwargs)
# Tell the task process what it needs to do!
- proc._on_child_started(ti=what, dag_rel_path=dag_rel_path,
bundle_info=bundle_info)
+ proc._on_child_started(
+ ti=what,
+ dag_rel_path=dag_rel_path,
+ bundle_info=bundle_info,
+ sentry_integration=sentry_integration,
+ )
return proc
- def _on_child_started(self, ti: TaskInstance, dag_rel_path: str |
os.PathLike[str], bundle_info):
+ def _on_child_started(
+ self,
+ *,
+ ti: TaskInstance,
+ dag_rel_path: str | os.PathLike[str],
+ bundle_info,
+ sentry_integration: str,
+ ) -> None:
"""Send startup message to the subprocess."""
self.ti = ti # type: ignore[assignment]
start_date = datetime.now(tz=timezone.utc)
@@ -987,6 +1000,7 @@ class ActivitySubprocess(WatchedSubprocess):
bundle_info=bundle_info,
ti_context=ti_context,
start_date=start_date,
+ sentry_integration=sentry_integration,
)
# Send the message to tell the process what it needs to execute
@@ -1943,6 +1957,7 @@ def supervise(
log_path: str | None = None,
subprocess_logs_to_stdout: bool = False,
client: Client | None = None,
+ sentry_integration: str = "",
) -> int:
"""
Run a single task execution to completion.
@@ -1956,6 +1971,8 @@ def supervise(
:param log_path: Path to write logs, if required.
:param subprocess_logs_to_stdout: Should task logs also be sent to stdout
via the main logger.
:param client: Optional preconfigured client for communication with the
server (Mostly for tests).
+ :param sentry_integration: If the executor has a Sentry integration, import
+ path to a callable to initialize it (empty means no integration).
:return: Exit code of the process.
:raises ValueError: If server URL is empty or invalid.
"""
@@ -2029,6 +2046,7 @@ def supervise(
logger=logger,
bundle_info=bundle_info,
subprocess_logs_to_stdout=subprocess_logs_to_stdout,
+ sentry_integration=sentry_integration,
)
exit_code = process.wait()
diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
index dc3c9dba294..82807bba902 100644
--- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
+++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
@@ -156,6 +156,8 @@ class RuntimeTaskInstance(TaskInstance):
rendered_map_index: str | None = None
+ sentry_integration: str = ""
+
def __rich_repr__(self):
yield "id", self.id
yield "task_id", self.task_id
@@ -679,6 +681,7 @@ def parse(what: StartupDetails, log: Logger) ->
RuntimeTaskInstance:
max_tries=what.ti_context.max_tries,
start_date=what.start_date,
state=TaskInstanceState.RUNNING,
+ sentry_integration=what.sentry_integration,
)
diff --git a/task-sdk/tests/task_sdk/execution_time/test_comms.py
b/task-sdk/tests/task_sdk/execution_time/test_comms.py
index fd5d352e14d..861b1d51c8a 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_comms.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_comms.py
@@ -104,6 +104,7 @@ class TestCommsDecoder:
"start_date": "2024-12-01T01:00:00Z",
"dag_rel_path": "/dev/null",
"bundle_info": {"name": "any-name", "version": "any-version"},
+ "sentry_integration": "",
}
bytes = msgspec.msgpack.encode(_ResponseFrame(0, msg, None))
w.sendall(len(bytes).to_bytes(4, byteorder="big") + bytes)
diff --git a/task-sdk/tests/task_sdk/execution_time/test_sentry.py
b/task-sdk/tests/task_sdk/execution_time/test_sentry.py
index 5aaaada2c5f..c4efb929052 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_sentry.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_sentry.py
@@ -57,6 +57,20 @@ def before_send(_):
pass
+class CustomIntegration:
+ """
+ Integration object to use in tests.
+
+ All instances of this class are equal to each other.
+ """
+
+ def __hash__(self): # Implemented to satisfy Ruff.
+ return 0
+
+ def __eq__(self, other):
+ return type(self) is type(other)
+
+
class CustomTransport:
pass
@@ -170,15 +184,30 @@ class TestSentryHook:
importlib.reload(sentry)
- def test_init(self, mock_sentry_sdk, sentry):
+ def test_prepare_to_enrich_errors(self, mock_sentry_sdk, sentry):
assert is_configured(sentry)
+
+ sentry.prepare_to_enrich_errors(executor_integration="")
assert mock_sentry_sdk.integrations.logging.ignore_logger.mock_calls
== [mock.call("airflow.task")]
assert mock_sentry_sdk.init.mock_calls == [
mock.call(
integrations=[],
default_integrations=False,
-
before_send=import_string("task_sdk.execution_time.test_sentry.before_send"),
- transport=None,
+ before_send="task_sdk.execution_time.test_sentry.before_send",
+ ),
+ ]
+
+ def test_prepare_to_enrich_errors_with_executor_integration(self,
mock_sentry_sdk, sentry):
+ assert is_configured(sentry)
+
+ executor_integration =
"task_sdk.execution_time.test_sentry.CustomIntegration"
+ sentry.prepare_to_enrich_errors(executor_integration)
+ assert mock_sentry_sdk.integrations.logging.ignore_logger.mock_calls
== [mock.call("airflow.task")]
+ assert mock_sentry_sdk.init.mock_calls == [
+ mock.call(
+
integrations=[import_string("task_sdk.execution_time.test_sentry.CustomIntegration")()],
+ default_integrations=False,
+ before_send="task_sdk.execution_time.test_sentry.before_send",
),
]
@@ -222,13 +251,14 @@ class TestSentryHook:
Test transport gets passed to the sentry SDK
"""
assert is_configured(sentry_custom_transport)
+
+
sentry_custom_transport.prepare_to_enrich_errors(executor_integration="")
assert mock_sentry_sdk.integrations.logging.ignore_logger.mock_calls
== [mock.call("airflow.task")]
assert mock_sentry_sdk.init.mock_calls == [
mock.call(
integrations=[],
default_integrations=False,
- before_send=None,
-
transport=import_string("task_sdk.execution_time.test_sentry.CustomTransport"),
+
transport="task_sdk.execution_time.test_sentry.CustomTransport",
),
]
@@ -237,7 +267,7 @@ class TestSentryHook:
Test before_send doesn't raise an exception when not set
"""
assert is_configured(sentry_minimum)
+
+ sentry_minimum.prepare_to_enrich_errors(executor_integration="")
assert mock_sentry_sdk.integrations.logging.ignore_logger.mock_calls
== [mock.call("airflow.task")]
- assert mock_sentry_sdk.init.mock_calls == [
- mock.call(integrations=[], before_send=None, transport=None),
- ]
+ assert mock_sentry_sdk.init.mock_calls == [mock.call(integrations=[])]
diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
index 1f01de15b47..5d6a2dfbb42 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
@@ -164,6 +164,7 @@ def test_parse(test_dags_dir: Path, make_ti_context):
bundle_info=BundleInfo(name="my-bundle", version=None),
ti_context=make_ti_context(),
start_date=timezone.utcnow(),
+ sentry_integration="",
)
with patch.dict(
@@ -210,6 +211,7 @@ def test_parse_dag_bag(mock_dagbag, test_dags_dir: Path,
make_ti_context):
bundle_info=BundleInfo(name="my-bundle", version=None),
ti_context=make_ti_context(),
start_date=timezone.utcnow(),
+ sentry_integration="",
)
with patch.dict(
@@ -269,6 +271,7 @@ def test_parse_not_found(test_dags_dir: Path,
make_ti_context, dag_id, task_id,
bundle_info=BundleInfo(name="my-bundle", version=None),
ti_context=make_ti_context(),
start_date=timezone.utcnow(),
+ sentry_integration="",
)
log = mock.Mock()
@@ -323,6 +326,7 @@ def test_parse_module_in_bundle_root(tmp_path: Path,
make_ti_context):
bundle_info=BundleInfo(name="my-bundle", version=None),
ti_context=make_ti_context(),
start_date=timezone.utcnow(),
+ sentry_integration="",
)
with patch.dict(
@@ -593,6 +597,7 @@ def test_basic_templated_dag(mocked_parse, make_ti_context,
mock_supervisor_comm
dag_rel_path="",
ti_context=make_ti_context(),
start_date=timezone.utcnow(),
+ sentry_integration="",
)
ti = mocked_parse(what, "basic_templated_dag", task)
@@ -708,6 +713,7 @@ def test_startup_and_run_dag_with_rtif(
bundle_info=FAKE_BUNDLE,
ti_context=make_ti_context(),
start_date=timezone.utcnow(),
+ sentry_integration="",
)
mocked_parse(what, "basic_dag", task)
@@ -755,6 +761,7 @@ def test_task_run_with_user_impersonation(
bundle_info=FAKE_BUNDLE,
ti_context=make_ti_context(),
start_date=timezone.utcnow(),
+ sentry_integration="",
)
mocked_parse(what, "basic_dag", task)
@@ -802,6 +809,7 @@ def test_task_run_with_user_impersonation_default_user(
bundle_info=FAKE_BUNDLE,
ti_context=make_ti_context(),
start_date=timezone.utcnow(),
+ sentry_integration="",
)
mocked_parse(what, "basic_dag", task)
@@ -841,6 +849,7 @@ def
test_task_run_with_user_impersonation_remove_krb5ccname_on_reexecuted_proces
bundle_info=FAKE_BUNDLE,
ti_context=make_ti_context(),
start_date=timezone.utcnow(),
+ sentry_integration="",
)
mocked_parse(what, "basic_dag", task)
@@ -980,6 +989,7 @@ def test_dag_parsing_context(make_ti_context,
mock_supervisor_comms, monkeypatch
bundle_info=BundleInfo(name="my-bundle", version=None),
ti_context=make_ti_context(dag_id=dag_id, run_id="c"),
start_date=timezone.utcnow(),
+ sentry_integration="",
)
mock_supervisor_comms._get_response.return_value = what
@@ -2891,6 +2901,7 @@ class TestTaskRunnerCallsListeners:
bundle_info=FAKE_BUNDLE,
ti_context=make_ti_context(),
start_date=timezone.utcnow(),
+ sentry_integration="",
)
mock_supervisor_comms._get_response.return_value = what