This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e6f445129a Pass additional arguments from Slack's Operators/Notifiers 
to Hooks (#35039)
e6f445129a is described below

commit e6f445129a998eab62d71bd91b4a5f46cd77c1de
Author: Andrey Anshin <[email protected]>
AuthorDate: Thu Oct 19 15:58:48 2023 +0400

    Pass additional arguments from Slack's Operators/Notifiers to Hooks (#35039)
---
 airflow/providers/slack/notifications/slack.py     | 27 ++++++++--
 .../providers/slack/notifications/slack_webhook.py | 14 ++++-
 airflow/providers/slack/operators/slack.py         | 27 ++++++++--
 airflow/providers/slack/operators/slack_webhook.py | 18 ++++++-
 airflow/providers/slack/transfers/sql_to_slack.py  | 30 ++++++++++-
 tests/providers/slack/notifications/test_slack.py  | 30 ++++++++++-
 .../slack/notifications/test_slack_webhook.py      | 26 ++++++++-
 tests/providers/slack/operators/test_slack.py      | 32 ++++++++++--
 .../slack/operators/test_slack_webhook.py          | 24 +++++++--
 .../providers/slack/transfers/test_sql_to_slack.py | 61 ++++++++++++++++++++--
 10 files changed, 261 insertions(+), 28 deletions(-)

diff --git a/airflow/providers/slack/notifications/slack.py 
b/airflow/providers/slack/notifications/slack.py
index f2359b6fcb..5ff63b3628 100644
--- a/airflow/providers/slack/notifications/slack.py
+++ b/airflow/providers/slack/notifications/slack.py
@@ -19,9 +19,10 @@ from __future__ import annotations
 
 import json
 from functools import cached_property
-from typing import Sequence
+from typing import TYPE_CHECKING, Sequence
 
 from airflow.exceptions import AirflowOptionalProviderFeatureException
+from airflow.providers.slack.hooks.slack import SlackHook
 
 try:
     from airflow.notifications.basenotifier import BaseNotifier
@@ -30,7 +31,8 @@ except ImportError:
         "Failed to import BaseNotifier. This feature is only available in 
Airflow versions >= 2.6.0"
     )
 
-from airflow.providers.slack.hooks.slack import SlackHook
+if TYPE_CHECKING:
+    from slack_sdk.http_retry import RetryHandler
 
 ICON_URL: str = 
"https://raw.githubusercontent.com/apache/airflow/2.5.0/airflow/www/static/pin_100.png";
 
@@ -46,6 +48,11 @@ class SlackNotifier(BaseNotifier):
     :param icon_url: The icon to use for the message. Optional
     :param attachments: A list of attachments to send with the message. 
Optional
     :param blocks: A list of blocks to send with the message. Optional
+    :param timeout: The maximum number of seconds the client will wait to 
connect
+        and receive a response from Slack. Optional
+    :param base_url: A string representing the Slack API base URL. Optional
+    :param proxy: Proxy to make the Slack API call. Optional
+    :param retry_handlers: List of handlers to customize retry logic in 
``slack_sdk.WebClient``. Optional
     """
 
     template_fields = ("text", "channel", "username", "attachments", "blocks")
@@ -60,6 +67,10 @@ class SlackNotifier(BaseNotifier):
         icon_url: str = ICON_URL,
         attachments: Sequence = (),
         blocks: Sequence = (),
+        base_url: str | None = None,
+        proxy: str | None = None,
+        timeout: int | None = None,
+        retry_handlers: list[RetryHandler] | None = None,
     ):
         super().__init__()
         self.slack_conn_id = slack_conn_id
@@ -69,11 +80,21 @@ class SlackNotifier(BaseNotifier):
         self.icon_url = icon_url
         self.attachments = attachments
         self.blocks = blocks
+        self.base_url = base_url
+        self.timeout = timeout
+        self.proxy = proxy
+        self.retry_handlers = retry_handlers
 
     @cached_property
     def hook(self) -> SlackHook:
         """Slack Hook."""
-        return SlackHook(slack_conn_id=self.slack_conn_id)
+        return SlackHook(
+            slack_conn_id=self.slack_conn_id,
+            base_url=self.base_url,
+            timeout=self.timeout,
+            proxy=self.proxy,
+            retry_handlers=self.retry_handlers,
+        )
 
     def notify(self, context):
         """Send a message to a Slack Channel."""
diff --git a/airflow/providers/slack/notifications/slack_webhook.py 
b/airflow/providers/slack/notifications/slack_webhook.py
index 67616e5a51..0fab447b6e 100644
--- a/airflow/providers/slack/notifications/slack_webhook.py
+++ b/airflow/providers/slack/notifications/slack_webhook.py
@@ -18,6 +18,7 @@
 from __future__ import annotations
 
 from functools import cached_property
+from typing import TYPE_CHECKING
 
 from airflow.exceptions import AirflowOptionalProviderFeatureException
 from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
@@ -29,6 +30,9 @@ except ImportError:
         "Failed to import BaseNotifier. This feature is only available in 
Airflow versions >= 2.6.0"
     )
 
+if TYPE_CHECKING:
+    from slack_sdk.http_retry import RetryHandler
+
 
 class SlackWebhookNotifier(BaseNotifier):
     """
