potiuk commented on code in PR #26452:
URL: https://github.com/apache/airflow/pull/26452#discussion_r975313534


##########
airflow/providers/slack/hooks/slack_webhook.py:
##########
@@ -19,144 +19,437 @@
 
 import json
 import warnings
+from functools import wraps
+from typing import TYPE_CHECKING, Any, Callable
+from urllib.parse import urlparse
 
+from slack_sdk import WebhookClient
+
+from airflow.compat.functools import cached_property
 from airflow.exceptions import AirflowException
-from airflow.providers.http.hooks.http import HttpHook
+from airflow.hooks.base import BaseHook
+from airflow.models import Connection
+from airflow.providers.slack.utils import ConnectionExtraConfig
+from airflow.utils.log.secrets_masker import mask_secret
+
+if TYPE_CHECKING:
+    from slack_sdk.http_retry import RetryHandler
+
+DEFAULT_SLACK_WEBHOOK_ENDPOINT = "https://hooks.slack.com/services";
+LEGACY_INTEGRATION_PARAMS = ("channel", "username", "icon_emoji", "icon_url")
+
 
+def check_webhook_response(func: Callable) -> Callable:
+    """Function decorator that check WebhookResponse and raise an error if 
status code != 200."""
 
-class SlackWebhookHook(HttpHook):
+    @wraps(func)
+    def wrapper(*args, **kwargs) -> Callable:
+        resp = func(*args, **kwargs)
+        if resp.status_code != 200:
+            raise AirflowException(
+                f"Response body: {resp.body!r}, Status Code: 
{resp.status_code}. "
+                "See: https://api.slack.com/messaging/webhooks#handling_errors";
+            )
+        return resp
+
+    return wrapper
+
+
+class SlackWebhookHook(BaseHook):
     """
-    This hook allows you to post messages to Slack using incoming webhooks.
-    Takes both Slack webhook token directly and connection that has Slack 
webhook token.
-    If both supplied, http_conn_id will be used as base_url,
-    and webhook_token will be taken as endpoint, the relative path of the url.
+    This class provide a thin wrapper around the ``slack_sdk.WebhookClient``.
+    This hook allows you to post messages to Slack by using Incoming Webhooks.
+
+    .. seealso::
+        - :ref:`Slack Incoming Webhook connection 
<howto/connection:slack-incoming-webhook>`
+        - https://api.slack.com/messaging/webhooks
+        - https://slack.dev/python-slack-sdk/webhook/index.html
+
+    .. note::
+        You cannot override the default channel (chosen by the user who 
installed your app),
+        username, or icon when you're using Incoming Webhooks to post messages.
+        Instead, these values will always inherit from the associated Slack 
App configuration
+        (`link 
<https://api.slack.com/messaging/webhooks#advanced_message_formatting>`_).
+        It is possible to change this values only in `Legacy Slack Integration 
Incoming Webhook
+        
<https://api.slack.com/legacy/custom-integrations/messaging/webhooks#legacy-customizations>`_.
 
     .. warning::
-        This hook intend to use `Slack Webhook` connection
+        This hook intend to use `Slack Incoming Webhook` connection
         and might not work correctly with `Slack API` connection.
 
-    Each Slack webhook token can be pre-configured to use a specific channel, 
username and
-    icon. You can override these defaults in this hook.
-
-    :param http_conn_id: connection that has Slack webhook token in the 
password field
-    :param webhook_token: Slack webhook token
-    :param message: The message you want to send on Slack
-    :param attachments: The attachments to send on Slack. Should be a list of
-        dictionaries representing Slack attachments.
-    :param blocks: The blocks to send on Slack. Should be a list of
-        dictionaries representing Slack blocks.
-    :param channel: The channel the message should be posted to
-    :param username: The username to post to slack with
-    :param icon_emoji: The emoji to use as icon for the user posting to Slack
-    :param icon_url: The icon image URL string to use in place of the default 
icon.
-    :param link_names: Whether or not to find and link channel and usernames 
in your
-        message
-    :param proxy: Proxy to use to make the Slack webhook call
+    Examples:
+     .. code-block:: python
+
+        # Create hook
+        hook = SlackWebhookHook(slack_webhook_conn_id="slack_default")
+
+        # Post message in Slack channel by JSON formatted message
+        # See: https://api.slack.com/messaging/webhooks#posting_with_webhooks
+        hook.send_dict({"text": "Hello world!"})
+
+        # Post simple message in Slack channel
+        hook.send_text("Hello world!")
+
+        # Use ``slack_sdk.WebhookClient``
+        hook.client.send(text="Hello world!")
+
+    :param slack_webhook_conn_id: Slack Incoming Webhook connection id
+        that has Incoming Webhook token in the password field.
+    :param timeout: The maximum number of seconds the client will wait to 
connect
+        and receive a response from Slack. If not set than default 
WebhookClient value will use.
+    :param proxy: Proxy to make the Slack Incoming Webhook call.
+    :param retry_handlers: List of handlers to customize retry logic in 
``slack_sdk.WebhookClient``.
+    :param webhook_token: (deprecated) Slack Incoming Webhook token.
+        Use instead Slack Incoming Webhook connection password field.
     """
 
