This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 2226e64a22 Add `PrestoToSlackOperator` (#23979)
2226e64a22 is described below

commit 2226e64a2263a8166e47d816aa95d211f8fc1c17
Author: eladkal <[email protected]>
AuthorDate: Mon Jun 6 09:28:46 2022 +0300

    Add `PrestoToSlackOperator` (#23979)
    
    * Add `PrestoToSlackOperator`
    Adding the funcitonality to run a single query against presto and send the 
result as slack message.
    Similar to `SnowflakeToSlackOperator`
---
 CONTRIBUTING.rst                                   |   2 +-
 airflow/providers/dependencies.json                |   3 +-
 airflow/providers/presto/provider.yaml             |   5 +
 .../providers/presto/transfers/presto_to_slack.py  | 141 +++++++++++++++++++++
 docs/apache-airflow-providers-presto/index.rst     |   7 +
 .../operators/transfer/presto_to_slack.rst         |  38 ++++++
 .../presto/transfers/test_presto_to_slack.py       |  77 +++++++++++
 .../providers/presto/example_presto_to_slack.py    |  52 ++++++++
 8 files changed, 323 insertions(+), 2 deletions(-)

diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst
index 8bab153525..19df2a5f70 100644
--- a/CONTRIBUTING.rst
+++ b/CONTRIBUTING.rst
@@ -728,7 +728,7 @@ hashicorp                  google
 microsoft.azure            google,oracle,sftp
 mysql                      amazon,presto,trino,vertica
 postgres                   amazon
-presto                     google
+presto                     google,slack
 salesforce                 tableau
 sftp                       ssh
 slack                      http
diff --git a/airflow/providers/dependencies.json 
b/airflow/providers/dependencies.json
index 79a58e5bc2..dd58aac77d 100644
--- a/airflow/providers/dependencies.json
+++ b/airflow/providers/dependencies.json
@@ -74,7 +74,8 @@
     "amazon"
   ],
   "presto": [
-    "google"
+    "google",
+    "slack"
   ],
   "salesforce": [
     "tableau"
diff --git a/airflow/providers/presto/provider.yaml 
b/airflow/providers/presto/provider.yaml
index 66c493e3f6..e07a77c940 100644
--- a/airflow/providers/presto/provider.yaml
+++ b/airflow/providers/presto/provider.yaml
@@ -53,6 +53,11 @@ transfers:
     how-to-guide: 
/docs/apache-airflow-providers-presto/operators/transfer/gcs_to_presto.rst
     python-module: airflow.providers.presto.transfers.gcs_to_presto
 
+  - source-integration-name: Presto
+    target-integration-name: Slack
+    how-to-guide: 
/docs/apache-airflow-providers-presto/operators/transfer/presto_to_slack.rst
+    python-module: airflow.providers.presto.transfers.presto_to_slack
+
 hook-class-names:  # deprecated - to be removed after providers add dependency 
on Airflow 2.2.0+
   - airflow.providers.presto.hooks.presto.PrestoHook
 
diff --git a/airflow/providers/presto/transfers/presto_to_slack.py 
b/airflow/providers/presto/transfers/presto_to_slack.py
new file mode 100644
index 0000000000..6dd0ecb3ab
--- /dev/null
+++ b/airflow/providers/presto/transfers/presto_to_slack.py
@@ -0,0 +1,141 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import TYPE_CHECKING, Iterable, Mapping, Optional, Sequence, Union
+
+from pandas import DataFrame
+from tabulate import tabulate
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.presto.hooks.presto import PrestoHook
+from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+
+class PrestoToSlackOperator(BaseOperator):
+    """
+    Executes a single SQL statement in Presto and sends the results to Slack. 
The results of the query are
+    rendered into the 'slack_message' parameter as a Pandas dataframe using a 
JINJA variable called '{{
+    results_df }}'. The 'results_df' variable name can be changed by 
specifying a different
+    'results_df_name' parameter. The Tabulate library is added to the JINJA 
environment as a filter to
+    allow the dataframe to be rendered nicely. For example, set 
'slack_message' to {{ results_df |
+    tabulate(tablefmt="pretty", headers="keys") }} to send the results to 
Slack as an ascii rendered table.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:PrestoToSlackOperator`
+
+    :param sql: The SQL statement to execute on Presto (templated)
+    :param slack_message: The templated Slack message to send with the data 
returned from Presto.
+        You can use the default JINJA variable {{ results_df }} to access the 
pandas dataframe containing the
+        SQL results
+    :param presto_conn_id: destination presto connection
+    :param slack_conn_id: The connection id for Slack
+    :param results_df_name: The name of the JINJA template's dataframe 
variable, default is 'results_df'
+    :param parameters: The parameters to pass to the SQL query
+    :param slack_token: The token to use to authenticate to Slack. If this is 
not provided, the
+        'webhook_token' attribute needs to be specified in the 'Extra' JSON 
field against the slack_conn_id
+    :param slack_channel: The channel to send message. Override default from 
Slack connection.
+    """
+
+    template_fields: Sequence[str] = ('sql', 'slack_message', 'slack_channel')
+    template_ext: Sequence[str] = ('.sql', '.jinja', '.j2')
+    template_fields_renderers = {"sql": "sql", "slack_message": "jinja"}
+    times_rendered = 0
+
+    def __init__(
+        self,
+        *,
+        sql: str,
+        slack_message: str,
+        presto_conn_id: str = 'presto_default',
+        slack_conn_id: str = 'slack_default',
+        results_df_name: str = 'results_df',
+        parameters: Optional[Union[Iterable, Mapping]] = None,
+        slack_token: Optional[str] = None,
+        slack_channel: Optional[str] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+
+        self.presto_conn_id = presto_conn_id
+        self.sql = sql
+        self.parameters = parameters
+        self.slack_conn_id = slack_conn_id
+        self.slack_token = slack_token
+        self.slack_message = slack_message
+        self.results_df_name = results_df_name
+        self.slack_channel = slack_channel
+
+    def _get_query_results(self) -> DataFrame:
+        presto_hook = self._get_presto_hook()
+
+        self.log.info('Running SQL query: %s', self.sql)
+        df = presto_hook.get_pandas_df(self.sql, parameters=self.parameters)
+        return df
+
+    def _render_and_send_slack_message(self, context, df) -> None:
+        # Put the dataframe into the context and render the JINJA template 
fields
+        context[self.results_df_name] = df
+        self.render_template_fields(context)
+
+        slack_hook = self._get_slack_hook()
+        self.log.info('Sending slack message: %s', self.slack_message)
+        slack_hook.execute()
+
+    def _get_presto_hook(self) -> PrestoHook:
+        return PrestoHook(presto_conn_id=self.presto_conn_id)
+
+    def _get_slack_hook(self) -> SlackWebhookHook:
+        return SlackWebhookHook(
+            http_conn_id=self.slack_conn_id,
+            message=self.slack_message,
+            webhook_token=self.slack_token,
+            slack_channel=self.slack_channel,
+        )
+
+    def render_template_fields(self, context, jinja_env=None) -> None:
+        # If this is the first render of the template fields, exclude 
slack_message from rendering since
+        # the presto results haven't been retrieved yet.
+        if self.times_rendered == 0:
+            fields_to_render: Iterable[str] = filter(lambda x: x != 
'slack_message', self.template_fields)
+        else:
+            fields_to_render = self.template_fields
+
+        if not jinja_env:
+            jinja_env = self.get_template_env()
+
+        # Add the tabulate library into the JINJA environment
+        jinja_env.filters['tabulate'] = tabulate
+
+        self._do_render_template_fields(self, fields_to_render, context, 
jinja_env, set())
+        self.times_rendered += 1
+
+    def execute(self, context: 'Context') -> None:
+        if not self.sql.strip():
+            raise AirflowException("Expected 'sql' parameter is missing.")
+        if not self.slack_message.strip():
+            raise AirflowException("Expected 'slack_message' parameter is 
missing.")
+
+        df = self._get_query_results()
+
+        self._render_and_send_slack_message(context, df)
+
+        self.log.debug('Finished sending Presto data to Slack')
diff --git a/docs/apache-airflow-providers-presto/index.rst 
b/docs/apache-airflow-providers-presto/index.rst
index 9fbd1b9347..b87b42d5c8 100644
--- a/docs/apache-airflow-providers-presto/index.rst
+++ b/docs/apache-airflow-providers-presto/index.rst
@@ -28,6 +28,12 @@ Content
 
     PrestoTransferOperator types <operators/transfer/gcs_to_presto>
 
+.. toctree::
+    :maxdepth: 1
+    :caption: Guides
+
+    PrestoToSlackOperator types <operators/transfer/presto_to_slack>
+
 .. toctree::
     :maxdepth: 1
     :caption: References
@@ -100,6 +106,7 @@ You can install such cross-provider dependencies when 
installing from PyPI. For
 Dependent package                                                              
                       Extra
 
====================================================================================================
  ==========
 `apache-airflow-providers-google 
<https://airflow.apache.org/docs/apache-airflow-providers-google>`_  ``google``
+`apache-airflow-providers-slack 
<https://airflow.apache.org/docs/apache-airflow-providers-slack>`_    ``slack``
 
====================================================================================================
  ==========
 
 Downloading official packages
diff --git 
a/docs/apache-airflow-providers-presto/operators/transfer/presto_to_slack.rst 
b/docs/apache-airflow-providers-presto/operators/transfer/presto_to_slack.rst
new file mode 100644
index 0000000000..5dded6bd0e
--- /dev/null
+++ 
b/docs/apache-airflow-providers-presto/operators/transfer/presto_to_slack.rst
@@ -0,0 +1,38 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+.. _howto/operator:PrestoToSlackOperator:
+
+PrestoToSlackOperator
+========================
+
+Use the 
:class:`~airflow.providers.presto.transfers.presto_to_slack.presto_to_slack` to 
post messages to predefined Slack
+channels.
+
+Using the Operator
+^^^^^^^^^^^^^^^^^^
+
+This operator will execute a custom query in Presto and publish a Slack 
message that can be formatted
+and contain the resulting dataset (e.g. ASCII formatted dataframe).
+
+An example usage of the PrestoToSlackOperator is as follows:
+
+.. exampleinclude:: 
/../../tests/system/providers/presto/example_presto_to_slack.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_presto_to_slack]
+    :end-before: [END howto_operator_presto_to_slack]
diff --git a/tests/providers/presto/transfers/test_presto_to_slack.py 
b/tests/providers/presto/transfers/test_presto_to_slack.py
new file mode 100644
index 0000000000..78aa2867ec
--- /dev/null
+++ b/tests/providers/presto/transfers/test_presto_to_slack.py
@@ -0,0 +1,77 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from unittest import mock
+
+from airflow.models import DAG
+from airflow.providers.presto.transfers.presto_to_slack import 
PrestoToSlackOperator
+from airflow.utils import timezone
+from tests.test_utils.db import clear_db_runs
+
+TEST_DAG_ID = 'presto_to_slack_unit_test'
+DEFAULT_DATE = timezone.datetime(2022, 1, 1)
+
+
+class TestPrestoToSlackOperator:
+    def setup_class(self):
+        clear_db_runs()
+
+    def setup_method(self):
+        self.example_dag = DAG('unit_test_dag_presto_to_slack', 
start_date=DEFAULT_DATE)
+
+    def teardown_method(self):
+        clear_db_runs()
+
+    @staticmethod
+    def _construct_operator(**kwargs):
+        operator = PrestoToSlackOperator(task_id=TEST_DAG_ID, **kwargs)
+        return operator
+
+    
@mock.patch('airflow.providers.presto.transfers.presto_to_slack.PrestoHook')
+    
@mock.patch('airflow.providers.presto.transfers.presto_to_slack.SlackWebhookHook')
+    def test_hooks_and_rendering(self, mock_slack_hook_class, 
mock_presto_hook_class):
+        operator_args = {
+            'presto_conn_id': 'presto_connection',
+            'slack_conn_id': 'slack_connection',
+            'sql': "sql {{ ds }}",
+            'results_df_name': 'xxxx',
+            'parameters': ['1', '2', '3'],
+            'slack_message': 'message: {{ ds }}, {{ xxxx }}',
+            'slack_token': 'test_token',
+            'slack_channel': 'my_channel',
+            'dag': self.example_dag,
+        }
+        presto_to_slack_operator = self._construct_operator(**operator_args)
+        presto_hook = mock_presto_hook_class.return_value
+        presto_hook.get_pandas_df.return_value = '1234'
+        slack_webhook_hook = mock_slack_hook_class.return_value
+        presto_to_slack_operator.run(start_date=DEFAULT_DATE, 
end_date=DEFAULT_DATE, ignore_ti_state=True)
+
+        mock_presto_hook_class.assert_called_once_with(
+            presto_conn_id='presto_connection',
+        )
+
+        presto_hook.get_pandas_df.assert_called_once_with('sql 2022-01-01', 
parameters=['1', '2', '3'])
+
+        mock_slack_hook_class.assert_called_once_with(
+            http_conn_id='slack_connection',
+            message='message: 2022-01-01, 1234',
+            webhook_token='test_token',
+            slack_channel='my_channel',
+        )
+
+        slack_webhook_hook.execute.assert_called_once()
diff --git a/tests/system/providers/presto/example_presto_to_slack.py 
b/tests/system/providers/presto/example_presto_to_slack.py
new file mode 100644
index 0000000000..91ab9c42e6
--- /dev/null
+++ b/tests/system/providers/presto/example_presto_to_slack.py
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Example DAG using PrestoToSlackOperator.
+"""
+
+import os
+from datetime import datetime
+
+from airflow import models
+from airflow.providers.presto.transfers.presto_to_slack import 
PrestoToSlackOperator
+
+PRESTO_TABLE = os.environ.get("PRESTO_TABLE", "test_table")
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+DAG_ID = "example_presto_to_slack"
+
+with models.DAG(
+    dag_id=DAG_ID,
+    schedule_interval='@once',  # Override to match your needs
+    start_date=datetime(2022, 1, 1),
+    catchup=False,
+    tags=["example"],
+) as dag:
+    # [START howto_operator_presto_to_slack]
+    PrestoToSlackOperator(
+        task_id="presto_to_slack",
+        sql=f"SELECT col FROM {PRESTO_TABLE}",
+        slack_channel="my_channel",
+        slack_message="message: {{ ds }}, {{ results_df }}",
+    )
+    # [END howto_operator_presto_to_slack]
+
+
+from tests.system.utils import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: 
tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)

Reply via email to