This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 2226e64a22 Add `PrestoToSlackOperator` (#23979)
2226e64a22 is described below
commit 2226e64a2263a8166e47d816aa95d211f8fc1c17
Author: eladkal <[email protected]>
AuthorDate: Mon Jun 6 09:28:46 2022 +0300
Add `PrestoToSlackOperator` (#23979)
* Add `PrestoToSlackOperator`
Adding the funcitonality to run a single query against presto and send the
result as slack message.
Similar to `SnowflakeToSlackOperator`
---
CONTRIBUTING.rst | 2 +-
airflow/providers/dependencies.json | 3 +-
airflow/providers/presto/provider.yaml | 5 +
.../providers/presto/transfers/presto_to_slack.py | 141 +++++++++++++++++++++
docs/apache-airflow-providers-presto/index.rst | 7 +
.../operators/transfer/presto_to_slack.rst | 38 ++++++
.../presto/transfers/test_presto_to_slack.py | 77 +++++++++++
.../providers/presto/example_presto_to_slack.py | 52 ++++++++
8 files changed, 323 insertions(+), 2 deletions(-)
diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst
index 8bab153525..19df2a5f70 100644
--- a/CONTRIBUTING.rst
+++ b/CONTRIBUTING.rst
@@ -728,7 +728,7 @@ hashicorp google
microsoft.azure google,oracle,sftp
mysql amazon,presto,trino,vertica
postgres amazon
-presto google
+presto google,slack
salesforce tableau
sftp ssh
slack http
diff --git a/airflow/providers/dependencies.json
b/airflow/providers/dependencies.json
index 79a58e5bc2..dd58aac77d 100644
--- a/airflow/providers/dependencies.json
+++ b/airflow/providers/dependencies.json
@@ -74,7 +74,8 @@
"amazon"
],
"presto": [
- "google"
+ "google",
+ "slack"
],
"salesforce": [
"tableau"
diff --git a/airflow/providers/presto/provider.yaml
b/airflow/providers/presto/provider.yaml
index 66c493e3f6..e07a77c940 100644
--- a/airflow/providers/presto/provider.yaml
+++ b/airflow/providers/presto/provider.yaml
@@ -53,6 +53,11 @@ transfers:
how-to-guide:
/docs/apache-airflow-providers-presto/operators/transfer/gcs_to_presto.rst
python-module: airflow.providers.presto.transfers.gcs_to_presto
+ - source-integration-name: Presto
+ target-integration-name: Slack
+ how-to-guide:
/docs/apache-airflow-providers-presto/operators/transfer/presto_to_slack.rst
+ python-module: airflow.providers.presto.transfers.presto_to_slack
+
hook-class-names: # deprecated - to be removed after providers add dependency
on Airflow 2.2.0+
- airflow.providers.presto.hooks.presto.PrestoHook
diff --git a/airflow/providers/presto/transfers/presto_to_slack.py
b/airflow/providers/presto/transfers/presto_to_slack.py
new file mode 100644
index 0000000000..6dd0ecb3ab
--- /dev/null
+++ b/airflow/providers/presto/transfers/presto_to_slack.py
@@ -0,0 +1,141 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import TYPE_CHECKING, Iterable, Mapping, Optional, Sequence, Union
+
+from pandas import DataFrame
+from tabulate import tabulate
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.presto.hooks.presto import PrestoHook
+from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
+
+if TYPE_CHECKING:
+ from airflow.utils.context import Context
+
+
+class PrestoToSlackOperator(BaseOperator):
+ """
+ Executes a single SQL statement in Presto and sends the results to Slack.
The results of the query are
+ rendered into the 'slack_message' parameter as a Pandas dataframe using a
JINJA variable called '{{
+ results_df }}'. The 'results_df' variable name can be changed by
specifying a different
+ 'results_df_name' parameter. The Tabulate library is added to the JINJA
environment as a filter to
+ allow the dataframe to be rendered nicely. For example, set
'slack_message' to {{ results_df |
+ tabulate(tablefmt="pretty", headers="keys") }} to send the results to
Slack as an ascii rendered table.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:PrestoToSlackOperator`
+
+ :param sql: The SQL statement to execute on Presto (templated)
+ :param slack_message: The templated Slack message to send with the data
returned from Presto.
+ You can use the default JINJA variable {{ results_df }} to access the
pandas dataframe containing the
+ SQL results
+ :param presto_conn_id: destination presto connection
+ :param slack_conn_id: The connection id for Slack
+ :param results_df_name: The name of the JINJA template's dataframe
variable, default is 'results_df'
+ :param parameters: The parameters to pass to the SQL query
+ :param slack_token: The token to use to authenticate to Slack. If this is
not provided, the
+ 'webhook_token' attribute needs to be specified in the 'Extra' JSON
field against the slack_conn_id
+ :param slack_channel: The channel to send message. Override default from
Slack connection.
+ """
+
+ template_fields: Sequence[str] = ('sql', 'slack_message', 'slack_channel')
+ template_ext: Sequence[str] = ('.sql', '.jinja', '.j2')
+ template_fields_renderers = {"sql": "sql", "slack_message": "jinja"}
+ times_rendered = 0
+
+ def __init__(
+ self,
+ *,
+ sql: str,
+ slack_message: str,
+ presto_conn_id: str = 'presto_default',
+ slack_conn_id: str = 'slack_default',
+ results_df_name: str = 'results_df',
+ parameters: Optional[Union[Iterable, Mapping]] = None,
+ slack_token: Optional[str] = None,
+ slack_channel: Optional[str] = None,
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+
+ self.presto_conn_id = presto_conn_id
+ self.sql = sql
+ self.parameters = parameters
+ self.slack_conn_id = slack_conn_id
+ self.slack_token = slack_token
+ self.slack_message = slack_message
+ self.results_df_name = results_df_name
+ self.slack_channel = slack_channel
+
+ def _get_query_results(self) -> DataFrame:
+ presto_hook = self._get_presto_hook()
+
+ self.log.info('Running SQL query: %s', self.sql)
+ df = presto_hook.get_pandas_df(self.sql, parameters=self.parameters)
+ return df
+
+ def _render_and_send_slack_message(self, context, df) -> None:
+ # Put the dataframe into the context and render the JINJA template
fields
+ context[self.results_df_name] = df
+ self.render_template_fields(context)
+
+ slack_hook = self._get_slack_hook()
+ self.log.info('Sending slack message: %s', self.slack_message)
+ slack_hook.execute()
+
+ def _get_presto_hook(self) -> PrestoHook:
+ return PrestoHook(presto_conn_id=self.presto_conn_id)
+
+ def _get_slack_hook(self) -> SlackWebhookHook:
+ return SlackWebhookHook(
+ http_conn_id=self.slack_conn_id,
+ message=self.slack_message,
+ webhook_token=self.slack_token,
+ slack_channel=self.slack_channel,
+ )
+
+ def render_template_fields(self, context, jinja_env=None) -> None:
+ # If this is the first render of the template fields, exclude
slack_message from rendering since
+ # the presto results haven't been retrieved yet.
+ if self.times_rendered == 0:
+ fields_to_render: Iterable[str] = filter(lambda x: x !=
'slack_message', self.template_fields)
+ else:
+ fields_to_render = self.template_fields
+
+ if not jinja_env:
+ jinja_env = self.get_template_env()
+
+ # Add the tabulate library into the JINJA environment
+ jinja_env.filters['tabulate'] = tabulate
+
+ self._do_render_template_fields(self, fields_to_render, context,
jinja_env, set())
+ self.times_rendered += 1
+
+ def execute(self, context: 'Context') -> None:
+ if not self.sql.strip():
+ raise AirflowException("Expected 'sql' parameter is missing.")
+ if not self.slack_message.strip():
+ raise AirflowException("Expected 'slack_message' parameter is
missing.")
+
+ df = self._get_query_results()
+
+ self._render_and_send_slack_message(context, df)
+
+ self.log.debug('Finished sending Presto data to Slack')
diff --git a/docs/apache-airflow-providers-presto/index.rst
b/docs/apache-airflow-providers-presto/index.rst
index 9fbd1b9347..b87b42d5c8 100644
--- a/docs/apache-airflow-providers-presto/index.rst
+++ b/docs/apache-airflow-providers-presto/index.rst
@@ -28,6 +28,12 @@ Content
PrestoTransferOperator types <operators/transfer/gcs_to_presto>
+.. toctree::
+ :maxdepth: 1
+ :caption: Guides
+
+ PrestoToSlackOperator types <operators/transfer/presto_to_slack>
+
.. toctree::
:maxdepth: 1
:caption: References
@@ -100,6 +106,7 @@ You can install such cross-provider dependencies when
installing from PyPI. For
Dependent package
Extra
====================================================================================================
==========
`apache-airflow-providers-google
<https://airflow.apache.org/docs/apache-airflow-providers-google>`_ ``google``
+`apache-airflow-providers-slack
<https://airflow.apache.org/docs/apache-airflow-providers-slack>`_ ``slack``
====================================================================================================
==========
Downloading official packages
diff --git
a/docs/apache-airflow-providers-presto/operators/transfer/presto_to_slack.rst
b/docs/apache-airflow-providers-presto/operators/transfer/presto_to_slack.rst
new file mode 100644
index 0000000000..5dded6bd0e
--- /dev/null
+++
b/docs/apache-airflow-providers-presto/operators/transfer/presto_to_slack.rst
@@ -0,0 +1,38 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+.. _howto/operator:PrestoToSlackOperator:
+
+PrestoToSlackOperator
+========================
+
+Use the
:class:`~airflow.providers.presto.transfers.presto_to_slack.presto_to_slack` to
post messages to predefined Slack
+channels.
+
+Using the Operator
+^^^^^^^^^^^^^^^^^^
+
+This operator will execute a custom query in Presto and publish a Slack
message that can be formatted
+and contain the resulting dataset (e.g. ASCII formatted dataframe).
+
+An example usage of the PrestoToSlackOperator is as follows:
+
+.. exampleinclude::
/../../tests/system/providers/presto/example_presto_to_slack.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_presto_to_slack]
+ :end-before: [END howto_operator_presto_to_slack]
diff --git a/tests/providers/presto/transfers/test_presto_to_slack.py
b/tests/providers/presto/transfers/test_presto_to_slack.py
new file mode 100644
index 0000000000..78aa2867ec
--- /dev/null
+++ b/tests/providers/presto/transfers/test_presto_to_slack.py
@@ -0,0 +1,77 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from unittest import mock
+
+from airflow.models import DAG
+from airflow.providers.presto.transfers.presto_to_slack import
PrestoToSlackOperator
+from airflow.utils import timezone
+from tests.test_utils.db import clear_db_runs
+
+TEST_DAG_ID = 'presto_to_slack_unit_test'
+DEFAULT_DATE = timezone.datetime(2022, 1, 1)
+
+
+class TestPrestoToSlackOperator:
+ def setup_class(self):
+ clear_db_runs()
+
+ def setup_method(self):
+ self.example_dag = DAG('unit_test_dag_presto_to_slack',
start_date=DEFAULT_DATE)
+
+ def teardown_method(self):
+ clear_db_runs()
+
+ @staticmethod
+ def _construct_operator(**kwargs):
+ operator = PrestoToSlackOperator(task_id=TEST_DAG_ID, **kwargs)
+ return operator
+
+
@mock.patch('airflow.providers.presto.transfers.presto_to_slack.PrestoHook')
+
@mock.patch('airflow.providers.presto.transfers.presto_to_slack.SlackWebhookHook')
+ def test_hooks_and_rendering(self, mock_slack_hook_class,
mock_presto_hook_class):
+ operator_args = {
+ 'presto_conn_id': 'presto_connection',
+ 'slack_conn_id': 'slack_connection',
+ 'sql': "sql {{ ds }}",
+ 'results_df_name': 'xxxx',
+ 'parameters': ['1', '2', '3'],
+ 'slack_message': 'message: {{ ds }}, {{ xxxx }}',
+ 'slack_token': 'test_token',
+ 'slack_channel': 'my_channel',
+ 'dag': self.example_dag,
+ }
+ presto_to_slack_operator = self._construct_operator(**operator_args)
+ presto_hook = mock_presto_hook_class.return_value
+ presto_hook.get_pandas_df.return_value = '1234'
+ slack_webhook_hook = mock_slack_hook_class.return_value
+ presto_to_slack_operator.run(start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE, ignore_ti_state=True)
+
+ mock_presto_hook_class.assert_called_once_with(
+ presto_conn_id='presto_connection',
+ )
+
+ presto_hook.get_pandas_df.assert_called_once_with('sql 2022-01-01',
parameters=['1', '2', '3'])
+
+ mock_slack_hook_class.assert_called_once_with(
+ http_conn_id='slack_connection',
+ message='message: 2022-01-01, 1234',
+ webhook_token='test_token',
+ slack_channel='my_channel',
+ )
+
+ slack_webhook_hook.execute.assert_called_once()
diff --git a/tests/system/providers/presto/example_presto_to_slack.py
b/tests/system/providers/presto/example_presto_to_slack.py
new file mode 100644
index 0000000000..91ab9c42e6
--- /dev/null
+++ b/tests/system/providers/presto/example_presto_to_slack.py
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Example DAG using PrestoToSlackOperator.
+"""
+
+import os
+from datetime import datetime
+
+from airflow import models
+from airflow.providers.presto.transfers.presto_to_slack import
PrestoToSlackOperator
+
+PRESTO_TABLE = os.environ.get("PRESTO_TABLE", "test_table")
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+DAG_ID = "example_presto_to_slack"
+
+with models.DAG(
+ dag_id=DAG_ID,
+ schedule_interval='@once', # Override to match your needs
+ start_date=datetime(2022, 1, 1),
+ catchup=False,
+ tags=["example"],
+) as dag:
+ # [START howto_operator_presto_to_slack]
+ PrestoToSlackOperator(
+ task_id="presto_to_slack",
+ sql=f"SELECT col FROM {PRESTO_TABLE}",
+ slack_channel="my_channel",
+ slack_message="message: {{ ds }}, {{ results_df }}",
+ )
+ # [END howto_operator_presto_to_slack]
+
+
+from tests.system.utils import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see:
tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)