This is an automated email from the ASF dual-hosted git repository.

shahar pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 0a4436c98c0 Return list of GCS URIs from Azure*ToGCS operators (#61048)
0a4436c98c0 is described below

commit 0a4436c98c078f0e764c358bcf92baf842ba454e
Author: Aaron Chen <[email protected]>
AuthorDate: Mon Feb 9 23:22:19 2026 -0800

    Return list of GCS URIs from Azure*ToGCS operators (#61048)
---
 .../google/cloud/transfers/azure_blob_to_gcs.py    | 24 ++++++++++++++++++++--
 .../cloud/transfers/azure_fileshare_to_gcs.py      | 21 +++++++++++++++++--
 .../cloud/azure/example_azure_fileshare_to_gcs.py  |  2 +-
 .../cloud/transfers/test_azure_blob_to_gcs.py      | 17 +++++++++++++--
 .../cloud/transfers/test_azure_fileshare_to_gcs.py | 15 ++++++++++++--
 5 files changed, 70 insertions(+), 9 deletions(-)

diff --git 
a/providers/google/src/airflow/providers/google/cloud/transfers/azure_blob_to_gcs.py
 
b/providers/google/src/airflow/providers/google/cloud/transfers/azure_blob_to_gcs.py
index 1ace104c282..536fa85375e 100644
--- 
a/providers/google/src/airflow/providers/google/cloud/transfers/azure_blob_to_gcs.py
+++ 
b/providers/google/src/airflow/providers/google/cloud/transfers/azure_blob_to_gcs.py
@@ -59,6 +59,10 @@ class AzureBlobStorageToGCSOperator(BaseOperator):
         If set as a sequence, the identities from the list must grant
         Service Account Token Creator IAM role to the directly preceding 
identity, with first
         account from the list granting this role to the originating account.
+    :param unwrap_single: If True, unwrap a single-element result list into a 
plain string value
+        for backward compatibility. If False, always return a list of GCS URIs.
+        If not explicitly provided, defaults to True and emits a FutureWarning 
that
+        the default will change to False in a future release.
     """
 
     def __init__(
@@ -73,6 +77,7 @@ class AzureBlobStorageToGCSOperator(BaseOperator):
         filename: str,
         gzip: bool,
         impersonation_chain: str | Sequence[str] | None = None,
+        unwrap_single: bool | None = None,
         **kwargs,
     ) -> None:
         super().__init__(**kwargs)
@@ -85,6 +90,18 @@ class AzureBlobStorageToGCSOperator(BaseOperator):
         self.filename = filename
         self.gzip = gzip
         self.impersonation_chain = impersonation_chain
+        if unwrap_single is None:
+            self.unwrap_single = True
+            import warnings
+
+            warnings.warn(
+                "The default value of `unwrap_single` will change from True to 
False in a future release. "
+                "Please set `unwrap_single` explicitly to avoid this warning.",
+                FutureWarning,
+                stacklevel=2,
+            )
+        else:
+            self.unwrap_single = unwrap_single
 
     template_fields: Sequence[str] = (
         "blob_name",
@@ -94,7 +111,7 @@ class AzureBlobStorageToGCSOperator(BaseOperator):
         "filename",
     )
 
-    def execute(self, context: Context) -> str:
+    def execute(self, context: Context) -> str | list[str]:
         azure_hook = WasbHook(wasb_conn_id=self.wasb_conn_id)
         gcs_hook = GCSHook(
             gcp_conn_id=self.gcp_conn_id,
@@ -122,7 +139,10 @@ class AzureBlobStorageToGCSOperator(BaseOperator):
                 self.blob_name,
                 self.bucket_name,
             )
-        return f"gs://{self.bucket_name}/{self.object_name}"
+        gcs_uri = f"gs://{self.bucket_name}/{self.object_name}"
+        if self.unwrap_single:
+            return gcs_uri
+        return [gcs_uri]
 
     def get_openlineage_facets_on_start(self):
         from airflow.providers.common.compat.openlineage.facet import Dataset
diff --git 
a/providers/google/src/airflow/providers/google/cloud/transfers/azure_fileshare_to_gcs.py
 
b/providers/google/src/airflow/providers/google/cloud/transfers/azure_fileshare_to_gcs.py
index ad362c508ba..993ce091d50 100644
--- 
a/providers/google/src/airflow/providers/google/cloud/transfers/azure_fileshare_to_gcs.py
+++ 
b/providers/google/src/airflow/providers/google/cloud/transfers/azure_fileshare_to_gcs.py
@@ -66,6 +66,8 @@ class AzureFileShareToGCSOperator(BaseOperator):
         If set as a sequence, the identities from the list must grant
         Service Account Token Creator IAM role to the directly preceding 
identity, with first
         account from the list granting this role to the originating account 
(templated).
+    :param return_gcs_uris: If True, return a list of GCS URIs. If False 
(default), return the legacy
+        list of Azure FileShare filenames and emit a deprecation warning.
 
     Note that ``share_name``, ``directory_path``, ``prefix``, and ``dest_gcs`` 
are
     templated, so you can use variables in them if you wish.
@@ -92,6 +94,7 @@ class AzureFileShareToGCSOperator(BaseOperator):
         replace: bool = False,
         gzip: bool = False,
         google_impersonation_chain: str | Sequence[str] | None = None,
+        return_gcs_uris: bool | None = None,
         **kwargs,
     ):
         super().__init__(**kwargs)
@@ -113,6 +116,16 @@ class AzureFileShareToGCSOperator(BaseOperator):
         self.replace = replace
         self.gzip = gzip
         self.google_impersonation_chain = google_impersonation_chain
+        if return_gcs_uris is None:
+            self.return_gcs_uris = False
+            warnings.warn(
+                "Returning a list of Azure FileShare filenames from 
AzureFileShareToGCSOperator is deprecated and "
+                "will change to list[str] of GCS URIs in a future release. Set 
return_gcs_uris=True to opt in.",
+                FutureWarning,
+                stacklevel=2,
+            )
+        else:
+            self.return_gcs_uris = return_gcs_uris
 
     def _check_inputs(self) -> None:
         if self.dest_gcs and not gcs_object_is_directory(self.dest_gcs):
@@ -125,7 +138,7 @@ class AzureFileShareToGCSOperator(BaseOperator):
                 'The destination Google Cloud Storage path must end with a 
slash "/" or be empty.'
             )
 
-    def execute(self, context: Context):
+    def execute(self, context: Context) -> list[str]:
         self._check_inputs()
         azure_fileshare_hook = AzureFileShareHook(
             share_name=self.share_name,
@@ -162,6 +175,7 @@ class AzureFileShareToGCSOperator(BaseOperator):
 
             files = list(set(files) - set(existing_files))
 
+        uploaded_gcs_uris: list[str] = []
         if files:
             self.log.info("%s files are going to be synced.", len(files))
             if self.directory_path is None:
@@ -181,9 +195,12 @@ class AzureFileShareToGCSOperator(BaseOperator):
                     # enforced at instantiation time
                     dest_gcs_object = dest_gcs_object_prefix + file
                     gcs_hook.upload(dest_gcs_bucket, dest_gcs_object, 
temp_file.name, gzip=self.gzip)
+                    
uploaded_gcs_uris.append(f"gs://{dest_gcs_bucket}/{dest_gcs_object}")
             self.log.info("All done, uploaded %d files to Google Cloud 
Storage.", len(files))
         else:
             self.log.info("There are no new files to sync. Have a nice day!")
             self.log.info("In sync, no files needed to be uploaded to Google 
Cloud Storage")
 
-        return files
+        if not self.return_gcs_uris:
+            return files
+        return uploaded_gcs_uris
diff --git 
a/providers/google/tests/system/google/cloud/azure/example_azure_fileshare_to_gcs.py
 
b/providers/google/tests/system/google/cloud/azure/example_azure_fileshare_to_gcs.py
index d03c493c8dd..c62db075d66 100644
--- 
a/providers/google/tests/system/google/cloud/azure/example_azure_fileshare_to_gcs.py
+++ 
b/providers/google/tests/system/google/cloud/azure/example_azure_fileshare_to_gcs.py
@@ -62,7 +62,7 @@ with DAG(
     sync_azure_files_with_gcs = AzureFileShareToGCSOperator(
         task_id="sync_azure_files_with_gcs",
         share_name=AZURE_SHARE_NAME,
-        dest_gcs=BUCKET_NAME,
+        dest_gcs=f"gs://{BUCKET_NAME}/",
         directory_path=AZURE_DIRECTORY_PATH,
         replace=False,
         gzip=True,
diff --git 
a/providers/google/tests/unit/google/cloud/transfers/test_azure_blob_to_gcs.py 
b/providers/google/tests/unit/google/cloud/transfers/test_azure_blob_to_gcs.py
index aa575eb9df4..ae3300af988 100644
--- 
a/providers/google/tests/unit/google/cloud/transfers/test_azure_blob_to_gcs.py
+++ 
b/providers/google/tests/unit/google/cloud/transfers/test_azure_blob_to_gcs.py
@@ -18,8 +18,12 @@ from __future__ import annotations
 
 from unittest import mock
 
+import pytest
+
 from airflow.providers.google.cloud.transfers.azure_blob_to_gcs import 
AzureBlobStorageToGCSOperator
 
+pytestmark = pytest.mark.filterwarnings("ignore::FutureWarning")
+
 WASB_CONN_ID = "wasb_default"
 GCP_CONN_ID = "google_cloud_default"
 BLOB_NAME = "azure_blob"
@@ -57,10 +61,17 @@ class TestAzureBlobStorageToGCSTransferOperator:
         assert operator.impersonation_chain == IMPERSONATION_CHAIN
         assert operator.task_id == TASK_ID
 
+    @pytest.mark.parametrize(
+        ("unwrap_single", "expected"),
+        [
+            (True, f"gs://{BUCKET_NAME}/{OBJECT_NAME}"),
+            (False, [f"gs://{BUCKET_NAME}/{OBJECT_NAME}"]),
+        ],
+    )
     
@mock.patch("airflow.providers.google.cloud.transfers.azure_blob_to_gcs.WasbHook")
     
@mock.patch("airflow.providers.google.cloud.transfers.azure_blob_to_gcs.GCSHook")
     
@mock.patch("airflow.providers.google.cloud.transfers.azure_blob_to_gcs.tempfile")
-    def test_execute(self, mock_temp, mock_hook_gcs, mock_hook_wasb):
+    def test_execute(self, mock_temp, mock_hook_gcs, mock_hook_wasb, 
unwrap_single, expected):
         op = AzureBlobStorageToGCSOperator(
             wasb_conn_id=WASB_CONN_ID,
             gcp_conn_id=GCP_CONN_ID,
@@ -71,10 +82,11 @@ class TestAzureBlobStorageToGCSTransferOperator:
             filename=FILENAME,
             gzip=GZIP,
             impersonation_chain=IMPERSONATION_CHAIN,
+            unwrap_single=unwrap_single,
             task_id=TASK_ID,
         )
 
-        op.execute(context=None)
+        result = op.execute(context=None)
         mock_hook_wasb.assert_called_once_with(wasb_conn_id=WASB_CONN_ID)
 
         mock_hook_wasb.return_value.get_file.assert_called_once_with(
@@ -91,6 +103,7 @@ class TestAzureBlobStorageToGCSTransferOperator:
             gzip=GZIP,
             
filename=mock_temp.NamedTemporaryFile.return_value.__enter__.return_value.name,
         )
+        assert result == expected
 
     
@mock.patch("airflow.providers.google.cloud.transfers.azure_blob_to_gcs.WasbHook")
     def test_execute_single_file_transfer_openlineage(self, mock_hook_wasb):
diff --git 
a/providers/google/tests/unit/google/cloud/transfers/test_azure_fileshare_to_gcs.py
 
b/providers/google/tests/unit/google/cloud/transfers/test_azure_fileshare_to_gcs.py
index 39b5bb62f38..a70d7cc709b 100644
--- 
a/providers/google/tests/unit/google/cloud/transfers/test_azure_fileshare_to_gcs.py
+++ 
b/providers/google/tests/unit/google/cloud/transfers/test_azure_fileshare_to_gcs.py
@@ -18,8 +18,12 @@ from __future__ import annotations
 
 from unittest import mock
 
+import pytest
+
 from airflow.providers.google.cloud.transfers.azure_fileshare_to_gcs import 
AzureFileShareToGCSOperator
 
+pytestmark = pytest.mark.filterwarnings("ignore::FutureWarning")
+
 TASK_ID = "test-azure-fileshare-to-gcs"
 AZURE_FILESHARE_SHARE = "test-share"
 AZURE_FILESHARE_DIRECTORY_PATH = "/path/to/dir"
@@ -52,9 +56,10 @@ class TestAzureFileShareToGCSOperator:
         assert operator.dest_gcs == GCS_PATH_PREFIX
         assert operator.google_impersonation_chain == IMPERSONATION_CHAIN
 
+    @pytest.mark.parametrize("return_gcs_uris", [True, False])
     
@mock.patch("airflow.providers.google.cloud.transfers.azure_fileshare_to_gcs.AzureFileShareHook")
     
@mock.patch("airflow.providers.google.cloud.transfers.azure_fileshare_to_gcs.GCSHook")
-    def test_execute(self, gcs_mock_hook, azure_fileshare_mock_hook):
+    def test_execute(self, gcs_mock_hook, azure_fileshare_mock_hook, 
return_gcs_uris):
         """Test the execute function when the run is successful."""
 
         operator = AzureFileShareToGCSOperator(
@@ -65,6 +70,7 @@ class TestAzureFileShareToGCSOperator:
             gcp_conn_id=GCS_CONN_ID,
             dest_gcs=GCS_PATH_PREFIX,
             google_impersonation_chain=IMPERSONATION_CHAIN,
+            return_gcs_uris=return_gcs_uris,
         )
 
         azure_fileshare_mock_hook.return_value.list_files.return_value = 
MOCK_FILES
@@ -87,7 +93,12 @@ class TestAzureFileShareToGCSOperator:
             impersonation_chain=IMPERSONATION_CHAIN,
         )
 
-        assert sorted(MOCK_FILES) == sorted(uploaded_files)
+        expected_files = (
+            [f"gs://gcs-bucket/data/{file_name}" for file_name in MOCK_FILES]
+            if return_gcs_uris
+            else MOCK_FILES
+        )
+        assert sorted(expected_files) == sorted(uploaded_files)
 
     
@mock.patch("airflow.providers.google.cloud.transfers.azure_fileshare_to_gcs.AzureFileShareHook")
     
@mock.patch("airflow.providers.google.cloud.transfers.azure_fileshare_to_gcs.GCSHook")

Reply via email to