This is an automated email from the ASF dual-hosted git repository.
taragolis pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 0aee6813db Fix set deprecated slack operators arguments in
`MappedOperator` (#38345)
0aee6813db is described below
commit 0aee6813db30f72c4e09610c6212f50ff2250cac
Author: Andrey Anshin <[email protected]>
AuthorDate: Fri Mar 22 12:55:02 2024 +0400
Fix set deprecated slack operators arguments in `MappedOperator` (#38345)
---
airflow/providers/slack/operators/slack.py | 6 ++-
.../slack/transfers/sql_to_slack_webhook.py | 6 ++-
tests/providers/slack/operators/test_slack.py | 53 ++++++++++++++++++++++
.../slack/transfers/test_sql_to_slack_webhook.py | 47 +++++++++++++++++++
4 files changed, 108 insertions(+), 4 deletions(-)
diff --git a/airflow/providers/slack/operators/slack.py
b/airflow/providers/slack/operators/slack.py
index 2fdafe3689..03c5f01ab7 100644
--- a/airflow/providers/slack/operators/slack.py
+++ b/airflow/providers/slack/operators/slack.py
@@ -27,6 +27,7 @@ from typing_extensions import Literal
from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.models import BaseOperator
from airflow.providers.slack.hooks.slack import SlackHook
+from airflow.utils.types import NOTSET, ArgNotSet
if TYPE_CHECKING:
from slack_sdk.http_retry import RetryHandler
@@ -226,9 +227,10 @@ class SlackAPIFileOperator(SlackAPIOperator):
content: str | None = None,
title: str | None = None,
method_version: Literal["v1", "v2"] = "v1",
+ channel: str | Sequence[str] | None | ArgNotSet = NOTSET,
**kwargs,
) -> None:
- if channel := kwargs.pop("channel", None):
+ if channel is not NOTSET:
warnings.warn(
"Argument `channel` is deprecated and will removed in a future
releases. "
"Please use `channels` instead.",
@@ -237,7 +239,7 @@ class SlackAPIFileOperator(SlackAPIOperator):
)
if channels:
raise ValueError(f"Cannot set both arguments:
channel={channel!r} and channels={channels!r}.")
- channels = channel
+ channels = channel # type: ignore[assignment]
super().__init__(method="files.upload", **kwargs)
self.channels = channels
diff --git a/airflow/providers/slack/transfers/sql_to_slack_webhook.py
b/airflow/providers/slack/transfers/sql_to_slack_webhook.py
index 5252661316..82cbd2a494 100644
--- a/airflow/providers/slack/transfers/sql_to_slack_webhook.py
+++ b/airflow/providers/slack/transfers/sql_to_slack_webhook.py
@@ -25,6 +25,7 @@ from tabulate import tabulate
from airflow.exceptions import AirflowException,
AirflowProviderDeprecationWarning
from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
from airflow.providers.slack.transfers.base_sql_to_slack import
BaseSqlToSlackOperator
+from airflow.utils.types import NOTSET, ArgNotSet
if TYPE_CHECKING:
from airflow.utils.context import Context
@@ -87,9 +88,10 @@ class SqlToSlackWebhookOperator(BaseSqlToSlackOperator):
slack_message: str,
results_df_name: str = "results_df",
parameters: list | tuple | Mapping[str, Any] | None = None,
+ slack_conn_id: str | ArgNotSet = NOTSET,
**kwargs,
) -> None:
- if slack_conn_id := kwargs.pop("slack_conn_id", None):
+ if slack_conn_id is not NOTSET:
warnings.warn(
"Parameter `slack_conn_id` is deprecated because this
attribute initially intend to use with "
"Slack API however this operator provided integration with
Slack Incoming Webhook. "
@@ -102,7 +104,7 @@ class SqlToSlackWebhookOperator(BaseSqlToSlackOperator):
"Conflicting Connection ids provided, "
f"slack_webhook_conn_id={slack_webhook_conn_id!r},
slack_conn_id={slack_conn_id!r}."
)
- slack_webhook_conn_id = slack_conn_id
+ slack_webhook_conn_id = slack_conn_id # type: ignore[assignment]
if not slack_webhook_conn_id:
raise ValueError("Got an empty `slack_webhook_conn_id` value.")
super().__init__(
diff --git a/tests/providers/slack/operators/test_slack.py
b/tests/providers/slack/operators/test_slack.py
index b389f12609..ce9a96a066 100644
--- a/tests/providers/slack/operators/test_slack.py
+++ b/tests/providers/slack/operators/test_slack.py
@@ -28,6 +28,7 @@ from airflow.providers.slack.operators.slack import (
SlackAPIOperator,
SlackAPIPostOperator,
)
+from airflow.utils.task_instance_session import
set_current_task_instance_session
SLACK_API_TEST_CONNECTION_ID = "test_slack_conn_id"
DEFAULT_HOOKS_PARAMETERS = {"base_url": None, "timeout": None, "proxy": None,
"retry_handlers": None}
@@ -308,3 +309,55 @@ class TestSlackAPIFileOperator:
channel="#random",
channels="#general",
)
+
+ @pytest.mark.db_test
+ @pytest.mark.parametrize(
+ "channel",
+ [
+ pytest.param("#contributors", id="single-channel"),
+ pytest.param(["#random", "#general"], id="multiple-channels"),
+ ],
+ )
+ def test_partial_deprecated_channel(self, channel, dag_maker, session):
+ with dag_maker(dag_id="test_partial_deprecated_channel",
session=session):
+ SlackAPIFileOperator.partial(
+ task_id="fake-task-id",
+ slack_conn_id="fake-conn-id",
+ channel=channel,
+ ).expand(filename=["/dev/zero", "/dev/urandom"])
+
+ dr = dag_maker.create_dagrun()
+ tis = dr.get_task_instances(session=session)
+ with set_current_task_instance_session(session=session):
+ warning_match = r"Argument `channel` is deprecated.*use `channels`
instead"
+ for ti in tis:
+ with pytest.warns(AirflowProviderDeprecationWarning,
match=warning_match):
+ ti.render_templates()
+ assert ti.task.channels == channel
+
+ @pytest.mark.db_test
+ @pytest.mark.parametrize(
+ "channel, channels",
+ [
+ pytest.param("#contributors", "#user-troubleshooting",
id="ambiguous-channel-params"),
+ pytest.param(["#random", "#general"], ["#random", "#general"],
id="non-ambiguous-channel-params"),
+ ],
+ )
+ def test_partial_both_channel_parameters(self, channel, channels,
dag_maker, session):
+ with dag_maker("test_partial_both_channel_parameters",
session=session):
+ SlackAPIFileOperator.partial(
+ task_id="fake-task-id",
+ slack_conn_id="fake-conn-id",
+ channel=channel,
+ channels=channels,
+ ).expand(filename=["/dev/zero", "/dev/urandom"])
+
+ dr = dag_maker.create_dagrun(session=session)
+ tis = dr.get_task_instances(session=session)
+ with set_current_task_instance_session(session=session):
+ warning_match = r"Argument `channel` is deprecated.*use `channels`
instead"
+ for ti in tis:
+ with pytest.warns(AirflowProviderDeprecationWarning,
match=warning_match), pytest.raises(
+ ValueError, match="Cannot set both arguments"
+ ):
+ ti.render_templates()
diff --git a/tests/providers/slack/transfers/test_sql_to_slack_webhook.py
b/tests/providers/slack/transfers/test_sql_to_slack_webhook.py
index aa71ec0b35..512e3175b7 100644
--- a/tests/providers/slack/transfers/test_sql_to_slack_webhook.py
+++ b/tests/providers/slack/transfers/test_sql_to_slack_webhook.py
@@ -27,6 +27,7 @@ from airflow.exceptions import
AirflowProviderDeprecationWarning
from airflow.models import Connection
from airflow.providers.slack.transfers.sql_to_slack_webhook import
SqlToSlackWebhookOperator
from airflow.utils import timezone
+from airflow.utils.task_instance_session import
set_current_task_instance_session
TEST_DAG_ID = "sql_to_slack_unit_test"
TEST_TASK_ID = "sql_to_slack_unit_test_task"
@@ -269,3 +270,49 @@ class TestSqlToSlackWebhookOperator:
assert hook.database == "database"
assert hook.role == "role"
assert hook.schema == "schema"
+
+ @pytest.mark.parametrize(
+ "slack_conn_id, slack_webhook_conn_id",
+ [
+ pytest.param("slack_conn_id", None, id="slack-conn-id-only"),
+ pytest.param("slack_conn_id", "slack_conn_id",
id="non-ambiguous-params"),
+ ],
+ )
+ def test_partial_deprecated_slack_conn_id(self, slack_conn_id,
slack_webhook_conn_id, dag_maker, session):
+ with dag_maker(dag_id="test_partial_deprecated_slack_conn_id",
session=session):
+ SqlToSlackWebhookOperator.partial(
+ task_id="fake-task-id",
+ slack_conn_id=slack_conn_id,
+ slack_webhook_conn_id=slack_webhook_conn_id,
+ sql_conn_id="fake-sql-conn-id",
+ slack_message="<https://github.com/apache/airflow|Apache
Airflow™>",
+ ).expand(sql=["SELECT 1", "SELECT 2"])
+
+ dr = dag_maker.create_dagrun()
+ tis = dr.get_task_instances(session=session)
+ with set_current_task_instance_session(session=session):
+ warning_match = r"Parameter `slack_conn_id` is deprecated"
+ for ti in tis:
+ with pytest.warns(AirflowProviderDeprecationWarning,
match=warning_match):
+ ti.render_templates()
+ assert ti.task.slack_webhook_conn_id == slack_conn_id
+
+ def test_partial_ambiguous_slack_connections(self, dag_maker, session):
+ with dag_maker("test_partial_ambiguous_slack_connections",
session=session):
+ SqlToSlackWebhookOperator.partial(
+ task_id="fake-task-id",
+ slack_conn_id="slack_conn_id",
+ slack_webhook_conn_id="slack_webhook_conn_id",
+ sql_conn_id="fake-sql-conn-id",
+ slack_message="<https://github.com/apache/airflow|Apache
Airflow™>",
+ ).expand(sql=["SELECT 1", "SELECT 2"])
+
+ dr = dag_maker.create_dagrun(session=session)
+ tis = dr.get_task_instances(session=session)
+ with set_current_task_instance_session(session=session):
+ warning_match = r"Parameter `slack_conn_id` is deprecated"
+ for ti in tis:
+ with pytest.warns(AirflowProviderDeprecationWarning,
match=warning_match), pytest.raises(
+ ValueError, match="Conflicting Connection ids provided"
+ ):
+ ti.render_templates()