This is an automated email from the ASF dual-hosted git repository.

taragolis pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 850e1947a6 Reorganize SQL to Slack Operators (#35215)
850e1947a6 is described below

commit 850e1947a6691e3c0664e59bbb36debc3ab19f48
Author: Andrey Anshin <[email protected]>
AuthorDate: Sun Nov 5 02:23:36 2023 +0400

    Reorganize SQL to Slack Operators (#35215)
    
    * Reorganize SQL to Slack Operators
    
    * Fix examples and provider.yaml integrations
    
    * Changer TestSlackProviderProjectStructure
    
    * Mark SqlToSlackWebhookOperator as db_test
    
    ---------
    
    Co-authored-by: Elad Kalif <[email protected]>
---
 airflow/providers/slack/provider.yaml              |  28 +-
 .../providers/slack/transfers/base_sql_to_slack.py |  83 ++++++
 airflow/providers/slack/transfers/sql_to_slack.py  | 211 ++-------------
 .../slack/transfers/sql_to_slack_webhook.py        | 172 ++++++++++++
 .../operators/slack_operator_howto_guide.rst       |   6 +-
 .../operators/sql_to_slack.rst                     |  19 +-
 .../{sql_to_slack.rst => sql_to_slack_webhook.rst} |  18 +-
 tests/always/test_project_structure.py             |   6 +-
 tests/providers/slack/transfers/conftest.py        |  27 ++
 .../slack/transfers/test_base_sql_to_slack.py      |  97 +++++++
 .../providers/slack/transfers/test_sql_to_slack.py | 293 ++-------------------
 .../slack/transfers/test_sql_to_slack_webhook.py   | 271 +++++++++++++++++++
 .../system/providers/slack/example_sql_to_slack.py |  23 +-
 ...to_slack.py => example_sql_to_slack_webhook.py} |  10 +-
 14 files changed, 757 insertions(+), 507 deletions(-)

diff --git a/airflow/providers/slack/provider.yaml 
b/airflow/providers/slack/provider.yaml
index dbf82418d2..c1d67723ea 100644
--- a/airflow/providers/slack/provider.yaml
+++ b/airflow/providers/slack/provider.yaml
@@ -19,7 +19,10 @@
 package-name: apache-airflow-providers-slack
 name: Slack
 description: |
-    `Slack <https://slack.com/>`__
+    `Slack <https://slack.com/>`__ services integration including:
+
+      - `Slack API <https://api.slack.com/>`__
+      - `Slack Incoming Webhook <https://api.slack.com/messaging/webhooks>`__
 
 suspended: false
 versions:
@@ -56,28 +59,47 @@ dependencies:
 integrations:
   - integration-name: Slack
     external-doc-url: https://slack.com/
+    logo: /integration-logos/slack/Slack.png
+    tags: [service]
+  - integration-name: Slack API
+    external-doc-url: https://api.slack.com/
     how-to-guide:
       - 
/docs/apache-airflow-providers-slack/operators/slack_operator_howto_guide.rst
     logo: /integration-logos/slack/Slack.png
     tags: [service]
+  - integration-name: Slack Incoming Webhook
+    external-doc-url: https://api.slack.com/messaging/webhooks
+    logo: /integration-logos/slack/Slack.png
+    tags: [service]
 
 operators:
-  - integration-name: Slack
+  - integration-name: Slack API
     python-modules:
       - airflow.providers.slack.operators.slack
+  - integration-name: Slack Incoming Webhook
+    python-modules:
       - airflow.providers.slack.operators.slack_webhook
 
 hooks:
-  - integration-name: Slack
+  - integration-name: Slack API
     python-modules:
       - airflow.providers.slack.hooks.slack
+  - integration-name: Slack Incoming Webhook
+    python-modules:
       - airflow.providers.slack.hooks.slack_webhook
 
 transfers:
   - source-integration-name: Common SQL
     target-integration-name: Slack
+    python-module: airflow.providers.slack.transfers.base_sql_to_slack
+  - source-integration-name: Common SQL
+    target-integration-name: Slack API
     python-module: airflow.providers.slack.transfers.sql_to_slack
     how-to-guide: 
/docs/apache-airflow-providers-slack/operators/sql_to_slack.rst
+  - source-integration-name: Common SQL
+    target-integration-name: Slack Incoming Webhook
+    python-module: airflow.providers.slack.transfers.sql_to_slack_webhook
+    how-to-guide: 
/docs/apache-airflow-providers-slack/operators/sql_to_slack_webhook.rst
 
 connection-types:
   - hook-class-name: airflow.providers.slack.hooks.slack.SlackHook
diff --git a/airflow/providers/slack/transfers/base_sql_to_slack.py 
b/airflow/providers/slack/transfers/base_sql_to_slack.py
new file mode 100644
index 0000000000..cdc5b4cc7a
--- /dev/null
+++ b/airflow/providers/slack/transfers/base_sql_to_slack.py
@@ -0,0 +1,83 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Any, Iterable, Mapping
+
+from airflow.exceptions import AirflowException
+from airflow.hooks.base import BaseHook
+from airflow.models import BaseOperator
+
+if TYPE_CHECKING:
+    import pandas as pd
+    from slack_sdk.http_retry import RetryHandler
+
+    from airflow.providers.common.sql.hooks.sql import DbApiHook
+
+
+class BaseSqlToSlackOperator(BaseOperator):
+    """
+    Operator implements base sql methods for SQL to Slack Transfer operators.
+
+    :param sql: The SQL query to be executed
+    :param sql_conn_id: reference to a specific DB-API Connection.
+    :param sql_hook_params: Extra config params to be passed to the underlying 
hook.
+        Should match the desired hook constructor params.
+    :param parameters: The parameters to pass to the SQL query.
+    :param slack_proxy: Proxy to make the Slack Incoming Webhook / API calls. 
Optional
+    :param slack_timeout: The maximum number of seconds the client will wait 
to connect
+        and receive a response from Slack. Optional
+    :param slack_retry_handlers: List of handlers to customize retry logic. 
Optional
+    """
+
+    def __init__(
+        self,
+        *,
+        sql: str,
+        sql_conn_id: str,
+        sql_hook_params: dict | None = None,
+        parameters: Iterable | Mapping[str, Any] | None = None,
+        slack_proxy: str | None = None,
+        slack_timeout: int | None = None,
+        slack_retry_handlers: list[RetryHandler] | None = None,
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.sql_conn_id = sql_conn_id
+        self.sql_hook_params = sql_hook_params
+        self.sql = sql
+        self.parameters = parameters
+        self.slack_proxy = slack_proxy
+        self.slack_timeout = slack_timeout
+        self.slack_retry_handlers = slack_retry_handlers
+
+    def _get_hook(self) -> DbApiHook:
+        self.log.debug("Get connection for %s", self.sql_conn_id)
+        conn = BaseHook.get_connection(self.sql_conn_id)
+        hook = conn.get_hook(hook_params=self.sql_hook_params)
+        if not callable(getattr(hook, "get_pandas_df", None)):
+            raise AirflowException(
+                "This hook is not supported. The hook class must have 
get_pandas_df method."
+            )
+        return hook
+
+    def _get_query_results(self) -> pd.DataFrame:
+        sql_hook = self._get_hook()
+
+        self.log.info("Running SQL query: %s", self.sql)
+        df = sql_hook.get_pandas_df(self.sql, parameters=self.parameters)
+        return df
diff --git a/airflow/providers/slack/transfers/sql_to_slack.py 
b/airflow/providers/slack/transfers/sql_to_slack.py
index 9c127d77a1..2bc249c7c1 100644
--- a/airflow/providers/slack/transfers/sql_to_slack.py
+++ b/airflow/providers/slack/transfers/sql_to_slack.py
@@ -16,188 +16,27 @@
 # under the License.
 from __future__ import annotations
 
+import warnings
 from tempfile import NamedTemporaryFile
 from typing import TYPE_CHECKING, Any, Iterable, Mapping, Sequence
 
-from tabulate import tabulate
-
-from airflow.exceptions import AirflowException
-from airflow.hooks.base import BaseHook
-from airflow.models import BaseOperator
+from airflow.exceptions import AirflowException, 
AirflowProviderDeprecationWarning
 from airflow.providers.slack.hooks.slack import SlackHook
-from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
+from airflow.providers.slack.transfers.base_sql_to_slack import 
BaseSqlToSlackOperator
+from airflow.providers.slack.transfers.sql_to_slack_webhook import 
SqlToSlackWebhookOperator
 from airflow.providers.slack.utils import parse_filename
 
 if TYPE_CHECKING:
-    import pandas as pd
-    from slack_sdk.http_retry import RetryHandler
-
-    from airflow.providers.common.sql.hooks.sql import DbApiHook
     from airflow.utils.context import Context
 
 
-class BaseSqlToSlackOperator(BaseOperator):
-    """
-    Operator implements base sql methods for SQL to Slack Transfer operators.
-
-    :param sql: The SQL query to be executed
-    :param sql_conn_id: reference to a specific DB-API Connection.
-    :param sql_hook_params: Extra config params to be passed to the underlying 
hook.
-        Should match the desired hook constructor params.
-    :param parameters: The parameters to pass to the SQL query.
-    :param slack_proxy: Proxy to make the Slack Incoming Webhook / API calls. 
Optional
-    :param slack_timeout: The maximum number of seconds the client will wait 
to connect
-        and receive a response from Slack. Optional
-    :param slack_retry_handlers: List of handlers to customize retry logic. 
Optional
-    """
-
-    def __init__(
-        self,
-        *,
-        sql: str,
-        sql_conn_id: str,
-        sql_hook_params: dict | None = None,
-        parameters: Iterable | Mapping[str, Any] | None = None,
-        slack_proxy: str | None = None,
-        slack_timeout: int | None = None,
-        slack_retry_handlers: list[RetryHandler] | None = None,
-        **kwargs,
-    ):
-        super().__init__(**kwargs)
-        self.sql_conn_id = sql_conn_id
-        self.sql_hook_params = sql_hook_params
-        self.sql = sql
-        self.parameters = parameters
-        self.slack_proxy = slack_proxy
-        self.slack_timeout = slack_timeout
-        self.slack_retry_handlers = slack_retry_handlers
-
-    def _get_hook(self) -> DbApiHook:
-        self.log.debug("Get connection for %s", self.sql_conn_id)
-        conn = BaseHook.get_connection(self.sql_conn_id)
-        hook = conn.get_hook(hook_params=self.sql_hook_params)
-        if not callable(getattr(hook, "get_pandas_df", None)):
-            raise AirflowException(
-                "This hook is not supported. The hook class must have 
get_pandas_df method."
-            )
-        return hook
-
-    def _get_query_results(self) -> pd.DataFrame:
-        sql_hook = self._get_hook()
-
-        self.log.info("Running SQL query: %s", self.sql)
-        df = sql_hook.get_pandas_df(self.sql, parameters=self.parameters)
-        return df
-
-
-class SqlToSlackOperator(BaseSqlToSlackOperator):
+class SqlToSlackApiFileOperator(BaseSqlToSlackOperator):
     """
-    Executes an SQL statement in a given SQL connection and sends the results 
to Slack.
-
-    The results of the query are rendered into the 'slack_message' parameter 
as a Pandas
-    dataframe using a JINJA variable called '{{ results_df }}'. The 
'results_df' variable
-    name can be changed by specifying a different 'results_df_name' parameter. 
The Tabulate
-    library is added to the JINJA environment as a filter to allow the 
dataframe to be
-    rendered nicely. For example, set 'slack_message' to {{ results_df |
-    tabulate(tablefmt="pretty", headers="keys") }} to send the results to 
Slack as an ascii
-    rendered table.
+    Executes an SQL statement in a given SQL connection and sends the results 
to Slack API as file.
 
     .. seealso::
         For more information on how to use this operator, take a look at the 
guide:
-        :ref:`howto/operator:SqlToSlackOperator`
-
-    :param sql: The SQL query to be executed (templated)
-    :param slack_message: The templated Slack message to send with the data 
returned from the SQL connection.
-        You can use the default JINJA variable {{ results_df }} to access the 
pandas dataframe containing the
-        SQL results
-    :param sql_conn_id: reference to a specific database.
-    :param sql_hook_params: Extra config params to be passed to the underlying 
hook.
-           Should match the desired hook constructor params.
-    :param slack_conn_id: The connection id for Slack.
-    :param slack_channel: The channel to send message. Override default from 
Slack connection.
-    :param results_df_name: The name of the JINJA template's dataframe 
variable, default is 'results_df'
-    :param parameters: The parameters to pass to the SQL query
-    """
-
-    template_fields: Sequence[str] = ("sql", "slack_message")
-    template_ext: Sequence[str] = (".sql", ".jinja", ".j2")
-    template_fields_renderers = {"sql": "sql", "slack_message": "jinja"}
-    times_rendered = 0
-
-    def __init__(
-        self,
-        *,
-        sql: str,
-        sql_conn_id: str,
-        slack_conn_id: str,
-        sql_hook_params: dict | None = None,
-        slack_channel: str | None = None,
-        slack_message: str,
-        results_df_name: str = "results_df",
-        parameters: Iterable | Mapping[str, Any] | None = None,
-        **kwargs,
-    ) -> None:
-        super().__init__(
-            sql=sql, sql_conn_id=sql_conn_id, sql_hook_params=sql_hook_params, 
parameters=parameters, **kwargs
-        )
-
-        self.slack_conn_id = slack_conn_id
-        self.slack_channel = slack_channel
-        self.slack_message = slack_message
-        self.results_df_name = results_df_name
-        self.kwargs = kwargs
-
-    def _render_and_send_slack_message(self, context, df) -> None:
-        # Put the dataframe into the context and render the JINJA template 
fields
-        context[self.results_df_name] = df
-        self.render_template_fields(context)
-
-        slack_hook = self._get_slack_hook()
-        self.log.info("Sending slack message: %s", self.slack_message)
-        slack_hook.send(text=self.slack_message, channel=self.slack_channel)
-
-    def _get_slack_hook(self) -> SlackWebhookHook:
-        return SlackWebhookHook(
-            slack_webhook_conn_id=self.slack_conn_id,
-            proxy=self.slack_proxy,
-            timeout=self.slack_timeout,
-            retry_handlers=self.slack_retry_handlers,
-        )
-
-    def render_template_fields(self, context, jinja_env=None) -> None:
-        # If this is the first render of the template fields, exclude 
slack_message from rendering since
-        # the SQL results haven't been retrieved yet.
-        if self.times_rendered == 0:
-            fields_to_render: Iterable[str] = (x for x in self.template_fields 
if x != "slack_message")
-        else:
-            fields_to_render = self.template_fields
-
-        if not jinja_env:
-            jinja_env = self.get_template_env()
-
-        # Add the tabulate library into the JINJA environment
-        jinja_env.filters["tabulate"] = tabulate
-
-        self._do_render_template_fields(self, fields_to_render, context, 
jinja_env, set())
-        self.times_rendered += 1
-
-    def execute(self, context: Context) -> None:
-        if not isinstance(self.sql, str):
-            raise AirflowException("Expected 'sql' parameter should be a 
string.")
-        if self.sql is None or self.sql.strip() == "":
-            raise AirflowException("Expected 'sql' parameter is missing.")
-        if self.slack_message is None or self.slack_message.strip() == "":
-            raise AirflowException("Expected 'slack_message' parameter is 
missing.")
-
-        df = self._get_query_results()
-        self._render_and_send_slack_message(context, df)
-
-        self.log.debug("Finished sending SQL data to Slack")
-
-
-class SqlToSlackApiFileOperator(BaseSqlToSlackOperator):
-    """
-    Executes an SQL statement in a given SQL connection and sends the results 
to Slack API as file.
+        :ref:`howto/operator:SqlToSlackApiFileOperator`
 
     :param sql: The SQL query to be executed
     :param sql_conn_id: reference to a specific DB-API Connection.
@@ -215,24 +54,6 @@ class SqlToSlackApiFileOperator(BaseSqlToSlackOperator):
     :param slack_title: Title of file.
     :param slack_base_url: A string representing the Slack API base URL. 
Optional
     :param df_kwargs: Keyword arguments forwarded to 
``pandas.DataFrame.to_{format}()`` method.
-
-
-    Example:
-     .. code-block:: python
-
-        SqlToSlackApiFileOperator(
-            task_id="sql_to_slack",
-            sql="SELECT 1 a, 2 b, 3 c",
-            sql_conn_id="sql-connection",
-            slack_conn_id="slack-api-connection",
-            slack_filename="awesome.json.gz",
-            slack_channels="#random,#general",
-            slack_initial_comment="Awesome load to compressed multiline JSON.",
-            df_kwargs={
-                "orient": "records",
-                "lines": True,
-            },
-        )
     """
 
     template_fields: Sequence[str] = (
@@ -315,3 +136,21 @@ class SqlToSlackApiFileOperator(BaseSqlToSlackOperator):
                 initial_comment=self.slack_initial_comment,
                 title=self.slack_title,
             )
+
+
+class SqlToSlackOperator(SqlToSlackWebhookOperator):
+    """
+    Executes an SQL statement in a given SQL connection and sends the results 
to Slack Incoming Webhook.
+
+    Deprecated, use 
:class:`airflow.providers.slack.transfers.sql_to_slack_webhook.SqlToSlackWebhookOperator`
+    """
+
+    def __init__(self, *args, **kwargs):
+        warnings.warn(
+            
"`airflow.providers.slack.transfers.sql_to_slack.SqlToSlackOperator` has been 
renamed "
+            "and moved 
`airflow.providers.slack.transfers.sql_to_slack_webhook.SqlToSlackWebhookOperator`
 "
+            "this operator deprecated and will be removed in future",
+            AirflowProviderDeprecationWarning,
+            stacklevel=2,
+        )
+        super().__init__(*args, **kwargs)
diff --git a/airflow/providers/slack/transfers/sql_to_slack_webhook.py 
b/airflow/providers/slack/transfers/sql_to_slack_webhook.py
new file mode 100644
index 0000000000..31700ed8b0
--- /dev/null
+++ b/airflow/providers/slack/transfers/sql_to_slack_webhook.py
@@ -0,0 +1,172 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import warnings
+from typing import TYPE_CHECKING, Any, Iterable, Mapping, Sequence
+
+from tabulate import tabulate
+
+from airflow.exceptions import AirflowException, 
AirflowProviderDeprecationWarning
+from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
+from airflow.providers.slack.transfers.base_sql_to_slack import 
BaseSqlToSlackOperator
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+
+class SqlToSlackWebhookOperator(BaseSqlToSlackOperator):
+    """
+    Executes an SQL statement in a given SQL connection and sends the results 
to Slack Incoming Webhook.
+
+    The results of the query are rendered into the 'slack_message' parameter 
as a Pandas
+    dataframe using a JINJA variable called '{{ results_df }}'. The 
'results_df' variable
+    name can be changed by specifying a different 'results_df_name' parameter. 
The Tabulate
+    library is added to the JINJA environment as a filter to allow the 
dataframe to be
+    rendered nicely. For example, set 'slack_message' to {{ results_df |
+    tabulate(tablefmt="pretty", headers="keys") }} to send the results to 
Slack as an ascii
+    rendered table.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:SqlToSlackWebhookOperator`
+
+    .. note::
+        You cannot override the default channel (chosen by the user who 
installed your app),
+        Instead, these values will always inherit from the associated Slack 
App configuration
+        (`link 
<https://api.slack.com/messaging/webhooks#advanced_message_formatting>`_).
+        It is possible to change this values only in `Legacy Slack Integration 
Incoming Webhook
+        
<https://api.slack.com/legacy/custom-integrations/messaging/webhooks#legacy-customizations>`_.
+
+    .. warning::
+        This hook intend to use `Slack Incoming Webhook` connection
+        and might not work correctly with `Slack API` connection.
+
+    :param sql: The SQL query to be executed (templated)
+    :param slack_message: The templated Slack message to send with the data 
returned from the SQL connection.
+        You can use the default JINJA variable {{ results_df }} to access the 
pandas dataframe containing the
+        SQL results
+    :param sql_conn_id: reference to a specific database.
+    :param sql_hook_params: Extra config params to be passed to the underlying 
hook.
+           Should match the desired hook constructor params.
+    :param slack_webhook_conn_id: :ref:`Slack Incoming Webhook 
<howto/connection:slack>`
+        connection id that has Incoming Webhook token in the password field.
+    :param slack_channel: The channel to send message.
+    :param results_df_name: The name of the JINJA template's dataframe 
variable, default is 'results_df'
+    :param parameters: The parameters to pass to the SQL query
+    """
+
+    template_fields: Sequence[str] = ("sql", "slack_message")
+    template_ext: Sequence[str] = (".sql", ".jinja", ".j2")
+    template_fields_renderers = {"sql": "sql", "slack_message": "jinja"}
+    times_rendered = 0
+
+    def __init__(
+        self,
+        *,
+        sql: str,
+        sql_conn_id: str,
+        slack_webhook_conn_id: str | None = None,
+        sql_hook_params: dict | None = None,
+        slack_channel: str | None = None,
+        slack_message: str,
+        results_df_name: str = "results_df",
+        parameters: Iterable | Mapping[str, Any] | None = None,
+        **kwargs,
+    ) -> None:
+        if slack_conn_id := kwargs.pop("slack_conn_id", None):
+            warnings.warn(
+                "Parameter `slack_conn_id` is deprecated because this 
attribute initially intend to use with "
+                "Slack API however this operator provided integration with 
Slack Incoming Webhook. "
+                "Please use `slack_webhook_conn_id` instead.",
+                AirflowProviderDeprecationWarning,
+                stacklevel=3,
+            )
+            if slack_webhook_conn_id and slack_conn_id != 
slack_webhook_conn_id:
+                raise ValueError(
+                    "Conflicting Connection ids provided, "
+                    f"slack_webhook_conn_id={slack_webhook_conn_id!r}, 
slack_conn_id={slack_conn_id!r}."
+                )
+            slack_webhook_conn_id = slack_conn_id
+        if not slack_webhook_conn_id:
+            raise ValueError("Got an empty `slack_webhook_conn_id` value.")
+        super().__init__(
+            sql=sql, sql_conn_id=sql_conn_id, sql_hook_params=sql_hook_params, 
parameters=parameters, **kwargs
+        )
+
+        self.slack_webhook_conn_id = slack_webhook_conn_id
+        self.slack_channel = slack_channel
+        self.slack_message = slack_message
+        self.results_df_name = results_df_name
+        self.kwargs = kwargs
+
+    def _render_and_send_slack_message(self, context, df) -> None:
+        # Put the dataframe into the context and render the JINJA template 
fields
+        context[self.results_df_name] = df
+        self.render_template_fields(context)
+
+        slack_hook = self._get_slack_hook()
+        self.log.info("Sending slack message: %s", self.slack_message)
+        slack_hook.send(text=self.slack_message, channel=self.slack_channel)
+
+    def _get_slack_hook(self) -> SlackWebhookHook:
+        return SlackWebhookHook(
+            slack_webhook_conn_id=self.slack_webhook_conn_id,
+            proxy=self.slack_proxy,
+            timeout=self.slack_timeout,
+            retry_handlers=self.slack_retry_handlers,
+        )
+
+    def render_template_fields(self, context, jinja_env=None) -> None:
+        # If this is the first render of the template fields, exclude 
slack_message from rendering since
+        # the SQL results haven't been retrieved yet.
+        if self.times_rendered == 0:
+            fields_to_render: Iterable[str] = (x for x in self.template_fields 
if x != "slack_message")
+        else:
+            fields_to_render = self.template_fields
+
+        if not jinja_env:
+            jinja_env = self.get_template_env()
+
+        # Add the tabulate library into the JINJA environment
+        jinja_env.filters["tabulate"] = tabulate
+
+        self._do_render_template_fields(self, fields_to_render, context, 
jinja_env, set())
+        self.times_rendered += 1
+
+    def execute(self, context: Context) -> None:
+        if not isinstance(self.sql, str):
+            raise AirflowException("Expected 'sql' parameter should be a 
string.")
+        if self.sql is None or self.sql.strip() == "":
+            raise AirflowException("Expected 'sql' parameter is missing.")
+        if self.slack_message is None or self.slack_message.strip() == "":
+            raise AirflowException("Expected 'slack_message' parameter is 
missing.")
+
+        df = self._get_query_results()
+        self._render_and_send_slack_message(context, df)
+
+        self.log.debug("Finished sending SQL data to Slack")
+
+    @property
+    def slack_conn_id(self):
+        warnings.warn(
+            f"`{type(self).__name__}.slack_conn_id` property deprecated and 
will be removed in a future. "
+            "Please use `slack_webhook_conn_id` instead.",
+            AirflowProviderDeprecationWarning,
+            stacklevel=2,
+        )
+        return self.slack_webhook_conn_id
diff --git 
a/docs/apache-airflow-providers-slack/operators/slack_operator_howto_guide.rst 
b/docs/apache-airflow-providers-slack/operators/slack_operator_howto_guide.rst
index 66d57a46df..62464230c3 100644
--- 
a/docs/apache-airflow-providers-slack/operators/slack_operator_howto_guide.rst
+++ 
b/docs/apache-airflow-providers-slack/operators/slack_operator_howto_guide.rst
@@ -21,9 +21,9 @@ How-to Guide for Slack Operators
 Introduction
 ------------
 
-Slack operators can send text messages 
(:class:`~airflow.providers.slack.operators.slack.SlackAPIFileOperator`)
-or files 
(:class:`~airflow.providers.slack.operators.slack.SlackAPIPostOperator`) to 
specified Slack channels.
-Provide ``slack_conn_id`` for the connection, and specify ``channel`` (name or 
ID).
+Slack operators can send text messages 
(:class:`~airflow.providers.slack.operators.slack.SlackAPIPostOperator`)
+or files 
(:class:`~airflow.providers.slack.operators.slack.SlackAPIFileOperator`) to 
specified Slack channels.
+Provide ``slack_webhook_conn_id`` for the connection, and specify ``channel`` 
(name or ID).
 
 Example Code for Sending Files
 ------------------------------
diff --git a/docs/apache-airflow-providers-slack/operators/sql_to_slack.rst 
b/docs/apache-airflow-providers-slack/operators/sql_to_slack.rst
index cda5c28c6d..de9fe136d9 100644
--- a/docs/apache-airflow-providers-slack/operators/sql_to_slack.rst
+++ b/docs/apache-airflow-providers-slack/operators/sql_to_slack.rst
@@ -15,24 +15,23 @@
     specific language governing permissions and limitations
     under the License.
 
-.. _howto/operator:SqlToSlackOperator:
+.. _howto/operator:SqlToSlackApiFileOperator:
 
-SqlToSlackOperator
-==================
+SqlToSlackApiFileOperator
+=========================
 
-Use the :class:`~airflow.providers.slack.transfers.sql_to_slack` to post 
messages to predefined Slack
-channel.
+Use the 
:class:`~airflow.providers.slack.transfers.sql_to_slack.SqlToSlackApiFileOperator`
 to post query result as a file
+to Slack channel(s) through `Slack API <https://api.slack.com/>`__.
 
 Using the Operator
 ^^^^^^^^^^^^^^^^^^
 
-This operator will execute a custom query in the provided SQL connection and 
publish a Slack message that can be formatted
-and contain the resulting dataset (e.g. ASCII formatted dataframe).
+This operator will execute a custom query in the provided SQL connection and 
publish a file to Slack channel(s).
 
-An example usage of the SqlToSlackOperator is as follows:
+An example usage of the SqlToSlackApiFileOperator is as follows:
 
 .. exampleinclude:: /../../tests/system/providers/slack/example_sql_to_slack.py
     :language: python
     :dedent: 4
-    :start-after: [START howto_operator_sql_to_slack]
-    :end-before: [END howto_operator_sql_to_slack]
+    :start-after: [START howto_operator_sql_to_slack_api_file]
+    :end-before: [END howto_operator_sql_to_slack_api_file]
diff --git a/docs/apache-airflow-providers-slack/operators/sql_to_slack.rst 
b/docs/apache-airflow-providers-slack/operators/sql_to_slack_webhook.rst
similarity index 68%
copy from docs/apache-airflow-providers-slack/operators/sql_to_slack.rst
copy to docs/apache-airflow-providers-slack/operators/sql_to_slack_webhook.rst
index cda5c28c6d..8bd1131566 100644
--- a/docs/apache-airflow-providers-slack/operators/sql_to_slack.rst
+++ b/docs/apache-airflow-providers-slack/operators/sql_to_slack_webhook.rst
@@ -15,13 +15,13 @@
     specific language governing permissions and limitations
     under the License.
 
-.. _howto/operator:SqlToSlackOperator:
+.. _howto/operator:SqlToSlackWebhookOperator:
 
-SqlToSlackOperator
-==================
+SqlToSlackWebhookOperator
+=========================
 
-Use the :class:`~airflow.providers.slack.transfers.sql_to_slack` to post 
messages to predefined Slack
-channel.
+Use the 
:class:`~airflow.providers.slack.transfers.sql_to_slack_webhook.SqlToSlackWebhookOperator`
 to post messages
+to predefined Slack channel through `Incoming Webhook 
<https://api.slack.com/messaging/webhooks>`__.
 
 Using the Operator
 ^^^^^^^^^^^^^^^^^^
@@ -29,10 +29,10 @@ Using the Operator
 This operator will execute a custom query in the provided SQL connection and 
publish a Slack message that can be formatted
 and contain the resulting dataset (e.g. ASCII formatted dataframe).
 
-An example usage of the SqlToSlackOperator is as follows:
+An example usage of the SqlToSlackWebhookOperator is as follows:
 
-.. exampleinclude:: /../../tests/system/providers/slack/example_sql_to_slack.py
+.. exampleinclude:: 
/../../tests/system/providers/slack/example_sql_to_slack_webhook.py
     :language: python
     :dedent: 4
-    :start-after: [START howto_operator_sql_to_slack]
-    :end-before: [END howto_operator_sql_to_slack]
+    :start-after: [START howto_operator_sql_to_slack_webhook]
+    :end-before: [END howto_operator_sql_to_slack_webhook]
diff --git a/tests/always/test_project_structure.py 
b/tests/always/test_project_structure.py
index 5f774f425f..f0e1a2e35a 100644
--- a/tests/always/test_project_structure.py
+++ b/tests/always/test_project_structure.py
@@ -611,16 +611,16 @@ class 
TestSlackProviderProjectStructure(ExampleCoverageTest):
     PROVIDER = "slack"
     CLASS_DIRS = ProjectStructureTest.CLASS_DIRS
     BASE_CLASSES = {
-        
"airflow.providers.slack.transfers.sql_to_slack.BaseSqlToSlackOperator",
+        
"airflow.providers.slack.transfers.base_sql_to_slack.BaseSqlToSlackOperator",
+        "airflow.providers.slack.operators.slack.SlackAPIOperator",
     }
     MISSING_EXAMPLES_FOR_CLASSES = {
-        "airflow.providers.slack.operators.slack.SlackAPIOperator",
         "airflow.providers.slack.operators.slack.SlackAPIPostOperator",
         "airflow.providers.slack.operators.slack_webhook.SlackWebhookOperator",
-        
"airflow.providers.slack.transfers.sql_to_slack.SqlToSlackApiFileOperator",
     }
     DEPRECATED_CLASSES = {
         "airflow.providers.slack.notifications.slack_notifier.py.",
+        "airflow.providers.slack.transfers.sql_to_slack.SqlToSlackOperator",
     }
 
 
diff --git a/tests/providers/slack/transfers/conftest.py 
b/tests/providers/slack/transfers/conftest.py
new file mode 100644
index 0000000000..83944b5148
--- /dev/null
+++ b/tests/providers/slack/transfers/conftest.py
@@ -0,0 +1,27 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from unittest import mock
+
+import pytest
+
+
[email protected]
+def mocked_get_connection():
+    with 
mock.patch("airflow.providers.common.sql.operators.sql.BaseHook.get_connection")
 as m:
+        yield m
diff --git a/tests/providers/slack/transfers/test_base_sql_to_slack.py 
b/tests/providers/slack/transfers/test_base_sql_to_slack.py
new file mode 100644
index 0000000000..f12767b8d6
--- /dev/null
+++ b/tests/providers/slack/transfers/test_base_sql_to_slack.py
@@ -0,0 +1,97 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from unittest import mock
+
+import pandas as pd
+import pytest
+
+from airflow.exceptions import AirflowException
+from airflow.models import Connection
+from airflow.providers.slack.transfers.base_sql_to_slack import 
BaseSqlToSlackOperator
+
+
+class TestBaseSqlToSlackOperator:
+    def setup_method(self):
+        self.default_op_kwargs = {
+            "sql": "SELECT 1",
+            "sql_conn_id": "test-sql-conn-id",
+            "sql_hook_params": None,
+            "parameters": None,
+        }
+
+    def test_execute_not_implemented(self):
+        """Test that no base implementation for 
``BaseSqlToSlackOperator.execute()``."""
+        op = BaseSqlToSlackOperator(task_id="test_base_not_implements", 
**self.default_op_kwargs)
+        with pytest.raises(NotImplementedError):
+            op.execute(mock.MagicMock())
+
+    
@mock.patch("airflow.providers.common.sql.operators.sql.BaseHook.get_connection")
+    @mock.patch("airflow.models.connection.Connection.get_hook")
+    @pytest.mark.parametrize("conn_type", ["postgres", "snowflake"])
+    @pytest.mark.parametrize("sql_hook_params", [None, {"foo": "bar"}])
+    def test_get_hook(self, mock_get_hook, mock_get_conn, conn_type, 
sql_hook_params):
+        class SomeDummyHook:
+            """Hook which implements ``get_pandas_df`` method"""
+
+            def get_pandas_df(self):
+                pass
+
+        expected_hook = SomeDummyHook()
+        mock_get_conn.return_value = 
Connection(conn_id=f"test_connection_{conn_type}", conn_type=conn_type)
+        mock_get_hook.return_value = expected_hook
+        op_kwargs = {
+            **self.default_op_kwargs,
+            "sql_hook_params": sql_hook_params,
+        }
+        op = BaseSqlToSlackOperator(task_id="test_get_hook", **op_kwargs)
+        hook = op._get_hook()
+        mock_get_hook.assert_called_once_with(hook_params=sql_hook_params)
+        assert hook == expected_hook
+
+    
@mock.patch("airflow.providers.common.sql.operators.sql.BaseHook.get_connection")
+    @mock.patch("airflow.models.connection.Connection.get_hook")
+    def test_get_not_supported_hook(self, mock_get_hook, mock_get_conn):
+        class SomeDummyHook:
+            """Hook which not implemented ``get_pandas_df`` method"""
+
+        mock_get_conn.return_value = Connection(conn_id="test_connection", 
conn_type="test_connection")
+        mock_get_hook.return_value = SomeDummyHook()
+        op = BaseSqlToSlackOperator(task_id="test_get_not_supported_hook", 
**self.default_op_kwargs)
+        error_message = r"This hook is not supported. The hook class must have 
get_pandas_df method\."
+        with pytest.raises(AirflowException, match=error_message):
+            op._get_hook()
+
+    
@mock.patch("airflow.providers.slack.transfers.sql_to_slack.BaseSqlToSlackOperator._get_hook")
+    @pytest.mark.parametrize("sql", ["SELECT 42", "SELECT 1 FROM DUMMY WHERE 
col = ?"])
+    @pytest.mark.parametrize("parameters", [None, {"col": "spam-egg"}])
+    def test_get_query_results(self, mock_op_get_hook, sql, parameters):
+        test_df = pd.DataFrame({"a": "1", "b": "2"}, index=[0, 1])
+        mock_get_pandas_df = mock.MagicMock(return_value=test_df)
+        mock_hook = mock.MagicMock()
+        mock_hook.get_pandas_df = mock_get_pandas_df
+        mock_op_get_hook.return_value = mock_hook
+        op_kwargs = {
+            **self.default_op_kwargs,
+            "sql": sql,
+            "parameters": parameters,
+        }
+        op = BaseSqlToSlackOperator(task_id="test_get_query_results", 
**op_kwargs)
+        df = op._get_query_results()
+        mock_get_pandas_df.assert_called_once_with(sql, parameters=parameters)
+        assert df is test_df
diff --git a/tests/providers/slack/transfers/test_sql_to_slack.py 
b/tests/providers/slack/transfers/test_sql_to_slack.py
index cb33fff516..9d60f28b42 100644
--- a/tests/providers/slack/transfers/test_sql_to_slack.py
+++ b/tests/providers/slack/transfers/test_sql_to_slack.py
@@ -18,16 +18,10 @@ from __future__ import annotations
 
 from unittest import mock
 
-import pandas as pd
 import pytest
 
-from airflow.exceptions import AirflowException
-from airflow.models import DAG, Connection
-from airflow.providers.slack.transfers.sql_to_slack import (
-    BaseSqlToSlackOperator,
-    SqlToSlackApiFileOperator,
-    SqlToSlackOperator,
-)
+from airflow.exceptions import AirflowProviderDeprecationWarning
+from airflow.providers.slack.transfers.sql_to_slack import 
SqlToSlackApiFileOperator, SqlToSlackOperator
 from airflow.utils import timezone
 
 TEST_DAG_ID = "sql_to_slack_unit_test"
@@ -35,275 +29,6 @@ TEST_TASK_ID = "sql_to_slack_unit_test_task"
 DEFAULT_DATE = timezone.datetime(2017, 1, 1)
 
 
-class TestBaseSqlToSlackOperator:
-    def setup_method(self):
-        self.default_op_kwargs = {
-            "sql": "SELECT 1",
-            "sql_conn_id": "test-sql-conn-id",
-            "sql_hook_params": None,
-            "parameters": None,
-        }
-
-    def test_execute_not_implemented(self):
-        """Test that no base implementation for 
``BaseSqlToSlackOperator.execute()``."""
-        op = BaseSqlToSlackOperator(task_id="test_base_not_implements", 
**self.default_op_kwargs)
-        with pytest.raises(NotImplementedError):
-            op.execute(mock.MagicMock())
-
-    
@mock.patch("airflow.providers.common.sql.operators.sql.BaseHook.get_connection")
-    @mock.patch("airflow.models.connection.Connection.get_hook")
-    @pytest.mark.parametrize("conn_type", ["postgres", "snowflake"])
-    @pytest.mark.parametrize("sql_hook_params", [None, {"foo": "bar"}])
-    def test_get_hook(self, mock_get_hook, mock_get_conn, conn_type, 
sql_hook_params):
-        class SomeDummyHook:
-            """Hook which implements ``get_pandas_df`` method"""
-
-            def get_pandas_df(self):
-                pass
-
-        expected_hook = SomeDummyHook()
-        mock_get_conn.return_value = 
Connection(conn_id=f"test_connection_{conn_type}", conn_type=conn_type)
-        mock_get_hook.return_value = expected_hook
-        op_kwargs = {
-            **self.default_op_kwargs,
-            "sql_hook_params": sql_hook_params,
-        }
-        op = BaseSqlToSlackOperator(task_id="test_get_hook", **op_kwargs)
-        hook = op._get_hook()
-        mock_get_hook.assert_called_once_with(hook_params=sql_hook_params)
-        assert hook == expected_hook
-
-    
@mock.patch("airflow.providers.common.sql.operators.sql.BaseHook.get_connection")
-    @mock.patch("airflow.models.connection.Connection.get_hook")
-    def test_get_not_supported_hook(self, mock_get_hook, mock_get_conn):
-        class SomeDummyHook:
-            """Hook which not implemented ``get_pandas_df`` method"""
-
-        mock_get_conn.return_value = Connection(conn_id="test_connection", 
conn_type="test_connection")
-        mock_get_hook.return_value = SomeDummyHook()
-        op = BaseSqlToSlackOperator(task_id="test_get_not_supported_hook", 
**self.default_op_kwargs)
-        error_message = r"This hook is not supported. The hook class must have 
get_pandas_df method\."
-        with pytest.raises(AirflowException, match=error_message):
-            op._get_hook()
-
-    
@mock.patch("airflow.providers.slack.transfers.sql_to_slack.BaseSqlToSlackOperator._get_hook")
-    @pytest.mark.parametrize("sql", ["SELECT 42", "SELECT 1 FROM DUMMY WHERE 
col = ?"])
-    @pytest.mark.parametrize("parameters", [None, {"col": "spam-egg"}])
-    def test_get_query_results(self, mock_op_get_hook, sql, parameters):
-        test_df = pd.DataFrame({"a": "1", "b": "2"}, index=[0, 1])
-        mock_get_pandas_df = mock.MagicMock(return_value=test_df)
-        mock_hook = mock.MagicMock()
-        mock_hook.get_pandas_df = mock_get_pandas_df
-        mock_op_get_hook.return_value = mock_hook
-        op_kwargs = {
-            **self.default_op_kwargs,
-            "sql": sql,
-            "parameters": parameters,
-        }
-        op = BaseSqlToSlackOperator(task_id="test_get_query_results", 
**op_kwargs)
-        df = op._get_query_results()
-        mock_get_pandas_df.assert_called_once_with(sql, parameters=parameters)
-        assert df is test_df
-
-
[email protected]_test
-class TestSqlToSlackOperator:
-    def setup_method(self):
-        self.example_dag = DAG(TEST_DAG_ID, start_date=DEFAULT_DATE)
-        self.default_hook_parameters = {"timeout": None, "proxy": None, 
"retry_handlers": None}
-
-    @staticmethod
-    def _construct_operator(**kwargs):
-        operator = SqlToSlackOperator(task_id=TEST_TASK_ID, **kwargs)
-        return operator
-
-    
@mock.patch("airflow.providers.slack.transfers.sql_to_slack.SlackWebhookHook")
-    @pytest.mark.parametrize(
-        "slack_op_kwargs, hook_extra_kwargs",
-        [
-            pytest.param(
-                {}, {"timeout": None, "proxy": None, "retry_handlers": None}, 
id="default-hook-parameters"
-            ),
-            pytest.param(
-                {"slack_timeout": 42, "slack_proxy": "http://spam.egg";, 
"slack_retry_handlers": []},
-                {"timeout": 42, "proxy": "http://spam.egg";, "retry_handlers": 
[]},
-                id="with-extra-hook-parameters",
-            ),
-        ],
-    )
-    def test_rendering_and_message_execution(self, mock_slack_hook_class, 
slack_op_kwargs, hook_extra_kwargs):
-        mock_dbapi_hook = mock.Mock()
-
-        test_df = pd.DataFrame({"a": "1", "b": "2"}, index=[0, 1])
-        get_pandas_df_mock = mock_dbapi_hook.return_value.get_pandas_df
-        get_pandas_df_mock.return_value = test_df
-
-        operator_args = {
-            "sql_conn_id": "snowflake_connection",
-            "slack_conn_id": "slack_connection",
-            "slack_message": "message: {{ ds }}, {{ results_df }}",
-            "slack_channel": "#test",
-            "sql": "sql {{ ds }}",
-            "dag": self.example_dag,
-            **slack_op_kwargs,
-        }
-        sql_to_slack_operator = self._construct_operator(**operator_args)
-
-        slack_webhook_hook = mock_slack_hook_class.return_value
-        sql_to_slack_operator._get_hook = mock_dbapi_hook
-        sql_to_slack_operator.run(start_date=DEFAULT_DATE, 
end_date=DEFAULT_DATE, ignore_ti_state=True)
-
-        # Test that the Slack hook is instantiated with the right parameters
-        mock_slack_hook_class.assert_called_once_with(
-            slack_webhook_conn_id="slack_connection", **hook_extra_kwargs
-        )
-
-        # Test that the `SlackWebhookHook.send` method gets run once
-        slack_webhook_hook.send.assert_called_once_with(
-            text=f"message: 2017-01-01, {test_df}",
-            channel="#test",
-        )
-
-    
@mock.patch("airflow.providers.slack.transfers.sql_to_slack.SlackWebhookHook")
-    def test_rendering_and_message_execution_with_slack_hook(self, 
mock_slack_hook_class):
-        mock_dbapi_hook = mock.Mock()
-
-        test_df = pd.DataFrame({"a": "1", "b": "2"}, index=[0, 1])
-        get_pandas_df_mock = mock_dbapi_hook.return_value.get_pandas_df
-        get_pandas_df_mock.return_value = test_df
-
-        operator_args = {
-            "sql_conn_id": "snowflake_connection",
-            "slack_conn_id": "slack_connection",
-            "slack_message": "message: {{ ds }}, {{ results_df }}",
-            "slack_channel": "#test",
-            "sql": "sql {{ ds }}",
-            "dag": self.example_dag,
-        }
-        sql_to_slack_operator = self._construct_operator(**operator_args)
-
-        slack_webhook_hook = mock_slack_hook_class.return_value
-        sql_to_slack_operator._get_hook = mock_dbapi_hook
-        sql_to_slack_operator.run(start_date=DEFAULT_DATE, 
end_date=DEFAULT_DATE, ignore_ti_state=True)
-
-        # Test that the Slack hook is instantiated with the right parameters
-        mock_slack_hook_class.assert_called_once_with(
-            slack_webhook_conn_id="slack_connection", 
**self.default_hook_parameters
-        )
-
-        # Test that the `SlackWebhookHook.send` method gets run once
-        slack_webhook_hook.send.assert_called_once_with(
-            text=f"message: 2017-01-01, {test_df}",
-            channel="#test",
-        )
-
-    def test_non_existing_slack_parameters_provided_exception_thrown(self):
-        operator_args = {
-            "sql_conn_id": "snowflake_connection",
-            "slack_message": "message: {{ ds }}, {{ xxxx }}",
-            "sql": "sql {{ ds }}",
-        }
-        with pytest.raises(AirflowException):
-            self._construct_operator(**operator_args)
-
-    
@mock.patch("airflow.providers.slack.transfers.sql_to_slack.SlackWebhookHook")
-    def test_rendering_custom_df_name_message_execution(self, 
mock_slack_hook_class):
-        mock_dbapi_hook = mock.Mock()
-
-        test_df = pd.DataFrame({"a": "1", "b": "2"}, index=[0, 1])
-        get_pandas_df_mock = mock_dbapi_hook.return_value.get_pandas_df
-        get_pandas_df_mock.return_value = test_df
-
-        operator_args = {
-            "sql_conn_id": "snowflake_connection",
-            "slack_conn_id": "slack_connection",
-            "slack_message": "message: {{ ds }}, {{ testing }}",
-            "slack_channel": "#test",
-            "sql": "sql {{ ds }}",
-            "results_df_name": "testing",
-            "dag": self.example_dag,
-        }
-        sql_to_slack_operator = self._construct_operator(**operator_args)
-
-        slack_webhook_hook = mock_slack_hook_class.return_value
-        sql_to_slack_operator._get_hook = mock_dbapi_hook
-        sql_to_slack_operator.run(start_date=DEFAULT_DATE, 
end_date=DEFAULT_DATE, ignore_ti_state=True)
-
-        # Test that the Slack hook is instantiated with the right parameters
-        mock_slack_hook_class.assert_called_once_with(
-            slack_webhook_conn_id="slack_connection", 
**self.default_hook_parameters
-        )
-
-        # Test that the `SlackWebhookHook.send` method gets run once
-        slack_webhook_hook.send.assert_called_once_with(
-            text=f"message: 2017-01-01, {test_df}",
-            channel="#test",
-        )
-
-    
@mock.patch("airflow.providers.common.sql.operators.sql.BaseHook.get_connection")
-    def test_hook_params_building(self, mock_get_conn):
-        mock_get_conn.return_value = 
Connection(conn_id="snowflake_connection", conn_type="snowflake")
-        hook_params = {
-            "schema": "test_schema",
-            "role": "test_role",
-            "database": "test_database",
-            "warehouse": "test_warehouse",
-        }
-        operator_args = {
-            "sql_conn_id": "dummy_connection",
-            "sql": "sql {{ ds }}",
-            "results_df_name": "xxxx",
-            "sql_hook_params": hook_params,
-            "slack_conn_id": "slack_connection",
-            "parameters": ["1", "2", "3"],
-            "slack_message": "message: {{ ds }}, {{ xxxx }}",
-            "dag": self.example_dag,
-        }
-        sql_to_slack_operator = SqlToSlackOperator(task_id=TEST_TASK_ID, 
**operator_args)
-
-        assert sql_to_slack_operator.sql_hook_params == hook_params
-
-    
@mock.patch("airflow.providers.common.sql.operators.sql.BaseHook.get_connection")
-    def test_hook_params(self, mock_get_conn):
-        mock_get_conn.return_value = Connection(conn_id="postgres_test", 
conn_type="postgres")
-        op = SqlToSlackOperator(
-            task_id="sql_hook_params",
-            sql_conn_id="postgres_test",
-            slack_conn_id="slack_connection",
-            sql="SELECT 1",
-            slack_message="message: {{ ds }}, {{ xxxx }}",
-            sql_hook_params={
-                "log_sql": False,
-            },
-        )
-        hook = op._get_hook()
-        assert hook.log_sql == op.sql_hook_params["log_sql"]
-
-    
@mock.patch("airflow.providers.common.sql.operators.sql.BaseHook.get_connection")
-    def test_hook_params_snowflake(self, mock_get_conn):
-        mock_get_conn.return_value = Connection(conn_id="snowflake_default", 
conn_type="snowflake")
-        op = SqlToSlackOperator(
-            task_id="snowflake_hook_params",
-            sql_conn_id="snowflake_default",
-            slack_conn_id="slack_default",
-            results_df_name="xxxx",
-            sql="SELECT 1",
-            slack_message="message: {{ ds }}, {{ xxxx }}",
-            sql_hook_params={
-                "warehouse": "warehouse",
-                "database": "database",
-                "role": "role",
-                "schema": "schema",
-            },
-        )
-        hook = op._get_hook()
-
-        assert hook.warehouse == "warehouse"
-        assert hook.database == "database"
-        assert hook.role == "role"
-        assert hook.schema == "schema"
-
-
 class TestSqlToSlackApiFileOperator:
     def setup_method(self):
         self.default_op_kwargs = {
@@ -419,3 +144,17 @@ class TestSqlToSlackApiFileOperator:
         )
         with pytest.raises(ValueError):
             op.execute(mock.MagicMock())
+
+
+def test_deprecated_sql_to_slack_operator():
+    warning_pattern = "SqlToSlackOperator` has been renamed and moved"
+    with pytest.warns(AirflowProviderDeprecationWarning, 
match=warning_pattern):
+        SqlToSlackOperator(
+            task_id="deprecated-sql-to-slack",
+            sql="SELECT 1",
+            sql_conn_id="test-sql-conn-id",
+            slack_webhook_conn_id="test-slack-conn-id",
+            sql_hook_params=None,
+            parameters=None,
+            slack_message="foo-bar",
+        )
diff --git a/tests/providers/slack/transfers/test_sql_to_slack_webhook.py 
b/tests/providers/slack/transfers/test_sql_to_slack_webhook.py
new file mode 100644
index 0000000000..aa71ec0b35
--- /dev/null
+++ b/tests/providers/slack/transfers/test_sql_to_slack_webhook.py
@@ -0,0 +1,271 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from contextlib import nullcontext
+from unittest import mock
+
+import pandas as pd
+import pytest
+
+from airflow import DAG
+from airflow.exceptions import AirflowProviderDeprecationWarning
+from airflow.models import Connection
+from airflow.providers.slack.transfers.sql_to_slack_webhook import 
SqlToSlackWebhookOperator
+from airflow.utils import timezone
+
+TEST_DAG_ID = "sql_to_slack_unit_test"
+TEST_TASK_ID = "sql_to_slack_unit_test_task"
+DEFAULT_DATE = timezone.datetime(2017, 1, 1)
+
+
[email protected]
+def mocked_hook():
+    with 
mock.patch("airflow.providers.slack.transfers.sql_to_slack_webhook.SlackWebhookHook")
 as m:
+        yield m
+
+
[email protected]_test
+class TestSqlToSlackWebhookOperator:
+    def setup_method(self):
+        self.example_dag = DAG(TEST_DAG_ID, start_date=DEFAULT_DATE)
+        self.default_hook_parameters = {"timeout": None, "proxy": None, 
"retry_handlers": None}
+
+    @staticmethod
+    def _construct_operator(**kwargs):
+        operator = SqlToSlackWebhookOperator(task_id=TEST_TASK_ID, **kwargs)
+        return operator
+
+    @pytest.mark.parametrize(
+        "slack_op_kwargs, hook_extra_kwargs",
+        [
+            pytest.param(
+                {}, {"timeout": None, "proxy": None, "retry_handlers": None}, 
id="default-hook-parameters"
+            ),
+            pytest.param(
+                {"slack_timeout": 42, "slack_proxy": "http://spam.egg";, 
"slack_retry_handlers": []},
+                {"timeout": 42, "proxy": "http://spam.egg";, "retry_handlers": 
[]},
+                id="with-extra-hook-parameters",
+            ),
+        ],
+    )
+    def test_rendering_and_message_execution(self, slack_op_kwargs, 
hook_extra_kwargs, mocked_hook):
+        mock_dbapi_hook = mock.Mock()
+
+        test_df = pd.DataFrame({"a": "1", "b": "2"}, index=[0, 1])
+        get_pandas_df_mock = mock_dbapi_hook.return_value.get_pandas_df
+        get_pandas_df_mock.return_value = test_df
+
+        operator_args = {
+            "sql_conn_id": "snowflake_connection",
+            "slack_webhook_conn_id": "slack_connection",
+            "slack_message": "message: {{ ds }}, {{ results_df }}",
+            "slack_channel": "#test",
+            "sql": "sql {{ ds }}",
+            "dag": self.example_dag,
+            **slack_op_kwargs,
+        }
+        sql_to_slack_operator = self._construct_operator(**operator_args)
+
+        slack_webhook_hook = mocked_hook.return_value
+        sql_to_slack_operator._get_hook = mock_dbapi_hook
+        sql_to_slack_operator.run(start_date=DEFAULT_DATE, 
end_date=DEFAULT_DATE, ignore_ti_state=True)
+
+        # Test that the Slack hook is instantiated with the right parameters
+        
mocked_hook.assert_called_once_with(slack_webhook_conn_id="slack_connection", 
**hook_extra_kwargs)
+
+        # Test that the `SlackWebhookHook.send` method gets run once
+        slack_webhook_hook.send.assert_called_once_with(
+            text=f"message: 2017-01-01, {test_df}",
+            channel="#test",
+        )
+
+    def test_rendering_and_message_execution_with_slack_hook(self, 
mocked_hook):
+        mock_dbapi_hook = mock.Mock()
+
+        test_df = pd.DataFrame({"a": "1", "b": "2"}, index=[0, 1])
+        get_pandas_df_mock = mock_dbapi_hook.return_value.get_pandas_df
+        get_pandas_df_mock.return_value = test_df
+
+        operator_args = {
+            "sql_conn_id": "snowflake_connection",
+            "slack_webhook_conn_id": "slack_connection",
+            "slack_message": "message: {{ ds }}, {{ results_df }}",
+            "slack_channel": "#test",
+            "sql": "sql {{ ds }}",
+            "dag": self.example_dag,
+        }
+        sql_to_slack_operator = self._construct_operator(**operator_args)
+
+        slack_webhook_hook = mocked_hook.return_value
+        sql_to_slack_operator._get_hook = mock_dbapi_hook
+        sql_to_slack_operator.run(start_date=DEFAULT_DATE, 
end_date=DEFAULT_DATE, ignore_ti_state=True)
+
+        # Test that the Slack hook is instantiated with the right parameters
+        mocked_hook.assert_called_once_with(
+            slack_webhook_conn_id="slack_connection", 
**self.default_hook_parameters
+        )
+
+        # Test that the `SlackWebhookHook.send` method gets run once
+        slack_webhook_hook.send.assert_called_once_with(
+            text=f"message: 2017-01-01, {test_df}",
+            channel="#test",
+        )
+
+    @pytest.mark.parametrize(
+        "slack_webhook_conn_id, slack_conn_id, warning_expected, 
expected_conn_id",
+        [
+            pytest.param("foo", None, False, "foo", 
id="slack-webhook-conn-id"),
+            pytest.param(None, "bar", True, "bar", id="slack-conn-id"),
+            pytest.param("spam", "spam", True, "spam", id="mixin-conn-ids"),
+        ],
+    )
+    def test_resolve_conn_ids(self, slack_webhook_conn_id, slack_conn_id, 
warning_expected, expected_conn_id):
+        operator_args = {
+            "sql_conn_id": "snowflake_connection",
+            "slack_message": "message: {{ ds }}, {{ xxxx }}",
+            "sql": "sql {{ ds }}",
+        }
+        if slack_webhook_conn_id:
+            operator_args["slack_webhook_conn_id"] = slack_webhook_conn_id
+        if slack_conn_id:
+            operator_args["slack_conn_id"] = slack_conn_id
+        ctx = (
+            pytest.warns(AirflowProviderDeprecationWarning, match="Parameter 
`slack_conn_id` is deprecated")
+            if warning_expected
+            else nullcontext()
+        )
+
+        with ctx:
+            op = self._construct_operator(**operator_args)
+
+        assert op.slack_webhook_conn_id == expected_conn_id
+        with pytest.warns(AirflowProviderDeprecationWarning, 
match="slack_conn_id` property deprecated"):
+            assert op.slack_conn_id == expected_conn_id
+
+    def test_conflicting_conn_id(self):
+        operator_args = {
+            "sql_conn_id": "snowflake_connection",
+            "slack_message": "message: {{ ds }}, {{ xxxx }}",
+            "sql": "sql {{ ds }}",
+        }
+        with pytest.raises(ValueError, match="Conflicting Connection ids 
provided"), pytest.warns(
+            AirflowProviderDeprecationWarning, match="Parameter 
`slack_conn_id` is deprecated"
+        ):
+            self._construct_operator(**operator_args, 
slack_webhook_conn_id="foo", slack_conn_id="bar")
+
+    def test_non_existing_slack_webhook_conn_id(self):
+        operator_args = {
+            "sql_conn_id": "snowflake_connection",
+            "slack_message": "message: {{ ds }}, {{ xxxx }}",
+            "sql": "sql {{ ds }}",
+        }
+        with pytest.raises(ValueError, match="Got an empty 
`slack_webhook_conn_id` value"):
+            self._construct_operator(**operator_args)
+
+    def test_rendering_custom_df_name_message_execution(self, mocked_hook):
+        mock_dbapi_hook = mock.Mock()
+
+        test_df = pd.DataFrame({"a": "1", "b": "2"}, index=[0, 1])
+        get_pandas_df_mock = mock_dbapi_hook.return_value.get_pandas_df
+        get_pandas_df_mock.return_value = test_df
+
+        operator_args = {
+            "sql_conn_id": "snowflake_connection",
+            "slack_webhook_conn_id": "slack_connection",
+            "slack_message": "message: {{ ds }}, {{ testing }}",
+            "slack_channel": "#test",
+            "sql": "sql {{ ds }}",
+            "results_df_name": "testing",
+            "dag": self.example_dag,
+        }
+        sql_to_slack_operator = self._construct_operator(**operator_args)
+
+        slack_webhook_hook = mocked_hook.return_value
+        sql_to_slack_operator._get_hook = mock_dbapi_hook
+        sql_to_slack_operator.run(start_date=DEFAULT_DATE, 
end_date=DEFAULT_DATE, ignore_ti_state=True)
+
+        # Test that the Slack hook is instantiated with the right parameters
+        mocked_hook.assert_called_once_with(
+            slack_webhook_conn_id="slack_connection", 
**self.default_hook_parameters
+        )
+
+        # Test that the `SlackWebhookHook.send` method gets run once
+        slack_webhook_hook.send.assert_called_once_with(
+            text=f"message: 2017-01-01, {test_df}",
+            channel="#test",
+        )
+
+    def test_hook_params_building(self, mocked_get_connection):
+        mocked_get_connection.return_value = 
Connection(conn_id="snowflake_connection", conn_type="snowflake")
+        hook_params = {
+            "schema": "test_schema",
+            "role": "test_role",
+            "database": "test_database",
+            "warehouse": "test_warehouse",
+        }
+        operator_args = {
+            "sql_conn_id": "dummy_connection",
+            "sql": "sql {{ ds }}",
+            "results_df_name": "xxxx",
+            "sql_hook_params": hook_params,
+            "slack_webhook_conn_id": "slack_connection",
+            "parameters": ["1", "2", "3"],
+            "slack_message": "message: {{ ds }}, {{ xxxx }}",
+            "dag": self.example_dag,
+        }
+        sql_to_slack_operator = 
SqlToSlackWebhookOperator(task_id=TEST_TASK_ID, **operator_args)
+
+        assert sql_to_slack_operator.sql_hook_params == hook_params
+
+    def test_hook_params(self, mocked_get_connection):
+        mocked_get_connection.return_value = 
Connection(conn_id="postgres_test", conn_type="postgres")
+        op = SqlToSlackWebhookOperator(
+            task_id="sql_hook_params",
+            sql_conn_id="postgres_test",
+            slack_webhook_conn_id="slack_connection",
+            sql="SELECT 1",
+            slack_message="message: {{ ds }}, {{ xxxx }}",
+            sql_hook_params={
+                "log_sql": False,
+            },
+        )
+        hook = op._get_hook()
+        assert hook.log_sql == op.sql_hook_params["log_sql"]
+
+    def test_hook_params_snowflake(self, mocked_get_connection):
+        mocked_get_connection.return_value = 
Connection(conn_id="snowflake_default", conn_type="snowflake")
+        op = SqlToSlackWebhookOperator(
+            task_id="snowflake_hook_params",
+            sql_conn_id="snowflake_default",
+            slack_webhook_conn_id="slack_default",
+            results_df_name="xxxx",
+            sql="SELECT 1",
+            slack_message="message: {{ ds }}, {{ xxxx }}",
+            sql_hook_params={
+                "warehouse": "warehouse",
+                "database": "database",
+                "role": "role",
+                "schema": "schema",
+            },
+        )
+        hook = op._get_hook()
+
+        assert hook.warehouse == "warehouse"
+        assert hook.database == "database"
+        assert hook.role == "role"
+        assert hook.schema == "schema"
diff --git a/tests/system/providers/slack/example_sql_to_slack.py 
b/tests/system/providers/slack/example_sql_to_slack.py
index 0a45f27ccf..288bac418b 100644
--- a/tests/system/providers/slack/example_sql_to_slack.py
+++ b/tests/system/providers/slack/example_sql_to_slack.py
@@ -24,10 +24,9 @@ import os
 from datetime import datetime
 
 from airflow import models
-from airflow.providers.slack.transfers.sql_to_slack import SqlToSlackOperator
+from airflow.providers.slack.transfers.sql_to_slack import 
SqlToSlackApiFileOperator
 
-SQL_TABLE = os.environ.get("SQL_TABLE", "test_table")
-SQL_CONN_ID = "presto_default"
+SQL_CONN_ID = os.environ.get("SQL_CONN_ID", "postgres_default")
 ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
 DAG_ID = "example_sql_to_slack"
 
@@ -38,16 +37,18 @@ with models.DAG(
     catchup=False,
     tags=["example"],
 ) as dag:
-    # [START howto_operator_sql_to_slack]
-    SqlToSlackOperator(
-        task_id="presto_to_slack",
+    # [START howto_operator_sql_to_slack_api_file]
+    SqlToSlackApiFileOperator(
+        task_id="sql_to_slack_api_file",
         sql_conn_id=SQL_CONN_ID,
-        sql=f"SELECT col FROM {SQL_TABLE}",
-        slack_channel="my_channel",
-        slack_conn_id="slack_default",
-        slack_message="message: {{ ds }}, {{ results_df }}",
+        sql="SELECT 6 as multiplier, 9 as multiplicand, 42 as answer",
+        slack_channels="C123456",
+        slack_conn_id="slack_api_default",
+        slack_filename="awesome.json.gz",
+        slack_initial_comment="Awesome compressed multiline JSON.",
+        df_kwargs={"orient": "records", "lines": True},
     )
-    # [END howto_operator_sql_to_slack]
+    # [END howto_operator_sql_to_slack_api_file]
 
 
 from tests.system.utils import get_test_run  # noqa: E402
diff --git a/tests/system/providers/slack/example_sql_to_slack.py 
b/tests/system/providers/slack/example_sql_to_slack_webhook.py
similarity index 86%
copy from tests/system/providers/slack/example_sql_to_slack.py
copy to tests/system/providers/slack/example_sql_to_slack_webhook.py
index 0a45f27ccf..fc2e6ee2a9 100644
--- a/tests/system/providers/slack/example_sql_to_slack.py
+++ b/tests/system/providers/slack/example_sql_to_slack_webhook.py
@@ -24,7 +24,7 @@ import os
 from datetime import datetime
 
 from airflow import models
-from airflow.providers.slack.transfers.sql_to_slack import SqlToSlackOperator
+from airflow.providers.slack.transfers.sql_to_slack_webhook import 
SqlToSlackWebhookOperator
 
 SQL_TABLE = os.environ.get("SQL_TABLE", "test_table")
 SQL_CONN_ID = "presto_default"
@@ -38,16 +38,16 @@ with models.DAG(
     catchup=False,
     tags=["example"],
 ) as dag:
-    # [START howto_operator_sql_to_slack]
-    SqlToSlackOperator(
+    # [START howto_operator_sql_to_slack_webhook]
+    SqlToSlackWebhookOperator(
         task_id="presto_to_slack",
         sql_conn_id=SQL_CONN_ID,
         sql=f"SELECT col FROM {SQL_TABLE}",
         slack_channel="my_channel",
-        slack_conn_id="slack_default",
+        slack_webhook_conn_id="slack_default",
         slack_message="message: {{ ds }}, {{ results_df }}",
     )
-    # [END howto_operator_sql_to_slack]
+    # [END howto_operator_sql_to_slack_webhook]
 
 
 from tests.system.utils import get_test_run  # noqa: E402

Reply via email to