This is an automated email from the ASF dual-hosted git repository.

shahar pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c56b84ce201 Migrate ADLSListOperator from ADLS Gen1 to Gen2 (#61188)
c56b84ce201 is described below

commit c56b84ce20112c300c991333a6081f17e8412359
Author: Aaron Chen <[email protected]>
AuthorDate: Mon Feb 9 23:28:12 2026 -0800

    Migrate ADLSListOperator from ADLS Gen1 to Gen2 (#61188)
---
 providers/google/pyproject.toml                    |  2 +-
 .../google/cloud/transfers/adls_to_gcs.py          | 20 +++++++++++++++++++-
 .../google/cloud/transfers/test_adls_to_gcs.py     | 14 +++++++++-----
 .../providers/microsoft/azure/operators/adls.py    | 22 +++++++++++++++++-----
 .../system/microsoft/azure/example_adls_list.py    |  3 ++-
 .../microsoft/azure/operators/test_adls_list.py    | 14 +++++++++-----
 6 files changed, 57 insertions(+), 18 deletions(-)

diff --git a/providers/google/pyproject.toml b/providers/google/pyproject.toml
index 7b7b69bcdf2..b18c1f65aa4 100644
--- a/providers/google/pyproject.toml
+++ b/providers/google/pyproject.toml
@@ -172,7 +172,7 @@ dependencies = [
     "apache-airflow-providers-apache-cassandra"
 ]
 "microsoft.azure" = [
-    "apache-airflow-providers-microsoft-azure"
+    "apache-airflow-providers-microsoft-azure",  # use next version
 ]
 "microsoft.mssql" = [
     "apache-airflow-providers-microsoft-mssql"
diff --git 
a/providers/google/src/airflow/providers/google/cloud/transfers/adls_to_gcs.py 
b/providers/google/src/airflow/providers/google/cloud/transfers/adls_to_gcs.py
index 9bd75b14ac3..7c9c8ff77ee 100644
--- 
a/providers/google/src/airflow/providers/google/cloud/transfers/adls_to_gcs.py
+++ 
b/providers/google/src/airflow/providers/google/cloud/transfers/adls_to_gcs.py
@@ -45,6 +45,8 @@ class ADLSToGCSOperator(ADLSListOperator):
     :param src_adls: The Azure Data Lake path to find the objects (templated)
     :param dest_gcs: The Google Cloud Storage bucket and prefix to
         store the objects. (templated)
+    :param file_system_name: Name of the file system (container) in ADLS Gen2.
+        This is passed via ``**kwargs`` to the parent ``ADLSListOperator``.
     :param replace: If true, replaces same-named files in GCS
     :param gzip: Option to compress file for upload
     :param azure_data_lake_conn_id: The connection ID to use when
@@ -117,7 +119,11 @@ class ADLSToGCSOperator(ADLSListOperator):
         google_impersonation_chain: str | Sequence[str] | None = None,
         **kwargs,
     ) -> None:
-        super().__init__(path=src_adls, 
azure_data_lake_conn_id=azure_data_lake_conn_id, **kwargs)
+        super().__init__(
+            path=src_adls,
+            azure_data_lake_conn_id=azure_data_lake_conn_id,
+            **self._validate_kwargs(kwargs),
+        )
 
         self.src_adls = src_adls
         self.dest_gcs = dest_gcs
@@ -126,6 +132,18 @@ class ADLSToGCSOperator(ADLSListOperator):
         self.gzip = gzip
         self.google_impersonation_chain = google_impersonation_chain
 
+    @staticmethod
+    def _validate_kwargs(kwargs: dict) -> dict:
+        file_system_name = kwargs.pop("file_system_name", None)
+        if file_system_name is None:
+            raise TypeError(
+                "The 'file_system_name' parameter is required. "
+                "ADLSListOperator has been migrated from Azure Data Lake 
Storage Gen1 (retired) "
+                "to Gen2, which requires specifying a file system name. "
+                "Please add file_system_name='your-container-name' to your 
operator instantiation."
+            )
+        return {"file_system_name": file_system_name, **kwargs}
+
     def execute(self, context: Context):
         # use the super to list all files in an Azure Data Lake path
         files = super().execute(context)
diff --git 
a/providers/google/tests/unit/google/cloud/transfers/test_adls_to_gcs.py 
b/providers/google/tests/unit/google/cloud/transfers/test_adls_to_gcs.py
index 914cbdc6723..1dfdf0ffc0a 100644
--- a/providers/google/tests/unit/google/cloud/transfers/test_adls_to_gcs.py
+++ b/providers/google/tests/unit/google/cloud/transfers/test_adls_to_gcs.py
@@ -24,6 +24,7 @@ from airflow.providers.google.cloud.transfers.adls_to_gcs 
import ADLSToGCSOperat
 TASK_ID = "test-adls-gcs-operator"
 ADLS_PATH_1 = "*"
 GCS_PATH = "gs://test/"
+TEST_FILE_SYSTEM_NAME = "test-container"
 MOCK_FILES = [
     "test/TEST1.csv",
     "test/TEST2.csv",
@@ -44,6 +45,7 @@ class TestAdlsToGoogleCloudStorageOperator:
             task_id=TASK_ID,
             src_adls=ADLS_PATH_1,
             dest_gcs=GCS_PATH,
+            file_system_name=TEST_FILE_SYSTEM_NAME,
             replace=False,
             azure_data_lake_conn_id=AZURE_CONN_ID,
             gcp_conn_id=GCS_CONN_ID,
@@ -57,7 +59,7 @@ class TestAdlsToGoogleCloudStorageOperator:
         assert operator.azure_data_lake_conn_id == AZURE_CONN_ID
 
     
@mock.patch("airflow.providers.google.cloud.transfers.adls_to_gcs.AzureDataLakeHook")
-    
@mock.patch("airflow.providers.microsoft.azure.operators.adls.AzureDataLakeHook")
+    
@mock.patch("airflow.providers.microsoft.azure.operators.adls.AzureDataLakeStorageV2Hook")
     @mock.patch("airflow.providers.google.cloud.transfers.adls_to_gcs.GCSHook")
     def test_execute(self, gcs_mock_hook, adls_one_mock_hook, 
adls_two_mock_hook):
         """Test the execute function when the run is successful."""
@@ -66,13 +68,14 @@ class TestAdlsToGoogleCloudStorageOperator:
             task_id=TASK_ID,
             src_adls=ADLS_PATH_1,
             dest_gcs=GCS_PATH,
+            file_system_name=TEST_FILE_SYSTEM_NAME,
             replace=False,
             azure_data_lake_conn_id=AZURE_CONN_ID,
             gcp_conn_id=GCS_CONN_ID,
             google_impersonation_chain=IMPERSONATION_CHAIN,
         )
 
-        adls_one_mock_hook.return_value.list.return_value = MOCK_FILES
+        adls_one_mock_hook.return_value.list_files_directory.return_value = 
MOCK_FILES
         adls_two_mock_hook.return_value.list.return_value = MOCK_FILES
 
         # gcs_mock_hook.return_value.upload.side_effect = _assert_upload
@@ -92,7 +95,7 @@ class TestAdlsToGoogleCloudStorageOperator:
             any_order=True,
         )
 
-        
adls_one_mock_hook.assert_called_once_with(azure_data_lake_conn_id=AZURE_CONN_ID)
+        adls_one_mock_hook.assert_called_once_with(adls_conn_id=AZURE_CONN_ID)
         
adls_two_mock_hook.assert_called_once_with(azure_data_lake_conn_id=AZURE_CONN_ID)
         gcs_mock_hook.assert_called_once_with(
             gcp_conn_id=GCS_CONN_ID,
@@ -103,7 +106,7 @@ class TestAdlsToGoogleCloudStorageOperator:
         assert sorted(MOCK_FILES) == sorted(uploaded_files)
 
     
@mock.patch("airflow.providers.google.cloud.transfers.adls_to_gcs.AzureDataLakeHook")
-    
@mock.patch("airflow.providers.microsoft.azure.operators.adls.AzureDataLakeHook")
+    
@mock.patch("airflow.providers.microsoft.azure.operators.adls.AzureDataLakeStorageV2Hook")
     @mock.patch("airflow.providers.google.cloud.transfers.adls_to_gcs.GCSHook")
     def test_execute_with_gzip(self, gcs_mock_hook, adls_one_mock_hook, 
adls_two_mock_hook):
         """Test the execute function when the run is successful."""
@@ -112,13 +115,14 @@ class TestAdlsToGoogleCloudStorageOperator:
             task_id=TASK_ID,
             src_adls=ADLS_PATH_1,
             dest_gcs=GCS_PATH,
+            file_system_name=TEST_FILE_SYSTEM_NAME,
             replace=False,
             azure_data_lake_conn_id=AZURE_CONN_ID,
             gcp_conn_id=GCS_CONN_ID,
             gzip=True,
         )
 
-        adls_one_mock_hook.return_value.list.return_value = MOCK_FILES
+        adls_one_mock_hook.return_value.list_files_directory.return_value = 
MOCK_FILES
         adls_two_mock_hook.return_value.list.return_value = MOCK_FILES
 
         # gcs_mock_hook.return_value.upload.side_effect = _assert_upload
diff --git 
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/adls.py
 
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/adls.py
index 5e6e5e68b32..1beaa290549 100644
--- 
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/adls.py
+++ 
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/adls.py
@@ -126,7 +126,8 @@ class ADLSListOperator(BaseOperator):
         For more information on how to use this operator, take a look at the 
guide:
         :ref:`howto/operator:ADLSListOperator`
 
-    :param path: The Azure Data Lake path to find the objects. Supports glob 
strings (templated)
+    :param file_system_name: Name of the file system (container) in ADLS Gen2.
+    :param path: The directory path within the file system to list files from 
(templated).
     :param azure_data_lake_conn_id: Reference to the :ref:`Azure Data Lake 
connection<howto/connection:adl>`.
     """
 
@@ -134,13 +135,24 @@ class ADLSListOperator(BaseOperator):
     ui_color = "#901dd2"
 
     def __init__(
-        self, *, path: str, azure_data_lake_conn_id: str = 
DEFAULT_AZURE_DATA_LAKE_CONN_ID, **kwargs
+        self,
+        *,
+        file_system_name: str,
+        path: str,
+        azure_data_lake_conn_id: str = DEFAULT_AZURE_DATA_LAKE_CONN_ID,
+        **kwargs,
     ) -> None:
         super().__init__(**kwargs)
+        self.file_system_name = file_system_name
         self.path = path
         self.azure_data_lake_conn_id = azure_data_lake_conn_id
 
     def execute(self, context: Context) -> list:
-        hook = 
AzureDataLakeHook(azure_data_lake_conn_id=self.azure_data_lake_conn_id)
-        self.log.info("Getting list of ADLS files in path: %s", self.path)
-        return hook.list(path=self.path)
+        hook = 
AzureDataLakeStorageV2Hook(adls_conn_id=self.azure_data_lake_conn_id)
+        self.log.info(
+            "Getting list of ADLS files in file system %s, path: %s", 
self.file_system_name, self.path
+        )
+        return hook.list_files_directory(
+            file_system_name=self.file_system_name,
+            directory_name=self.path,
+        )
diff --git 
a/providers/microsoft/azure/tests/system/microsoft/azure/example_adls_list.py 
b/providers/microsoft/azure/tests/system/microsoft/azure/example_adls_list.py
index 26e65e0cf8d..9c71a1e4bba 100644
--- 
a/providers/microsoft/azure/tests/system/microsoft/azure/example_adls_list.py
+++ 
b/providers/microsoft/azure/tests/system/microsoft/azure/example_adls_list.py
@@ -37,7 +37,8 @@ with models.DAG(
     # [START howto_operator_adls_list]
     adls_files = ADLSListOperator(
         task_id="adls_files",
-        path="folder/output/*.parquet",
+        file_system_name="mycontainer",
+        path="folder/output",
         azure_data_lake_conn_id="azure_data_lake_default",
     )
     # [END howto_operator_adls_list]
diff --git 
a/providers/microsoft/azure/tests/unit/microsoft/azure/operators/test_adls_list.py
 
b/providers/microsoft/azure/tests/unit/microsoft/azure/operators/test_adls_list.py
index e73b1e4200e..e292b6c8da0 100644
--- 
a/providers/microsoft/azure/tests/unit/microsoft/azure/operators/test_adls_list.py
+++ 
b/providers/microsoft/azure/tests/unit/microsoft/azure/operators/test_adls_list.py
@@ -22,7 +22,8 @@ from unittest import mock
 from airflow.providers.microsoft.azure.operators.adls import ADLSListOperator
 
 TASK_ID = "test-adls-list-operator"
-TEST_PATH = "test/*"
+TEST_FILE_SYSTEM_NAME = "test-container"
+TEST_PATH = "test/path"
 MOCK_FILES = [
     "test/TEST1.csv",
     "test/TEST2.csv",
@@ -33,12 +34,15 @@ MOCK_FILES = [
 
 
 class TestAzureDataLakeStorageListOperator:
-    
@mock.patch("airflow.providers.microsoft.azure.operators.adls.AzureDataLakeHook")
+    
@mock.patch("airflow.providers.microsoft.azure.operators.adls.AzureDataLakeStorageV2Hook")
     def test_execute(self, mock_hook):
-        mock_hook.return_value.list.return_value = MOCK_FILES
+        mock_hook.return_value.list_files_directory.return_value = MOCK_FILES
 
-        operator = ADLSListOperator(task_id=TASK_ID, path=TEST_PATH)
+        operator = ADLSListOperator(task_id=TASK_ID, 
file_system_name=TEST_FILE_SYSTEM_NAME, path=TEST_PATH)
 
         files = operator.execute(None)
-        mock_hook.return_value.list.assert_called_once_with(path=TEST_PATH)
+        mock_hook.return_value.list_files_directory.assert_called_once_with(
+            file_system_name=TEST_FILE_SYSTEM_NAME,
+            directory_name=TEST_PATH,
+        )
         assert sorted(files) == sorted(MOCK_FILES)

Reply via email to