This is an automated email from the ASF dual-hosted git repository.

ferruzzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 7c9ffa5bf6b feat: add async jira notifier (#56326)
7c9ffa5bf6b is described below

commit 7c9ffa5bf6b26685d3021b992900653f03d4b7ef
Author: Sebastian Daum <[email protected]>
AuthorDate: Mon Oct 20 23:52:38 2025 +0200

    feat: add async jira notifier (#56326)
---
 dev/breeze/tests/test_selective_checks.py          |  17 +--
 providers/atlassian/jira/pyproject.toml            |   2 +
 .../airflow/providers/atlassian/jira/hooks/jira.py | 107 +++++++++++++++++-
 .../providers/atlassian/jira/notifications/jira.py |  55 ++++++++--
 .../tests/unit/atlassian/jira/hooks/test_jira.py   | 120 ++++++++++++++++++++-
 .../unit/atlassian/jira/notifications/test_jira.py |  36 ++++++-
 6 files changed, 315 insertions(+), 22 deletions(-)

diff --git a/dev/breeze/tests/test_selective_checks.py 
b/dev/breeze/tests/test_selective_checks.py
index 196fc9846b6..0e75dca5f1e 100644
--- a/dev/breeze/tests/test_selective_checks.py
+++ b/dev/breeze/tests/test_selective_checks.py
@@ -670,7 +670,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, 
str], stderr: str):
                     "providers/http/tests/file.py",
                 ),
                 {
-                    "selected-providers-list-as-string": "amazon apache.livy 
dbt.cloud dingding discord google http pagerduty",
+                    "selected-providers-list-as-string": "amazon apache.livy 
atlassian.jira dbt.cloud dingding discord google http pagerduty",
                     "all-python-versions": 
f"['{DEFAULT_PYTHON_MAJOR_MINOR_VERSION}']",
                     "all-python-versions-list-as-string": 
DEFAULT_PYTHON_MAJOR_MINOR_VERSION,
                     "python-versions": 
f"['{DEFAULT_PYTHON_MAJOR_MINOR_VERSION}']",
@@ -691,7 +691,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, 
str], stderr: str):
                         [
                             {
                                 "description": "amazon...google",
-                                "test_types": "Providers[amazon] 
Providers[apache.livy,dbt.cloud,dingding,discord,http,pagerduty] 
Providers[google]",
+                                "test_types": "Providers[amazon] 
Providers[apache.livy,atlassian.jira,dbt.cloud,dingding,discord,http,pagerduty] 
Providers[google]",
                             }
                         ]
                     ),
@@ -702,14 +702,17 @@ def assert_outputs_are_printed(expected_outputs: 
dict[str, str], stderr: str):
                                 "test_types": "Providers[amazon] 
Providers[apache.livy]",
                             },
                             {
-                                "description": "dbt.cloud...dingding",
-                                "test_types": "Providers[dbt.cloud] 
Providers[dingding]",
+                                "description": "atlassian.jir...dbt.cloud",
+                                "test_types": "Providers[atlassian.jira] 
Providers[dbt.cloud]",
                             },
                             {
-                                "description": "discord...google",
-                                "test_types": "Providers[discord] 
Providers[google]",
+                                "description": "dingding...discord",
+                                "test_types": "Providers[dingding] 
Providers[discord]",
+                            },
+                            {
+                                "description": "google...http",
+                                "test_types": "Providers[google] 
Providers[http]",
                             },
-                            {"description": "http", "test_types": 
"Providers[http]"},
                             {"description": "pagerduty", "test_types": 
"Providers[pagerduty]"},
                         ]
                     ),
diff --git a/providers/atlassian/jira/pyproject.toml 
b/providers/atlassian/jira/pyproject.toml
index d7beb2aa59d..6f9f3e454e5 100644
--- a/providers/atlassian/jira/pyproject.toml
+++ b/providers/atlassian/jira/pyproject.toml
@@ -59,6 +59,7 @@ requires-python = ">=3.10"
 dependencies = [
     "apache-airflow>=2.10.0",
     "apache-airflow-providers-common-compat>=1.6.1",
+    "apache-airflow-providers-http",
     "atlassian-python-api>3.41.10",
 ]
 