@@ -45,9 +49,10 @@ class SlackWebhookNotifier(BaseNotifier):
     :param unfurl_links: Option to indicate whether text url should unfurl. 
Optional
     :param unfurl_media: Option to indicate whether media url should unfurl. 
Optional
     :param timeout: The maximum number of seconds the client will wait to 
connect. Optional
-        and receive a response from Slack. If not set than default 
WebhookClient value will use.
+        and receive a response from Slack. Optional
     :param proxy: Proxy to make the Slack Incoming Webhook call. Optional
     :param attachments: A list of attachments to send with the message. 
Optional
+    :param retry_handlers: List of handlers to customize retry logic in 
``slack_sdk.WebhookClient``. Optional
     """
 
     template_fields = ("slack_webhook_conn_id", "text", "attachments", 
"blocks", "proxy", "timeout")
@@ -63,6 +68,7 @@ class SlackWebhookNotifier(BaseNotifier):
         proxy: str | None = None,
         timeout: int | None = None,
         attachments: list | None = None,
+        retry_handlers: list[RetryHandler] | None = None,
     ):
         super().__init__()
         self.slack_webhook_conn_id = slack_webhook_conn_id
@@ -73,12 +79,16 @@ class SlackWebhookNotifier(BaseNotifier):
         self.unfurl_media = unfurl_media
         self.timeout = timeout
         self.proxy = proxy
+        self.retry_handlers = retry_handlers
 
     @cached_property
     def hook(self) -> SlackWebhookHook:
         """Slack Incoming Webhook Hook."""
         return SlackWebhookHook(
-            slack_webhook_conn_id=self.slack_webhook_conn_id, 
proxy=self.proxy, timeout=self.timeout
+            slack_webhook_conn_id=self.slack_webhook_conn_id,
+            proxy=self.proxy,
+            timeout=self.timeout,
+            retry_handlers=self.retry_handlers,
         )
 
     def notify(self, context):
diff --git a/airflow/providers/slack/operators/slack.py 
b/airflow/providers/slack/operators/slack.py
index adf21a9372..ffefbbac3a 100644
--- a/airflow/providers/slack/operators/slack.py
+++ b/airflow/providers/slack/operators/slack.py
@@ -20,12 +20,15 @@ from __future__ import annotations
 import json
 import warnings
 from functools import cached_property
-from typing import Any, Sequence
+from typing import TYPE_CHECKING, Any, Sequence
 
 from airflow.exceptions import AirflowProviderDeprecationWarning
 from airflow.models import BaseOperator
 from airflow.providers.slack.hooks.slack import SlackHook
 
+if TYPE_CHECKING:
+    from slack_sdk.http_retry import RetryHandler
+
 
 class SlackAPIOperator(BaseOperator):
     """Base Slack Operator class.
@@ -34,7 +37,11 @@ class SlackAPIOperator(BaseOperator):
         which its password is Slack API token.
     :param method: The Slack API Method to Call 
