This is an automated email from the ASF dual-hosted git repository.
ferruzzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 7c9ffa5bf6b feat: add async jira notifier (#56326)
7c9ffa5bf6b is described below
commit 7c9ffa5bf6b26685d3021b992900653f03d4b7ef
Author: Sebastian Daum <[email protected]>
AuthorDate: Mon Oct 20 23:52:38 2025 +0200
feat: add async jira notifier (#56326)
---
dev/breeze/tests/test_selective_checks.py | 17 +--
providers/atlassian/jira/pyproject.toml | 2 +
.../airflow/providers/atlassian/jira/hooks/jira.py | 107 +++++++++++++++++-
.../providers/atlassian/jira/notifications/jira.py | 55 ++++++++--
.../tests/unit/atlassian/jira/hooks/test_jira.py | 120 ++++++++++++++++++++-
.../unit/atlassian/jira/notifications/test_jira.py | 36 ++++++-
6 files changed, 315 insertions(+), 22 deletions(-)
diff --git a/dev/breeze/tests/test_selective_checks.py
b/dev/breeze/tests/test_selective_checks.py
index 196fc9846b6..0e75dca5f1e 100644
--- a/dev/breeze/tests/test_selective_checks.py
+++ b/dev/breeze/tests/test_selective_checks.py
@@ -670,7 +670,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str,
str], stderr: str):
"providers/http/tests/file.py",
),
{
- "selected-providers-list-as-string": "amazon apache.livy
dbt.cloud dingding discord google http pagerduty",
+ "selected-providers-list-as-string": "amazon apache.livy
atlassian.jira dbt.cloud dingding discord google http pagerduty",
"all-python-versions":
f"['{DEFAULT_PYTHON_MAJOR_MINOR_VERSION}']",
"all-python-versions-list-as-string":
DEFAULT_PYTHON_MAJOR_MINOR_VERSION,
"python-versions":
f"['{DEFAULT_PYTHON_MAJOR_MINOR_VERSION}']",
@@ -691,7 +691,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str,
str], stderr: str):
[
{
"description": "amazon...google",
- "test_types": "Providers[amazon]
Providers[apache.livy,dbt.cloud,dingding,discord,http,pagerduty]
Providers[google]",
+ "test_types": "Providers[amazon]
Providers[apache.livy,atlassian.jira,dbt.cloud,dingding,discord,http,pagerduty]
Providers[google]",
}
]
),
@@ -702,14 +702,17 @@ def assert_outputs_are_printed(expected_outputs:
dict[str, str], stderr: str):
"test_types": "Providers[amazon]
Providers[apache.livy]",
},
{
- "description": "dbt.cloud...dingding",
- "test_types": "Providers[dbt.cloud]
Providers[dingding]",
+ "description": "atlassian.jir...dbt.cloud",
+ "test_types": "Providers[atlassian.jira]
Providers[dbt.cloud]",
},
{
- "description": "discord...google",
- "test_types": "Providers[discord]
Providers[google]",
+ "description": "dingding...discord",
+ "test_types": "Providers[dingding]
Providers[discord]",
+ },
+ {
+ "description": "google...http",
+ "test_types": "Providers[google]
Providers[http]",
},
- {"description": "http", "test_types":
"Providers[http]"},
{"description": "pagerduty", "test_types":
"Providers[pagerduty]"},
]
),
diff --git a/providers/atlassian/jira/pyproject.toml
b/providers/atlassian/jira/pyproject.toml
index d7beb2aa59d..6f9f3e454e5 100644
--- a/providers/atlassian/jira/pyproject.toml
+++ b/providers/atlassian/jira/pyproject.toml
@@ -59,6 +59,7 @@ requires-python = ">=3.10"
dependencies = [
"apache-airflow>=2.10.0",
"apache-airflow-providers-common-compat>=1.6.1",
+ "apache-airflow-providers-http",
"atlassian-python-api>3.41.10",
]
@@ -68,6 +69,7 @@ dev = [
"apache-airflow-task-sdk",
"apache-airflow-devel-common",
"apache-airflow-providers-common-compat",
+ "apache-airflow-providers-http",
# Additional devel dependencies (do not remove this line and add extra
development dependencies)
]
diff --git
a/providers/atlassian/jira/src/airflow/providers/atlassian/jira/hooks/jira.py
b/providers/atlassian/jira/src/airflow/providers/atlassian/jira/hooks/jira.py
index 101910c06f6..f4752fa143f 100644
---
a/providers/atlassian/jira/src/airflow/providers/atlassian/jira/hooks/jira.py
+++
b/providers/atlassian/jira/src/airflow/providers/atlassian/jira/hooks/jira.py
@@ -15,16 +15,30 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-"""Hook for JIRA."""
+"""
+Hook for JIRA.
+
+.. spelling:word-list::
+
+ ClientResponse
+ aiohttp
+ reqrep
+"""
from __future__ import annotations
-from typing import Any, cast
+import json
+from typing import TYPE_CHECKING, Any, cast
+import aiohttp
from atlassian import Jira
from airflow.exceptions import AirflowException
from airflow.providers.atlassian.jira.version_compat import BaseHook
+from airflow.providers.http.hooks.http import HttpAsyncHook
+
+if TYPE_CHECKING:
+ from aiohttp.client_reqrep import ClientResponse
class JiraHook(BaseHook):
@@ -32,6 +46,9 @@ class JiraHook(BaseHook):
Jira interaction hook, a Wrapper around Atlassian Jira Python SDK.
:param jira_conn_id: reference to a pre-defined Jira Connection
+ :param proxies: Proxies to make the Jira REST API call. Optional
+ :param api_root: root for the api requests. Optional
+ :param api_version: Jira api version to use. Optional
"""
default_conn_name = "jira_default"
@@ -39,10 +56,18 @@ class JiraHook(BaseHook):
conn_name_attr = "jira_conn_id"
hook_name = "JIRA"
- def __init__(self, jira_conn_id: str = default_conn_name, proxies: Any |
None = None) -> None:
+ def __init__(
+ self,
+ jira_conn_id: str = default_conn_name,
+ proxies: Any | None = None,
+ api_root: str = "rest/api",
+ api_version: str | int = "2",
+ ) -> None:
super().__init__()
self.jira_conn_id = jira_conn_id
self.proxies = proxies
+ self.api_root = api_root
+ self.api_version = api_version
self.client: Jira | None = None
self.get_conn()
@@ -67,6 +92,8 @@ class JiraHook(BaseHook):
password=conn.password,
verify_ssl=verify,
proxies=self.proxies,
+ api_version=self.api_version,
+ api_root=self.api_root,
)
return self.client
@@ -88,3 +115,77 @@ class JiraHook(BaseHook):
"hidden_fields": ["schema", "extra"],
"relabeling": {},
}
+
+
+class JiraAsyncHook(HttpAsyncHook):
+ """
+ Async Jira interaction hook, interacts with Jira with HTTP.
+
+ :param jira_conn_id: reference to a pre-defined Jira Connection
+ :param api_root: root for the api requests
+ :param api_version: api version to use
+ :param proxies: Specify proxies to use. Defaults to None.
+
+ """
+
+ default_headers = {
+ "Content-Type": "application/json",
+ "Accept": "application/json",
+ }
+ default_conn_name = "jira_default"
+ conn_type = "jira"
+ conn_name_attr = "jira_conn_id"
+ hook_name = "Async HTTP JIRA"
+
+ def __init__(
+ self,
+ jira_conn_id: str = default_conn_name,
+ api_root: str = "rest/api",
+ api_version: str | int = "2",
+ proxies: Any | None = None,
+ ) -> None:
+ super().__init__()
+ self.method = "POST"
+ self.api_root = api_root
+ self.api_version = api_version
+ self.http_conn_id = jira_conn_id
+ self.proxies = proxies
+
+ def get_resource_url(
+ self, resource: str, api_root: str | None = None, api_version: str |
int | None = None
+ ) -> str:
+ """
+ Create the resource url.
+
+ :param resource: Jira resource
+ :param api_root: root for the api requests
+ :param api_version: request payload
+ :return: resource URL
+
+ """
+ if api_root is None:
+ api_root = self.api_root
+ if api_version is None:
+ api_version = self.api_version
+ return "/".join(str(s).strip("/") for s in [api_root, api_version,
resource] if s is not None)
+
+ async def create_issue(self, fields: str | dict) -> ClientResponse:
+ """
+ Create an issue or a sub-task from a JSON representation.
+
+ :param fields: JSON data. mandatory keys are issue type, summary and
project
+ :return: client response
+ """
+ path = self.get_resource_url("issue")
+
+ session_kwargs: dict[str, Any] = {}
+ if self.proxies:
+ session_kwargs["proxy"] = self.proxies
+
+ async with aiohttp.ClientSession(**session_kwargs) as session:
+ return await super().run(
+ session=session,
+ endpoint=path,
+ data=json.dumps({"fields": fields}),
+ headers=self.default_headers,
+ )
diff --git
a/providers/atlassian/jira/src/airflow/providers/atlassian/jira/notifications/jira.py
b/providers/atlassian/jira/src/airflow/providers/atlassian/jira/notifications/jira.py
index 151d225575e..bd2f8f5b22e 100644
---
a/providers/atlassian/jira/src/airflow/providers/atlassian/jira/notifications/jira.py
+++
b/providers/atlassian/jira/src/airflow/providers/atlassian/jira/notifications/jira.py
@@ -20,7 +20,8 @@ from __future__ import annotations
from functools import cached_property
from typing import Any
-from airflow.providers.atlassian.jira.hooks.jira import JiraHook
+from airflow.providers.atlassian.jira.hooks.jira import JiraAsyncHook, JiraHook
+from airflow.providers.atlassian.jira.version_compat import AIRFLOW_V_3_1_PLUS
from airflow.providers.common.compat.notifier import BaseNotifier
@@ -30,6 +31,8 @@ class JiraNotifier(BaseNotifier):
:param jira_conn_id: The HTTP connection ID for the Jira instance.
:param proxies: Proxies to make the Jira REST API call. Optional
+ :param api_version: Jira api version to use. Optional
+ :param api_root: root for the api requests. Optional
:param description: The content for the body of the issue
:param summary: The title of the issue
:param project_id: The ID of the project under which the issue will be
created
@@ -44,15 +47,24 @@ class JiraNotifier(BaseNotifier):
*,
jira_conn_id: str = JiraHook.default_conn_name,
proxies: Any | None = None,
+ api_version: str | int = "2",
+ api_root: str = "rest/api",
description: str,
summary: str,
project_id: int,
issue_type_id: int,
labels: list[str] | None = None,
+ **kwargs,
):
- super().__init__()
+ if AIRFLOW_V_3_1_PLUS:
+ # Support for passing context was added in 3.1.0
+ super().__init__(**kwargs)
+ else:
+ super().__init__()
self.jira_conn_id = jira_conn_id
self.proxies = proxies
+ self.api_version = api_version
+ self.api_root = api_root
self.description = description
self.summary = summary
self.project_id = project_id
@@ -61,17 +73,40 @@ class JiraNotifier(BaseNotifier):
@cached_property
def hook(self) -> JiraHook:
- return JiraHook(jira_conn_id=self.jira_conn_id, proxies=self.proxies)
+ return JiraHook(
+ jira_conn_id=self.jira_conn_id,
+ proxies=self.proxies,
+ api_version=self.api_version,
+ api_root=self.api_root,
+ )
- def notify(self, context) -> None:
- fields = dict(
- description=self.description,
- summary=self.summary,
- project=dict(id=self.project_id),
- issuetype=dict(id=self.issue_type_id),
- labels=self.labels,
+ @cached_property
+ def async_hook(self) -> JiraAsyncHook:
+ return JiraAsyncHook(
+ jira_conn_id=self.jira_conn_id,
+ proxies=self.proxies,
+ api_version=self.api_version,
+ api_root=self.api_root,
)
+
+ def _get_fields(self) -> dict[str, Any]:
+ """Get the required Jira fields to create an issue."""
+ return {
+ "description": self.description,
+ "summary": self.summary,
+ "project": {"id": self.project_id},
+ "issuetype": {"id": self.issue_type_id},
+ "labels": self.labels,
+ }
+
+ def notify(self, context) -> None:
+ fields = self._get_fields()
self.hook.get_conn().create_issue(fields)
+ async def async_notify(self, context) -> None:
+ """Create a Jira issue (async)."""
+ fields = self._get_fields()
+ await self.async_hook.create_issue(fields)
+
send_jira_notification = JiraNotifier
diff --git
a/providers/atlassian/jira/tests/unit/atlassian/jira/hooks/test_jira.py
b/providers/atlassian/jira/tests/unit/atlassian/jira/hooks/test_jira.py
index 215fadddd93..97a5794cdd8 100644
--- a/providers/atlassian/jira/tests/unit/atlassian/jira/hooks/test_jira.py
+++ b/providers/atlassian/jira/tests/unit/atlassian/jira/hooks/test_jira.py
@@ -20,13 +20,23 @@ from __future__ import annotations
from unittest import mock
import pytest
+from aioresponses import aioresponses
from airflow.models import Connection
-from airflow.providers.atlassian.jira.hooks.jira import JiraHook
+from airflow.providers.atlassian.jira.hooks.jira import JiraAsyncHook, JiraHook
from tests_common.test_utils.compat import connection_as_json
[email protected]
+def aioresponse():
+ """
+ Creates mock async API response.
+ """
+ with aioresponses() as async_response:
+ yield async_response
+
+
@pytest.fixture
def mocked_jira_client():
with mock.patch("airflow.providers.atlassian.jira.hooks.jira.Jira",
autospec=True) as m:
@@ -68,6 +78,114 @@ class TestJiraHook:
password=self.password,
verify_ssl=False,
proxies=self.proxies,
+ api_version="2",
+ api_root="rest/api",
)
assert isinstance(jira_hook.client, mock.Mock)
assert jira_hook.client.name == mocked_jira_client.return_value.name
+
+
+class TestJiraAsyncHook:
+ @pytest.fixture
+ def setup_connections(self, create_connection_without_db):
+ create_connection_without_db(
+ Connection(
+ conn_id="jira_default",
+ conn_type="jira",
+ host="http://test.atlassian.net",
+ login="login",
+ password="password",
+ extra='{"verify_ssl": false}',
+ )
+ )
+
+ @pytest.mark.parametrize(
+ "api_version, api_result",
+ [
+ pytest.param(
+ "2",
+ "2",
+ ),
+ pytest.param(
+ 1,
+ 1,
+ ),
+ pytest.param(
+ "latest",
+ "latest",
+ ),
+ ],
+ )
+ def test_api_version(self, api_version, api_result):
+ """Test different API versions"""
+ hook = JiraAsyncHook(jira_conn_id="jira_default",
api_version=api_version)
+ assert hook.api_version == api_result
+
+ @pytest.mark.parametrize(
+ "hook_kwargs, resource, api_root, api_version, resource_url_result",
+ [
+ pytest.param({"jira_conn_id": "jira_default"}, "issue", "abc/api",
1, "abc/api/1/issue"),
+ pytest.param(
+ {"jira_conn_id": "jira_default", "api_version": 2}, "issue",
"abc/api", 1, "abc/api/1/issue"
+ ),
+ pytest.param(
+ {"jira_conn_id": "jira_default", "api_version": 2},
+ "issue",
+ "abc/api",
+ None,
+ "abc/api/2/issue",
+ ),
+ pytest.param(
+ {"jira_conn_id": "jira_default", "api_root": "notset/api"},
+ "issue",
+ "abc/api",
+ 1,
+ "abc/api/1/issue",
+ ),
+ pytest.param(
+ {"jira_conn_id": "jira_default", "api_root": "notset/api"},
+ "issue",
+ "notset/api",
+ 1,
+ "notset/api/1/issue",
+ ),
+ ],
+ )
+ def test_get_resource_url(self, hook_kwargs, resource, api_root,
api_version, resource_url_result):
+ """Test expected resource url"""
+ hook = JiraAsyncHook(**hook_kwargs)
+ assert (
+ hook.get_resource_url(resource=resource, api_root=api_root,
api_version=api_version)
+ == resource_url_result
+ )
+
+ @pytest.mark.asyncio
+ async def test_create_issue_with_provided_conn(self, setup_connections):
+ """Asserts that async http hook can read a jira connection."""
+ hook = JiraAsyncHook(jira_conn_id="jira_default")
+ fields = {
+ "project": {"key": "TEST"},
+ "issuetype": {"name": "Task"},
+ "summary": "test rest",
+ "description": "rest rest",
+ }
+ with mock.patch("aiohttp.ClientSession.post",
new_callable=mock.AsyncMock) as mocked_function:
+ await hook.create_issue(fields)
+ assert mocked_function.call_args.kwargs.get("verify_ssl") is False
+ assert mocked_function.call_args.kwargs.get("auth").login ==
"login"
+ assert mocked_function.call_args.kwargs.get("auth").password ==
"password"
+
+ @pytest.mark.asyncio
+ async def test_create_issue_with_sucess(self, aioresponse,
setup_connections):
+ """Asserts that create issue return with success."""
+ hook = JiraAsyncHook(jira_conn_id="jira_default")
+ fields = {
+ "project": {"key": "TEST"},
+ "issuetype": {"name": "Task"},
+ "summary": "test rest",
+ "description": "rest rest",
+ }
+ aioresponse.post("http://test.atlassian.net/rest/api/2/issue",
status=200)
+
+ res = await hook.create_issue(fields)
+ assert res.status == 200
diff --git
a/providers/atlassian/jira/tests/unit/atlassian/jira/notifications/test_jira.py
b/providers/atlassian/jira/tests/unit/atlassian/jira/notifications/test_jira.py
index 78bbca66467..cd501c4a184 100644
---
a/providers/atlassian/jira/tests/unit/atlassian/jira/notifications/test_jira.py
+++
b/providers/atlassian/jira/tests/unit/atlassian/jira/notifications/test_jira.py
@@ -19,8 +19,13 @@ from __future__ import annotations
from unittest import mock
+import pytest
+
from airflow.providers.atlassian.jira.hooks.jira import JiraHook
-from airflow.providers.atlassian.jira.notifications.jira import JiraNotifier,
send_jira_notification
+from airflow.providers.atlassian.jira.notifications.jira import (
+ JiraNotifier,
+ send_jira_notification,
+)
jira_create_issue_payload = dict(
description="Test operator failed",
@@ -78,3 +83,32 @@ class TestJiraNotifier:
labels=["airflow-dag-failure"],
)
)
+
+ def test_jira_notifier_get_fields(self):
+ notifier = JiraNotifier(
+ jira_conn_id="jira_default",
+ project_id=10000,
+ description="Test operator failed",
+ summary="Test Jira issue",
+ issue_type_id=10003,
+ labels=["airflow-dag-failure"],
+ )
+ assert notifier._get_fields() == jira_create_issue_payload
+
+ @pytest.mark.asyncio
+ @mock.patch(
+
"airflow.providers.atlassian.jira.notifications.jira.JiraAsyncHook.create_issue",
+ new_callable=mock.AsyncMock,
+ )
+ async def test_jira_async_notifier(self, mock_jira_hook,
create_dag_without_db):
+ notifier = send_jira_notification(
+ jira_conn_id="jira_default",
+ api_root="test/rest",
+ project_id=10000,
+ description="Test operator failed",
+ summary="Test Jira issue",
+ issue_type_id=10003,
+ labels=["airflow-dag-failure"],
+ )
+ await notifier.async_notify({"dag":
create_dag_without_db("test_jira_notifier")})
+ mock_jira_hook.assert_called_once_with(jira_create_issue_payload)