@@ -68,6 +69,7 @@ dev = [
     "apache-airflow-task-sdk",
     "apache-airflow-devel-common",
     "apache-airflow-providers-common-compat",
+    "apache-airflow-providers-http",
     # Additional devel dependencies (do not remove this line and add extra 
development dependencies)
 ]
 
diff --git 
a/providers/atlassian/jira/src/airflow/providers/atlassian/jira/hooks/jira.py 
b/providers/atlassian/jira/src/airflow/providers/atlassian/jira/hooks/jira.py
index 101910c06f6..f4752fa143f 100644
--- 
a/providers/atlassian/jira/src/airflow/providers/atlassian/jira/hooks/jira.py
+++ 
b/providers/atlassian/jira/src/airflow/providers/atlassian/jira/hooks/jira.py
@@ -15,16 +15,30 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-"""Hook for JIRA."""
+"""
+Hook for JIRA.
+
+.. spelling:word-list::
+
+    ClientResponse
+    aiohttp
+    reqrep
+"""
 
 from __future__ import annotations
 
-from typing import Any, cast
+import json
+from typing import TYPE_CHECKING, Any, cast
 
+import aiohttp
 from atlassian import Jira
 
 from airflow.exceptions import AirflowException
 from airflow.providers.atlassian.jira.version_compat import BaseHook
+from airflow.providers.http.hooks.http import HttpAsyncHook
+
+if TYPE_CHECKING:
+    from aiohttp.client_reqrep import ClientResponse
 
 
 class JiraHook(BaseHook):
@@ -32,6 +46,9 @@ class JiraHook(BaseHook):
     Jira interaction hook, a Wrapper around Atlassian Jira Python SDK.
 
     :param jira_conn_id: reference to a pre-defined Jira Connection
