This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new a1473c92c1 Revert "Check task attribute before use in 
sentry.add_tagging() (#37143)" (#38519)
a1473c92c1 is described below

commit a1473c92c11d844998c77ad6520b37bd6827a87f
Author: Andrey Anshin <[email protected]>
AuthorDate: Wed Mar 27 02:55:34 2024 +0400

    Revert "Check task attribute before use in sentry.add_tagging() (#37143)" 
(#38519)
    
    This reverts commit 77d2fc7d7591679aa99c1924daba678463a7b7bb.
---
 airflow/sentry.py            | 196 +++++++++++++++++++++++++++++++++++++++++++
 airflow/sentry/__init__.py   |  29 -------
 airflow/sentry/blank.py      |  40 ---------
 airflow/sentry/configured.py | 176 --------------------------------------
 tests/test_sentry.py         |  65 --------------
 5 files changed, 196 insertions(+), 310 deletions(-)

diff --git a/airflow/sentry.py b/airflow/sentry.py
new file mode 100644
index 0000000000..d5fbf3c04d
--- /dev/null
+++ b/airflow/sentry.py
@@ -0,0 +1,196 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Sentry Integration."""
+
+from __future__ import annotations
+
+import logging
+from functools import wraps
+from typing import TYPE_CHECKING
+
+from airflow.configuration import conf
+from airflow.executors.executor_loader import ExecutorLoader
+from airflow.utils.session import find_session_idx, provide_session
+from airflow.utils.state import TaskInstanceState
+
+if TYPE_CHECKING:
+    from sqlalchemy.orm import Session
+
+    from airflow.models.taskinstance import TaskInstance
+
+log = logging.getLogger(__name__)
+
+
+class DummySentry:
+    """Blank class for Sentry."""
+
+    def add_tagging(self, task_instance):
+        """Blank function for tagging."""
+
+    def add_breadcrumbs(self, task_instance, session: Session | None = None):
+        """Blank function for breadcrumbs."""
+
+    def enrich_errors(self, run):
+        """Blank function for formatting a TaskInstance._run_raw_task."""
+        return run
+
+    def flush(self):
+        """Blank function for flushing errors."""
+
+
+Sentry: DummySentry = DummySentry()
+if conf.getboolean("sentry", "sentry_on", fallback=False):
+    import sentry_sdk
+    from sentry_sdk.integrations.flask import FlaskIntegration
+    from sentry_sdk.integrations.logging import ignore_logger
+
+    class ConfiguredSentry(DummySentry):
+        """Configure Sentry SDK."""
+
+        SCOPE_DAG_RUN_TAGS = frozenset(("data_interval_end", 
"data_interval_start", "execution_date"))
+        SCOPE_TASK_INSTANCE_TAGS = frozenset(("task_id", "dag_id", 
"try_number"))
+        SCOPE_CRUMBS = frozenset(("task_id", "state", "operator", "duration"))
+
+        UNSUPPORTED_SENTRY_OPTIONS = frozenset(
+            (
+                "integrations",
+                "in_app_include",
+                "in_app_exclude",
+                "ignore_errors",
+                "before_breadcrumb",
+            )
+        )
+
+        def __init__(self):
+            """Initialize the Sentry SDK."""
+            ignore_logger("airflow.task")
+
+            sentry_flask = FlaskIntegration()
+
+            # LoggingIntegration is set by default.
+            integrations = [sentry_flask]
+
+            executor_class, _ = 
ExecutorLoader.import_default_executor_cls(validate=False)
+
+            if executor_class.supports_sentry:
+                from sentry_sdk.integrations.celery import CeleryIntegration
+
+                sentry_celery = CeleryIntegration()
+                integrations.append(sentry_celery)
+
+            dsn = None
+            sentry_config_opts = conf.getsection("sentry") or {}
+            if sentry_config_opts:
+                sentry_config_opts.pop("sentry_on")
+                old_way_dsn = sentry_config_opts.pop("sentry_dsn", None)
+                new_way_dsn = sentry_config_opts.pop("dsn", None)
+                # supported backward compatibility with old way dsn option
+                dsn = old_way_dsn or new_way_dsn
+
+                unsupported_options = 
self.UNSUPPORTED_SENTRY_OPTIONS.intersection(sentry_config_opts.keys())
+                if unsupported_options:
+                    log.warning(
+                        "There are unsupported options in [sentry] section: 
%s",
+                        ", ".join(unsupported_options),
+                    )
+
+                sentry_config_opts["before_send"] = conf.getimport("sentry", 
"before_send", fallback=None)
+                sentry_config_opts["transport"] = conf.getimport("sentry", 
"transport", fallback=None)
+
+            if dsn:
+                sentry_sdk.init(dsn=dsn, integrations=integrations, 
**sentry_config_opts)
+            else:
+                # Setting up Sentry using environment variables.
+                log.debug("Defaulting to SENTRY_DSN in environment.")
+                sentry_sdk.init(integrations=integrations, 
**sentry_config_opts)
+
+        def add_tagging(self, task_instance):
+            """Add tagging for a task_instance."""
+            dag_run = task_instance.dag_run
+            task = task_instance.task
+
+            with sentry_sdk.configure_scope() as scope:
+                for tag_name in self.SCOPE_TASK_INSTANCE_TAGS:
+                    attribute = getattr(task_instance, tag_name)
+                    scope.set_tag(tag_name, attribute)
+                for tag_name in self.SCOPE_DAG_RUN_TAGS:
+                    attribute = getattr(dag_run, tag_name)
+                    scope.set_tag(tag_name, attribute)
+                scope.set_tag("operator", task.__class__.__name__)
+
+        @provide_session
+        def add_breadcrumbs(
+            self,
+            task_instance: TaskInstance,
+            session: Session | None = None,
+        ) -> None:
+            """Add breadcrumbs inside of a task_instance."""
+            if session is None:
+                return
+            dr = task_instance.get_dagrun(session)
+            task_instances = dr.get_task_instances(
+                state={TaskInstanceState.SUCCESS, TaskInstanceState.FAILED},
+                session=session,
+            )
+
+            for ti in task_instances:
+                data = {}
+                for crumb_tag in self.SCOPE_CRUMBS:
+                    data[crumb_tag] = getattr(ti, crumb_tag)
+
+                sentry_sdk.add_breadcrumb(category="completed_tasks", 
data=data, level="info")
+
+        def enrich_errors(self, func):
+            """
+            Decorate errors.
+
+            Wrap TaskInstance._run_raw_task to support task specific tags and 
breadcrumbs.
+            """
+            session_args_idx = find_session_idx(func)
+
+            @wraps(func)
+            def wrapper(_self, *args, **kwargs):
+                # Wrapping the _run_raw_task function with push_scope to 
contain
+                # tags and breadcrumbs to a specific Task Instance
+
+                try:
+                    session = kwargs.get("session", args[session_args_idx])
+                except IndexError:
+                    session = None
+
+                with sentry_sdk.push_scope():
+                    try:
+                        # Is a LocalTaskJob get the task instance
+                        if hasattr(_self, "task_instance"):
+                            task_instance = _self.task_instance
+                        else:
+                            task_instance = _self
+
+                        self.add_tagging(task_instance)
+                        self.add_breadcrumbs(task_instance, session=session)
+                        return func(_self, *args, **kwargs)
+                    except Exception as e:
+                        sentry_sdk.capture_exception(e)
+                        raise
+
+            return wrapper
+
+        def flush(self):
+            sentry_sdk.flush()
+
+    Sentry = ConfiguredSentry()
diff --git a/airflow/sentry/__init__.py b/airflow/sentry/__init__.py
deleted file mode 100644
index 10178aabf0..0000000000
--- a/airflow/sentry/__init__.py
+++ /dev/null
@@ -1,29 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-"""Sentry Integration."""
-
-from __future__ import annotations
-
-from airflow.configuration import conf
-from airflow.sentry.blank import BlankSentry
-
-Sentry: BlankSentry = BlankSentry()
-if conf.getboolean("sentry", "sentry_on", fallback=False):
-    from airflow.sentry.configured import ConfiguredSentry
-
-    Sentry = ConfiguredSentry()
diff --git a/airflow/sentry/blank.py b/airflow/sentry/blank.py
deleted file mode 100644
index 8cdb40b5a9..0000000000
--- a/airflow/sentry/blank.py
+++ /dev/null
@@ -1,40 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-from typing import TYPE_CHECKING
-
-if TYPE_CHECKING:
-    from sqlalchemy.orm import Session
-
-
-class BlankSentry:
-    """Blank class for Sentry."""
-
-    def add_tagging(self, task_instance):
-        """Blank function for tagging."""
-
-    def add_breadcrumbs(self, task_instance, session: Session | None = None):
-        """Blank function for breadcrumbs."""
-
-    def enrich_errors(self, run):
-        """Blank function for formatting a TaskInstance._run_raw_task."""
-        return run
-
-    def flush(self):
-        """Blank function for flushing errors."""
diff --git a/airflow/sentry/configured.py b/airflow/sentry/configured.py
deleted file mode 100644
index af96839138..0000000000
--- a/airflow/sentry/configured.py
+++ /dev/null
@@ -1,176 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-import logging
-from functools import wraps
-from typing import TYPE_CHECKING
-
-import sentry_sdk
-from sentry_sdk.integrations.flask import FlaskIntegration
-from sentry_sdk.integrations.logging import ignore_logger
-
-from airflow.configuration import conf
-from airflow.executors.executor_loader import ExecutorLoader
-from airflow.sentry.blank import BlankSentry
-from airflow.utils.session import find_session_idx, provide_session
-from airflow.utils.state import TaskInstanceState
-
-if TYPE_CHECKING:
-    from sqlalchemy.orm import Session
-
-    from airflow.models.taskinstance import TaskInstance
-
-log = logging.getLogger(__name__)
-
-
-class ConfiguredSentry(BlankSentry):
-    """Configure Sentry SDK."""
-
-    SCOPE_DAG_RUN_TAGS = frozenset(("data_interval_end", 
"data_interval_start", "execution_date"))
-    SCOPE_TASK_INSTANCE_TAGS = frozenset(("task_id", "dag_id", "try_number"))
-    SCOPE_CRUMBS = frozenset(("task_id", "state", "operator", "duration"))
-
-    UNSUPPORTED_SENTRY_OPTIONS = frozenset(
-        (
-            "integrations",
-            "in_app_include",
-            "in_app_exclude",
-            "ignore_errors",
-            "before_breadcrumb",
-        )
-    )
-
-    def __init__(self):
-        """Initialize the Sentry SDK."""
-        ignore_logger("airflow.task")
-
-        sentry_flask = FlaskIntegration()
-
-        # LoggingIntegration is set by default.
-        integrations = [sentry_flask]
-
-        executor_class, _ = 
ExecutorLoader.import_default_executor_cls(validate=False)
-
-        if executor_class.supports_sentry:
-            from sentry_sdk.integrations.celery import CeleryIntegration
-
-            sentry_celery = CeleryIntegration()
-            integrations.append(sentry_celery)
-
-        dsn = None
-        sentry_config_opts = conf.getsection("sentry") or {}
-        if sentry_config_opts:
-            sentry_config_opts.pop("sentry_on")
-            old_way_dsn = sentry_config_opts.pop("sentry_dsn", None)
-            new_way_dsn = sentry_config_opts.pop("dsn", None)
-            # supported backward compatibility with old way dsn option
-            dsn = old_way_dsn or new_way_dsn
-
-            unsupported_options = 
self.UNSUPPORTED_SENTRY_OPTIONS.intersection(sentry_config_opts.keys())
-            if unsupported_options:
-                log.warning(
-                    "There are unsupported options in [sentry] section: %s",
-                    ", ".join(unsupported_options),
-                )
-
-            sentry_config_opts["before_send"] = conf.getimport("sentry", 
"before_send", fallback=None)
-            sentry_config_opts["transport"] = conf.getimport("sentry", 
"transport", fallback=None)
-
-        if dsn:
-            sentry_sdk.init(dsn=dsn, integrations=integrations, 
**sentry_config_opts)
-        else:
-            # Setting up Sentry using environment variables.
-            log.debug("Defaulting to SENTRY_DSN in environment.")
-            sentry_sdk.init(integrations=integrations, **sentry_config_opts)
-
-    def add_tagging(self, task_instance):
-        """Add tagging for a task_instance."""
-        dag_run = task_instance.dag_run
-        # See TaskInstance definition, the "task" attribute may not be set
-        task = getattr(task_instance, "task")
-
-        with sentry_sdk.configure_scope() as scope:
-            for tag_name in self.SCOPE_TASK_INSTANCE_TAGS:
-                attribute = getattr(task_instance, tag_name)
-                scope.set_tag(tag_name, attribute)
-            for tag_name in self.SCOPE_DAG_RUN_TAGS:
-                attribute = getattr(dag_run, tag_name)
-                scope.set_tag(tag_name, attribute)
-            if task is not None:
-                scope.set_tag("operator", task.__class__.__name__)
-
-    @provide_session
-    def add_breadcrumbs(
-        self,
-        task_instance: TaskInstance,
-        session: Session | None = None,
-    ) -> None:
-        """Add breadcrumbs inside of a task_instance."""
-        if session is None:
-            return
-        dr = task_instance.get_dagrun(session)
-        task_instances = dr.get_task_instances(
-            state={TaskInstanceState.SUCCESS, TaskInstanceState.FAILED},
-            session=session,
-        )
-
-        for ti in task_instances:
-            data = {}
-            for crumb_tag in self.SCOPE_CRUMBS:
-                data[crumb_tag] = getattr(ti, crumb_tag)
-
-            sentry_sdk.add_breadcrumb(category="completed_tasks", data=data, 
level="info")
-
-    def enrich_errors(self, func):
-        """
-        Decorate errors.
-
-        Wrap TaskInstance._run_raw_task to support task specific tags and 
breadcrumbs.
-        """
-        session_args_idx = find_session_idx(func)
-
-        @wraps(func)
-        def wrapper(_self, *args, **kwargs):
-            # Wrapping the _run_raw_task function with push_scope to contain
-            # tags and breadcrumbs to a specific Task Instance
-
-            try:
-                session = kwargs.get("session", args[session_args_idx])
-            except IndexError:
-                session = None
-
-            with sentry_sdk.push_scope():
-                try:
-                    # Is a LocalTaskJob get the task instance
-                    if hasattr(_self, "task_instance"):
-                        task_instance = _self.task_instance
-                    else:
-                        task_instance = _self
-
-                    self.add_tagging(task_instance)
-                    self.add_breadcrumbs(task_instance, session=session)
-                    return func(_self, *args, **kwargs)
-                except Exception as e:
-                    sentry_sdk.capture_exception(e)
-                    raise
-
-        return wrapper
-
-    def flush(self):
-        sentry_sdk.flush()
diff --git a/tests/test_sentry.py b/tests/test_sentry.py
deleted file mode 100644
index 10968b5274..0000000000
--- a/tests/test_sentry.py
+++ /dev/null
@@ -1,65 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-from unittest.mock import MagicMock, create_autospec, patch
-
-from sentry_sdk.scope import Scope
-
-from airflow.models.taskinstance import TaskInstance
-
-
-@patch("sentry_sdk.configure_scope")
-def test_configured_sentry_add_tagging(mock_configure_scope):
-    mock_scope = create_autospec(Scope)
-    mock_configure_scope.return_value = mock_scope
-
-    from airflow.sentry.configured import ConfiguredSentry
-
-    sentry = ConfiguredSentry()
-
-    dummy_tags = ConfiguredSentry.SCOPE_DAG_RUN_TAGS | 
ConfiguredSentry.SCOPE_TASK_INSTANCE_TAGS
-
-    # It should not raise error with both "dag_run" and "task" attributes, 
available.
-    task_instance_1 = create_autospec(TaskInstance)
-    task_instance_1.dag_run = MagicMock()
-    task_instance_1.task = "task_1"
-    for tag in dummy_tags:
-        setattr(task_instance_1.dag_run, tag, "dummy")
-    sentry.add_tagging(task_instance_1)
-
-    # Verify tags
-    for tag in dummy_tags:
-        mock_scope.set_tag.assert_called_with(tag, "dummy")
-    mock_scope.set_tag.assert_called_with("operator", "str")
-
-    # Reset the mock
-    mock_scope.reset_mock()
-
-    # It should not raise error if "task" attribute is not set.
-    task_instance_2 = create_autospec(TaskInstance)
-    task_instance_2.dag_run = MagicMock()
-    for tag in dummy_tags:
-        setattr(task_instance_2.dag_run, tag, "dummy")
-    sentry.add_tagging(task_instance_2)
-
-    # Verify tags
-    for tag in dummy_tags:
-        mock_scope.set_tag.assert_called_with(tag, "dummy")
-    # Also verify the "operator" tag, which is related to the "task attribute.
-    mock_scope.set_tag.assert_called_with("operator", "str")

Reply via email to