(https://api.slack.com/methods). Optional
     :param api_params: API Method call parameters 
(https://api.slack.com/methods). Optional
-    :param client_args: Slack Hook parameters. Optional. Check 
airflow.providers.slack.hooks.SlackHook
+    :param timeout: The maximum number of seconds the client will wait to 
connect
+        and receive a response from Slack. Optional
+    :param base_url: A string representing the Slack API base URL. Optional
+    :param proxy: Proxy to make the Slack API call. Optional
+    :param retry_handlers: List of handlers to customize retry logic in 
``slack_sdk.WebClient``. Optional
     """
 
     def __init__(
@@ -43,17 +50,31 @@ class SlackAPIOperator(BaseOperator):
         slack_conn_id: str = SlackHook.default_conn_name,
         method: str | None = None,
         api_params: dict | None = None,
+        base_url: str | None = None,
+        proxy: str | None = None,
+        timeout: int | None = None,
+        retry_handlers: list[RetryHandler] | None = None,
         **kwargs,
     ) -> None:
         super().__init__(**kwargs)
         self.slack_conn_id = slack_conn_id
         self.method = method
         self.api_params = api_params
+        self.base_url = base_url
+        self.timeout = timeout
+        self.proxy = proxy
+        self.retry_handlers = retry_handlers
 
     @cached_property
     def hook(self) -> SlackHook:
         """Slack Hook."""
-        return SlackHook(slack_conn_id=self.slack_conn_id)
+        return SlackHook(
+            slack_conn_id=self.slack_conn_id,
+            base_url=self.base_url,
+            timeout=self.timeout,
+            proxy=self.proxy,
+            retry_handlers=self.retry_handlers,
+        )
 
     def construct_api_call_params(self) -> Any:
         """API call parameters used by the execute function.
diff --git a/airflow/providers/slack/operators/slack_webhook.py 
b/airflow/providers/slack/operators/slack_webhook.py
index f386adf6b8..9a36a806e9 100644
--- a/airflow/providers/slack/operators/slack_webhook.py
+++ b/airflow/providers/slack/operators/slack_webhook.py
@@ -24,6 +24,8 @@ from airflow.models import BaseOperator
 from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
 
 if TYPE_CHECKING:
+    from slack_sdk.http_retry import RetryHandler
+
     from airflow.utils.context import Context
 
 
@@ -51,7 +53,10 @@ class SlackWebhookOperator(BaseOperator):
     :param username: The username to post to slack with
     :param icon_emoji: The emoji to use as icon for the user posting to Slack
     :param icon_url: The icon image URL string to use in place of the default 
icon.
-    :param proxy: Proxy to use to make the Slack webhook call
+    :param proxy: Proxy to make the Slack Incoming Webhook call. Optional
+    :param timeout: The maximum number of seconds the client will wait to 
connect
+        and receive a response from Slack. Optional
+    :param retry_handlers: List of handlers to customize retry logic in 
``slack_sdk.WebhookClient``. Optional
     """
 
     template_fields: Sequence[str] = (
@@ -75,6 +80,8 @@ class SlackWebhookOperator(BaseOperator):
         icon_emoji: str | None = None,
         icon_url: str | None = None,
         proxy: str | None = None,
+        timeout: int | None = None,
+        retry_handlers: list[RetryHandler] | None = None,
         **kwargs,
     ) -> None:
         super().__init__(**kwargs)
@@ -87,11 +94,18 @@ class SlackWebhookOperator(BaseOperator):
         self.username = username
         self.icon_emoji = icon_emoji
         self.icon_url = icon_url
+        self.timeout = timeout
+        self.retry_handlers = retry_handlers
 
     @cached_property
     def hook(self) -> SlackWebhookHook:
         """Create and return an SlackWebhookHook (cached)."""
-        return 
SlackWebhookHook(slack_webhook_conn_id=self.slack_webhook_conn_id, 
proxy=self.proxy)
+        return SlackWebhookHook(
+            slack_webhook_conn_id=self.slack_webhook_conn_id,
+            proxy=self.proxy,
+            timeout=self.timeout,
+            retry_handlers=self.retry_handlers,
+        )
 
     def execute(self, context: Context) -> None:
         """Call the SlackWebhookHook to post the provided Slack message."""
diff --git a/airflow/providers/slack/transfers/sql_to_slack.py 
b/airflow/providers/slack/transfers/sql_to_slack.py
index 537abe3b9b..9c127d77a1 100644
--- a/airflow/providers/slack/transfers/sql_to_slack.py
+++ b/airflow/providers/slack/transfers/sql_to_slack.py
@@ -30,6 +30,7 @@ from airflow.providers.slack.utils import parse_filename
 
 if TYPE_CHECKING:
     import pandas as pd
+    from slack_sdk.http_retry import RetryHandler
 
     from airflow.providers.common.sql.hooks.sql import DbApiHook
     from airflow.utils.context import Context
@@ -44,6 +45,10 @@ class BaseSqlToSlackOperator(BaseOperator):
     :param sql_hook_params: Extra config params to be passed to the underlying 
hook.
         Should match the desired hook constructor params.
     :param parameters: The parameters to pass to the SQL query.
+    :param slack_proxy: Proxy to make the Slack Incoming Webhook / API calls. 
Optional
+    :param slack_timeout: The maximum number of seconds the client will wait 
to connect
+        and receive a response from Slack. Optional
+    :param slack_retry_handlers: List of handlers to customize retry logic. 
Optional
     """
 
     def __init__(
@@ -53,6 +58,9 @@ class BaseSqlToSlackOperator(BaseOperator):
         sql_conn_id: str,
         sql_hook_params: dict | None = None,
         parameters: Iterable | Mapping[str, Any] | None = None,
+        slack_proxy: str | None = None,
+        slack_timeout: int | None = None,
+        slack_retry_handlers: list[RetryHandler] | None = None,
         **kwargs,
     ):
         super().__init__(**kwargs)
@@ -60,6 +68,9 @@ class BaseSqlToSlackOperator(BaseOperator):
         self.sql_hook_params = sql_hook_params
         self.sql = sql
         self.parameters = parameters
+        self.slack_proxy = slack_proxy
+        self.slack_timeout = slack_timeout
+        self.slack_retry_handlers = slack_retry_handlers
 
     def _get_hook(self) -> DbApiHook:
         self.log.debug("Get connection for %s", self.sql_conn_id)
@@ -146,7 +157,12 @@ class SqlToSlackOperator(BaseSqlToSlackOperator):
         slack_hook.send(text=self.slack_message, channel=self.slack_channel)
 
     def _get_slack_hook(self) -> SlackWebhookHook:
-        return SlackWebhookHook(slack_webhook_conn_id=self.slack_conn_id)
+        return SlackWebhookHook(
+            slack_webhook_conn_id=self.slack_conn_id,
+            proxy=self.slack_proxy,
+            timeout=self.slack_timeout,
+            retry_handlers=self.slack_retry_handlers,
+        )
 
     def render_template_fields(self, context, jinja_env=None) -> None:
         # If this is the first render of the template fields, exclude 
slack_message from rendering since
@@ -197,8 +213,10 @@ class SqlToSlackApiFileOperator(BaseSqlToSlackOperator):
          If omitting this parameter, then file will send to workspace.
     :param slack_initial_comment: The message text introducing the file in 
specified ``slack_channels``.
     :param slack_title: Title of file.
+    :param slack_base_url: A string representing the Slack API base URL. 
Optional
     :param df_kwargs: Keyword arguments forwarded to 
``pandas.DataFrame.to_{format}()`` method.
 
+
     Example:
      .. code-block:: python
 
@@ -241,6 +259,7 @@ class SqlToSlackApiFileOperator(BaseSqlToSlackOperator):
         slack_channels: str | Sequence[str] | None = None,
         slack_initial_comment: str | None = None,
         slack_title: str | None = None,
+        slack_base_url: str | None = None,
         df_kwargs: dict | None = None,
         **kwargs,
     ):
@@ -252,6 +271,7 @@ class SqlToSlackApiFileOperator(BaseSqlToSlackOperator):
         self.slack_channels = slack_channels
         self.slack_initial_comment = slack_initial_comment
         self.slack_title = slack_title
+        self.slack_base_url = slack_base_url
         self.df_kwargs = df_kwargs or {}
 
     def execute(self, context: Context) -> None:
@@ -261,7 +281,13 @@ class SqlToSlackApiFileOperator(BaseSqlToSlackOperator):
             supported_file_formats=self.SUPPORTED_FILE_FORMATS,
         )
 