+    :param proxies: Proxies to make the Jira REST API call. Optional
+    :param api_root: root for the api requests. Optional
+    :param api_version: Jira api version to use. Optional
     """
 
     default_conn_name = "jira_default"
@@ -39,10 +56,18 @@ class JiraHook(BaseHook):
     conn_name_attr = "jira_conn_id"
     hook_name = "JIRA"
 
-    def __init__(self, jira_conn_id: str = default_conn_name, proxies: Any | 
None = None) -> None:
+    def __init__(
+        self,
+        jira_conn_id: str = default_conn_name,
+        proxies: Any | None = None,
+        api_root: str = "rest/api",
+        api_version: str | int = "2",
+    ) -> None:
         super().__init__()
         self.jira_conn_id = jira_conn_id
         self.proxies = proxies
+        self.api_root = api_root
+        self.api_version = api_version
         self.client: Jira | None = None
         self.get_conn()
 
@@ -67,6 +92,8 @@ class JiraHook(BaseHook):
                 password=conn.password,
                 verify_ssl=verify,
                 proxies=self.proxies,
+                api_version=self.api_version,
+                api_root=self.api_root,
             )
 
         return self.client
@@ -88,3 +115,77 @@ class JiraHook(BaseHook):
             "hidden_fields": ["schema", "extra"],
             "relabeling": {},
         }
+
+
+class JiraAsyncHook(HttpAsyncHook):
+    """
+    Async Jira interaction hook, interacts with Jira with HTTP.
+
+    :param jira_conn_id: reference to a pre-defined Jira Connection
+    :param api_root: root for the api requests
+    :param api_version: api version to use
+    :param proxies: Specify proxies to use. Defaults to None.
+
+    """
+
+    default_headers = {
+        "Content-Type": "application/json",
+        "Accept": "application/json",
+    }
+    default_conn_name = "jira_default"
+    conn_type = "jira"
+    conn_name_attr = "jira_conn_id"
+    hook_name = "Async HTTP JIRA"
+
+    def __init__(
+        self,
+        jira_conn_id: str = default_conn_name,
+        api_root: str = "rest/api",
+        api_version: str | int = "2",
+        proxies: Any | None = None,
+    ) -> None:
+        super().__init__()
+        self.method = "POST"
+        self.api_root = api_root
+        self.api_version = api_version
+        self.http_conn_id = jira_conn_id
+        self.proxies = proxies
+
+    def get_resource_url(
+        self, resource: str, api_root: str | None = None, api_version: str | 
int | None = None
+    ) -> str:
+        """
+        Create the resource url.
+
+        :param resource: Jira resource
+        :param api_root: root for the api requests
+        :param api_version: request payload
+        :return: resource URL
+
+        """
+        if api_root is None:
+            api_root = self.api_root
+        if api_version is None:
+            api_version = self.api_version
+        return "/".join(str(s).strip("/") for s in [api_root, api_version, 
resource] if s is not None)
+
+    async def create_issue(self, fields: str | dict) -> ClientResponse:
+        """
+        Create an issue or a sub-task from a JSON representation.
+
+        :param fields: JSON data. mandatory keys are issue type, summary and 
project
+        :return: client response
+        """
+        path = self.get_resource_url("issue")
+
+        session_kwargs: dict[str, Any] = {}
+        if self.proxies:
+            session_kwargs["proxy"] = self.proxies
+
+        async with aiohttp.ClientSession(**session_kwargs) as session:
+            return await super().run(
+                session=session,
+                endpoint=path,
+                data=json.dumps({"fields": fields}),
+                headers=self.default_headers,
+            )
diff --git 
a/providers/atlassian/jira/src/airflow/providers/atlassian/jira/notifications/jira.py
 
b/providers/atlassian/jira/src/airflow/providers/atlassian/jira/notifications/jira.py
index 151d225575e..bd2f8f5b22e 100644
--- 
a/providers/atlassian/jira/src/airflow/providers/atlassian/jira/notifications/jira.py
+++ 
b/providers/atlassian/jira/src/airflow/providers/atlassian/jira/notifications/jira.py
@@ -20,7 +20,8 @@ from __future__ import annotations
 from functools import cached_property
 from typing import Any
 
-from airflow.providers.atlassian.jira.hooks.jira import JiraHook
+from airflow.providers.atlassian.jira.hooks.jira import JiraAsyncHook, JiraHook
+from airflow.providers.atlassian.jira.version_compat import AIRFLOW_V_3_1_PLUS
 from airflow.providers.common.compat.notifier import BaseNotifier
 
 
@@ -30,6 +31,8 @@ class JiraNotifier(BaseNotifier):
 
     :param jira_conn_id: The HTTP connection ID for the Jira instance.
     :param proxies: Proxies to make the Jira REST API call. Optional
+    :param api_version: Jira api version to use. Optional
+    :param api_root: root for the api requests. Optional
     :param description: The content for the body of the issue
     :param summary: The title of the issue
     :param project_id: The ID of the project under which the issue will be 
created
@@ -44,15 +47,24 @@ class JiraNotifier(BaseNotifier):
         *,
         jira_conn_id: str = JiraHook.default_conn_name,
         proxies: Any | None = None,
+        api_version: str | int = "2",
+        api_root: str = "rest/api",
         description: str,
         summary: str,
         project_id: int,
         issue_type_id: int,
         labels: list[str] | None = None,
+        **kwargs,
     ):
-        super().__init__()
+        if AIRFLOW_V_3_1_PLUS:
+            #  Support for passing context was added in 3.1.0
+            super().__init__(**kwargs)
+        else:
+            super().__init__()
         self.jira_conn_id = jira_conn_id
         self.proxies = proxies
+        self.api_version = api_version
+        self.api_root = api_root
         self.description = description
         self.summary = summary
         self.project_id = project_id
@@ -61,17 +73,40 @@ class JiraNotifier(BaseNotifier):
 
     @cached_property
     def hook(self) -> JiraHook:
-        return JiraHook(jira_conn_id=self.jira_conn_id, proxies=self.proxies)
+        return JiraHook(
+            jira_conn_id=self.jira_conn_id,
+            proxies=self.proxies,
+            api_version=self.api_version,
+            api_root=self.api_root,
+        )
 
-    def notify(self, context) -> None:
-        fields = dict(
-            description=self.description,
-            summary=self.summary,
-            project=dict(id=self.project_id),
-            issuetype=dict(id=self.issue_type_id),
-            labels=self.labels,
+    @cached_property
+    def async_hook(self) -> JiraAsyncHook:
+        return JiraAsyncHook(
+            jira_conn_id=self.jira_conn_id,
+            proxies=self.proxies,
+            api_version=self.api_version,
+            api_root=self.api_root,
         )
+
+    def _get_fields(self) -> dict[str, Any]:
+        """Get the required Jira fields to create an issue."""
+        return {
+            "description": self.description,
+            "summary": self.summary,
+            "project": {"id": self.project_id},
+            "issuetype": {"id": self.issue_type_id},
+            "labels": self.labels,
+        }
+
+    def notify(self, context) -> None:
+        fields = self._get_fields()
         self.hook.get_conn().create_issue(fields)
 
+    async def async_notify(self, context) -> None:
+        """Create a Jira issue (async)."""
+        fields = self._get_fields()
+        await self.async_hook.create_issue(fields)
+
 
 send_jira_notification = JiraNotifier
diff --git 
a/providers/atlassian/jira/tests/unit/atlassian/jira/hooks/test_jira.py 
b/providers/atlassian/jira/tests/unit/atlassian/jira/hooks/test_jira.py
index 215fadddd93..97a5794cdd8 100644
--- a/providers/atlassian/jira/tests/unit/atlassian/jira/hooks/test_jira.py
+++ b/providers/atlassian/jira/tests/unit/atlassian/jira/hooks/test_jira.py
@@ -20,13 +20,23 @@ from __future__ import annotations
 from unittest import mock
 
 import pytest
+from aioresponses import aioresponses
 
 from airflow.models import Connection
-from airflow.providers.atlassian.jira.hooks.jira import JiraHook
+from airflow.providers.atlassian.jira.hooks.jira import JiraAsyncHook, JiraHook
 
 from tests_common.test_utils.compat import connection_as_json
 
 
[email protected]
+def aioresponse():
+    """
+    Creates mock async API response.
+    """
+    with aioresponses() as async_response:
+        yield async_response
+
+
 @pytest.fixture
 def mocked_jira_client():
     with mock.patch("airflow.providers.atlassian.jira.hooks.jira.Jira", 
autospec=True) as m:
@@ -68,6 +78,114 @@ class TestJiraHook:
             password=self.password,
             verify_ssl=False,
             proxies=self.proxies,
+            api_version="2",
+            api_root="rest/api",
         )
         assert isinstance(jira_hook.client, mock.Mock)
         assert jira_hook.client.name == mocked_jira_client.return_value.name
+
+
+class TestJiraAsyncHook:
+    @pytest.fixture
+    def setup_connections(self, create_connection_without_db):
+        create_connection_without_db(
+            Connection(
+                conn_id="jira_default",
+                conn_type="jira",
+                host="http://test.atlassian.net";,
+                login="login",
+                password="password",
+                extra='{"verify_ssl": false}',
+            )
+        )
+
+    @pytest.mark.parametrize(
+        "api_version, api_result",
+        [
+            pytest.param(
+                "2",
+                "2",
+            ),
+            pytest.param(
+                1,
+                1,
+            ),
+            pytest.param(
+                "latest",
+                "latest",
+            ),
+        ],
+    )
+    def test_api_version(self, api_version, api_result):
+        """Test different API versions"""
+        hook = JiraAsyncHook(jira_conn_id="jira_default", 
api_version=api_version)
+        assert hook.api_version == api_result
+
+    @pytest.mark.parametrize(
+        "hook_kwargs, resource, api_root, api_version, resource_url_result",
+        [
+            pytest.param({"jira_conn_id": "jira_default"}, "issue", "abc/api", 
1, "abc/api/1/issue"),
+            pytest.param(
+                {"jira_conn_id": "jira_default", "api_version": 2}, "issue", 
"abc/api", 1, "abc/api/1/issue"
+            ),
+            pytest.param(
+                {"jira_conn_id": "jira_default", "api_version": 2},
+                "issue",
+                "abc/api",
+                None,
+                "abc/api/2/issue",
+            ),
+            pytest.param(
+                {"jira_conn_id": "jira_default", "api_root": "notset/api"},
+                "issue",
+                "abc/api",
+                1,
+                "abc/api/1/issue",
+            ),
+            pytest.param(
+                {"jira_conn_id": "jira_default", "api_root": "notset/api"},
+                "issue",
+                "notset/api",
+                1,
+                "notset/api/1/issue",
+            ),
+        ],
+    )
+    def test_get_resource_url(self, hook_kwargs, resource, api_root, 
api_version, resource_url_result):
+        """Test expected resource url"""
+        hook = JiraAsyncHook(**hook_kwargs)
+        assert (
+            hook.get_resource_url(resource=resource, api_root=api_root, 
api_version=api_version)
+            == resource_url_result
+        )
+
+    @pytest.mark.asyncio
+    async def test_create_issue_with_provided_conn(self, setup_connections):
+        """Asserts that async http hook can read a jira connection."""
+        hook = JiraAsyncHook(jira_conn_id="jira_default")
+        fields = {
+            "project": {"key": "TEST"},
+            "issuetype": {"name": "Task"},
+            "summary": "test rest",
+            "description": "rest rest",
+        }
+        with mock.patch("aiohttp.ClientSession.post", 
new_callable=mock.AsyncMock) as mocked_function:
+            await hook.create_issue(fields)
+            assert mocked_function.call_args.kwargs.get("verify_ssl") is False
+            assert mocked_function.call_args.kwargs.get("auth").login == 
"login"
+            assert mocked_function.call_args.kwargs.get("auth").password == 
"password"
+
+    @pytest.mark.asyncio
+    async def test_create_issue_with_sucess(self, aioresponse, 
setup_connections):
+        """Asserts that create issue return with success."""
+        hook = JiraAsyncHook(jira_conn_id="jira_default")
+        fields = {
+            "project": {"key": "TEST"},
+            "issuetype": {"name": "Task"},
+            "summary": "test rest",
+            "description": "rest rest",
+        }
+        aioresponse.post("http://test.atlassian.net/rest/api/2/issue";, 
status=200)
+
+        res = await hook.create_issue(fields)
+        assert res.status == 200
diff --git 
a/providers/atlassian/jira/tests/unit/atlassian/jira/notifications/test_jira.py 
b/providers/atlassian/jira/tests/unit/atlassian/jira/notifications/test_jira.py
index 78bbca66467..cd501c4a184 100644
--- 
a/providers/atlassian/jira/tests/unit/atlassian/jira/notifications/test_jira.py
+++ 
b/providers/atlassian/jira/tests/unit/atlassian/jira/notifications/test_jira.py
@@ -19,8 +19,13 @@ from __future__ import annotations
 
 from unittest import mock
 
+import pytest
+
 from airflow.providers.atlassian.jira.hooks.jira import JiraHook
-from airflow.providers.atlassian.jira.notifications.jira import JiraNotifier, 
send_jira_notification
+from airflow.providers.atlassian.jira.notifications.jira import (
+    JiraNotifier,
+    send_jira_notification,
+)
 
 jira_create_issue_payload = dict(
     description="Test operator failed",
@@ -78,3 +83,32 @@ class TestJiraNotifier:
                 labels=["airflow-dag-failure"],
             )
         )
+
+    def test_jira_notifier_get_fields(self):
+        notifier = JiraNotifier(
+            jira_conn_id="jira_default",
+            project_id=10000,
+            description="Test operator failed",
+            summary="Test Jira issue",
+            issue_type_id=10003,
+            labels=["airflow-dag-failure"],
+        )
+        assert notifier._get_fields() == jira_create_issue_payload
+
+    @pytest.mark.asyncio
+    @mock.patch(
+        
"airflow.providers.atlassian.jira.notifications.jira.JiraAsyncHook.create_issue",
+        new_callable=mock.AsyncMock,
+    )
+    async def test_jira_async_notifier(self, mock_jira_hook, 
create_dag_without_db):
+        notifier = send_jira_notification(
+            jira_conn_id="jira_default",
+            api_root="test/rest",
+            project_id=10000,
+            description="Test operator failed",
+            summary="Test Jira issue",
+            issue_type_id=10003,
+            labels=["airflow-dag-failure"],
+        )
+        await notifier.async_notify({"dag": 
create_dag_without_db("test_jira_notifier")})
+        mock_jira_hook.assert_called_once_with(jira_create_issue_payload)

Reply via email to