This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 7c11944be71 Feat: telegram send file Operator and Hook (#44040)
7c11944be71 is described below
commit 7c11944be719a83176fd0f98e5a281e940fdba7e
Author: Anthony Lin <[email protected]>
AuthorDate: Mon Nov 18 08:36:10 2024 +0800
Feat: telegram send file Operator and Hook (#44040)
---------
Co-authored-by: Jarek Potiuk <[email protected]>
Co-authored-by: Copilot <[email protected]>
---
.../airflow/providers/telegram/hooks/telegram.py | 30 ++++++
.../providers/telegram/operators/telegram.py | 56 ++++++++++
providers/tests/telegram/hooks/test_telegram.py | 114 ++++++++++++++++++++-
3 files changed, 196 insertions(+), 4 deletions(-)
diff --git a/providers/src/airflow/providers/telegram/hooks/telegram.py
b/providers/src/airflow/providers/telegram/hooks/telegram.py
index 6b73690143c..fd830c2250b 100644
--- a/providers/src/airflow/providers/telegram/hooks/telegram.py
+++ b/providers/src/airflow/providers/telegram/hooks/telegram.py
@@ -158,3 +158,33 @@ class TelegramHook(BaseHook):
response = asyncio.run(self.connection.send_message(**kwargs))
self.log.debug(response)
+
+ @tenacity.retry(
+ retry=tenacity.retry_if_exception_type(telegram.error.TelegramError),
+ stop=tenacity.stop_after_attempt(5),
+ wait=tenacity.wait_fixed(1),
+ )
+ def send_file(self, api_params: dict) -> None:
+ """
+ Send the file to a telegram channel or chat.
+
+ :param api_params: params for telegram_instance.send_document. It can
also be used to override chat_id
+ """
+ kwargs: dict[str, Any] = {}
+
+ if self.chat_id is not None:
+ kwargs["chat_id"] = self.chat_id
+ kwargs.update(api_params)
+
+ if "file" not in kwargs or kwargs["file"] is None:
+ raise AirflowException(
+ "'file' parameter must be provided for sending a Telegram
document message"
+ )
+
+ kwargs["document"] = kwargs.pop("file") # rename 'file' to 'document'
+
+ if kwargs.get("chat_id") is None:
+ raise AirflowException("'chat_id' must be provided for telegram
document message")
+
+ response = asyncio.run(self.connection.send_document(**kwargs))
+ self.log.debug(response)
diff --git a/providers/src/airflow/providers/telegram/operators/telegram.py
b/providers/src/airflow/providers/telegram/operators/telegram.py
index f330d2a84fa..a8bda16f260 100644
--- a/providers/src/airflow/providers/telegram/operators/telegram.py
+++ b/providers/src/airflow/providers/telegram/operators/telegram.py
@@ -83,3 +83,59 @@ class TelegramOperator(BaseOperator):
chat_id=self.chat_id,
)
telegram_hook.send_message(self.telegram_kwargs)
+
+
+class TelegramFileOperator(BaseOperator):
+ """
+ This operator allows you to send file to Telegram using Telegram Bot API.
+
+ Takes both Telegram Bot API token directly or connection that has Telegram
token in password field.
+ If both supplied, token parameter will be given precedence.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:TelegramOperator`
+
+ :param telegram_conn_id: Telegram connection ID which its password is
Telegram API token
+ :param token: Telegram API Token
+ :param chat_id: Telegram chat ID for a chat/channel/group
+ :param file: The path of the file or media to be sent via Telegram
+ :param telegram_kwargs: Extra args to be passed to telegram client
+ """
+
+ template_fields: Sequence[str] = "chat_id"
+ ui_color = "#FFBA40"
+
+ def __init__(
+ self,
+ *,
+ telegram_conn_id: str = "telegram_default",
+ token: str | None = None,
+ chat_id: str | None = None,
+ file: str,
+ telegram_kwargs: dict | None = None,
+ **kwargs,
+ ):
+ self.chat_id = chat_id
+ self.token = token
+ self.telegram_kwargs = telegram_kwargs or {}
+ self.file = file
+
+ if telegram_conn_id is None:
+ raise AirflowException("No valid Telegram connection id supplied.")
+
+ self.telegram_conn_id = telegram_conn_id
+
+ super().__init__(**kwargs)
+
+ def execute(self, context: Context) -> None:
+ """Call the TelegramHook to send the provided Telegram file."""
+ if self.file:
+ self.telegram_kwargs["file"] = self.file
+
+ telegram_hook = TelegramHook(
+ telegram_conn_id=self.telegram_conn_id,
+ token=self.token,
+ chat_id=self.chat_id,
+ )
+ telegram_hook.send_file(**self.telegram_kwargs)
diff --git a/providers/tests/telegram/hooks/test_telegram.py
b/providers/tests/telegram/hooks/test_telegram.py
index 56ee3850420..f1ec25dafb0 100644
--- a/providers/tests/telegram/hooks/test_telegram.py
+++ b/providers/tests/telegram/hooks/test_telegram.py
@@ -39,6 +39,10 @@ class AsyncMock(mock.MagicMock):
return super().__call__(*args, **kwargs)
+def telegram_error_side_effect(*args, **kwargs):
+ raise telegram.error.TelegramError("cosmic rays caused bit flips")
+
+
class TestTelegramHook:
def setup_method(self):
db.merge_conn(
@@ -157,10 +161,7 @@ class TestTelegramHook:
excepted_retry_count = 5
mock_get_conn.return_value = AsyncMock(password="some_token")
- def side_effect(*args, **kwargs):
- raise telegram.error.TelegramError("cosmic rays caused bit flips")
-
- mock_get_conn.return_value.send_message.side_effect = side_effect
+ mock_get_conn.return_value.send_message.side_effect =
telegram_error_side_effect
hook = TelegramHook(telegram_conn_id="telegram-webhook-with-chat_id")
with pytest.raises(tenacity.RetryError) as ctx:
@@ -196,3 +197,108 @@ class TestTelegramHook:
"text": "test telegram message",
}
)
+
+
@mock.patch("airflow.providers.telegram.hooks.telegram.TelegramHook.get_conn")
+ def
test_should_raise_exception_if_chat_id_is_not_provided_anywhere_when_sending_file(
+ self, mock_get_conn
+ ):
+ hook = TelegramHook(telegram_conn_id="telegram_default")
+ error_message = "'chat_id' must be provided for telegram document
message"
+ with pytest.raises(airflow.exceptions.AirflowException,
match=error_message):
+ hook.send_file({"file": "/file/to/path"})
+
+
@mock.patch("airflow.providers.telegram.hooks.telegram.TelegramHook.get_conn")
+ def
test_should_raise_exception_if_file_path_is_not_provided_when_sending_file(self,
mock_get_conn):
+ hook = TelegramHook(telegram_conn_id="telegram_default")
+ error_message = "'file' parameter must be provided for sending a
Telegram document message"
+ with pytest.raises(airflow.exceptions.AirflowException,
match=error_message):
+ hook.send_file({"chat_id": "-420913222"})
+
+
@mock.patch("airflow.providers.telegram.hooks.telegram.TelegramHook.get_conn")
+ def test_should_send_file_if_all_parameters_are_correctly_provided(self,
mock_get_conn):
+ mock_get_conn.return_value = AsyncMock(password="some_token")
+
+ hook = TelegramHook(telegram_conn_id="telegram_default")
+ hook.send_file({"chat_id": "-420913222", "file": "/file/to/path"})
+
+ mock_get_conn.return_value.send_document.return_value = "OK."
+
+ mock_get_conn.assert_called_once()
+ mock_get_conn.return_value.send_document.assert_called_once_with(
+ **{
+ "chat_id": "-420913222",
+ "document": "/file/to/path",
+ }
+ )
+
+
@mock.patch("airflow.providers.telegram.hooks.telegram.TelegramHook.get_conn")
+ def test_should_send_file_if_chat_id_is_provided_through_constructor(self,
mock_get_conn):
+ mock_get_conn.return_value = AsyncMock(password="some_token")
+
+ hook = TelegramHook(telegram_conn_id="telegram_default",
chat_id="-420913222")
+ hook.send_file({"file": "/file/to/path"})
+
+ mock_get_conn.return_value.send_document.return_value = "OK."
+
+ mock_get_conn.assert_called_once()
+ mock_get_conn.return_value.send_document.assert_called_once_with(
+ **{
+ "chat_id": "-420913222",
+ "document": "/file/to/path",
+ }
+ )
+
+
@mock.patch("airflow.providers.telegram.hooks.telegram.TelegramHook.get_conn")
+ def test_should_send_file_if_chat_id_is_provided_in_connection(self,
mock_get_conn):
+ mock_get_conn.return_value = AsyncMock(password="some_token")
+
+ hook = TelegramHook(telegram_conn_id="telegram-webhook-with-chat_id")
+ hook.send_file({"file": "/file/to/path"})
+
+ mock_get_conn.return_value.send_document.return_value = "OK."
+
+ mock_get_conn.assert_called_once()
+ mock_get_conn.return_value.send_document.assert_called_once_with(
+ **{
+ "chat_id": "-420913222",
+ "document": "/file/to/path",
+ }
+ )
+
+
@mock.patch("airflow.providers.telegram.hooks.telegram.TelegramHook.get_conn")
+ def test_should_retry_on_telegram_error_when_sending_file(self,
mock_get_conn):
+ excepted_retry_count = 5
+ mock_get_conn.return_value = AsyncMock(password="some_token")
+
+ mock_get_conn.return_value.send_document.side_effect =
telegram_error_side_effect
+
+ hook = TelegramHook(telegram_conn_id="telegram-webhook-with-chat_id")
+ with pytest.raises(tenacity.RetryError) as ctx:
+ hook.send_file({"file": "/file/to/path"})
+ assert "state=finished raised TelegramError" in str(ctx.value)
+
+ mock_get_conn.assert_called_once()
+ mock_get_conn.return_value.send_document.assert_called_with(
+ **{
+ "chat_id": "-420913222",
+ "document": "/file/to/path",
+ }
+ )
+ assert excepted_retry_count ==
mock_get_conn.return_value.send_document.call_count
+
+
@mock.patch("airflow.providers.telegram.hooks.telegram.TelegramHook.get_conn")
+ def test_should_send_file_if_token_is_provided(self, mock_get_conn):
+ mock_get_conn.return_value = AsyncMock(password="some_token")
+
+ hook = TelegramHook(token=TELEGRAM_TOKEN, chat_id="-420913222")
+ hook.send_file({"file": "/file/to/path"})
+
+ mock_get_conn.return_value.send_document.return_value = "OK."
+
+ mock_get_conn.assert_called_once()
+ mock_get_conn.return_value.send_document.assert_called_once_with(
+ **{
+ "chat_id": "-420913222",
+ "document": "/file/to/path",
+ }
+ )