-        slack_hook = SlackHook(slack_conn_id=self.slack_conn_id)
+        slack_hook = SlackHook(
+            slack_conn_id=self.slack_conn_id,
+            base_url=self.slack_base_url,
+            timeout=self.slack_timeout,
+            proxy=self.slack_proxy,
+            retry_handlers=self.slack_retry_handlers,
+        )
         with NamedTemporaryFile(mode="w+", suffix=f"_{self.slack_filename}") 
as fp:
             # tempfile.NamedTemporaryFile used only for create and remove 
temporary file,
             # pandas will open file in correct mode itself depend on file type.
diff --git a/tests/providers/slack/notifications/test_slack.py 
b/tests/providers/slack/notifications/test_slack.py
index 295a01d092..0e10bacd30 100644
--- a/tests/providers/slack/notifications/test_slack.py
+++ b/tests/providers/slack/notifications/test_slack.py
@@ -19,17 +19,42 @@ from __future__ import annotations
 
 from unittest import mock
 
+import pytest
+
 from airflow.operators.empty import EmptyOperator
 from airflow.providers.slack.notifications.slack import SlackNotifier, 
send_slack_notification
 
+DEFAULT_HOOKS_PARAMETERS = {"base_url": None, "timeout": None, "proxy": None, 
"retry_handlers": None}
+
 
 class TestSlackNotifier:
     @mock.patch("airflow.providers.slack.notifications.slack.SlackHook")
