This is an automated email from the ASF dual-hosted git repository.
ferruzzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new afa73458bda Add a new Amazon Simple Email Service Notifier (#56106)
afa73458bda is described below
commit afa73458bda7b10def14adc09a26f9330b0a6c56
Author: D. Ferruzzi <[email protected]>
AuthorDate: Fri Sep 26 16:34:27 2025 -0700
Add a new Amazon Simple Email Service Notifier (#56106)
* Add a new Amazon Simple Email Service Notifier
---
.../src/airflow/providers/amazon/aws/hooks/ses.py | 86 ++++++++++--
.../providers/amazon/aws/notifications/ses.py | 139 +++++++++++++++++++
.../amazon/tests/unit/amazon/aws/hooks/test_ses.py | 145 +++++++++++++------
.../unit/amazon/aws/notifications/test_ses.py | 153 +++++++++++++++++++++
4 files changed, 474 insertions(+), 49 deletions(-)
diff --git a/providers/amazon/src/airflow/providers/amazon/aws/hooks/ses.py
b/providers/amazon/src/airflow/providers/amazon/aws/hooks/ses.py
index 7c0568c4d03..d418e13749e 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/hooks/ses.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/hooks/ses.py
@@ -42,6 +42,20 @@ class SesHook(AwsBaseHook):
kwargs["client_type"] = "ses"
super().__init__(*args, **kwargs)
+ @staticmethod
+ def _build_headers(
+ custom_headers: dict[str, Any] | None,
+ reply_to: str | None,
+ return_path: str | None,
+ ) -> dict[str, Any]:
+ _custom_headers = custom_headers or {}
+ if reply_to:
+ _custom_headers["Reply-To"] = reply_to
+ if return_path:
+ _custom_headers["Return-Path"] = return_path
+
+ return _custom_headers
+
def send_email(
self,
mail_from: str,
@@ -70,23 +84,17 @@ class SesHook(AwsBaseHook):
:param files: List of paths of files to be attached
:param cc: List of email addresses to set as email's CC
:param bcc: List of email addresses to set as email's BCC
- :param mime_subtype: Can be used to specify the sub-type of the
message. Default = mixed
+ :param mime_subtype: Can be used to specify the subtype of the
message. Default = mixed
:param mime_charset: Email's charset. Default = UTF-8.
:param return_path: The email address to which replies will be sent.
By default, replies
are sent to the original sender's email address.
:param reply_to: The email address to which message bounces and
complaints should be sent.
"Return-Path" is sometimes called "envelope from", "envelope
sender", or "MAIL FROM".
:param custom_headers: Additional headers to add to the MIME message.
- No validations are run on these values and they should be able to
be encoded.
+ No validations are run on these values, and they should be able to
be encoded.
:return: Response from Amazon SES service with unique message
identifier.
"""
- ses_client = self.get_conn()
-
- custom_headers = custom_headers or {}
- if reply_to:
- custom_headers["Reply-To"] = reply_to
- if return_path:
- custom_headers["Return-Path"] = return_path
+ custom_headers = self._build_headers(custom_headers, reply_to,
return_path)
message, recipients = build_mime_message(
mail_from=mail_from,
@@ -101,6 +109,64 @@ class SesHook(AwsBaseHook):
custom_headers=custom_headers,
)
- return ses_client.send_raw_email(
+ return self.conn.send_raw_email(
Source=mail_from, Destinations=recipients, RawMessage={"Data":
message.as_string()}
)
+
+ async def asend_email(
+ self,
+ mail_from: str,
+ to: str | Iterable[str],
+ subject: str,
+ html_content: str,
+ files: list[str] | None = None,
+ cc: str | Iterable[str] | None = None,
+ bcc: str | Iterable[str] | None = None,
+ mime_subtype: str = "mixed",
+ mime_charset: str = "utf-8",
+ reply_to: str | None = None,
+ return_path: str | None = None,
+ custom_headers: dict[str, Any] | None = None,
+ ) -> dict:
+ """
+ Send email using Amazon Simple Email Service (async).
+
+ .. seealso::
+ - :external+boto3:py:meth:`SES.Client.send_raw_email`
+
+ :param mail_from: Email address to set as email's from
+ :param to: List of email addresses to set as email's to
+ :param subject: Email's subject
+ :param html_content: Content of email in HTML format
+ :param files: List of paths of files to be attached
+ :param cc: List of email addresses to set as email's CC
+ :param bcc: List of email addresses to set as email's BCC
+ :param mime_subtype: Can be used to specify the subtype of the
message. Default = mixed
+ :param mime_charset: Email's charset. Default = UTF-8.
+ :param return_path: The email address to which replies will be sent.
By default, replies
+ are sent to the original sender's email address.
+ :param reply_to: The email address to which message bounces and
complaints should be sent.
+ "Return-Path" is sometimes called "envelope from", "envelope
sender", or "MAIL FROM".
+ :param custom_headers: Additional headers to add to the MIME message.
+ No validations are run on these values, and they should be able to
be encoded.
+ :return: Response from Amazon SES service with unique message
identifier.
+ """
+ custom_headers = self._build_headers(custom_headers, reply_to,
return_path)
+
+ message, recipients = build_mime_message(
+ mail_from=mail_from,
+ to=to,
+ subject=subject,
+ html_content=html_content,
+ files=files,
+ cc=cc,
+ bcc=bcc,
+ mime_subtype=mime_subtype,
+ mime_charset=mime_charset,
+ custom_headers=custom_headers,
+ )
+
+ async with await self.get_async_conn() as async_client:
+ return await async_client.send_raw_email(
+ Source=mail_from, Destinations=recipients, RawMessage={"Data":
message.as_string()}
+ )
diff --git
a/providers/amazon/src/airflow/providers/amazon/aws/notifications/ses.py
b/providers/amazon/src/airflow/providers/amazon/aws/notifications/ses.py
new file mode 100644
index 00000000000..9e45a83be56
--- /dev/null
+++ b/providers/amazon/src/airflow/providers/amazon/aws/notifications/ses.py
@@ -0,0 +1,139 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from collections.abc import Iterable, Sequence
+from functools import cached_property
+from typing import Any
+
+from airflow.providers.amazon.aws.hooks.ses import SesHook
+from airflow.providers.amazon.version_compat import AIRFLOW_V_3_1_PLUS
+from airflow.providers.common.compat.notifier import BaseNotifier
+from airflow.utils.helpers import prune_dict
+
+
+class SesNotifier(BaseNotifier):
+ """
+ Amazon Simple Email Service (SES) Notifier.
+
+ :param mail_from: Email address to set as email's from
+ :param to: List of email addresses to set as email's to
+ :param subject: Email's subject
+ :param html_content: Content of email in HTML format
+ :param files: List of paths of files to be attached
+ :param cc: List of email addresses to set as email's CC
+ :param bcc: List of email addresses to set as email's BCC
+ :param mime_subtype: Can be used to specify the subtype of the message.
Default = mixed
+ :param mime_charset: Email's charset. Default = UTF-8.
+ :param return_path: The email address to which replies will be sent. By
default, replies
+ are sent to the original sender's email address.
+ :param reply_to: The email address to which message bounces and complaints
should be sent.
+ "Return-Path" is sometimes called "envelope from", "envelope sender",
or "MAIL FROM".
+ :param custom_headers: Additional headers to add to the MIME message.
+ No validations are run on these values, and they should be able to be
encoded.
+ """
+
+ template_fields: Sequence[str] = (
+ "aws_conn_id",
+ "region_name",
+ "mail_from",
+ "to",
+ "subject",
+ "html_content",
+ "files",
+ "cc",
+ "bcc",
+ "mime_subtype",
+ "mime_charset",
+ "reply_to",
+ "return_path",
+ "custom_headers",
+ )
+
+ def __init__(
+ self,
+ *,
+ aws_conn_id: str | None = SesHook.default_conn_name,
+ region_name: str | None = None,
+ mail_from: str,
+ to: str | Iterable[str],
+ subject: str,
+ html_content: str,
+ files: list[str] | None = None,
+ cc: str | Iterable[str] | None = None,
+ bcc: str | Iterable[str] | None = None,
+ mime_subtype: str = "mixed",
+ mime_charset: str = "utf-8",
+ reply_to: str | None = None,
+ return_path: str | None = None,
+ custom_headers: dict[str, Any] | None = None,
+ **kwargs,
+ ):
+ if AIRFLOW_V_3_1_PLUS:
+ # Support for passing context was added in 3.1.0
+ super().__init__(**kwargs)
+ else:
+ super().__init__()
+ self.aws_conn_id = aws_conn_id
+ self.region_name = region_name
+
+ self.mail_from = mail_from
+ self.to = to
+ self.subject = subject
+ self.html_content = html_content
+ self.files = files
+ self.cc = cc
+ self.bcc = bcc
+ self.mime_subtype = mime_subtype
+ self.mime_charset = mime_charset
+ self.reply_to = reply_to
+ self.return_path = return_path
+ self.custom_headers = custom_headers
+
+ def _build_send_kwargs(self):
+ return prune_dict(
+ {
+ "mail_from": self.mail_from,
+ "to": self.to,
+ "subject": self.subject,
+ "html_content": self.html_content,
+ "files": self.files,
+ "cc": self.cc,
+ "bcc": self.bcc,
+ "mime_subtype": self.mime_subtype,
+ "mime_charset": self.mime_charset,
+ "reply_to": self.reply_to,
+ "return_path": self.return_path,
+ "custom_headers": self.custom_headers,
+ }
+ )
+
+ @cached_property
+ def hook(self) -> SesHook:
+ """Amazon Simple Email Service (SES) Hook (cached)."""
+ return SesHook(aws_conn_id=self.aws_conn_id,
region_name=self.region_name)
+
+ def notify(self, context):
+ """Send email using Amazon Simple Email Service (SES)."""
+ self.hook.send_email(**self._build_send_kwargs())
+
+ async def async_notify(self, context):
+ """Send email using Amazon Simple Email Service (SES) (async)."""
+ await self.hook.asend_email(**self._build_send_kwargs())
+
+
+send_ses_notification = SesNotifier
diff --git a/providers/amazon/tests/unit/amazon/aws/hooks/test_ses.py
b/providers/amazon/tests/unit/amazon/aws/hooks/test_ses.py
index 023b1f58fa2..bd56b679e55 100644
--- a/providers/amazon/tests/unit/amazon/aws/hooks/test_ses.py
+++ b/providers/amazon/tests/unit/amazon/aws/hooks/test_ses.py
@@ -16,6 +16,8 @@
# under the License.
from __future__ import annotations
+from unittest import mock
+
import boto3
import pytest
from moto import mock_aws
@@ -25,46 +27,111 @@ from airflow.providers.amazon.aws.hooks.ses import SesHook
boto3.setup_default_session()
+TEST_TO_ADDRESSES = [
+ pytest.param("[email protected]", id="to=single_string"),
+ pytest.param("[email protected],[email protected]", id="to=comma_string"),
+ pytest.param(["[email protected]", "[email protected]"], id="to=list"),
+]
+
+TEST_CC_ADDRESSES = [
+ pytest.param("[email protected]", id="cc=single_string"),
+ pytest.param("[email protected],[email protected]", id="cc=comma_string"),
+ pytest.param(["[email protected]", "[email protected]"], id="cc=list"),
+]
+
+TEST_BCC_ADDRESSES = [
+ pytest.param("[email protected]", id="bcc=single_string"),
+ pytest.param("[email protected],[email protected]", id="bcc=comma_string"),
+ pytest.param(["[email protected]", "[email protected]"], id="bcc=list"),
+]
+
+TEST_FROM_ADDRESS = "[email protected]"
+TEST_SUBJECT = "subject"
+TEST_HTML_CONTENT = "<html>Test</html>"
+TEST_REPLY_TO = "[email protected]"
+TEST_RETURN_PATH = "[email protected]"
+
+
@mock_aws
-def test_get_conn():
- hook = SesHook(aws_conn_id="aws_default")
- assert hook.get_conn() is not None
+def _verify_address(address: str) -> None:
+ """
+ Amazon only allows emails from verified addresses. If the address is not
verified,
+ this test will raise `botocore.errorfactory.MessageRejected`.
+ """
+ SesHook().get_conn().verify_email_identity(EmailAddress=address)
@mock_aws
[email protected](
- "to", ["[email protected]", ["[email protected]", "[email protected]"],
"[email protected],[email protected]"]
-)
[email protected](
- "cc", ["[email protected]", ["[email protected]", "[email protected]"],
"[email protected],[email protected]"]
-)
[email protected](
- "bcc", ["[email protected]", ["[email protected]", "[email protected]"],
"[email protected],[email protected]"]
-)
-def test_send_email(to, cc, bcc):
- # Given
- hook = SesHook()
- ses_client = hook.get_conn()
- mail_from = "[email protected]"
-
- # Amazon only allows to send emails from verified addresses,
- # then we need to validate the from address before sending the email,
- # otherwise this test would raise a
`botocore.errorfactory.MessageRejected` exception
- ses_client.verify_email_identity(EmailAddress=mail_from)
-
- # When
- response = hook.send_email(
- mail_from=mail_from,
- to=to,
- subject="subject",
- html_content="<html>Test</html>",
- cc=cc,
- bcc=bcc,
- reply_to="[email protected]",
- return_path="[email protected]",
- )
-
- # Then
- assert response is not None
- assert isinstance(response, dict)
- assert "MessageId" in response
+class TestSesHook:
+ def test_get_conn(self):
+ hook = SesHook(aws_conn_id="aws_default")
+ assert hook.get_conn() is not None
+
+ @pytest.mark.parametrize("to", TEST_TO_ADDRESSES)
+ @pytest.mark.parametrize("cc", TEST_CC_ADDRESSES)
+ @pytest.mark.parametrize("bcc", TEST_BCC_ADDRESSES)
+ def test_send_email(self, to, cc, bcc):
+ _verify_address(TEST_FROM_ADDRESS)
+ hook = SesHook()
+
+ response = hook.send_email(
+ mail_from=TEST_FROM_ADDRESS,
+ to=to,
+ subject=TEST_SUBJECT,
+ html_content=TEST_HTML_CONTENT,
+ cc=cc,
+ bcc=bcc,
+ reply_to=TEST_REPLY_TO,
+ return_path=TEST_RETURN_PATH,
+ )
+
+ assert response is not None
+ assert isinstance(response, dict)
+ assert "MessageId" in response
+
+
[email protected]
+class TestAsyncSesHook:
+ """The mock_aws decorator uses `moto` which does not currently support
async SES so we mock it manually."""
+
+ @pytest.fixture
+ def mock_async_client(self):
+ return mock.AsyncMock()
+
+ @pytest.fixture
+ def mock_get_async_conn(self, mock_async_client):
+ with mock.patch.object(SesHook, "get_async_conn") as mocked_conn:
+ mocked_conn.return_value.__aenter__.return_value =
mock_async_client
+ yield mocked_conn
+
+ async def test_get_async_conn(self, mock_get_async_conn,
mock_async_client):
+ hook = SesHook()
+ async with await hook.get_async_conn() as async_conn:
+ assert async_conn is mock_async_client
+
+ @pytest.mark.parametrize("to", TEST_TO_ADDRESSES)
+ @pytest.mark.parametrize("cc", TEST_CC_ADDRESSES)
+ @pytest.mark.parametrize("bcc", TEST_BCC_ADDRESSES)
+ async def test_asend_email(self, mock_get_async_conn, mock_async_client,
to, cc, bcc):
+ _verify_address(TEST_FROM_ADDRESS)
+ hook = SesHook()
+
+ mock_async_client.send_raw_email.return_value = {"MessageId":
"test_message_id"}
+
+ response = await hook.asend_email(
+ mail_from=TEST_FROM_ADDRESS,
+ to=to,
+ subject=TEST_SUBJECT,
+ html_content=TEST_HTML_CONTENT,
+ cc=cc,
+ bcc=bcc,
+ reply_to=TEST_REPLY_TO,
+ return_path=TEST_RETURN_PATH,
+ )
+
+ assert response is not None
+ assert isinstance(response, dict)
+ assert "MessageId" in response
+ assert response["MessageId"] == "test_message_id"
+
+ mock_async_client.send_raw_email.assert_called_once()
diff --git a/providers/amazon/tests/unit/amazon/aws/notifications/test_ses.py
b/providers/amazon/tests/unit/amazon/aws/notifications/test_ses.py
new file mode 100644
index 00000000000..b848f38c5b3
--- /dev/null
+++ b/providers/amazon/tests/unit/amazon/aws/notifications/test_ses.py
@@ -0,0 +1,153 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from unittest import mock
+
+import pytest
+
+from airflow.providers.amazon.aws.notifications.ses import SesNotifier,
send_ses_notification
+from airflow.utils.types import NOTSET
+
+TEST_EMAIL_PARAMS = {
+ "mail_from": "[email protected]",
+ "to": "[email protected]",
+ "subject": "Test Subject",
+ "html_content": "<p>Test Content</p>",
+}
+
+# The hook sets these default values if they are not provided
+HOOK_DEFAULTS = {
+ "mime_charset": "utf-8",
+ "mime_subtype": "mixed",
+}
+
+
+class TestSesNotifier:
+ def test_class_and_notifier_are_same(self):
+ assert send_ses_notification is SesNotifier
+
+ @pytest.mark.parametrize(
+ "aws_conn_id",
+ [
+ pytest.param("aws_test_conn_id", id="custom-conn"),
+ pytest.param(None, id="none-conn"),
+ pytest.param(NOTSET, id="default-value"),
+ ],
+ )
+ @pytest.mark.parametrize(
+ "region_name",
+ [
+ pytest.param("eu-west-2", id="custom-region"),
+ pytest.param(None, id="no-region"),
+ pytest.param(NOTSET, id="default-value"),
+ ],
+ )
+ def test_parameters_propagate_to_hook(self, aws_conn_id, region_name):
+ """Test notifier attributes propagate to SesHook."""
+ notifier_kwargs = {}
+ if aws_conn_id is not NOTSET:
+ notifier_kwargs["aws_conn_id"] = aws_conn_id
+ if region_name is not NOTSET:
+ notifier_kwargs["region_name"] = region_name
+
+ notifier = SesNotifier(**notifier_kwargs, **TEST_EMAIL_PARAMS)
+ with
mock.patch("airflow.providers.amazon.aws.notifications.ses.SesHook") as
mock_hook:
+ hook = notifier.hook
+ assert hook is notifier.hook, "Hook property not cached"
+ mock_hook.assert_called_once_with(
+ aws_conn_id=(aws_conn_id if aws_conn_id is not NOTSET else
"aws_default"),
+ region_name=(region_name if region_name is not NOTSET else
None),
+ )
+
+ # Basic check for notifier
+ notifier.notify({})
+
mock_hook.return_value.send_email.assert_called_once_with(**TEST_EMAIL_PARAMS,
**HOOK_DEFAULTS)
+
+ @pytest.mark.asyncio
+ async def test_async_notify(self):
+ """Test async notification sends correctly."""
+ notifier = SesNotifier(**TEST_EMAIL_PARAMS)
+ with
mock.patch("airflow.providers.amazon.aws.notifications.ses.SesHook") as
mock_hook:
+ mock_hook.return_value.asend_email = mock.AsyncMock()
+
+ await notifier.async_notify({})
+
+
mock_hook.return_value.asend_email.assert_called_once_with(**TEST_EMAIL_PARAMS,
**HOOK_DEFAULTS)
+
+ def test_ses_notifier_with_optional_params(self):
+ """Test notifier handles all optional parameters correctly."""
+ email_params = {
+ **TEST_EMAIL_PARAMS,
+ "files": ["test.txt"],
+ "cc": ["[email protected]"],
+ "bcc": ["[email protected]"],
+ "mime_subtype": "alternative",
+ "mime_charset": "ascii",
+ "reply_to": "[email protected]",
+ "return_path": "[email protected]",
+ "custom_headers": {"X-Custom": "value"},
+ }
+
+ notifier = SesNotifier(**email_params)
+ with
mock.patch("airflow.providers.amazon.aws.notifications.ses.SesHook") as
mock_hook:
+ notifier.notify({})
+
+
mock_hook.return_value.send_email.assert_called_once_with(**email_params)
+
+ def test_ses_notifier_templated(self, create_dag_without_db):
+ """Test template fields are properly rendered."""
+ templated_params = {
+ "aws_conn_id": "{{ dag.dag_id }}",
+ "region_name": "{{ var_region }}",
+ "mail_from": "{{ var_from }}",
+ "to": "{{ var_to }}",
+ "subject": "{{ var_subject }}",
+ "html_content": "Hello {{ var_name }}",
+ "cc": ["cc@{{ var_domain }}"],
+ "bcc": ["bcc@{{ var_domain }}"],
+ "reply_to": "reply@{{ var_domain }}",
+ }
+
+ notifier = SesNotifier(**templated_params)
+ with
mock.patch("airflow.providers.amazon.aws.notifications.ses.SesHook") as
mock_hook:
+ context = {
+ "dag": create_dag_without_db("test_ses_notifier_templated"),
+ "var_region": "us-west-1",
+ "var_from": "[email protected]",
+ "var_to": "[email protected]",
+ "var_subject": "Test Email",
+ "var_name": "John",
+ "var_domain": "example.com",
+ }
+ notifier(context)
+
+ mock_hook.assert_called_once_with(
+ aws_conn_id="test_ses_notifier_templated",
+ region_name="us-west-1",
+ )
+ mock_hook.return_value.send_email.assert_called_once_with(
+ mail_from="[email protected]",
+ to="[email protected]",
+ subject="Test Email",
+ html_content="Hello John",
+ cc=["[email protected]"],
+ bcc=["[email protected]"],
+ mime_subtype="mixed",
+ mime_charset="utf-8",
+ reply_to="[email protected]",
+ )