This is an automated email from the ASF dual-hosted git repository.
shahar pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c56b84ce201 Migrate ADLSListOperator from ADLS Gen1 to Gen2 (#61188)
c56b84ce201 is described below
commit c56b84ce20112c300c991333a6081f17e8412359
Author: Aaron Chen <[email protected]>
AuthorDate: Mon Feb 9 23:28:12 2026 -0800
Migrate ADLSListOperator from ADLS Gen1 to Gen2 (#61188)
---
providers/google/pyproject.toml | 2 +-
.../google/cloud/transfers/adls_to_gcs.py | 20 +++++++++++++++++++-
.../google/cloud/transfers/test_adls_to_gcs.py | 14 +++++++++-----
.../providers/microsoft/azure/operators/adls.py | 22 +++++++++++++++++-----
.../system/microsoft/azure/example_adls_list.py | 3 ++-
.../microsoft/azure/operators/test_adls_list.py | 14 +++++++++-----
6 files changed, 57 insertions(+), 18 deletions(-)
diff --git a/providers/google/pyproject.toml b/providers/google/pyproject.toml
index 7b7b69bcdf2..b18c1f65aa4 100644
--- a/providers/google/pyproject.toml
+++ b/providers/google/pyproject.toml
@@ -172,7 +172,7 @@ dependencies = [
"apache-airflow-providers-apache-cassandra"
]
"microsoft.azure" = [
- "apache-airflow-providers-microsoft-azure"
+ "apache-airflow-providers-microsoft-azure", # use next version
]
"microsoft.mssql" = [
"apache-airflow-providers-microsoft-mssql"
diff --git
a/providers/google/src/airflow/providers/google/cloud/transfers/adls_to_gcs.py
b/providers/google/src/airflow/providers/google/cloud/transfers/adls_to_gcs.py
index 9bd75b14ac3..7c9c8ff77ee 100644
---
a/providers/google/src/airflow/providers/google/cloud/transfers/adls_to_gcs.py
+++
b/providers/google/src/airflow/providers/google/cloud/transfers/adls_to_gcs.py
@@ -45,6 +45,8 @@ class ADLSToGCSOperator(ADLSListOperator):
:param src_adls: The Azure Data Lake path to find the objects (templated)
:param dest_gcs: The Google Cloud Storage bucket and prefix to
store the objects. (templated)
+ :param file_system_name: Name of the file system (container) in ADLS Gen2.
+ This is passed via ``**kwargs`` to the parent ``ADLSListOperator``.
:param replace: If true, replaces same-named files in GCS
:param gzip: Option to compress file for upload
:param azure_data_lake_conn_id: The connection ID to use when
@@ -117,7 +119,11 @@ class ADLSToGCSOperator(ADLSListOperator):
google_impersonation_chain: str | Sequence[str] | None = None,
**kwargs,
) -> None:
- super().__init__(path=src_adls,
azure_data_lake_conn_id=azure_data_lake_conn_id, **kwargs)
+ super().__init__(
+ path=src_adls,
+ azure_data_lake_conn_id=azure_data_lake_conn_id,
+ **self._validate_kwargs(kwargs),
+ )
self.src_adls = src_adls
self.dest_gcs = dest_gcs
@@ -126,6 +132,18 @@ class ADLSToGCSOperator(ADLSListOperator):
self.gzip = gzip
self.google_impersonation_chain = google_impersonation_chain
+ @staticmethod
+ def _validate_kwargs(kwargs: dict) -> dict:
+ file_system_name = kwargs.pop("file_system_name", None)
+ if file_system_name is None:
+ raise TypeError(
+ "The 'file_system_name' parameter is required. "
+ "ADLSListOperator has been migrated from Azure Data Lake
Storage Gen1 (retired) "
+ "to Gen2, which requires specifying a file system name. "
+ "Please add file_system_name='your-container-name' to your
operator instantiation."
+ )
+ return {"file_system_name": file_system_name, **kwargs}
+
def execute(self, context: Context):
# use the super to list all files in an Azure Data Lake path
files = super().execute(context)
diff --git
a/providers/google/tests/unit/google/cloud/transfers/test_adls_to_gcs.py
b/providers/google/tests/unit/google/cloud/transfers/test_adls_to_gcs.py
index 914cbdc6723..1dfdf0ffc0a 100644
--- a/providers/google/tests/unit/google/cloud/transfers/test_adls_to_gcs.py
+++ b/providers/google/tests/unit/google/cloud/transfers/test_adls_to_gcs.py
@@ -24,6 +24,7 @@ from airflow.providers.google.cloud.transfers.adls_to_gcs
import ADLSToGCSOperat
TASK_ID = "test-adls-gcs-operator"
ADLS_PATH_1 = "*"
GCS_PATH = "gs://test/"
+TEST_FILE_SYSTEM_NAME = "test-container"
MOCK_FILES = [
"test/TEST1.csv",
"test/TEST2.csv",
@@ -44,6 +45,7 @@ class TestAdlsToGoogleCloudStorageOperator:
task_id=TASK_ID,
src_adls=ADLS_PATH_1,
dest_gcs=GCS_PATH,
+ file_system_name=TEST_FILE_SYSTEM_NAME,
replace=False,
azure_data_lake_conn_id=AZURE_CONN_ID,
gcp_conn_id=GCS_CONN_ID,
@@ -57,7 +59,7 @@ class TestAdlsToGoogleCloudStorageOperator:
assert operator.azure_data_lake_conn_id == AZURE_CONN_ID
@mock.patch("airflow.providers.google.cloud.transfers.adls_to_gcs.AzureDataLakeHook")
-
@mock.patch("airflow.providers.microsoft.azure.operators.adls.AzureDataLakeHook")
+
@mock.patch("airflow.providers.microsoft.azure.operators.adls.AzureDataLakeStorageV2Hook")
@mock.patch("airflow.providers.google.cloud.transfers.adls_to_gcs.GCSHook")
def test_execute(self, gcs_mock_hook, adls_one_mock_hook,
adls_two_mock_hook):
"""Test the execute function when the run is successful."""
@@ -66,13 +68,14 @@ class TestAdlsToGoogleCloudStorageOperator:
task_id=TASK_ID,
src_adls=ADLS_PATH_1,
dest_gcs=GCS_PATH,
+ file_system_name=TEST_FILE_SYSTEM_NAME,
replace=False,
azure_data_lake_conn_id=AZURE_CONN_ID,
gcp_conn_id=GCS_CONN_ID,
google_impersonation_chain=IMPERSONATION_CHAIN,
)
- adls_one_mock_hook.return_value.list.return_value = MOCK_FILES
+ adls_one_mock_hook.return_value.list_files_directory.return_value =
MOCK_FILES
adls_two_mock_hook.return_value.list.return_value = MOCK_FILES
# gcs_mock_hook.return_value.upload.side_effect = _assert_upload
@@ -92,7 +95,7 @@ class TestAdlsToGoogleCloudStorageOperator:
any_order=True,
)
-
adls_one_mock_hook.assert_called_once_with(azure_data_lake_conn_id=AZURE_CONN_ID)
+ adls_one_mock_hook.assert_called_once_with(adls_conn_id=AZURE_CONN_ID)
adls_two_mock_hook.assert_called_once_with(azure_data_lake_conn_id=AZURE_CONN_ID)
gcs_mock_hook.assert_called_once_with(
gcp_conn_id=GCS_CONN_ID,
@@ -103,7 +106,7 @@ class TestAdlsToGoogleCloudStorageOperator:
assert sorted(MOCK_FILES) == sorted(uploaded_files)
@mock.patch("airflow.providers.google.cloud.transfers.adls_to_gcs.AzureDataLakeHook")
-
@mock.patch("airflow.providers.microsoft.azure.operators.adls.AzureDataLakeHook")
+
@mock.patch("airflow.providers.microsoft.azure.operators.adls.AzureDataLakeStorageV2Hook")
@mock.patch("airflow.providers.google.cloud.transfers.adls_to_gcs.GCSHook")
def test_execute_with_gzip(self, gcs_mock_hook, adls_one_mock_hook,
adls_two_mock_hook):
"""Test the execute function when the run is successful."""
@@ -112,13 +115,14 @@ class TestAdlsToGoogleCloudStorageOperator:
task_id=TASK_ID,
src_adls=ADLS_PATH_1,
dest_gcs=GCS_PATH,
+ file_system_name=TEST_FILE_SYSTEM_NAME,
replace=False,
azure_data_lake_conn_id=AZURE_CONN_ID,
gcp_conn_id=GCS_CONN_ID,
gzip=True,
)
- adls_one_mock_hook.return_value.list.return_value = MOCK_FILES
+ adls_one_mock_hook.return_value.list_files_directory.return_value =
MOCK_FILES
adls_two_mock_hook.return_value.list.return_value = MOCK_FILES
# gcs_mock_hook.return_value.upload.side_effect = _assert_upload
diff --git
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/adls.py
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/adls.py
index 5e6e5e68b32..1beaa290549 100644
---
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/adls.py
+++
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/adls.py
@@ -126,7 +126,8 @@ class ADLSListOperator(BaseOperator):
For more information on how to use this operator, take a look at the
guide:
:ref:`howto/operator:ADLSListOperator`
- :param path: The Azure Data Lake path to find the objects. Supports glob
strings (templated)
+ :param file_system_name: Name of the file system (container) in ADLS Gen2.
+ :param path: The directory path within the file system to list files from
(templated).
:param azure_data_lake_conn_id: Reference to the :ref:`Azure Data Lake
connection<howto/connection:adl>`.
"""
@@ -134,13 +135,24 @@ class ADLSListOperator(BaseOperator):
ui_color = "#901dd2"
def __init__(
- self, *, path: str, azure_data_lake_conn_id: str =
DEFAULT_AZURE_DATA_LAKE_CONN_ID, **kwargs
+ self,
+ *,
+ file_system_name: str,
+ path: str,
+ azure_data_lake_conn_id: str = DEFAULT_AZURE_DATA_LAKE_CONN_ID,
+ **kwargs,
) -> None:
super().__init__(**kwargs)
+ self.file_system_name = file_system_name
self.path = path
self.azure_data_lake_conn_id = azure_data_lake_conn_id
def execute(self, context: Context) -> list:
- hook =
AzureDataLakeHook(azure_data_lake_conn_id=self.azure_data_lake_conn_id)
- self.log.info("Getting list of ADLS files in path: %s", self.path)
- return hook.list(path=self.path)
+ hook =
AzureDataLakeStorageV2Hook(adls_conn_id=self.azure_data_lake_conn_id)
+ self.log.info(
+ "Getting list of ADLS files in file system %s, path: %s",
self.file_system_name, self.path
+ )
+ return hook.list_files_directory(
+ file_system_name=self.file_system_name,
+ directory_name=self.path,
+ )
diff --git
a/providers/microsoft/azure/tests/system/microsoft/azure/example_adls_list.py
b/providers/microsoft/azure/tests/system/microsoft/azure/example_adls_list.py
index 26e65e0cf8d..9c71a1e4bba 100644
---
a/providers/microsoft/azure/tests/system/microsoft/azure/example_adls_list.py
+++
b/providers/microsoft/azure/tests/system/microsoft/azure/example_adls_list.py
@@ -37,7 +37,8 @@ with models.DAG(
# [START howto_operator_adls_list]
adls_files = ADLSListOperator(
task_id="adls_files",
- path="folder/output/*.parquet",
+ file_system_name="mycontainer",
+ path="folder/output",
azure_data_lake_conn_id="azure_data_lake_default",
)
# [END howto_operator_adls_list]
diff --git
a/providers/microsoft/azure/tests/unit/microsoft/azure/operators/test_adls_list.py
b/providers/microsoft/azure/tests/unit/microsoft/azure/operators/test_adls_list.py
index e73b1e4200e..e292b6c8da0 100644
---
a/providers/microsoft/azure/tests/unit/microsoft/azure/operators/test_adls_list.py
+++
b/providers/microsoft/azure/tests/unit/microsoft/azure/operators/test_adls_list.py
@@ -22,7 +22,8 @@ from unittest import mock
from airflow.providers.microsoft.azure.operators.adls import ADLSListOperator
TASK_ID = "test-adls-list-operator"
-TEST_PATH = "test/*"
+TEST_FILE_SYSTEM_NAME = "test-container"
+TEST_PATH = "test/path"
MOCK_FILES = [
"test/TEST1.csv",
"test/TEST2.csv",
@@ -33,12 +34,15 @@ MOCK_FILES = [
class TestAzureDataLakeStorageListOperator:
-
@mock.patch("airflow.providers.microsoft.azure.operators.adls.AzureDataLakeHook")
+
@mock.patch("airflow.providers.microsoft.azure.operators.adls.AzureDataLakeStorageV2Hook")
def test_execute(self, mock_hook):
- mock_hook.return_value.list.return_value = MOCK_FILES
+ mock_hook.return_value.list_files_directory.return_value = MOCK_FILES
- operator = ADLSListOperator(task_id=TASK_ID, path=TEST_PATH)
+ operator = ADLSListOperator(task_id=TASK_ID,
file_system_name=TEST_FILE_SYSTEM_NAME, path=TEST_PATH)
files = operator.execute(None)
- mock_hook.return_value.list.assert_called_once_with(path=TEST_PATH)
+ mock_hook.return_value.list_files_directory.assert_called_once_with(
+ file_system_name=TEST_FILE_SYSTEM_NAME,
+ directory_name=TEST_PATH,
+ )
assert sorted(files) == sorted(MOCK_FILES)