-    def test_slack_notifier(self, mock_slack_hook, dag_maker):
+    @pytest.mark.parametrize(
+        "extra_kwargs, hook_extra_kwargs",
+        [
+            pytest.param({}, DEFAULT_HOOKS_PARAMETERS, 
id="default-hook-parameters"),
+            pytest.param(
+                {
+                    "base_url": "https://foo.bar";,
+                    "timeout": 42,
+                    "proxy": "http://spam.egg";,
+                    "retry_handlers": [],
+                },
+                {
+                    "base_url": "https://foo.bar";,
+                    "timeout": 42,
+                    "proxy": "http://spam.egg";,
+                    "retry_handlers": [],
+                },
+                id="with-extra-hook-parameters",
+            ),
+        ],
+    )
+    def test_slack_notifier(self, mock_slack_hook, dag_maker, extra_kwargs, 
hook_extra_kwargs):
         with dag_maker("test_slack_notifier") as dag:
             EmptyOperator(task_id="task1")
 
-        notifier = send_slack_notification(text="test")
+        notifier = send_slack_notification(slack_conn_id="test_conn_id", 
text="test", **extra_kwargs)
         notifier({"dag": dag})
         mock_slack_hook.return_value.call.assert_called_once_with(
             "chat.postMessage",
@@ -43,6 +68,7 @@ class TestSlackNotifier:
                 "blocks": "[]",
             },
         )
+        mock_slack_hook.assert_called_once_with(slack_conn_id="test_conn_id", 
**hook_extra_kwargs)
 
     @mock.patch("airflow.providers.slack.notifications.slack.SlackHook")
     def test_slack_notifier_with_notifier_class(self, mock_slack_hook, 
dag_maker):
diff --git a/tests/providers/slack/notifications/test_slack_webhook.py 
b/tests/providers/slack/notifications/test_slack_webhook.py
index e595766bfc..12f27c4b23 100644
--- a/tests/providers/slack/notifications/test_slack_webhook.py
+++ b/tests/providers/slack/notifications/test_slack_webhook.py
@@ -19,21 +19,42 @@ from __future__ import annotations
 
 from unittest import mock
 
+import pytest
+
 from airflow.operators.empty import EmptyOperator
 from airflow.providers.slack.notifications.slack_webhook import (
     SlackWebhookNotifier,
     send_slack_webhook_notification,
 )
 
+DEFAULT_HOOKS_PARAMETERS = {"timeout": None, "proxy": None, "retry_handlers": 
None}
+
 
 class TestSlackNotifier:
     def test_class_and_notifier_are_same(self):
         assert send_slack_webhook_notification is SlackWebhookNotifier
 
     
