This is an automated email from the ASF dual-hosted git repository.
taragolis pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 850e1947a6 Reorganize SQL to Slack Operators (#35215)
850e1947a6 is described below
commit 850e1947a6691e3c0664e59bbb36debc3ab19f48
Author: Andrey Anshin <[email protected]>
AuthorDate: Sun Nov 5 02:23:36 2023 +0400
Reorganize SQL to Slack Operators (#35215)
* Reorganize SQL to Slack Operators
* Fix examples and provider.yaml integrations
* Changer TestSlackProviderProjectStructure
* Mark SqlToSlackWebhookOperator as db_test
---------
Co-authored-by: Elad Kalif <[email protected]>
---
airflow/providers/slack/provider.yaml | 28 +-
.../providers/slack/transfers/base_sql_to_slack.py | 83 ++++++
airflow/providers/slack/transfers/sql_to_slack.py | 211 ++-------------
.../slack/transfers/sql_to_slack_webhook.py | 172 ++++++++++++
.../operators/slack_operator_howto_guide.rst | 6 +-
.../operators/sql_to_slack.rst | 19 +-
.../{sql_to_slack.rst => sql_to_slack_webhook.rst} | 18 +-
tests/always/test_project_structure.py | 6 +-
tests/providers/slack/transfers/conftest.py | 27 ++
.../slack/transfers/test_base_sql_to_slack.py | 97 +++++++
.../providers/slack/transfers/test_sql_to_slack.py | 293 ++-------------------
.../slack/transfers/test_sql_to_slack_webhook.py | 271 +++++++++++++++++++
.../system/providers/slack/example_sql_to_slack.py | 23 +-
...to_slack.py => example_sql_to_slack_webhook.py} | 10 +-
14 files changed, 757 insertions(+), 507 deletions(-)
diff --git a/airflow/providers/slack/provider.yaml
b/airflow/providers/slack/provider.yaml
index dbf82418d2..c1d67723ea 100644
--- a/airflow/providers/slack/provider.yaml
+++ b/airflow/providers/slack/provider.yaml
@@ -19,7 +19,10 @@
package-name: apache-airflow-providers-slack
name: Slack
description: |
- `Slack <https://slack.com/>`__
+ `Slack <https://slack.com/>`__ services integration including:
+
+ - `Slack API <https://api.slack.com/>`__
+ - `Slack Incoming Webhook <https://api.slack.com/messaging/webhooks>`__
suspended: false
versions:
@@ -56,28 +59,47 @@ dependencies:
integrations:
- integration-name: Slack
external-doc-url: https://slack.com/
+ logo: /integration-logos/slack/Slack.png
+ tags: [service]
+ - integration-name: Slack API
+ external-doc-url: https://api.slack.com/
how-to-guide:
-
/docs/apache-airflow-providers-slack/operators/slack_operator_howto_guide.rst
logo: /integration-logos/slack/Slack.png
tags: [service]
+ - integration-name: Slack Incoming Webhook
+ external-doc-url: https://api.slack.com/messaging/webhooks
+ logo: /integration-logos/slack/Slack.png
+ tags: [service]
operators:
- - integration-name: Slack
+ - integration-name: Slack API
python-modules:
- airflow.providers.slack.operators.slack
+ - integration-name: Slack Incoming Webhook
+ python-modules:
- airflow.providers.slack.operators.slack_webhook
hooks:
- - integration-name: Slack
+ - integration-name: Slack API
python-modules:
- airflow.providers.slack.hooks.slack
+ - integration-name: Slack Incoming Webhook
+ python-modules:
- airflow.providers.slack.hooks.slack_webhook
transfers:
- source-integration-name: Common SQL
target-integration-name: Slack
+ python-module: airflow.providers.slack.transfers.base_sql_to_slack
+ - source-integration-name: Common SQL
+ target-integration-name: Slack API
python-module: airflow.providers.slack.transfers.sql_to_slack
how-to-guide:
/docs/apache-airflow-providers-slack/operators/sql_to_slack.rst
+ - source-integration-name: Common SQL
+ target-integration-name: Slack Incoming Webhook
+ python-module: airflow.providers.slack.transfers.sql_to_slack_webhook
+ how-to-guide:
/docs/apache-airflow-providers-slack/operators/sql_to_slack_webhook.rst
connection-types:
- hook-class-name: airflow.providers.slack.hooks.slack.SlackHook
diff --git a/airflow/providers/slack/transfers/base_sql_to_slack.py
b/airflow/providers/slack/transfers/base_sql_to_slack.py
new file mode 100644
index 0000000000..cdc5b4cc7a
--- /dev/null
+++ b/airflow/providers/slack/transfers/base_sql_to_slack.py
@@ -0,0 +1,83 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Any, Iterable, Mapping
+
+from airflow.exceptions import AirflowException
+from airflow.hooks.base import BaseHook
+from airflow.models import BaseOperator
+
+if TYPE_CHECKING:
+ import pandas as pd
+ from slack_sdk.http_retry import RetryHandler
+
+ from airflow.providers.common.sql.hooks.sql import DbApiHook
+
+
+class BaseSqlToSlackOperator(BaseOperator):
+ """
+ Operator implements base sql methods for SQL to Slack Transfer operators.
+
+ :param sql: The SQL query to be executed
+ :param sql_conn_id: reference to a specific DB-API Connection.
+ :param sql_hook_params: Extra config params to be passed to the underlying
hook.
+ Should match the desired hook constructor params.
+ :param parameters: The parameters to pass to the SQL query.
+ :param slack_proxy: Proxy to make the Slack Incoming Webhook / API calls.
Optional
+ :param slack_timeout: The maximum number of seconds the client will wait
to connect
+ and receive a response from Slack. Optional
+ :param slack_retry_handlers: List of handlers to customize retry logic.
Optional
+ """
+
+ def __init__(
+ self,
+ *,
+ sql: str,
+ sql_conn_id: str,
+ sql_hook_params: dict | None = None,
+ parameters: Iterable | Mapping[str, Any] | None = None,
+ slack_proxy: str | None = None,
+ slack_timeout: int | None = None,
+ slack_retry_handlers: list[RetryHandler] | None = None,
+ **kwargs,
+ ):
+ super().__init__(**kwargs)
+ self.sql_conn_id = sql_conn_id
+ self.sql_hook_params = sql_hook_params
+ self.sql = sql
+ self.parameters = parameters
+ self.slack_proxy = slack_proxy
+ self.slack_timeout = slack_timeout
+ self.slack_retry_handlers = slack_retry_handlers
+
+ def _get_hook(self) -> DbApiHook:
+ self.log.debug("Get connection for %s", self.sql_conn_id)
+ conn = BaseHook.get_connection(self.sql_conn_id)
+ hook = conn.get_hook(hook_params=self.sql_hook_params)
+ if not callable(getattr(hook, "get_pandas_df", None)):
+ raise AirflowException(
+ "This hook is not supported. The hook class must have
get_pandas_df method."
+ )
+ return hook
+
+ def _get_query_results(self) -> pd.DataFrame:
+ sql_hook = self._get_hook()
+
+ self.log.info("Running SQL query: %s", self.sql)
+ df = sql_hook.get_pandas_df(self.sql, parameters=self.parameters)
+ return df
diff --git a/airflow/providers/slack/transfers/sql_to_slack.py
b/airflow/providers/slack/transfers/sql_to_slack.py
index 9c127d77a1..2bc249c7c1 100644
--- a/airflow/providers/slack/transfers/sql_to_slack.py
+++ b/airflow/providers/slack/transfers/sql_to_slack.py
@@ -16,188 +16,27 @@
# under the License.
from __future__ import annotations
+import warnings
from tempfile import NamedTemporaryFile
from typing import TYPE_CHECKING, Any, Iterable, Mapping, Sequence
-from tabulate import tabulate
-
-from airflow.exceptions import AirflowException
-from airflow.hooks.base import BaseHook
-from airflow.models import BaseOperator
+from airflow.exceptions import AirflowException,
AirflowProviderDeprecationWarning
from airflow.providers.slack.hooks.slack import SlackHook
-from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
+from airflow.providers.slack.transfers.base_sql_to_slack import
BaseSqlToSlackOperator
+from airflow.providers.slack.transfers.sql_to_slack_webhook import
SqlToSlackWebhookOperator
from airflow.providers.slack.utils import parse_filename
if TYPE_CHECKING:
- import pandas as pd
- from slack_sdk.http_retry import RetryHandler
-
- from airflow.providers.common.sql.hooks.sql import DbApiHook
from airflow.utils.context import Context
-class BaseSqlToSlackOperator(BaseOperator):
- """
- Operator implements base sql methods for SQL to Slack Transfer operators.
-
- :param sql: The SQL query to be executed
- :param sql_conn_id: reference to a specific DB-API Connection.
- :param sql_hook_params: Extra config params to be passed to the underlying
hook.
- Should match the desired hook constructor params.
- :param parameters: The parameters to pass to the SQL query.
- :param slack_proxy: Proxy to make the Slack Incoming Webhook / API calls.
Optional
- :param slack_timeout: The maximum number of seconds the client will wait
to connect
- and receive a response from Slack. Optional
- :param slack_retry_handlers: List of handlers to customize retry logic.
Optional
- """
-
- def __init__(
- self,
- *,
- sql: str,
- sql_conn_id: str,
- sql_hook_params: dict | None = None,
- parameters: Iterable | Mapping[str, Any] | None = None,
- slack_proxy: str | None = None,
- slack_timeout: int | None = None,
- slack_retry_handlers: list[RetryHandler] | None = None,
- **kwargs,
- ):
- super().__init__(**kwargs)
- self.sql_conn_id = sql_conn_id
- self.sql_hook_params = sql_hook_params
- self.sql = sql
- self.parameters = parameters
- self.slack_proxy = slack_proxy
- self.slack_timeout = slack_timeout
- self.slack_retry_handlers = slack_retry_handlers
-
- def _get_hook(self) -> DbApiHook:
- self.log.debug("Get connection for %s", self.sql_conn_id)
- conn = BaseHook.get_connection(self.sql_conn_id)
- hook = conn.get_hook(hook_params=self.sql_hook_params)
- if not callable(getattr(hook, "get_pandas_df", None)):
- raise AirflowException(
- "This hook is not supported. The hook class must have
get_pandas_df method."
- )
- return hook
-
- def _get_query_results(self) -> pd.DataFrame:
- sql_hook = self._get_hook()
-
- self.log.info("Running SQL query: %s", self.sql)
- df = sql_hook.get_pandas_df(self.sql, parameters=self.parameters)
- return df
-
-
-class SqlToSlackOperator(BaseSqlToSlackOperator):
+class SqlToSlackApiFileOperator(BaseSqlToSlackOperator):
"""
- Executes an SQL statement in a given SQL connection and sends the results
to Slack.
-
- The results of the query are rendered into the 'slack_message' parameter
as a Pandas
- dataframe using a JINJA variable called '{{ results_df }}'. The
'results_df' variable
- name can be changed by specifying a different 'results_df_name' parameter.
The Tabulate
- library is added to the JINJA environment as a filter to allow the
dataframe to be
- rendered nicely. For example, set 'slack_message' to {{ results_df |
- tabulate(tablefmt="pretty", headers="keys") }} to send the results to
Slack as an ascii
- rendered table.
+ Executes an SQL statement in a given SQL connection and sends the results
to Slack API as file.
.. seealso::
For more information on how to use this operator, take a look at the
guide:
- :ref:`howto/operator:SqlToSlackOperator`
-
- :param sql: The SQL query to be executed (templated)
- :param slack_message: The templated Slack message to send with the data
returned from the SQL connection.
- You can use the default JINJA variable {{ results_df }} to access the
pandas dataframe containing the
- SQL results
- :param sql_conn_id: reference to a specific database.
- :param sql_hook_params: Extra config params to be passed to the underlying
hook.
- Should match the desired hook constructor params.
- :param slack_conn_id: The connection id for Slack.
- :param slack_channel: The channel to send message. Override default from
Slack connection.
- :param results_df_name: The name of the JINJA template's dataframe
variable, default is 'results_df'
- :param parameters: The parameters to pass to the SQL query
- """
-
- template_fields: Sequence[str] = ("sql", "slack_message")
- template_ext: Sequence[str] = (".sql", ".jinja", ".j2")
- template_fields_renderers = {"sql": "sql", "slack_message": "jinja"}
- times_rendered = 0
-
- def __init__(
- self,
- *,
- sql: str,
- sql_conn_id: str,
- slack_conn_id: str,
- sql_hook_params: dict | None = None,
- slack_channel: str | None = None,
- slack_message: str,
- results_df_name: str = "results_df",
- parameters: Iterable | Mapping[str, Any] | None = None,
- **kwargs,
- ) -> None:
- super().__init__(
- sql=sql, sql_conn_id=sql_conn_id, sql_hook_params=sql_hook_params,
parameters=parameters, **kwargs
- )
-
- self.slack_conn_id = slack_conn_id
- self.slack_channel = slack_channel
- self.slack_message = slack_message
- self.results_df_name = results_df_name
- self.kwargs = kwargs
-
- def _render_and_send_slack_message(self, context, df) -> None:
- # Put the dataframe into the context and render the JINJA template
fields
- context[self.results_df_name] = df
- self.render_template_fields(context)
-
- slack_hook = self._get_slack_hook()
- self.log.info("Sending slack message: %s", self.slack_message)
- slack_hook.send(text=self.slack_message, channel=self.slack_channel)
-
- def _get_slack_hook(self) -> SlackWebhookHook:
- return SlackWebhookHook(
- slack_webhook_conn_id=self.slack_conn_id,
- proxy=self.slack_proxy,
- timeout=self.slack_timeout,
- retry_handlers=self.slack_retry_handlers,
- )
-
- def render_template_fields(self, context, jinja_env=None) -> None:
- # If this is the first render of the template fields, exclude
slack_message from rendering since
- # the SQL results haven't been retrieved yet.
- if self.times_rendered == 0:
- fields_to_render: Iterable[str] = (x for x in self.template_fields
if x != "slack_message")
- else:
- fields_to_render = self.template_fields
-
- if not jinja_env:
- jinja_env = self.get_template_env()
-
- # Add the tabulate library into the JINJA environment
- jinja_env.filters["tabulate"] = tabulate
-
- self._do_render_template_fields(self, fields_to_render, context,
jinja_env, set())
- self.times_rendered += 1
-
- def execute(self, context: Context) -> None:
- if not isinstance(self.sql, str):
- raise AirflowException("Expected 'sql' parameter should be a
string.")
- if self.sql is None or self.sql.strip() == "":
- raise AirflowException("Expected 'sql' parameter is missing.")
- if self.slack_message is None or self.slack_message.strip() == "":
- raise AirflowException("Expected 'slack_message' parameter is
missing.")
-
- df = self._get_query_results()
- self._render_and_send_slack_message(context, df)
-
- self.log.debug("Finished sending SQL data to Slack")
-
-
-class SqlToSlackApiFileOperator(BaseSqlToSlackOperator):
- """
- Executes an SQL statement in a given SQL connection and sends the results
to Slack API as file.
+ :ref:`howto/operator:SqlToSlackApiFileOperator`
:param sql: The SQL query to be executed
:param sql_conn_id: reference to a specific DB-API Connection.
@@ -215,24 +54,6 @@ class SqlToSlackApiFileOperator(BaseSqlToSlackOperator):
:param slack_title: Title of file.
:param slack_base_url: A string representing the Slack API base URL.
Optional
:param df_kwargs: Keyword arguments forwarded to
``pandas.DataFrame.to_{format}()`` method.
-
-
- Example:
- .. code-block:: python
-
- SqlToSlackApiFileOperator(
- task_id="sql_to_slack",
- sql="SELECT 1 a, 2 b, 3 c",
- sql_conn_id="sql-connection",
- slack_conn_id="slack-api-connection",
- slack_filename="awesome.json.gz",
- slack_channels="#random,#general",
- slack_initial_comment="Awesome load to compressed multiline JSON.",
- df_kwargs={
- "orient": "records",
- "lines": True,
- },
- )
"""
template_fields: Sequence[str] = (
@@ -315,3 +136,21 @@ class SqlToSlackApiFileOperator(BaseSqlToSlackOperator):
initial_comment=self.slack_initial_comment,
title=self.slack_title,
)
+
+
+class SqlToSlackOperator(SqlToSlackWebhookOperator):
+ """
+ Executes an SQL statement in a given SQL connection and sends the results
to Slack Incoming Webhook.
+
+ Deprecated, use
:class:`airflow.providers.slack.transfers.sql_to_slack_webhook.SqlToSlackWebhookOperator`
+ """
+
+ def __init__(self, *args, **kwargs):
+ warnings.warn(
+
"`airflow.providers.slack.transfers.sql_to_slack.SqlToSlackOperator` has been
renamed "
+ "and moved
`airflow.providers.slack.transfers.sql_to_slack_webhook.SqlToSlackWebhookOperator`
"
+ "this operator deprecated and will be removed in future",
+ AirflowProviderDeprecationWarning,
+ stacklevel=2,
+ )
+ super().__init__(*args, **kwargs)
diff --git a/airflow/providers/slack/transfers/sql_to_slack_webhook.py
b/airflow/providers/slack/transfers/sql_to_slack_webhook.py
new file mode 100644
index 0000000000..31700ed8b0
--- /dev/null
+++ b/airflow/providers/slack/transfers/sql_to_slack_webhook.py
@@ -0,0 +1,172 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import warnings
+from typing import TYPE_CHECKING, Any, Iterable, Mapping, Sequence
+
+from tabulate import tabulate
+
+from airflow.exceptions import AirflowException,
AirflowProviderDeprecationWarning
+from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
+from airflow.providers.slack.transfers.base_sql_to_slack import
BaseSqlToSlackOperator
+
+if TYPE_CHECKING:
+ from airflow.utils.context import Context
+
+
+class SqlToSlackWebhookOperator(BaseSqlToSlackOperator):
+ """
+ Executes an SQL statement in a given SQL connection and sends the results
to Slack Incoming Webhook.
+
+ The results of the query are rendered into the 'slack_message' parameter
as a Pandas
+ dataframe using a JINJA variable called '{{ results_df }}'. The
'results_df' variable
+ name can be changed by specifying a different 'results_df_name' parameter.
The Tabulate
+ library is added to the JINJA environment as a filter to allow the
dataframe to be
+ rendered nicely. For example, set 'slack_message' to {{ results_df |
+ tabulate(tablefmt="pretty", headers="keys") }} to send the results to
Slack as an ascii
+ rendered table.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:SqlToSlackWebhookOperator`
+
+ .. note::
+ You cannot override the default channel (chosen by the user who
installed your app),
+ Instead, these values will always inherit from the associated Slack
App configuration
+ (`link
<https://api.slack.com/messaging/webhooks#advanced_message_formatting>`_).
+ It is possible to change this values only in `Legacy Slack Integration
Incoming Webhook
+
<https://api.slack.com/legacy/custom-integrations/messaging/webhooks#legacy-customizations>`_.
+
+ .. warning::
+ This hook intend to use `Slack Incoming Webhook` connection
+ and might not work correctly with `Slack API` connection.
+
+ :param sql: The SQL query to be executed (templated)
+ :param slack_message: The templated Slack message to send with the data
returned from the SQL connection.
+ You can use the default JINJA variable {{ results_df }} to access the
pandas dataframe containing the
+ SQL results
+ :param sql_conn_id: reference to a specific database.
+ :param sql_hook_params: Extra config params to be passed to the underlying
hook.
+ Should match the desired hook constructor params.
+ :param slack_webhook_conn_id: :ref:`Slack Incoming Webhook
<howto/connection:slack>`
+ connection id that has Incoming Webhook token in the password field.
+ :param slack_channel: The channel to send message.
+ :param results_df_name: The name of the JINJA template's dataframe
variable, default is 'results_df'
+ :param parameters: The parameters to pass to the SQL query
+ """
+
+ template_fields: Sequence[str] = ("sql", "slack_message")
+ template_ext: Sequence[str] = (".sql", ".jinja", ".j2")
+ template_fields_renderers = {"sql": "sql", "slack_message": "jinja"}
+ times_rendered = 0
+
+ def __init__(
+ self,
+ *,
+ sql: str,
+ sql_conn_id: str,
+ slack_webhook_conn_id: str | None = None,
+ sql_hook_params: dict | None = None,
+ slack_channel: str | None = None,
+ slack_message: str,
+ results_df_name: str = "results_df",
+ parameters: Iterable | Mapping[str, Any] | None = None,
+ **kwargs,
+ ) -> None:
+ if slack_conn_id := kwargs.pop("slack_conn_id", None):
+ warnings.warn(
+ "Parameter `slack_conn_id` is deprecated because this
attribute initially intend to use with "
+ "Slack API however this operator provided integration with
Slack Incoming Webhook. "
+ "Please use `slack_webhook_conn_id` instead.",
+ AirflowProviderDeprecationWarning,
+ stacklevel=3,
+ )
+ if slack_webhook_conn_id and slack_conn_id !=
slack_webhook_conn_id:
+ raise ValueError(
+ "Conflicting Connection ids provided, "
+ f"slack_webhook_conn_id={slack_webhook_conn_id!r},
slack_conn_id={slack_conn_id!r}."
+ )
+ slack_webhook_conn_id = slack_conn_id
+ if not slack_webhook_conn_id:
+ raise ValueError("Got an empty `slack_webhook_conn_id` value.")
+ super().__init__(
+ sql=sql, sql_conn_id=sql_conn_id, sql_hook_params=sql_hook_params,
parameters=parameters, **kwargs
+ )
+
+ self.slack_webhook_conn_id = slack_webhook_conn_id
+ self.slack_channel = slack_channel
+ self.slack_message = slack_message
+ self.results_df_name = results_df_name
+ self.kwargs = kwargs
+
+ def _render_and_send_slack_message(self, context, df) -> None:
+ # Put the dataframe into the context and render the JINJA template
fields
+ context[self.results_df_name] = df
+ self.render_template_fields(context)
+
+ slack_hook = self._get_slack_hook()
+ self.log.info("Sending slack message: %s", self.slack_message)
+ slack_hook.send(text=self.slack_message, channel=self.slack_channel)
+
+ def _get_slack_hook(self) -> SlackWebhookHook:
+ return SlackWebhookHook(
+ slack_webhook_conn_id=self.slack_webhook_conn_id,
+ proxy=self.slack_proxy,
+ timeout=self.slack_timeout,
+ retry_handlers=self.slack_retry_handlers,
+ )
+
+ def render_template_fields(self, context, jinja_env=None) -> None:
+ # If this is the first render of the template fields, exclude
slack_message from rendering since
+ # the SQL results haven't been retrieved yet.
+ if self.times_rendered == 0:
+ fields_to_render: Iterable[str] = (x for x in self.template_fields
if x != "slack_message")
+ else:
+ fields_to_render = self.template_fields
+
+ if not jinja_env:
+ jinja_env = self.get_template_env()
+
+ # Add the tabulate library into the JINJA environment
+ jinja_env.filters["tabulate"] = tabulate
+
+ self._do_render_template_fields(self, fields_to_render, context,
jinja_env, set())
+ self.times_rendered += 1
+
+ def execute(self, context: Context) -> None:
+ if not isinstance(self.sql, str):
+ raise AirflowException("Expected 'sql' parameter should be a
string.")
+ if self.sql is None or self.sql.strip() == "":
+ raise AirflowException("Expected 'sql' parameter is missing.")
+ if self.slack_message is None or self.slack_message.strip() == "":
+ raise AirflowException("Expected 'slack_message' parameter is
missing.")
+
+ df = self._get_query_results()
+ self._render_and_send_slack_message(context, df)
+
+ self.log.debug("Finished sending SQL data to Slack")
+
+ @property
+ def slack_conn_id(self):
+ warnings.warn(
+ f"`{type(self).__name__}.slack_conn_id` property deprecated and
will be removed in a future. "
+ "Please use `slack_webhook_conn_id` instead.",
+ AirflowProviderDeprecationWarning,
+ stacklevel=2,
+ )
+ return self.slack_webhook_conn_id
diff --git
a/docs/apache-airflow-providers-slack/operators/slack_operator_howto_guide.rst
b/docs/apache-airflow-providers-slack/operators/slack_operator_howto_guide.rst
index 66d57a46df..62464230c3 100644
---
a/docs/apache-airflow-providers-slack/operators/slack_operator_howto_guide.rst
+++
b/docs/apache-airflow-providers-slack/operators/slack_operator_howto_guide.rst
@@ -21,9 +21,9 @@ How-to Guide for Slack Operators
Introduction
------------
-Slack operators can send text messages
(:class:`~airflow.providers.slack.operators.slack.SlackAPIFileOperator`)
-or files
(:class:`~airflow.providers.slack.operators.slack.SlackAPIPostOperator`) to
specified Slack channels.
-Provide ``slack_conn_id`` for the connection, and specify ``channel`` (name or
ID).
+Slack operators can send text messages
(:class:`~airflow.providers.slack.operators.slack.SlackAPIPostOperator`)
+or files
(:class:`~airflow.providers.slack.operators.slack.SlackAPIFileOperator`) to
specified Slack channels.
+Provide ``slack_webhook_conn_id`` for the connection, and specify ``channel``
(name or ID).
Example Code for Sending Files
------------------------------
diff --git a/docs/apache-airflow-providers-slack/operators/sql_to_slack.rst
b/docs/apache-airflow-providers-slack/operators/sql_to_slack.rst
index cda5c28c6d..de9fe136d9 100644
--- a/docs/apache-airflow-providers-slack/operators/sql_to_slack.rst
+++ b/docs/apache-airflow-providers-slack/operators/sql_to_slack.rst
@@ -15,24 +15,23 @@
specific language governing permissions and limitations
under the License.
-.. _howto/operator:SqlToSlackOperator:
+.. _howto/operator:SqlToSlackApiFileOperator:
-SqlToSlackOperator
-==================
+SqlToSlackApiFileOperator
+=========================
-Use the :class:`~airflow.providers.slack.transfers.sql_to_slack` to post
messages to predefined Slack
-channel.
+Use the
:class:`~airflow.providers.slack.transfers.sql_to_slack.SqlToSlackApiFileOperator`
to post query result as a file
+to Slack channel(s) through `Slack API <https://api.slack.com/>`__.
Using the Operator
^^^^^^^^^^^^^^^^^^
-This operator will execute a custom query in the provided SQL connection and
publish a Slack message that can be formatted
-and contain the resulting dataset (e.g. ASCII formatted dataframe).
+This operator will execute a custom query in the provided SQL connection and
publish a file to Slack channel(s).
-An example usage of the SqlToSlackOperator is as follows:
+An example usage of the SqlToSlackApiFileOperator is as follows:
.. exampleinclude:: /../../tests/system/providers/slack/example_sql_to_slack.py
:language: python
:dedent: 4
- :start-after: [START howto_operator_sql_to_slack]
- :end-before: [END howto_operator_sql_to_slack]
+ :start-after: [START howto_operator_sql_to_slack_api_file]
+ :end-before: [END howto_operator_sql_to_slack_api_file]
diff --git a/docs/apache-airflow-providers-slack/operators/sql_to_slack.rst
b/docs/apache-airflow-providers-slack/operators/sql_to_slack_webhook.rst
similarity index 68%
copy from docs/apache-airflow-providers-slack/operators/sql_to_slack.rst
copy to docs/apache-airflow-providers-slack/operators/sql_to_slack_webhook.rst
index cda5c28c6d..8bd1131566 100644
--- a/docs/apache-airflow-providers-slack/operators/sql_to_slack.rst
+++ b/docs/apache-airflow-providers-slack/operators/sql_to_slack_webhook.rst
@@ -15,13 +15,13 @@
specific language governing permissions and limitations
under the License.
-.. _howto/operator:SqlToSlackOperator:
+.. _howto/operator:SqlToSlackWebhookOperator:
-SqlToSlackOperator
-==================
+SqlToSlackWebhookOperator
+=========================
-Use the :class:`~airflow.providers.slack.transfers.sql_to_slack` to post
messages to predefined Slack
-channel.
+Use the
:class:`~airflow.providers.slack.transfers.sql_to_slack_webhook.SqlToSlackWebhookOperator`
to post messages
+to predefined Slack channel through `Incoming Webhook
<https://api.slack.com/messaging/webhooks>`__.
Using the Operator
^^^^^^^^^^^^^^^^^^
@@ -29,10 +29,10 @@ Using the Operator
This operator will execute a custom query in the provided SQL connection and
publish a Slack message that can be formatted
and contain the resulting dataset (e.g. ASCII formatted dataframe).
-An example usage of the SqlToSlackOperator is as follows:
+An example usage of the SqlToSlackWebhookOperator is as follows:
-.. exampleinclude:: /../../tests/system/providers/slack/example_sql_to_slack.py
+.. exampleinclude::
/../../tests/system/providers/slack/example_sql_to_slack_webhook.py
:language: python
:dedent: 4
- :start-after: [START howto_operator_sql_to_slack]
- :end-before: [END howto_operator_sql_to_slack]
+ :start-after: [START howto_operator_sql_to_slack_webhook]
+ :end-before: [END howto_operator_sql_to_slack_webhook]
diff --git a/tests/always/test_project_structure.py
b/tests/always/test_project_structure.py
index 5f774f425f..f0e1a2e35a 100644
--- a/tests/always/test_project_structure.py
+++ b/tests/always/test_project_structure.py
@@ -611,16 +611,16 @@ class
TestSlackProviderProjectStructure(ExampleCoverageTest):
PROVIDER = "slack"
CLASS_DIRS = ProjectStructureTest.CLASS_DIRS
BASE_CLASSES = {
-
"airflow.providers.slack.transfers.sql_to_slack.BaseSqlToSlackOperator",
+
"airflow.providers.slack.transfers.base_sql_to_slack.BaseSqlToSlackOperator",
+ "airflow.providers.slack.operators.slack.SlackAPIOperator",
}
MISSING_EXAMPLES_FOR_CLASSES = {
- "airflow.providers.slack.operators.slack.SlackAPIOperator",
"airflow.providers.slack.operators.slack.SlackAPIPostOperator",
"airflow.providers.slack.operators.slack_webhook.SlackWebhookOperator",
-
"airflow.providers.slack.transfers.sql_to_slack.SqlToSlackApiFileOperator",
}
DEPRECATED_CLASSES = {
"airflow.providers.slack.notifications.slack_notifier.py.",
+ "airflow.providers.slack.transfers.sql_to_slack.SqlToSlackOperator",
}
diff --git a/tests/providers/slack/transfers/conftest.py
b/tests/providers/slack/transfers/conftest.py
new file mode 100644
index 0000000000..83944b5148
--- /dev/null
+++ b/tests/providers/slack/transfers/conftest.py
@@ -0,0 +1,27 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from unittest import mock
+
+import pytest
+
+
[email protected]
+def mocked_get_connection():
+ with
mock.patch("airflow.providers.common.sql.operators.sql.BaseHook.get_connection")
as m:
+ yield m
diff --git a/tests/providers/slack/transfers/test_base_sql_to_slack.py
b/tests/providers/slack/transfers/test_base_sql_to_slack.py
new file mode 100644
index 0000000000..f12767b8d6
--- /dev/null
+++ b/tests/providers/slack/transfers/test_base_sql_to_slack.py
@@ -0,0 +1,97 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from unittest import mock
+
+import pandas as pd
+import pytest
+
+from airflow.exceptions import AirflowException
+from airflow.models import Connection
+from airflow.providers.slack.transfers.base_sql_to_slack import
BaseSqlToSlackOperator
+
+
+class TestBaseSqlToSlackOperator:
+ def setup_method(self):
+ self.default_op_kwargs = {
+ "sql": "SELECT 1",
+ "sql_conn_id": "test-sql-conn-id",
+ "sql_hook_params": None,
+ "parameters": None,
+ }
+
+ def test_execute_not_implemented(self):
+ """Test that no base implementation for
``BaseSqlToSlackOperator.execute()``."""
+ op = BaseSqlToSlackOperator(task_id="test_base_not_implements",
**self.default_op_kwargs)
+ with pytest.raises(NotImplementedError):
+ op.execute(mock.MagicMock())
+
+
@mock.patch("airflow.providers.common.sql.operators.sql.BaseHook.get_connection")
+ @mock.patch("airflow.models.connection.Connection.get_hook")
+ @pytest.mark.parametrize("conn_type", ["postgres", "snowflake"])
+ @pytest.mark.parametrize("sql_hook_params", [None, {"foo": "bar"}])
+ def test_get_hook(self, mock_get_hook, mock_get_conn, conn_type,
sql_hook_params):
+ class SomeDummyHook:
+ """Hook which implements ``get_pandas_df`` method"""
+
+ def get_pandas_df(self):
+ pass
+
+ expected_hook = SomeDummyHook()
+ mock_get_conn.return_value =
Connection(conn_id=f"test_connection_{conn_type}", conn_type=conn_type)
+ mock_get_hook.return_value = expected_hook
+ op_kwargs = {
+ **self.default_op_kwargs,
+ "sql_hook_params": sql_hook_params,
+ }
+ op = BaseSqlToSlackOperator(task_id="test_get_hook", **op_kwargs)
+ hook = op._get_hook()
+ mock_get_hook.assert_called_once_with(hook_params=sql_hook_params)
+ assert hook == expected_hook
+
+
@mock.patch("airflow.providers.common.sql.operators.sql.BaseHook.get_connection")
+ @mock.patch("airflow.models.connection.Connection.get_hook")
+ def test_get_not_supported_hook(self, mock_get_hook, mock_get_conn):
+ class SomeDummyHook:
+ """Hook which not implemented ``get_pandas_df`` method"""
+
+ mock_get_conn.return_value = Connection(conn_id="test_connection",
conn_type="test_connection")
+ mock_get_hook.return_value = SomeDummyHook()
+ op = BaseSqlToSlackOperator(task_id="test_get_not_supported_hook",
**self.default_op_kwargs)
+ error_message = r"This hook is not supported. The hook class must have
get_pandas_df method\."
+ with pytest.raises(AirflowException, match=error_message):
+ op._get_hook()
+
+
@mock.patch("airflow.providers.slack.transfers.sql_to_slack.BaseSqlToSlackOperator._get_hook")
+ @pytest.mark.parametrize("sql", ["SELECT 42", "SELECT 1 FROM DUMMY WHERE
col = ?"])
+ @pytest.mark.parametrize("parameters", [None, {"col": "spam-egg"}])
+ def test_get_query_results(self, mock_op_get_hook, sql, parameters):
+ test_df = pd.DataFrame({"a": "1", "b": "2"}, index=[0, 1])
+ mock_get_pandas_df = mock.MagicMock(return_value=test_df)
+ mock_hook = mock.MagicMock()
+ mock_hook.get_pandas_df = mock_get_pandas_df
+ mock_op_get_hook.return_value = mock_hook
+ op_kwargs = {
+ **self.default_op_kwargs,
+ "sql": sql,
+ "parameters": parameters,
+ }
+ op = BaseSqlToSlackOperator(task_id="test_get_query_results",
**op_kwargs)
+ df = op._get_query_results()
+ mock_get_pandas_df.assert_called_once_with(sql, parameters=parameters)
+ assert df is test_df
diff --git a/tests/providers/slack/transfers/test_sql_to_slack.py
b/tests/providers/slack/transfers/test_sql_to_slack.py
index cb33fff516..9d60f28b42 100644
--- a/tests/providers/slack/transfers/test_sql_to_slack.py
+++ b/tests/providers/slack/transfers/test_sql_to_slack.py
@@ -18,16 +18,10 @@ from __future__ import annotations
from unittest import mock
-import pandas as pd
import pytest
-from airflow.exceptions import AirflowException
-from airflow.models import DAG, Connection
-from airflow.providers.slack.transfers.sql_to_slack import (
- BaseSqlToSlackOperator,
- SqlToSlackApiFileOperator,
- SqlToSlackOperator,
-)
+from airflow.exceptions import AirflowProviderDeprecationWarning
+from airflow.providers.slack.transfers.sql_to_slack import
SqlToSlackApiFileOperator, SqlToSlackOperator
from airflow.utils import timezone
TEST_DAG_ID = "sql_to_slack_unit_test"
@@ -35,275 +29,6 @@ TEST_TASK_ID = "sql_to_slack_unit_test_task"
DEFAULT_DATE = timezone.datetime(2017, 1, 1)
-class TestBaseSqlToSlackOperator:
- def setup_method(self):
- self.default_op_kwargs = {
- "sql": "SELECT 1",
- "sql_conn_id": "test-sql-conn-id",
- "sql_hook_params": None,
- "parameters": None,
- }
-
- def test_execute_not_implemented(self):
- """Test that no base implementation for
``BaseSqlToSlackOperator.execute()``."""
- op = BaseSqlToSlackOperator(task_id="test_base_not_implements",
**self.default_op_kwargs)
- with pytest.raises(NotImplementedError):
- op.execute(mock.MagicMock())
-
-
@mock.patch("airflow.providers.common.sql.operators.sql.BaseHook.get_connection")
- @mock.patch("airflow.models.connection.Connection.get_hook")
- @pytest.mark.parametrize("conn_type", ["postgres", "snowflake"])
- @pytest.mark.parametrize("sql_hook_params", [None, {"foo": "bar"}])
- def test_get_hook(self, mock_get_hook, mock_get_conn, conn_type,
sql_hook_params):
- class SomeDummyHook:
- """Hook which implements ``get_pandas_df`` method"""
-
- def get_pandas_df(self):
- pass
-
- expected_hook = SomeDummyHook()
- mock_get_conn.return_value =
Connection(conn_id=f"test_connection_{conn_type}", conn_type=conn_type)
- mock_get_hook.return_value = expected_hook
- op_kwargs = {
- **self.default_op_kwargs,
- "sql_hook_params": sql_hook_params,
- }
- op = BaseSqlToSlackOperator(task_id="test_get_hook", **op_kwargs)
- hook = op._get_hook()
- mock_get_hook.assert_called_once_with(hook_params=sql_hook_params)
- assert hook == expected_hook
-
-
@mock.patch("airflow.providers.common.sql.operators.sql.BaseHook.get_connection")
- @mock.patch("airflow.models.connection.Connection.get_hook")
- def test_get_not_supported_hook(self, mock_get_hook, mock_get_conn):
- class SomeDummyHook:
- """Hook which not implemented ``get_pandas_df`` method"""
-
- mock_get_conn.return_value = Connection(conn_id="test_connection",
conn_type="test_connection")
- mock_get_hook.return_value = SomeDummyHook()
- op = BaseSqlToSlackOperator(task_id="test_get_not_supported_hook",
**self.default_op_kwargs)
- error_message = r"This hook is not supported. The hook class must have
get_pandas_df method\."
- with pytest.raises(AirflowException, match=error_message):
- op._get_hook()
-
-
@mock.patch("airflow.providers.slack.transfers.sql_to_slack.BaseSqlToSlackOperator._get_hook")
- @pytest.mark.parametrize("sql", ["SELECT 42", "SELECT 1 FROM DUMMY WHERE
col = ?"])
- @pytest.mark.parametrize("parameters", [None, {"col": "spam-egg"}])
- def test_get_query_results(self, mock_op_get_hook, sql, parameters):
- test_df = pd.DataFrame({"a": "1", "b": "2"}, index=[0, 1])
- mock_get_pandas_df = mock.MagicMock(return_value=test_df)
- mock_hook = mock.MagicMock()
- mock_hook.get_pandas_df = mock_get_pandas_df
- mock_op_get_hook.return_value = mock_hook
- op_kwargs = {
- **self.default_op_kwargs,
- "sql": sql,
- "parameters": parameters,
- }
- op = BaseSqlToSlackOperator(task_id="test_get_query_results",
**op_kwargs)
- df = op._get_query_results()
- mock_get_pandas_df.assert_called_once_with(sql, parameters=parameters)
- assert df is test_df
-
-
[email protected]_test
-class TestSqlToSlackOperator:
- def setup_method(self):
- self.example_dag = DAG(TEST_DAG_ID, start_date=DEFAULT_DATE)
- self.default_hook_parameters = {"timeout": None, "proxy": None,
"retry_handlers": None}
-
- @staticmethod
- def _construct_operator(**kwargs):
- operator = SqlToSlackOperator(task_id=TEST_TASK_ID, **kwargs)
- return operator
-
-
@mock.patch("airflow.providers.slack.transfers.sql_to_slack.SlackWebhookHook")
- @pytest.mark.parametrize(
- "slack_op_kwargs, hook_extra_kwargs",
- [
- pytest.param(
- {}, {"timeout": None, "proxy": None, "retry_handlers": None},
id="default-hook-parameters"
- ),
- pytest.param(
- {"slack_timeout": 42, "slack_proxy": "http://spam.egg",
"slack_retry_handlers": []},
- {"timeout": 42, "proxy": "http://spam.egg", "retry_handlers":
[]},
- id="with-extra-hook-parameters",
- ),
- ],
- )
- def test_rendering_and_message_execution(self, mock_slack_hook_class,
slack_op_kwargs, hook_extra_kwargs):
- mock_dbapi_hook = mock.Mock()
-
- test_df = pd.DataFrame({"a": "1", "b": "2"}, index=[0, 1])
- get_pandas_df_mock = mock_dbapi_hook.return_value.get_pandas_df
- get_pandas_df_mock.return_value = test_df
-
- operator_args = {
- "sql_conn_id": "snowflake_connection",
- "slack_conn_id": "slack_connection",
- "slack_message": "message: {{ ds }}, {{ results_df }}",
- "slack_channel": "#test",
- "sql": "sql {{ ds }}",
- "dag": self.example_dag,
- **slack_op_kwargs,
- }
- sql_to_slack_operator = self._construct_operator(**operator_args)
-
- slack_webhook_hook = mock_slack_hook_class.return_value
- sql_to_slack_operator._get_hook = mock_dbapi_hook
- sql_to_slack_operator.run(start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE, ignore_ti_state=True)
-
- # Test that the Slack hook is instantiated with the right parameters
- mock_slack_hook_class.assert_called_once_with(
- slack_webhook_conn_id="slack_connection", **hook_extra_kwargs
- )
-
- # Test that the `SlackWebhookHook.send` method gets run once
- slack_webhook_hook.send.assert_called_once_with(
- text=f"message: 2017-01-01, {test_df}",
- channel="#test",
- )
-
-
@mock.patch("airflow.providers.slack.transfers.sql_to_slack.SlackWebhookHook")
- def test_rendering_and_message_execution_with_slack_hook(self,
mock_slack_hook_class):
- mock_dbapi_hook = mock.Mock()
-
- test_df = pd.DataFrame({"a": "1", "b": "2"}, index=[0, 1])
- get_pandas_df_mock = mock_dbapi_hook.return_value.get_pandas_df
- get_pandas_df_mock.return_value = test_df
-
- operator_args = {
- "sql_conn_id": "snowflake_connection",
- "slack_conn_id": "slack_connection",
- "slack_message": "message: {{ ds }}, {{ results_df }}",
- "slack_channel": "#test",
- "sql": "sql {{ ds }}",
- "dag": self.example_dag,
- }
- sql_to_slack_operator = self._construct_operator(**operator_args)
-
- slack_webhook_hook = mock_slack_hook_class.return_value
- sql_to_slack_operator._get_hook = mock_dbapi_hook
- sql_to_slack_operator.run(start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE, ignore_ti_state=True)
-
- # Test that the Slack hook is instantiated with the right parameters
- mock_slack_hook_class.assert_called_once_with(
- slack_webhook_conn_id="slack_connection",
**self.default_hook_parameters
- )
-
- # Test that the `SlackWebhookHook.send` method gets run once
- slack_webhook_hook.send.assert_called_once_with(
- text=f"message: 2017-01-01, {test_df}",
- channel="#test",
- )
-
- def test_non_existing_slack_parameters_provided_exception_thrown(self):
- operator_args = {
- "sql_conn_id": "snowflake_connection",
- "slack_message": "message: {{ ds }}, {{ xxxx }}",
- "sql": "sql {{ ds }}",
- }
- with pytest.raises(AirflowException):
- self._construct_operator(**operator_args)
-
-
@mock.patch("airflow.providers.slack.transfers.sql_to_slack.SlackWebhookHook")
- def test_rendering_custom_df_name_message_execution(self,
mock_slack_hook_class):
- mock_dbapi_hook = mock.Mock()
-
- test_df = pd.DataFrame({"a": "1", "b": "2"}, index=[0, 1])
- get_pandas_df_mock = mock_dbapi_hook.return_value.get_pandas_df
- get_pandas_df_mock.return_value = test_df
-
- operator_args = {
- "sql_conn_id": "snowflake_connection",
- "slack_conn_id": "slack_connection",
- "slack_message": "message: {{ ds }}, {{ testing }}",
- "slack_channel": "#test",
- "sql": "sql {{ ds }}",
- "results_df_name": "testing",
- "dag": self.example_dag,
- }
- sql_to_slack_operator = self._construct_operator(**operator_args)
-
- slack_webhook_hook = mock_slack_hook_class.return_value
- sql_to_slack_operator._get_hook = mock_dbapi_hook
- sql_to_slack_operator.run(start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE, ignore_ti_state=True)
-
- # Test that the Slack hook is instantiated with the right parameters
- mock_slack_hook_class.assert_called_once_with(
- slack_webhook_conn_id="slack_connection",
**self.default_hook_parameters
- )
-
- # Test that the `SlackWebhookHook.send` method gets run once
- slack_webhook_hook.send.assert_called_once_with(
- text=f"message: 2017-01-01, {test_df}",
- channel="#test",
- )
-
-
@mock.patch("airflow.providers.common.sql.operators.sql.BaseHook.get_connection")
- def test_hook_params_building(self, mock_get_conn):
- mock_get_conn.return_value =
Connection(conn_id="snowflake_connection", conn_type="snowflake")
- hook_params = {
- "schema": "test_schema",
- "role": "test_role",
- "database": "test_database",
- "warehouse": "test_warehouse",
- }
- operator_args = {
- "sql_conn_id": "dummy_connection",
- "sql": "sql {{ ds }}",
- "results_df_name": "xxxx",
- "sql_hook_params": hook_params,
- "slack_conn_id": "slack_connection",
- "parameters": ["1", "2", "3"],
- "slack_message": "message: {{ ds }}, {{ xxxx }}",
- "dag": self.example_dag,
- }
- sql_to_slack_operator = SqlToSlackOperator(task_id=TEST_TASK_ID,
**operator_args)
-
- assert sql_to_slack_operator.sql_hook_params == hook_params
-
-
@mock.patch("airflow.providers.common.sql.operators.sql.BaseHook.get_connection")
- def test_hook_params(self, mock_get_conn):
- mock_get_conn.return_value = Connection(conn_id="postgres_test",
conn_type="postgres")
- op = SqlToSlackOperator(
- task_id="sql_hook_params",
- sql_conn_id="postgres_test",
- slack_conn_id="slack_connection",
- sql="SELECT 1",
- slack_message="message: {{ ds }}, {{ xxxx }}",
- sql_hook_params={
- "log_sql": False,
- },
- )
- hook = op._get_hook()
- assert hook.log_sql == op.sql_hook_params["log_sql"]
-
-
@mock.patch("airflow.providers.common.sql.operators.sql.BaseHook.get_connection")
- def test_hook_params_snowflake(self, mock_get_conn):
- mock_get_conn.return_value = Connection(conn_id="snowflake_default",
conn_type="snowflake")
- op = SqlToSlackOperator(
- task_id="snowflake_hook_params",
- sql_conn_id="snowflake_default",
- slack_conn_id="slack_default",
- results_df_name="xxxx",
- sql="SELECT 1",
- slack_message="message: {{ ds }}, {{ xxxx }}",
- sql_hook_params={
- "warehouse": "warehouse",
- "database": "database",
- "role": "role",
- "schema": "schema",
- },
- )
- hook = op._get_hook()
-
- assert hook.warehouse == "warehouse"
- assert hook.database == "database"
- assert hook.role == "role"
- assert hook.schema == "schema"
-
-
class TestSqlToSlackApiFileOperator:
def setup_method(self):
self.default_op_kwargs = {
@@ -419,3 +144,17 @@ class TestSqlToSlackApiFileOperator:
)
with pytest.raises(ValueError):
op.execute(mock.MagicMock())
+
+
+def test_deprecated_sql_to_slack_operator():
+ warning_pattern = "SqlToSlackOperator` has been renamed and moved"
+ with pytest.warns(AirflowProviderDeprecationWarning,
match=warning_pattern):
+ SqlToSlackOperator(
+ task_id="deprecated-sql-to-slack",
+ sql="SELECT 1",
+ sql_conn_id="test-sql-conn-id",
+ slack_webhook_conn_id="test-slack-conn-id",
+ sql_hook_params=None,
+ parameters=None,
+ slack_message="foo-bar",
+ )
diff --git a/tests/providers/slack/transfers/test_sql_to_slack_webhook.py
b/tests/providers/slack/transfers/test_sql_to_slack_webhook.py
new file mode 100644
index 0000000000..aa71ec0b35
--- /dev/null
+++ b/tests/providers/slack/transfers/test_sql_to_slack_webhook.py
@@ -0,0 +1,271 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from contextlib import nullcontext
+from unittest import mock
+
+import pandas as pd
+import pytest
+
+from airflow import DAG
+from airflow.exceptions import AirflowProviderDeprecationWarning
+from airflow.models import Connection
+from airflow.providers.slack.transfers.sql_to_slack_webhook import
SqlToSlackWebhookOperator
+from airflow.utils import timezone
+
+TEST_DAG_ID = "sql_to_slack_unit_test"
+TEST_TASK_ID = "sql_to_slack_unit_test_task"
+DEFAULT_DATE = timezone.datetime(2017, 1, 1)
+
+
[email protected]
+def mocked_hook():
+ with
mock.patch("airflow.providers.slack.transfers.sql_to_slack_webhook.SlackWebhookHook")
as m:
+ yield m
+
+
[email protected]_test
+class TestSqlToSlackWebhookOperator:
+ def setup_method(self):
+ self.example_dag = DAG(TEST_DAG_ID, start_date=DEFAULT_DATE)
+ self.default_hook_parameters = {"timeout": None, "proxy": None,
"retry_handlers": None}
+
+ @staticmethod
+ def _construct_operator(**kwargs):
+ operator = SqlToSlackWebhookOperator(task_id=TEST_TASK_ID, **kwargs)
+ return operator
+
+ @pytest.mark.parametrize(
+ "slack_op_kwargs, hook_extra_kwargs",
+ [
+ pytest.param(
+ {}, {"timeout": None, "proxy": None, "retry_handlers": None},
id="default-hook-parameters"
+ ),
+ pytest.param(
+ {"slack_timeout": 42, "slack_proxy": "http://spam.egg",
"slack_retry_handlers": []},
+ {"timeout": 42, "proxy": "http://spam.egg", "retry_handlers":
[]},
+ id="with-extra-hook-parameters",
+ ),
+ ],
+ )
+ def test_rendering_and_message_execution(self, slack_op_kwargs,
hook_extra_kwargs, mocked_hook):
+ mock_dbapi_hook = mock.Mock()
+
+ test_df = pd.DataFrame({"a": "1", "b": "2"}, index=[0, 1])
+ get_pandas_df_mock = mock_dbapi_hook.return_value.get_pandas_df
+ get_pandas_df_mock.return_value = test_df
+
+ operator_args = {
+ "sql_conn_id": "snowflake_connection",
+ "slack_webhook_conn_id": "slack_connection",
+ "slack_message": "message: {{ ds }}, {{ results_df }}",
+ "slack_channel": "#test",
+ "sql": "sql {{ ds }}",
+ "dag": self.example_dag,
+ **slack_op_kwargs,
+ }
+ sql_to_slack_operator = self._construct_operator(**operator_args)
+
+ slack_webhook_hook = mocked_hook.return_value
+ sql_to_slack_operator._get_hook = mock_dbapi_hook
+ sql_to_slack_operator.run(start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE, ignore_ti_state=True)
+
+ # Test that the Slack hook is instantiated with the right parameters
+
mocked_hook.assert_called_once_with(slack_webhook_conn_id="slack_connection",
**hook_extra_kwargs)
+
+ # Test that the `SlackWebhookHook.send` method gets run once
+ slack_webhook_hook.send.assert_called_once_with(
+ text=f"message: 2017-01-01, {test_df}",
+ channel="#test",
+ )
+
+ def test_rendering_and_message_execution_with_slack_hook(self,
mocked_hook):
+ mock_dbapi_hook = mock.Mock()
+
+ test_df = pd.DataFrame({"a": "1", "b": "2"}, index=[0, 1])
+ get_pandas_df_mock = mock_dbapi_hook.return_value.get_pandas_df
+ get_pandas_df_mock.return_value = test_df
+
+ operator_args = {
+ "sql_conn_id": "snowflake_connection",
+ "slack_webhook_conn_id": "slack_connection",
+ "slack_message": "message: {{ ds }}, {{ results_df }}",
+ "slack_channel": "#test",
+ "sql": "sql {{ ds }}",
+ "dag": self.example_dag,
+ }
+ sql_to_slack_operator = self._construct_operator(**operator_args)
+
+ slack_webhook_hook = mocked_hook.return_value
+ sql_to_slack_operator._get_hook = mock_dbapi_hook
+ sql_to_slack_operator.run(start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE, ignore_ti_state=True)
+
+ # Test that the Slack hook is instantiated with the right parameters
+ mocked_hook.assert_called_once_with(
+ slack_webhook_conn_id="slack_connection",
**self.default_hook_parameters
+ )
+
+ # Test that the `SlackWebhookHook.send` method gets run once
+ slack_webhook_hook.send.assert_called_once_with(
+ text=f"message: 2017-01-01, {test_df}",
+ channel="#test",
+ )
+
+ @pytest.mark.parametrize(
+ "slack_webhook_conn_id, slack_conn_id, warning_expected,
expected_conn_id",
+ [
+ pytest.param("foo", None, False, "foo",
id="slack-webhook-conn-id"),
+ pytest.param(None, "bar", True, "bar", id="slack-conn-id"),
+ pytest.param("spam", "spam", True, "spam", id="mixin-conn-ids"),
+ ],
+ )
+ def test_resolve_conn_ids(self, slack_webhook_conn_id, slack_conn_id,
warning_expected, expected_conn_id):
+ operator_args = {
+ "sql_conn_id": "snowflake_connection",
+ "slack_message": "message: {{ ds }}, {{ xxxx }}",
+ "sql": "sql {{ ds }}",
+ }
+ if slack_webhook_conn_id:
+ operator_args["slack_webhook_conn_id"] = slack_webhook_conn_id
+ if slack_conn_id:
+ operator_args["slack_conn_id"] = slack_conn_id
+ ctx = (
+ pytest.warns(AirflowProviderDeprecationWarning, match="Parameter
`slack_conn_id` is deprecated")
+ if warning_expected
+ else nullcontext()
+ )
+
+ with ctx:
+ op = self._construct_operator(**operator_args)
+
+ assert op.slack_webhook_conn_id == expected_conn_id
+ with pytest.warns(AirflowProviderDeprecationWarning,
match="slack_conn_id` property deprecated"):
+ assert op.slack_conn_id == expected_conn_id
+
+ def test_conflicting_conn_id(self):
+ operator_args = {
+ "sql_conn_id": "snowflake_connection",
+ "slack_message": "message: {{ ds }}, {{ xxxx }}",
+ "sql": "sql {{ ds }}",
+ }
+ with pytest.raises(ValueError, match="Conflicting Connection ids
provided"), pytest.warns(
+ AirflowProviderDeprecationWarning, match="Parameter
`slack_conn_id` is deprecated"
+ ):
+ self._construct_operator(**operator_args,
slack_webhook_conn_id="foo", slack_conn_id="bar")
+
+ def test_non_existing_slack_webhook_conn_id(self):
+ operator_args = {
+ "sql_conn_id": "snowflake_connection",
+ "slack_message": "message: {{ ds }}, {{ xxxx }}",
+ "sql": "sql {{ ds }}",
+ }
+ with pytest.raises(ValueError, match="Got an empty
`slack_webhook_conn_id` value"):
+ self._construct_operator(**operator_args)
+
+ def test_rendering_custom_df_name_message_execution(self, mocked_hook):
+ mock_dbapi_hook = mock.Mock()
+
+ test_df = pd.DataFrame({"a": "1", "b": "2"}, index=[0, 1])
+ get_pandas_df_mock = mock_dbapi_hook.return_value.get_pandas_df
+ get_pandas_df_mock.return_value = test_df
+
+ operator_args = {
+ "sql_conn_id": "snowflake_connection",
+ "slack_webhook_conn_id": "slack_connection",
+ "slack_message": "message: {{ ds }}, {{ testing }}",
+ "slack_channel": "#test",
+ "sql": "sql {{ ds }}",
+ "results_df_name": "testing",
+ "dag": self.example_dag,
+ }
+ sql_to_slack_operator = self._construct_operator(**operator_args)
+
+ slack_webhook_hook = mocked_hook.return_value
+ sql_to_slack_operator._get_hook = mock_dbapi_hook
+ sql_to_slack_operator.run(start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE, ignore_ti_state=True)
+
+ # Test that the Slack hook is instantiated with the right parameters
+ mocked_hook.assert_called_once_with(
+ slack_webhook_conn_id="slack_connection",
**self.default_hook_parameters
+ )
+
+ # Test that the `SlackWebhookHook.send` method gets run once
+ slack_webhook_hook.send.assert_called_once_with(
+ text=f"message: 2017-01-01, {test_df}",
+ channel="#test",
+ )
+
+ def test_hook_params_building(self, mocked_get_connection):
+ mocked_get_connection.return_value =
Connection(conn_id="snowflake_connection", conn_type="snowflake")
+ hook_params = {
+ "schema": "test_schema",
+ "role": "test_role",
+ "database": "test_database",
+ "warehouse": "test_warehouse",
+ }
+ operator_args = {
+ "sql_conn_id": "dummy_connection",
+ "sql": "sql {{ ds }}",
+ "results_df_name": "xxxx",
+ "sql_hook_params": hook_params,
+ "slack_webhook_conn_id": "slack_connection",
+ "parameters": ["1", "2", "3"],
+ "slack_message": "message: {{ ds }}, {{ xxxx }}",
+ "dag": self.example_dag,
+ }
+ sql_to_slack_operator =
SqlToSlackWebhookOperator(task_id=TEST_TASK_ID, **operator_args)
+
+ assert sql_to_slack_operator.sql_hook_params == hook_params
+
+ def test_hook_params(self, mocked_get_connection):
+ mocked_get_connection.return_value =
Connection(conn_id="postgres_test", conn_type="postgres")
+ op = SqlToSlackWebhookOperator(
+ task_id="sql_hook_params",
+ sql_conn_id="postgres_test",
+ slack_webhook_conn_id="slack_connection",
+ sql="SELECT 1",
+ slack_message="message: {{ ds }}, {{ xxxx }}",
+ sql_hook_params={
+ "log_sql": False,
+ },
+ )
+ hook = op._get_hook()
+ assert hook.log_sql == op.sql_hook_params["log_sql"]
+
+ def test_hook_params_snowflake(self, mocked_get_connection):
+ mocked_get_connection.return_value =
Connection(conn_id="snowflake_default", conn_type="snowflake")
+ op = SqlToSlackWebhookOperator(
+ task_id="snowflake_hook_params",
+ sql_conn_id="snowflake_default",
+ slack_webhook_conn_id="slack_default",
+ results_df_name="xxxx",
+ sql="SELECT 1",
+ slack_message="message: {{ ds }}, {{ xxxx }}",
+ sql_hook_params={
+ "warehouse": "warehouse",
+ "database": "database",
+ "role": "role",
+ "schema": "schema",
+ },
+ )
+ hook = op._get_hook()
+
+ assert hook.warehouse == "warehouse"
+ assert hook.database == "database"
+ assert hook.role == "role"
+ assert hook.schema == "schema"
diff --git a/tests/system/providers/slack/example_sql_to_slack.py
b/tests/system/providers/slack/example_sql_to_slack.py
index 0a45f27ccf..288bac418b 100644
--- a/tests/system/providers/slack/example_sql_to_slack.py
+++ b/tests/system/providers/slack/example_sql_to_slack.py
@@ -24,10 +24,9 @@ import os
from datetime import datetime
from airflow import models
-from airflow.providers.slack.transfers.sql_to_slack import SqlToSlackOperator
+from airflow.providers.slack.transfers.sql_to_slack import
SqlToSlackApiFileOperator
-SQL_TABLE = os.environ.get("SQL_TABLE", "test_table")
-SQL_CONN_ID = "presto_default"
+SQL_CONN_ID = os.environ.get("SQL_CONN_ID", "postgres_default")
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "example_sql_to_slack"
@@ -38,16 +37,18 @@ with models.DAG(
catchup=False,
tags=["example"],
) as dag:
- # [START howto_operator_sql_to_slack]
- SqlToSlackOperator(
- task_id="presto_to_slack",
+ # [START howto_operator_sql_to_slack_api_file]
+ SqlToSlackApiFileOperator(
+ task_id="sql_to_slack_api_file",
sql_conn_id=SQL_CONN_ID,
- sql=f"SELECT col FROM {SQL_TABLE}",
- slack_channel="my_channel",
- slack_conn_id="slack_default",
- slack_message="message: {{ ds }}, {{ results_df }}",
+ sql="SELECT 6 as multiplier, 9 as multiplicand, 42 as answer",
+ slack_channels="C123456",
+ slack_conn_id="slack_api_default",
+ slack_filename="awesome.json.gz",
+ slack_initial_comment="Awesome compressed multiline JSON.",
+ df_kwargs={"orient": "records", "lines": True},
)
- # [END howto_operator_sql_to_slack]
+ # [END howto_operator_sql_to_slack_api_file]
from tests.system.utils import get_test_run # noqa: E402
diff --git a/tests/system/providers/slack/example_sql_to_slack.py
b/tests/system/providers/slack/example_sql_to_slack_webhook.py
similarity index 86%
copy from tests/system/providers/slack/example_sql_to_slack.py
copy to tests/system/providers/slack/example_sql_to_slack_webhook.py
index 0a45f27ccf..fc2e6ee2a9 100644
--- a/tests/system/providers/slack/example_sql_to_slack.py
+++ b/tests/system/providers/slack/example_sql_to_slack_webhook.py
@@ -24,7 +24,7 @@ import os
from datetime import datetime
from airflow import models
-from airflow.providers.slack.transfers.sql_to_slack import SqlToSlackOperator
+from airflow.providers.slack.transfers.sql_to_slack_webhook import
SqlToSlackWebhookOperator
SQL_TABLE = os.environ.get("SQL_TABLE", "test_table")
SQL_CONN_ID = "presto_default"
@@ -38,16 +38,16 @@ with models.DAG(
catchup=False,
tags=["example"],
) as dag:
- # [START howto_operator_sql_to_slack]
- SqlToSlackOperator(
+ # [START howto_operator_sql_to_slack_webhook]
+ SqlToSlackWebhookOperator(
task_id="presto_to_slack",
sql_conn_id=SQL_CONN_ID,
sql=f"SELECT col FROM {SQL_TABLE}",
slack_channel="my_channel",
- slack_conn_id="slack_default",
+ slack_webhook_conn_id="slack_default",
slack_message="message: {{ ds }}, {{ results_df }}",
)
- # [END howto_operator_sql_to_slack]
+ # [END howto_operator_sql_to_slack_webhook]
from tests.system.utils import get_test_run # noqa: E402