mik-laj commented on a change in pull request #4891: Telegram hook/operator to 
post messages to telegram channels
URL: https://github.com/apache/airflow/pull/4891#discussion_r264053260
 
 

 ##########
 File path: airflow/contrib/operators/telegram_operator.py
 ##########
 @@ -0,0 +1,89 @@
+import json
+import telegram
+
+from airflow.models import BaseOperator
+from airflow.utils.decorators import apply_defaults
+from airflow.exceptions import AirflowException
+from hooks.telegram_hook import TelegramHook
+
+
+class TelegramAPIOperator(BaseOperator):
+    """
+    Base Telegram Operator
+    The TelegramAPIPostOperator is derived from this operator.
+    In the future additional Telegram bot API Operators will be derived from 
this class as well
+    :param telegram_conn_id: Telegram connection ID which its password is 
Telegram API token
+    :type telegram_conn_id: str
+    :param token: Telergram bot API token
+    :type token: str
+    """
+
+    @apply_defaults
+    def __init__(self, telegram_conn_id=None, token=None, *args, **kwargs):
+        super(TelegramAPIOperator, self).__init__(*args, **kwargs)
+
+        if token is None and telegram_conn_id is None:
+            raise AirflowException("No valid Telegram token nor 
telegram_conn_id supplied.")
+        if token is not None and telegram_conn_id is not None:
+            raise AirflowException(
+                "Cannot determine Telegram credential "
+                "when both token and telegram_conn_id are supplied."
+            )
+
+        self.token = token
+        self.telegram_conn_id = telegram_conn_id
+
+    def construct_api_call_params(self):
+        """
+        Used by the execute function. Allows templating on the source fields
+        of the api_call_params dict before construction
+        Override in child classes.
+        Each TelegramAPIOperator child class is responsible for
+        having a construct_api_call_params function
+        which sets self.api_call_params with a dict of
+        API call parameters
+        """
+
+        pass
+
+    def execute(self, **kwargs):
+        """
+        TelegramAPIOperator calls will not fail even if the call is not 
unsuccessful.
+        It should not prevent a DAG from completing in success
+        """
+        if not hasattr(self, "api_params"):
+            self.construct_api_call_params()
+        telegram_client = TelegramHook(token=self.token, 
telegram_conn_id=self.telegram_conn_id)
+        telegram_client.call("POST", self.api_params)
+
+
+class TelegramAPIPostOperator(TelegramAPIOperator):
 
 Review comment:
   It will be difficult to make changes to the old code and keep it backwards 
compatible. However, if you have time, you can make a change, but remember to 
add a note in the file `updating.md`.  Let us wait for the opinions of other 
people now.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to