@mock.patch("airflow.providers.slack.notifications.slack_webhook.SlackWebhookHook")
-    def test_slack_webhook_notifier(self, mock_slack_hook):
+    @pytest.mark.parametrize(
+        "slack_op_kwargs, hook_extra_kwargs",
+        [
+            pytest.param({}, DEFAULT_HOOKS_PARAMETERS, 
id="default-hook-parameters"),
+            pytest.param(
+                {"timeout": 42, "proxy": "http://spam.egg";, "retry_handlers": 
[]},
+                {"timeout": 42, "proxy": "http://spam.egg";, "retry_handlers": 
[]},
+                id="with-extra-hook-parameters",
+            ),
+        ],
+    )
+    def test_slack_webhook_notifier(self, mock_slack_hook, slack_op_kwargs, 
hook_extra_kwargs):
         notifier = send_slack_webhook_notification(
-            text="foo-bar", blocks="spam-egg", attachments="baz-qux", 
unfurl_links=True, unfurl_media=False
+            slack_webhook_conn_id="test_conn_id",
+            text="foo-bar",
+            blocks="spam-egg",
+            attachments="baz-qux",
+            unfurl_links=True,
+            unfurl_media=False,
+            **slack_op_kwargs,
         )
         notifier.notify({})
         mock_slack_hook.return_value.send.assert_called_once_with(
@@ -43,6 +64,7 @@ class TestSlackNotifier:
             unfurl_media=False,
             attachments="baz-qux",
         )
+        
mock_slack_hook.assert_called_once_with(slack_webhook_conn_id="test_conn_id", 
**hook_extra_kwargs)
 
     
@mock.patch("airflow.providers.slack.notifications.slack_webhook.SlackWebhookHook")
     def test_slack_webhook_templated(self, mock_slack_hook, dag_maker):
diff --git a/tests/providers/slack/operators/test_slack.py 
b/tests/providers/slack/operators/test_slack.py
index 8dfebc9b2d..52a28ef4d8 100644
--- a/tests/providers/slack/operators/test_slack.py
+++ b/tests/providers/slack/operators/test_slack.py
@@ -31,17 +31,43 @@ from airflow.providers.slack.operators.slack import (
 )
 
 SLACK_API_TEST_CONNECTION_ID = "test_slack_conn_id"
+DEFAULT_HOOKS_PARAMETERS = {"base_url": None, "timeout": None, "proxy": None, 
"retry_handlers": None}
 
 
 class TestSlackAPIOperator:
     @mock.patch("airflow.providers.slack.operators.slack.SlackHook")
-    def test_hook(self, mock_slack_hook_cls):
+    @pytest.mark.parametrize(
+        "slack_op_kwargs, hook_extra_kwargs",
+        [
+            pytest.param({}, DEFAULT_HOOKS_PARAMETERS, 
id="default-hook-parameters"),
+            pytest.param(
+                {
+                    "base_url": "https://foo.bar";,
+                    "timeout": 42,
+                    "proxy": "http://spam.egg";,
+                    "retry_handlers": [],
+                },
+                {
+                    "base_url": "https://foo.bar";,
+                    "timeout": 42,
+                    "proxy": "http://spam.egg";,
+                    "retry_handlers": [],
+                },
+                id="with-extra-hook-parameters",
+            ),
+        ],
+    )
+    def test_hook(self, mock_slack_hook_cls, slack_op_kwargs, 
hook_extra_kwargs):
         mock_slack_hook = mock_slack_hook_cls.return_value
-        op = SlackAPIOperator(task_id="test-mask-token", 
slack_conn_id=SLACK_API_TEST_CONNECTION_ID)
+        op = SlackAPIOperator(
+            task_id="test-mask-token", 
slack_conn_id=SLACK_API_TEST_CONNECTION_ID, **slack_op_kwargs
+        )
         hook = op.hook
         assert hook == mock_slack_hook
         assert hook is op.hook
-        
mock_slack_hook_cls.assert_called_once_with(slack_conn_id=SLACK_API_TEST_CONNECTION_ID)
+        mock_slack_hook_cls.assert_called_once_with(
+            slack_conn_id=SLACK_API_TEST_CONNECTION_ID, **hook_extra_kwargs
+        )
 
 
 class TestSlackAPIPostOperator:
diff --git a/tests/providers/slack/operators/test_slack_webhook.py 
b/tests/providers/slack/operators/test_slack_webhook.py
index abc8900c82..abb9bf70bc 100644
--- a/tests/providers/slack/operators/test_slack_webhook.py
+++ b/tests/providers/slack/operators/test_slack_webhook.py
@@ -23,6 +23,8 @@ import pytest
 
 from airflow.providers.slack.operators.slack_webhook import 
SlackWebhookOperator
 
