This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new a1473c92c1 Revert "Check task attribute before use in
sentry.add_tagging() (#37143)" (#38519)
a1473c92c1 is described below
commit a1473c92c11d844998c77ad6520b37bd6827a87f
Author: Andrey Anshin <[email protected]>
AuthorDate: Wed Mar 27 02:55:34 2024 +0400
Revert "Check task attribute before use in sentry.add_tagging() (#37143)"
(#38519)
This reverts commit 77d2fc7d7591679aa99c1924daba678463a7b7bb.
---
airflow/sentry.py | 196 +++++++++++++++++++++++++++++++++++++++++++
airflow/sentry/__init__.py | 29 -------
airflow/sentry/blank.py | 40 ---------
airflow/sentry/configured.py | 176 --------------------------------------
tests/test_sentry.py | 65 --------------
5 files changed, 196 insertions(+), 310 deletions(-)
diff --git a/airflow/sentry.py b/airflow/sentry.py
new file mode 100644
index 0000000000..d5fbf3c04d
--- /dev/null
+++ b/airflow/sentry.py
@@ -0,0 +1,196 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Sentry Integration."""
+
+from __future__ import annotations
+
+import logging
+from functools import wraps
+from typing import TYPE_CHECKING
+
+from airflow.configuration import conf
+from airflow.executors.executor_loader import ExecutorLoader
+from airflow.utils.session import find_session_idx, provide_session
+from airflow.utils.state import TaskInstanceState
+
+if TYPE_CHECKING:
+ from sqlalchemy.orm import Session
+
+ from airflow.models.taskinstance import TaskInstance
+
+log = logging.getLogger(__name__)
+
+
+class DummySentry:
+ """Blank class for Sentry."""
+
+ def add_tagging(self, task_instance):
+ """Blank function for tagging."""
+
+ def add_breadcrumbs(self, task_instance, session: Session | None = None):
+ """Blank function for breadcrumbs."""
+
+ def enrich_errors(self, run):
+ """Blank function for formatting a TaskInstance._run_raw_task."""
+ return run
+
+ def flush(self):
+ """Blank function for flushing errors."""
+
+
+Sentry: DummySentry = DummySentry()
+if conf.getboolean("sentry", "sentry_on", fallback=False):
+ import sentry_sdk
+ from sentry_sdk.integrations.flask import FlaskIntegration
+ from sentry_sdk.integrations.logging import ignore_logger
+
+ class ConfiguredSentry(DummySentry):
+ """Configure Sentry SDK."""
+
+ SCOPE_DAG_RUN_TAGS = frozenset(("data_interval_end",
"data_interval_start", "execution_date"))
+ SCOPE_TASK_INSTANCE_TAGS = frozenset(("task_id", "dag_id",
"try_number"))
+ SCOPE_CRUMBS = frozenset(("task_id", "state", "operator", "duration"))
+
+ UNSUPPORTED_SENTRY_OPTIONS = frozenset(
+ (
+ "integrations",
+ "in_app_include",
+ "in_app_exclude",
+ "ignore_errors",
+ "before_breadcrumb",
+ )
+ )
+
+ def __init__(self):
+ """Initialize the Sentry SDK."""
+ ignore_logger("airflow.task")
+
+ sentry_flask = FlaskIntegration()
+
+ # LoggingIntegration is set by default.
+ integrations = [sentry_flask]
+
+ executor_class, _ =
ExecutorLoader.import_default_executor_cls(validate=False)
+
+ if executor_class.supports_sentry:
+ from sentry_sdk.integrations.celery import CeleryIntegration
+
+ sentry_celery = CeleryIntegration()
+ integrations.append(sentry_celery)
+
+ dsn = None
+ sentry_config_opts = conf.getsection("sentry") or {}
+ if sentry_config_opts:
+ sentry_config_opts.pop("sentry_on")
+ old_way_dsn = sentry_config_opts.pop("sentry_dsn", None)
+ new_way_dsn = sentry_config_opts.pop("dsn", None)
+ # supported backward compatibility with old way dsn option
+ dsn = old_way_dsn or new_way_dsn
+
+ unsupported_options =
self.UNSUPPORTED_SENTRY_OPTIONS.intersection(sentry_config_opts.keys())
+ if unsupported_options:
+ log.warning(
+ "There are unsupported options in [sentry] section:
%s",
+ ", ".join(unsupported_options),
+ )
+
+ sentry_config_opts["before_send"] = conf.getimport("sentry",
"before_send", fallback=None)
+ sentry_config_opts["transport"] = conf.getimport("sentry",
"transport", fallback=None)
+
+ if dsn:
+ sentry_sdk.init(dsn=dsn, integrations=integrations,
**sentry_config_opts)
+ else:
+ # Setting up Sentry using environment variables.
+ log.debug("Defaulting to SENTRY_DSN in environment.")
+ sentry_sdk.init(integrations=integrations,
**sentry_config_opts)
+
+ def add_tagging(self, task_instance):
+ """Add tagging for a task_instance."""
+ dag_run = task_instance.dag_run
+ task = task_instance.task
+
+ with sentry_sdk.configure_scope() as scope:
+ for tag_name in self.SCOPE_TASK_INSTANCE_TAGS:
+ attribute = getattr(task_instance, tag_name)
+ scope.set_tag(tag_name, attribute)
+ for tag_name in self.SCOPE_DAG_RUN_TAGS:
+ attribute = getattr(dag_run, tag_name)
+ scope.set_tag(tag_name, attribute)
+ scope.set_tag("operator", task.__class__.__name__)
+
+ @provide_session
+ def add_breadcrumbs(
+ self,
+ task_instance: TaskInstance,
+ session: Session | None = None,
+ ) -> None:
+ """Add breadcrumbs inside of a task_instance."""
+ if session is None:
+ return
+ dr = task_instance.get_dagrun(session)
+ task_instances = dr.get_task_instances(
+ state={TaskInstanceState.SUCCESS, TaskInstanceState.FAILED},
+ session=session,
+ )
+
+ for ti in task_instances:
+ data = {}
+ for crumb_tag in self.SCOPE_CRUMBS:
+ data[crumb_tag] = getattr(ti, crumb_tag)
+
+ sentry_sdk.add_breadcrumb(category="completed_tasks",
data=data, level="info")
+
+ def enrich_errors(self, func):
+ """
+ Decorate errors.
+
+ Wrap TaskInstance._run_raw_task to support task specific tags and
breadcrumbs.
+ """
+ session_args_idx = find_session_idx(func)
+
+ @wraps(func)
+ def wrapper(_self, *args, **kwargs):
+ # Wrapping the _run_raw_task function with push_scope to
contain
+ # tags and breadcrumbs to a specific Task Instance
+
+ try:
+ session = kwargs.get("session", args[session_args_idx])
+ except IndexError:
+ session = None
+
+ with sentry_sdk.push_scope():
+ try:
+ # Is a LocalTaskJob get the task instance
+ if hasattr(_self, "task_instance"):
+ task_instance = _self.task_instance
+ else:
+ task_instance = _self
+
+ self.add_tagging(task_instance)
+ self.add_breadcrumbs(task_instance, session=session)
+ return func(_self, *args, **kwargs)
+ except Exception as e:
+ sentry_sdk.capture_exception(e)
+ raise
+
+ return wrapper
+
+ def flush(self):
+ sentry_sdk.flush()
+
+ Sentry = ConfiguredSentry()
diff --git a/airflow/sentry/__init__.py b/airflow/sentry/__init__.py
deleted file mode 100644
index 10178aabf0..0000000000
--- a/airflow/sentry/__init__.py
+++ /dev/null
@@ -1,29 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-"""Sentry Integration."""
-
-from __future__ import annotations
-
-from airflow.configuration import conf
-from airflow.sentry.blank import BlankSentry
-
-Sentry: BlankSentry = BlankSentry()
-if conf.getboolean("sentry", "sentry_on", fallback=False):
- from airflow.sentry.configured import ConfiguredSentry
-
- Sentry = ConfiguredSentry()
diff --git a/airflow/sentry/blank.py b/airflow/sentry/blank.py
deleted file mode 100644
index 8cdb40b5a9..0000000000
--- a/airflow/sentry/blank.py
+++ /dev/null
@@ -1,40 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-from typing import TYPE_CHECKING
-
-if TYPE_CHECKING:
- from sqlalchemy.orm import Session
-
-
-class BlankSentry:
- """Blank class for Sentry."""
-
- def add_tagging(self, task_instance):
- """Blank function for tagging."""
-
- def add_breadcrumbs(self, task_instance, session: Session | None = None):
- """Blank function for breadcrumbs."""
-
- def enrich_errors(self, run):
- """Blank function for formatting a TaskInstance._run_raw_task."""
- return run
-
- def flush(self):
- """Blank function for flushing errors."""
diff --git a/airflow/sentry/configured.py b/airflow/sentry/configured.py
deleted file mode 100644
index af96839138..0000000000
--- a/airflow/sentry/configured.py
+++ /dev/null
@@ -1,176 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-import logging
-from functools import wraps
-from typing import TYPE_CHECKING
-
-import sentry_sdk
-from sentry_sdk.integrations.flask import FlaskIntegration
-from sentry_sdk.integrations.logging import ignore_logger
-
-from airflow.configuration import conf
-from airflow.executors.executor_loader import ExecutorLoader
-from airflow.sentry.blank import BlankSentry
-from airflow.utils.session import find_session_idx, provide_session
-from airflow.utils.state import TaskInstanceState
-
-if TYPE_CHECKING:
- from sqlalchemy.orm import Session
-
- from airflow.models.taskinstance import TaskInstance
-
-log = logging.getLogger(__name__)
-
-
-class ConfiguredSentry(BlankSentry):
- """Configure Sentry SDK."""
-
- SCOPE_DAG_RUN_TAGS = frozenset(("data_interval_end",
"data_interval_start", "execution_date"))
- SCOPE_TASK_INSTANCE_TAGS = frozenset(("task_id", "dag_id", "try_number"))
- SCOPE_CRUMBS = frozenset(("task_id", "state", "operator", "duration"))
-
- UNSUPPORTED_SENTRY_OPTIONS = frozenset(
- (
- "integrations",
- "in_app_include",
- "in_app_exclude",
- "ignore_errors",
- "before_breadcrumb",
- )
- )
-
- def __init__(self):
- """Initialize the Sentry SDK."""
- ignore_logger("airflow.task")
-
- sentry_flask = FlaskIntegration()
-
- # LoggingIntegration is set by default.
- integrations = [sentry_flask]
-
- executor_class, _ =
ExecutorLoader.import_default_executor_cls(validate=False)
-
- if executor_class.supports_sentry:
- from sentry_sdk.integrations.celery import CeleryIntegration
-
- sentry_celery = CeleryIntegration()
- integrations.append(sentry_celery)
-
- dsn = None
- sentry_config_opts = conf.getsection("sentry") or {}
- if sentry_config_opts:
- sentry_config_opts.pop("sentry_on")
- old_way_dsn = sentry_config_opts.pop("sentry_dsn", None)
- new_way_dsn = sentry_config_opts.pop("dsn", None)
- # supported backward compatibility with old way dsn option
- dsn = old_way_dsn or new_way_dsn
-
- unsupported_options =
self.UNSUPPORTED_SENTRY_OPTIONS.intersection(sentry_config_opts.keys())
- if unsupported_options:
- log.warning(
- "There are unsupported options in [sentry] section: %s",
- ", ".join(unsupported_options),
- )
-
- sentry_config_opts["before_send"] = conf.getimport("sentry",
"before_send", fallback=None)
- sentry_config_opts["transport"] = conf.getimport("sentry",
"transport", fallback=None)
-
- if dsn:
- sentry_sdk.init(dsn=dsn, integrations=integrations,
**sentry_config_opts)
- else:
- # Setting up Sentry using environment variables.
- log.debug("Defaulting to SENTRY_DSN in environment.")
- sentry_sdk.init(integrations=integrations, **sentry_config_opts)
-
- def add_tagging(self, task_instance):
- """Add tagging for a task_instance."""
- dag_run = task_instance.dag_run
- # See TaskInstance definition, the "task" attribute may not be set
- task = getattr(task_instance, "task")
-
- with sentry_sdk.configure_scope() as scope:
- for tag_name in self.SCOPE_TASK_INSTANCE_TAGS:
- attribute = getattr(task_instance, tag_name)
- scope.set_tag(tag_name, attribute)
- for tag_name in self.SCOPE_DAG_RUN_TAGS:
- attribute = getattr(dag_run, tag_name)
- scope.set_tag(tag_name, attribute)
- if task is not None:
- scope.set_tag("operator", task.__class__.__name__)
-
- @provide_session
- def add_breadcrumbs(
- self,
- task_instance: TaskInstance,
- session: Session | None = None,
- ) -> None:
- """Add breadcrumbs inside of a task_instance."""
- if session is None:
- return
- dr = task_instance.get_dagrun(session)
- task_instances = dr.get_task_instances(
- state={TaskInstanceState.SUCCESS, TaskInstanceState.FAILED},
- session=session,
- )
-
- for ti in task_instances:
- data = {}
- for crumb_tag in self.SCOPE_CRUMBS:
- data[crumb_tag] = getattr(ti, crumb_tag)
-
- sentry_sdk.add_breadcrumb(category="completed_tasks", data=data,
level="info")
-
- def enrich_errors(self, func):
- """
- Decorate errors.
-
- Wrap TaskInstance._run_raw_task to support task specific tags and
breadcrumbs.
- """
- session_args_idx = find_session_idx(func)
-
- @wraps(func)
- def wrapper(_self, *args, **kwargs):
- # Wrapping the _run_raw_task function with push_scope to contain
- # tags and breadcrumbs to a specific Task Instance
-
- try:
- session = kwargs.get("session", args[session_args_idx])
- except IndexError:
- session = None
-
- with sentry_sdk.push_scope():
- try:
- # Is a LocalTaskJob get the task instance
- if hasattr(_self, "task_instance"):
- task_instance = _self.task_instance
- else:
- task_instance = _self
-
- self.add_tagging(task_instance)
- self.add_breadcrumbs(task_instance, session=session)
- return func(_self, *args, **kwargs)
- except Exception as e:
- sentry_sdk.capture_exception(e)
- raise
-
- return wrapper
-
- def flush(self):
- sentry_sdk.flush()
diff --git a/tests/test_sentry.py b/tests/test_sentry.py
deleted file mode 100644
index 10968b5274..0000000000
--- a/tests/test_sentry.py
+++ /dev/null
@@ -1,65 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-from unittest.mock import MagicMock, create_autospec, patch
-
-from sentry_sdk.scope import Scope
-
-from airflow.models.taskinstance import TaskInstance
-
-
-@patch("sentry_sdk.configure_scope")
-def test_configured_sentry_add_tagging(mock_configure_scope):
- mock_scope = create_autospec(Scope)
- mock_configure_scope.return_value = mock_scope
-
- from airflow.sentry.configured import ConfiguredSentry
-
- sentry = ConfiguredSentry()
-
- dummy_tags = ConfiguredSentry.SCOPE_DAG_RUN_TAGS |
ConfiguredSentry.SCOPE_TASK_INSTANCE_TAGS
-
- # It should not raise error with both "dag_run" and "task" attributes,
available.
- task_instance_1 = create_autospec(TaskInstance)
- task_instance_1.dag_run = MagicMock()
- task_instance_1.task = "task_1"
- for tag in dummy_tags:
- setattr(task_instance_1.dag_run, tag, "dummy")
- sentry.add_tagging(task_instance_1)
-
- # Verify tags
- for tag in dummy_tags:
- mock_scope.set_tag.assert_called_with(tag, "dummy")
- mock_scope.set_tag.assert_called_with("operator", "str")
-
- # Reset the mock
- mock_scope.reset_mock()
-
- # It should not raise error if "task" attribute is not set.
- task_instance_2 = create_autospec(TaskInstance)
- task_instance_2.dag_run = MagicMock()
- for tag in dummy_tags:
- setattr(task_instance_2.dag_run, tag, "dummy")
- sentry.add_tagging(task_instance_2)
-
- # Verify tags
- for tag in dummy_tags:
- mock_scope.set_tag.assert_called_with(tag, "dummy")
- # Also verify the "operator" tag, which is related to the "task attribute.
- mock_scope.set_tag.assert_called_with("operator", "str")