-    conn_name_attr = 'http_conn_id'
+    conn_name_attr = 'slack_webhook_conn_id'
     default_conn_name = 'slack_default'
     conn_type = 'slackwebhook'
-    hook_name = 'Slack Webhook'
+    hook_name = 'Slack Incoming Webhook'
 
     def __init__(
         self,
-        http_conn_id=None,
-        webhook_token=None,
-        message="",
-        attachments=None,
-        blocks=None,
-        channel=None,
-        username=None,
-        icon_emoji=None,
-        icon_url=None,
-        link_names=False,
-        proxy=None,
-        *args,
+        slack_webhook_conn_id: str | None = None,
+        webhook_token: str | None = None,
+        timeout: int | None = None,
+        proxy: str | None = None,
+        retry_handlers: list[RetryHandler] | None = None,
         **kwargs,
     ):
-        super().__init__(http_conn_id=http_conn_id, *args, **kwargs)
-        self.webhook_token = self._get_token(webhook_token, http_conn_id)
-        self.message = message
-        self.attachments = attachments
-        self.blocks = blocks
-        self.channel = channel
-        self.username = username
-        self.icon_emoji = icon_emoji
-        self.icon_url = icon_url
-        self.link_names = link_names
+        super().__init__()
+
+        http_conn_id = kwargs.pop("http_conn_id", None)
+        if http_conn_id:
+            warnings.warn(
+                'Parameter `http_conn_id` is deprecated. Please use 
`slack_webhook_conn_id` instead.',
+                DeprecationWarning,
+                stacklevel=2,
+            )
+            if slack_webhook_conn_id:
+                raise AirflowException("You cannot provide both 
`slack_webhook_conn_id` and `http_conn_id`.")
+            slack_webhook_conn_id = http_conn_id
+
+        if not slack_webhook_conn_id and not webhook_token:
+            raise AirflowException("Either `slack_webhook_conn_id` or 
`webhook_token` should be provided.")
+        if webhook_token:
+            mask_secret(webhook_token)
+            warnings.warn(
+                "Provide `webhook_token` as hook argument deprecated by 
security reason and will be removed "
+                "in a future releases. Please specify it in `Slack Webhook` 
connection.",
+                DeprecationWarning,
+                stacklevel=2,
+            )
+        if not slack_webhook_conn_id:
+            warnings.warn(
+                "You have not set parameter `slack_webhook_conn_id`. Currently 
`Slack Incoming Webhook` "
+                "connection id optional but in a future release it will 
mandatory.",
+                FutureWarning,
+                stacklevel=2,
+            )
+
+        self.slack_webhook_conn_id = slack_webhook_conn_id
+        self.timeout = timeout
         self.proxy = proxy
+        self.retry_handlers = retry_handlers
+        self._webhook_token = webhook_token
 
-    def _get_token(self, token: str, http_conn_id: str | None) -> str:
-        """
-        Given either a manually set token or a conn_id, return the 
webhook_token to use.
+        # Compatibility with previous version of SlackWebhookHook
+        deprecated_class_attrs = []
+        for deprecated_attr in (
+            "message",
+            "attachments",
+            "blocks",
+            "channel",
+            "username",
+            "icon_emoji",
+            "icon_url",
+            "link_names",
+        ):
+            if deprecated_attr in kwargs:
+                deprecated_class_attrs.append(deprecated_attr)
+                setattr(self, deprecated_attr, kwargs.pop(deprecated_attr))
+                if deprecated_attr == "message":
+                    # Slack WebHook Post Request not expected `message` as 
field,
+                    # so we also set "text" attribute which will check by 
SlackWebhookHook._resolve_argument
+                    self.text = getattr(self, deprecated_attr)
 
-        :param token: The manually provided token
-        :param http_conn_id: The conn_id provided
-        :return: webhook_token to use
-        :rtype: str
-        """
-        if token:
-            return token
-        elif http_conn_id:
-            conn = self.get_connection(http_conn_id)
+        if deprecated_class_attrs:
+            warnings.warn(
+                f"Provide {','.join(repr(a) for a in deprecated_class_attrs)} 
as hook argument(s) "
+                f"is deprecated and will be removed in a future releases. "
+                f"Please specify attributes in 
`{self.__class__.__name__}.send` method instead.",
+                DeprecationWarning,
+                stacklevel=2,
+            )
 
-            if getattr(conn, 'password', None):
-                return conn.password
-            else:
-                extra = conn.extra_dejson
-                web_token = extra.get('webhook_token', '')
+        self.extra_client_args = kwargs
+
+    @cached_property
+    def client(self) -> WebhookClient:
+        """Get the underlying slack_sdk.webhook.WebhookClient (cached)."""
+        return WebhookClient(**self._get_conn_params())
 
-                if web_token:
+    def get_conn(self) -> WebhookClient:
+        """Get the underlying slack_sdk.webhook.WebhookClient (cached)."""
+        return self.client
+
+    @cached_property
+    def webhook_token(self) -> str:
+        """Return Slack Webhook Token URL."""
+        warnings.warn(
+            "`SlackHook.webhook_token` property deprecated and will be removed 
in a future releases.",
+            DeprecationWarning,
+            stacklevel=2,
+        )
+        return self._get_conn_params()["url"]
+
+    def _get_conn_params(self) -> dict[str, Any]:
+        """Fetch connection params as a dict and merge it with hook 
parameters."""
+        default_schema, _, default_host = 
DEFAULT_SLACK_WEBHOOK_ENDPOINT.partition("://")
+        if self.slack_webhook_conn_id:
+            conn = self.get_connection(self.slack_webhook_conn_id)
+        else:
+            # If slack_webhook_conn_id not specified, then use connection with 
default schema and host
+            conn = Connection(
+                conn_id=None, conn_type=self.conn_type, host=default_schema, 
password=default_host
+            )
+        extra_config = ConnectionExtraConfig(
+            conn_type=self.conn_type,
+            conn_id=conn.conn_id,
+            extra=conn.extra_dejson,
+        )
+        conn_params: dict[str, Any] = {"retry_handlers": self.retry_handlers}
+
+        webhook_token = None
+        if self._webhook_token:
+            self.log.debug("Retrieving Slack Webhook Token from hook 
attribute.")
+            webhook_token = self._webhook_token
+        elif conn.conn_id:
+            if conn.password:
+                self.log.debug(
+                    "Retrieving Slack Webhook Token from Connection ID %r 
password.",
+                    self.slack_webhook_conn_id,
+                )
+                webhook_token = conn.password
+            else:
+                webhook_token = extra_config.get("webhook_token", None)
+                if webhook_token:
                     warnings.warn(
-                        "'webhook_token' in 'extra' is deprecated. Please use 
'password' field",
+                        f"Found 'webhook_token' in Connection {conn.conn_id!r} 
Extra, this option is "
+                        "deprecated and will be removed in a future releases. 
Please use 'password' field.",
                         DeprecationWarning,
                         stacklevel=2,
                     )
+                    mask_secret(webhook_token)
 
-                return web_token
+        webhook_token = webhook_token or ""
+        if not webhook_token and not conn.host:
+            raise AirflowException("Cannot get token: No valid Slack token nor 
valid Connection ID supplied.")
+        elif webhook_token and "://" in webhook_token:
+            self.log.debug("Retrieving Slack Webhook Token URL from webhook 
token.")
+            url = webhook_token
         else:
-            raise AirflowException('Cannot get token: No valid Slack webhook 
token nor conn_id supplied')
+            self.log.debug("Constructing Slack Webhook Token URL.")
+            if conn.host and "://" in conn.host:
+                base_url = conn.host
+            else:
+                schema = conn.schema if conn.schema else default_schema
+                host = conn.host if conn.host else default_host
+                base_url = f"{schema}://{host}"
 
-    def _build_slack_message(self) -> str:
+            base_url = base_url.rstrip("/")
+            if not webhook_token:
+                parsed_token = (urlparse(base_url).path or "").strip("/")
+                if base_url == DEFAULT_SLACK_WEBHOOK_ENDPOINT or not 
parsed_token:
+                    # Raise an error in case of password not specified and
+                    # 1. Result of constructing base_url equal 
https://hooks.slack.com/services
+                    # 2. Empty url path, e.g. if base_url = 
https://hooks.slack.com
+                    raise AirflowException(
+                        "Cannot get token: No valid Slack token nor valid 
Connection ID supplied."
+                    )
+                mask_secret(parsed_token)

Review Comment:
   good idea.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to