+DEFAULT_HOOKS_PARAMETERS = {"timeout": None, "proxy": None, "retry_handlers": 
None}
+
 
 class TestSlackWebhookOperator:
     def setup_method(self):
@@ -34,14 +36,28 @@ class TestSlackWebhookOperator:
             "icon_url": None,
         }
 
-    @pytest.mark.parametrize("proxy", [None, "https://localhost:9999";])
     
@mock.patch("airflow.providers.slack.operators.slack_webhook.SlackWebhookHook")
-    def test_hook(self, mock_slackwebhook_cls, proxy):
+    @pytest.mark.parametrize(
+        "slack_op_kwargs, hook_extra_kwargs",
+        [
+            pytest.param({}, DEFAULT_HOOKS_PARAMETERS, 
id="default-hook-parameters"),
+            pytest.param(
+                {"timeout": 42, "proxy": "http://spam.egg";, "retry_handlers": 
[]},
+                {"timeout": 42, "proxy": "http://spam.egg";, "retry_handlers": 
[]},
+                id="with-extra-hook-parameters",
+            ),
+        ],
+    )
+    def test_hook(self, mock_slackwebhook_cls, slack_op_kwargs, 
hook_extra_kwargs):
         """Test get cached ``SlackWebhookHook`` hook."""
-        op = SlackWebhookOperator(task_id="test_hook", 
slack_webhook_conn_id="test_conn_id", proxy=proxy)
+        op = SlackWebhookOperator(
+            task_id="test_hook", slack_webhook_conn_id="test_conn_id", 
**slack_op_kwargs
+        )
         hook = op.hook
         assert hook is op.hook, "Expected cached hook"
-        
mock_slackwebhook_cls.assert_called_once_with(slack_webhook_conn_id="test_conn_id",
 proxy=proxy)
+        mock_slackwebhook_cls.assert_called_once_with(
+            slack_webhook_conn_id="test_conn_id", **hook_extra_kwargs
+        )
 
     def test_assert_templated_fields(self):
         """Test expected templated fields."""
diff --git a/tests/providers/slack/transfers/test_sql_to_slack.py 
b/tests/providers/slack/transfers/test_sql_to_slack.py
index 07e6e87aba..c2d698719f 100644
--- a/tests/providers/slack/transfers/test_sql_to_slack.py
+++ b/tests/providers/slack/transfers/test_sql_to_slack.py
@@ -109,6 +109,7 @@ class TestBaseSqlToSlackOperator:
 class TestSqlToSlackOperator:
     def setup_method(self):
         self.example_dag = DAG(TEST_DAG_ID, start_date=DEFAULT_DATE)
+        self.default_hook_parameters = {"timeout": None, "proxy": None, 
"retry_handlers": None}
 
     @staticmethod
     def _construct_operator(**kwargs):
@@ -116,7 +117,20 @@ class TestSqlToSlackOperator:
         return operator
 
     
@mock.patch("airflow.providers.slack.transfers.sql_to_slack.SlackWebhookHook")
-    def test_rendering_and_message_execution(self, mock_slack_hook_class):
+    @pytest.mark.parametrize(
+        "slack_op_kwargs, hook_extra_kwargs",
+        [
+            pytest.param(
+                {}, {"timeout": None, "proxy": None, "retry_handlers": None}, 
id="default-hook-parameters"
+            ),
+            pytest.param(
+                {"slack_timeout": 42, "slack_proxy": "http://spam.egg";, 
"slack_retry_handlers": []},
+                {"timeout": 42, "proxy": "http://spam.egg";, "retry_handlers": 
[]},
+                id="with-extra-hook-parameters",
+            ),
+        ],
+    )
+    def test_rendering_and_message_execution(self, mock_slack_hook_class, 
slack_op_kwargs, hook_extra_kwargs):
         mock_dbapi_hook = mock.Mock()
 
         test_df = pd.DataFrame({"a": "1", "b": "2"}, index=[0, 1])
@@ -130,6 +144,7 @@ class TestSqlToSlackOperator:
             "slack_channel": "#test",
             "sql": "sql {{ ds }}",
             "dag": self.example_dag,
+            **slack_op_kwargs,
         }
         sql_to_slack_operator = self._construct_operator(**operator_args)
 
