This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 7c11944be71 Feat: telegram send file Operator and Hook (#44040)
7c11944be71 is described below

commit 7c11944be719a83176fd0f98e5a281e940fdba7e
Author: Anthony Lin <[email protected]>
AuthorDate: Mon Nov 18 08:36:10 2024 +0800

    Feat: telegram send file Operator and Hook (#44040)
    
    
    
    ---------
    
    Co-authored-by: Jarek Potiuk <[email protected]>
    Co-authored-by: Copilot <[email protected]>
---
 .../airflow/providers/telegram/hooks/telegram.py   |  30 ++++++
 .../providers/telegram/operators/telegram.py       |  56 ++++++++++
 providers/tests/telegram/hooks/test_telegram.py    | 114 ++++++++++++++++++++-
 3 files changed, 196 insertions(+), 4 deletions(-)

diff --git a/providers/src/airflow/providers/telegram/hooks/telegram.py 
b/providers/src/airflow/providers/telegram/hooks/telegram.py
index 6b73690143c..fd830c2250b 100644
--- a/providers/src/airflow/providers/telegram/hooks/telegram.py
+++ b/providers/src/airflow/providers/telegram/hooks/telegram.py
@@ -158,3 +158,33 @@ class TelegramHook(BaseHook):
 
         response = asyncio.run(self.connection.send_message(**kwargs))
         self.log.debug(response)
+
+    @tenacity.retry(
+        retry=tenacity.retry_if_exception_type(telegram.error.TelegramError),
+        stop=tenacity.stop_after_attempt(5),
+        wait=tenacity.wait_fixed(1),
+    )
+    def send_file(self, api_params: dict) -> None:
+        """
+        Send the file to a telegram channel or chat.
+
+        :param api_params: params for telegram_instance.send_document. It can 
also be used to override chat_id
+        """
+        kwargs: dict[str, Any] = {}
+
+        if self.chat_id is not None:
+            kwargs["chat_id"] = self.chat_id
+        kwargs.update(api_params)
+
+        if "file" not in kwargs or kwargs["file"] is None:
+            raise AirflowException(
+                "'file' parameter must be provided for sending a Telegram 
document message"
+            )
+
+        kwargs["document"] = kwargs.pop("file")  # rename 'file' to 'document'
+
+        if kwargs.get("chat_id") is None:
+            raise AirflowException("'chat_id' must be provided for telegram 
document message")
+
+        response = asyncio.run(self.connection.send_document(**kwargs))
+        self.log.debug(response)
diff --git a/providers/src/airflow/providers/telegram/operators/telegram.py 
b/providers/src/airflow/providers/telegram/operators/telegram.py
index f330d2a84fa..a8bda16f260 100644
--- a/providers/src/airflow/providers/telegram/operators/telegram.py
+++ b/providers/src/airflow/providers/telegram/operators/telegram.py
@@ -83,3 +83,59 @@ class TelegramOperator(BaseOperator):
             chat_id=self.chat_id,
         )
         telegram_hook.send_message(self.telegram_kwargs)
+
+
+class TelegramFileOperator(BaseOperator):
+    """
+    This operator allows you to send file to Telegram using Telegram Bot API.
+
+    Takes both Telegram Bot API token directly or connection that has Telegram 
token in password field.
+    If both supplied, token parameter will be given precedence.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:TelegramOperator`
+
+    :param telegram_conn_id: Telegram connection ID which its password is 
Telegram API token
+    :param token: Telegram API Token
+    :param chat_id: Telegram chat ID for a chat/channel/group
+    :param file: The path of the file or media to be sent via Telegram
+    :param telegram_kwargs: Extra args to be passed to telegram client
+    """
+
+    template_fields: Sequence[str] = "chat_id"
+    ui_color = "#FFBA40"
+
+    def __init__(
+        self,
+        *,
+        telegram_conn_id: str = "telegram_default",
+        token: str | None = None,
+        chat_id: str | None = None,
+        file: str,
+        telegram_kwargs: dict | None = None,
+        **kwargs,
+    ):
+        self.chat_id = chat_id
+        self.token = token
+        self.telegram_kwargs = telegram_kwargs or {}
+        self.file = file
+
+        if telegram_conn_id is None:
+            raise AirflowException("No valid Telegram connection id supplied.")
+
+        self.telegram_conn_id = telegram_conn_id
+
+        super().__init__(**kwargs)
+
+    def execute(self, context: Context) -> None:
+        """Call the TelegramHook to send the provided Telegram file."""
+        if self.file:
+            self.telegram_kwargs["file"] = self.file
+
+        telegram_hook = TelegramHook(
+            telegram_conn_id=self.telegram_conn_id,
+            token=self.token,
+            chat_id=self.chat_id,
+        )
+        telegram_hook.send_file(**self.telegram_kwargs)
diff --git a/providers/tests/telegram/hooks/test_telegram.py 
b/providers/tests/telegram/hooks/test_telegram.py
index 56ee3850420..f1ec25dafb0 100644
--- a/providers/tests/telegram/hooks/test_telegram.py
+++ b/providers/tests/telegram/hooks/test_telegram.py
@@ -39,6 +39,10 @@ class AsyncMock(mock.MagicMock):
         return super().__call__(*args, **kwargs)
 
 
+def telegram_error_side_effect(*args, **kwargs):
+    raise telegram.error.TelegramError("cosmic rays caused bit flips")
+
+
 class TestTelegramHook:
     def setup_method(self):
         db.merge_conn(
@@ -157,10 +161,7 @@ class TestTelegramHook:
         excepted_retry_count = 5
         mock_get_conn.return_value = AsyncMock(password="some_token")
 
-        def side_effect(*args, **kwargs):
-            raise telegram.error.TelegramError("cosmic rays caused bit flips")
-
-        mock_get_conn.return_value.send_message.side_effect = side_effect
+        mock_get_conn.return_value.send_message.side_effect = 
telegram_error_side_effect
 
         hook = TelegramHook(telegram_conn_id="telegram-webhook-with-chat_id")
         with pytest.raises(tenacity.RetryError) as ctx:
@@ -196,3 +197,108 @@ class TestTelegramHook:
                 "text": "test telegram message",
             }
         )
+
+    
@mock.patch("airflow.providers.telegram.hooks.telegram.TelegramHook.get_conn")
+    def 
test_should_raise_exception_if_chat_id_is_not_provided_anywhere_when_sending_file(
+        self, mock_get_conn
+    ):
+        hook = TelegramHook(telegram_conn_id="telegram_default")
+        error_message = "'chat_id' must be provided for telegram document 
message"
+        with pytest.raises(airflow.exceptions.AirflowException, 
match=error_message):
+            hook.send_file({"file": "/file/to/path"})
+
+    
@mock.patch("airflow.providers.telegram.hooks.telegram.TelegramHook.get_conn")
+    def 
test_should_raise_exception_if_file_path_is_not_provided_when_sending_file(self,
 mock_get_conn):
+        hook = TelegramHook(telegram_conn_id="telegram_default")
+        error_message = "'file' parameter must be provided for sending a 
Telegram document message"
+        with pytest.raises(airflow.exceptions.AirflowException, 
match=error_message):
+            hook.send_file({"chat_id": "-420913222"})
+
+    
@mock.patch("airflow.providers.telegram.hooks.telegram.TelegramHook.get_conn")
+    def test_should_send_file_if_all_parameters_are_correctly_provided(self, 
mock_get_conn):
+        mock_get_conn.return_value = AsyncMock(password="some_token")
+
+        hook = TelegramHook(telegram_conn_id="telegram_default")
+        hook.send_file({"chat_id": "-420913222", "file": "/file/to/path"})
+
+        mock_get_conn.return_value.send_document.return_value = "OK."
+
+        mock_get_conn.assert_called_once()
+        mock_get_conn.return_value.send_document.assert_called_once_with(
+            **{
+                "chat_id": "-420913222",
+                "document": "/file/to/path",
+            }
+        )
+
+    
@mock.patch("airflow.providers.telegram.hooks.telegram.TelegramHook.get_conn")
+    def test_should_send_file_if_chat_id_is_provided_through_constructor(self, 
mock_get_conn):
+        mock_get_conn.return_value = AsyncMock(password="some_token")
+
+        hook = TelegramHook(telegram_conn_id="telegram_default", 
chat_id="-420913222")
+        hook.send_file({"file": "/file/to/path"})
+
+        mock_get_conn.return_value.send_document.return_value = "OK."
+
+        mock_get_conn.assert_called_once()
+        mock_get_conn.return_value.send_document.assert_called_once_with(
+            **{
+                "chat_id": "-420913222",
+                "document": "/file/to/path",
+            }
+        )
+
+    
@mock.patch("airflow.providers.telegram.hooks.telegram.TelegramHook.get_conn")
+    def test_should_send_file_if_chat_id_is_provided_in_connection(self, 
mock_get_conn):
+        mock_get_conn.return_value = AsyncMock(password="some_token")
+
+        hook = TelegramHook(telegram_conn_id="telegram-webhook-with-chat_id")
+        hook.send_file({"file": "/file/to/path"})
+
+        mock_get_conn.return_value.send_document.return_value = "OK."
+
+        mock_get_conn.assert_called_once()
+        mock_get_conn.return_value.send_document.assert_called_once_with(
+            **{
+                "chat_id": "-420913222",
+                "document": "/file/to/path",
+            }
+        )
+
+    
@mock.patch("airflow.providers.telegram.hooks.telegram.TelegramHook.get_conn")
+    def test_should_retry_on_telegram_error_when_sending_file(self, 
mock_get_conn):
+        excepted_retry_count = 5
+        mock_get_conn.return_value = AsyncMock(password="some_token")
+
+        mock_get_conn.return_value.send_document.side_effect = 
telegram_error_side_effect
+
+        hook = TelegramHook(telegram_conn_id="telegram-webhook-with-chat_id")
+        with pytest.raises(tenacity.RetryError) as ctx:
+            hook.send_file({"file": "/file/to/path"})
+        assert "state=finished raised TelegramError" in str(ctx.value)
+
+        mock_get_conn.assert_called_once()
+        mock_get_conn.return_value.send_document.assert_called_with(
+            **{
+                "chat_id": "-420913222",
+                "document": "/file/to/path",
+            }
+        )
+        assert excepted_retry_count == 
mock_get_conn.return_value.send_document.call_count
+
+    
@mock.patch("airflow.providers.telegram.hooks.telegram.TelegramHook.get_conn")
+    def test_should_send_file_if_token_is_provided(self, mock_get_conn):
+        mock_get_conn.return_value = AsyncMock(password="some_token")
+
+        hook = TelegramHook(token=TELEGRAM_TOKEN, chat_id="-420913222")
+        hook.send_file({"file": "/file/to/path"})
+
+        mock_get_conn.return_value.send_document.return_value = "OK."
+
+        mock_get_conn.assert_called_once()
+        mock_get_conn.return_value.send_document.assert_called_once_with(
+            **{
+                "chat_id": "-420913222",
+                "document": "/file/to/path",
+            }
+        )

Reply via email to