This is an automated email from the ASF dual-hosted git repository.
taragolis pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 72c0911ede Fix deprecated apache.hive operators arguments in
`MappedOperator` (#38351)
72c0911ede is described below
commit 72c0911eded10af5a97dd37d62c762e10e435894
Author: Andrey Anshin <[email protected]>
AuthorDate: Fri Mar 22 12:56:43 2024 +0400
Fix deprecated apache.hive operators arguments in `MappedOperator` (#38351)
---
.../providers/apache/hive/operators/hive_stats.py | 10 +++---
.../apache/hive/operators/test_hive_stats.py | 42 +++++++++++++++++++++-
2 files changed, 47 insertions(+), 5 deletions(-)
diff --git a/airflow/providers/apache/hive/operators/hive_stats.py
b/airflow/providers/apache/hive/operators/hive_stats.py
index 327e8a676f..643a5a4571 100644
--- a/airflow/providers/apache/hive/operators/hive_stats.py
+++ b/airflow/providers/apache/hive/operators/hive_stats.py
@@ -21,11 +21,12 @@ import json
import warnings
from typing import TYPE_CHECKING, Any, Callable, Sequence
-from airflow.exceptions import AirflowException
+from airflow.exceptions import AirflowException,
AirflowProviderDeprecationWarning
from airflow.models import BaseOperator
from airflow.providers.apache.hive.hooks.hive import HiveMetastoreHook
from airflow.providers.mysql.hooks.mysql import MySqlHook
from airflow.providers.presto.hooks.presto import PrestoHook
+from airflow.utils.types import NOTSET, ArgNotSet
if TYPE_CHECKING:
from airflow.utils.context import Context
@@ -78,17 +79,18 @@ class HiveStatsCollectionOperator(BaseOperator):
mysql_conn_id: str = "airflow_db",
ds: str = "{{ ds }}",
dttm: str = "{{ logical_date.isoformat() }}",
+ col_blacklist: list[str] | None | ArgNotSet = NOTSET,
**kwargs: Any,
) -> None:
- if "col_blacklist" in kwargs:
+ if col_blacklist is not NOTSET:
warnings.warn(
f"col_blacklist kwarg passed to {self.__class__.__name__} "
f"(task_id: {kwargs.get('task_id')}) is deprecated, "
f"please rename it to excluded_columns instead",
- category=FutureWarning,
+ category=AirflowProviderDeprecationWarning,
stacklevel=2,
)
- excluded_columns = kwargs.pop("col_blacklist")
+ excluded_columns = col_blacklist # type: ignore[assignment]
super().__init__(**kwargs)
self.table = table
self.partition = partition
diff --git a/tests/providers/apache/hive/operators/test_hive_stats.py
b/tests/providers/apache/hive/operators/test_hive_stats.py
index e419d2da00..9d2157ffc3 100644
--- a/tests/providers/apache/hive/operators/test_hive_stats.py
+++ b/tests/providers/apache/hive/operators/test_hive_stats.py
@@ -23,9 +23,11 @@ from unittest.mock import MagicMock, patch
import pytest
-from airflow.exceptions import AirflowException
+from airflow.exceptions import AirflowException,
AirflowProviderDeprecationWarning
from airflow.providers.apache.hive.operators.hive_stats import
HiveStatsCollectionOperator
from airflow.providers.presto.hooks.presto import PrestoHook
+from airflow.utils import timezone
+from airflow.utils.task_instance_session import
set_current_task_instance_session
from tests.providers.apache.hive import (
DEFAULT_DATE,
DEFAULT_DATE_DS,
@@ -370,3 +372,41 @@ class TestHiveStatsCollectionOperator(TestHiveEnvironment):
"value",
],
)
+
+ def test_col_blacklist_deprecation(self):
+ warn_message = "col_blacklist kwarg passed to.*task_id:
fake-task-id.*is deprecated"
+ with pytest.warns(AirflowProviderDeprecationWarning,
match=warn_message):
+ HiveStatsCollectionOperator(
+ task_id="fake-task-id",
+ table="airflow.static_babynames_partitioned",
+ partition={"ds": DEFAULT_DATE_DS},
+ col_blacklist=["foo", "bar"],
+ )
+
+ @pytest.mark.db_test
+ @pytest.mark.parametrize(
+ "col_blacklist",
+ [pytest.param(None, id="none"), pytest.param(["foo", "bar"],
id="list")],
+ )
+ def test_partial_col_blacklist_deprecation(self, col_blacklist, dag_maker,
session):
+ with dag_maker(
+ dag_id="test_partial_col_blacklist_deprecation",
+ start_date=timezone.datetime(2024, 1, 1),
+ session=session,
+ ):
+ HiveStatsCollectionOperator.partial(
+ task_id="fake-task-id",
+ partition={"ds": DEFAULT_DATE_DS},
+ col_blacklist=col_blacklist,
+ excluded_columns=["spam", "egg"],
+ ).expand(table=["airflow.table1", "airflow.table2"])
+
+ dr = dag_maker.create_dagrun(execution_date=None)
+ tis = dr.get_task_instances(session=session)
+ with set_current_task_instance_session(session=session):
+ warn_message = "col_blacklist kwarg passed to.*task_id:
fake-task-id.*is deprecated"
+ for ti in tis:
+ with pytest.warns(AirflowProviderDeprecationWarning,
match=warn_message):
+ ti.render_templates()
+ expected = col_blacklist or []
+ assert ti.task.excluded_columns == expected