@@ -138,7 +153,9 @@ class TestSqlToSlackOperator:
         sql_to_slack_operator.run(start_date=DEFAULT_DATE, 
end_date=DEFAULT_DATE, ignore_ti_state=True)
 
         # Test that the Slack hook is instantiated with the right parameters
-        
mock_slack_hook_class.assert_called_once_with(slack_webhook_conn_id="slack_connection")
+        mock_slack_hook_class.assert_called_once_with(
+            slack_webhook_conn_id="slack_connection", **hook_extra_kwargs
+        )
 
         # Test that the `SlackWebhookHook.send` method gets run once
         slack_webhook_hook.send.assert_called_once_with(
@@ -169,7 +186,9 @@ class TestSqlToSlackOperator:
         sql_to_slack_operator.run(start_date=DEFAULT_DATE, 
end_date=DEFAULT_DATE, ignore_ti_state=True)
 
         # Test that the Slack hook is instantiated with the right parameters
-        
mock_slack_hook_class.assert_called_once_with(slack_webhook_conn_id="slack_connection")
+        mock_slack_hook_class.assert_called_once_with(
+            slack_webhook_conn_id="slack_connection", 
**self.default_hook_parameters
+        )
 
         # Test that the `SlackWebhookHook.send` method gets run once
         slack_webhook_hook.send.assert_called_once_with(
@@ -210,7 +229,9 @@ class TestSqlToSlackOperator:
         sql_to_slack_operator.run(start_date=DEFAULT_DATE, 
end_date=DEFAULT_DATE, ignore_ti_state=True)
 
         # Test that the Slack hook is instantiated with the right parameters
-        
mock_slack_hook_class.assert_called_once_with(slack_webhook_conn_id="slack_connection")
+        mock_slack_hook_class.assert_called_once_with(
+            slack_webhook_conn_id="slack_connection", 
**self.default_hook_parameters
+        )
 
         # Test that the `SlackWebhookHook.send` method gets run once
         slack_webhook_hook.send.assert_called_once_with(
@@ -308,6 +329,31 @@ class TestSqlToSlackApiFileOperator:
     @pytest.mark.parametrize("channels", ["#random", "#random,#general", None])
     @pytest.mark.parametrize("initial_comment", [None, "Test Comment"])
     @pytest.mark.parametrize("title", [None, "Test File Title"])
+    @pytest.mark.parametrize(
+        "slack_op_kwargs, hook_extra_kwargs",
+        [
+            pytest.param(
+                {},
+                {"base_url": None, "timeout": None, "proxy": None, 
"retry_handlers": None},
+                id="default-hook-parameters",
+            ),
+            pytest.param(
+                {
+                    "slack_base_url": "https://foo.bar";,
+                    "slack_timeout": 42,
+                    "slack_proxy": "http://spam.egg";,
+                    "slack_retry_handlers": [],
+                },
+                {
+                    "base_url": "https://foo.bar";,
+                    "timeout": 42,
+                    "proxy": "http://spam.egg";,
+                    "retry_handlers": [],
+                },
+                id="with-extra-hook-parameters",
+            ),
+        ],
+    )
     def test_send_file(
         self,
         mock_slack_hook_cls,
@@ -318,6 +364,8 @@ class TestSqlToSlackApiFileOperator:
         channels,
         initial_comment,
         title,
+        slack_op_kwargs: dict,
+        hook_extra_kwargs: dict,
     ):
         # Mock Hook
         mock_send_file = mock.MagicMock()
@@ -337,11 +385,14 @@ class TestSqlToSlackApiFileOperator:
             "slack_initial_comment": initial_comment,
             "slack_title": title,
             "df_kwargs": df_kwargs,
+            **slack_op_kwargs,
         }
         op = SqlToSlackApiFileOperator(task_id="test_send_file", **op_kwargs)
         op.execute(mock.MagicMock())
 
-        
mock_slack_hook_cls.assert_called_once_with(slack_conn_id="expected-test-slack-conn-id")
+        mock_slack_hook_cls.assert_called_once_with(
+            slack_conn_id="expected-test-slack-conn-id", **hook_extra_kwargs
+        )
         mock_get_query_results.assert_called_once_with()
         mock_df_output_method.assert_called_once_with(mock.ANY, **(df_kwargs 
or {}))
         mock_send_file.assert_called_once_with(

Reply via email to