This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 76bed661465 Add GCSToAzureBlobStorageOperator for GCS to Azure Blob 
transfer (#64966)
76bed661465 is described below

commit 76bed6614658e6c436aa61854475bd1d67e0b5aa
Author: Yuseok Jo <[email protected]>
AuthorDate: Tue Jun 9 19:54:11 2026 +0900

    Add GCSToAzureBlobStorageOperator for GCS to Azure Blob transfer (#64966)
    
    * Add GCSToAzureBlobStorageOperator for GCS to Azure Blob transfer
    
    * Fix CI
    
    * Fix CI error
    
    * Replace AirflowException with ValueError
    
    * Update 
providers/microsoft/azure/src/airflow/providers/microsoft/azure/transfers/gcs_to_wasb.py
    
    Co-authored-by: Jarek Potiuk <[email protected]>
    
    * Update 
providers/microsoft/azure/src/airflow/providers/microsoft/azure/transfers/gcs_to_wasb.py
    
    Co-authored-by: Jarek Potiuk <[email protected]>
    
    * Avoid name mangling on _is_match_glob_supported attribute
    
    * Remove redundant _transform_file_path filter clause
    
    * Fix self.blob_prefix mutation on execute() retry
    
    * Fix silent overwrite of colliding blobs when replace=True
    
    * Fix Windows backslash in Azure blob names
    
    ---------
    
    Co-authored-by: Jarek Potiuk <[email protected]>
---
 dev/breeze/tests/test_selective_checks.py          |   4 +-
 providers/microsoft/azure/docs/index.rst           |   2 +
 .../microsoft/azure/docs/transfer/gcs_to_wasb.rst  |  69 ++++++
 providers/microsoft/azure/provider.yaml            |   4 +
 providers/microsoft/azure/pyproject.toml           |   8 +
 .../providers/microsoft/azure/get_provider_info.py |   6 +
 .../microsoft/azure/transfers/gcs_to_wasb.py       | 261 +++++++++++++++++++++
 .../microsoft/azure/transfers/test_gcs_to_wasb.py  | 259 ++++++++++++++++++++
 uv.lock                                            |  14 +-
 9 files changed, 624 insertions(+), 3 deletions(-)

diff --git a/dev/breeze/tests/test_selective_checks.py 
b/dev/breeze/tests/test_selective_checks.py
index f11438e0440..622041285ac 100644
--- a/dev/breeze/tests/test_selective_checks.py
+++ b/dev/breeze/tests/test_selective_checks.py
@@ -2484,7 +2484,7 @@ def test_expected_output_push(
             ),
             {
                 "selected-providers-list-as-string": "amazon common.compat 
common.io common.sql "
-                "databricks dbt.cloud ftp google jdbc microsoft.mssql mysql "
+                "databricks dbt.cloud ftp google jdbc microsoft.azure 
microsoft.mssql mysql "
                 "openlineage oracle postgres sftp snowflake standard trino",
                 "all-python-versions": 
f"['{DEFAULT_PYTHON_MAJOR_MINOR_VERSION}']",
                 "all-python-versions-list-as-string": 
DEFAULT_PYTHON_MAJOR_MINOR_VERSION,
@@ -2503,7 +2503,7 @@ def test_expected_output_push(
                         {
                             "description": "amazon...standard",
                             "test_types": "Providers[amazon] 
Providers[common.compat,common.io,common.sql,"
-                            
"databricks,dbt.cloud,ftp,jdbc,microsoft.mssql,mysql,openlineage,oracle,"
+                            
"databricks,dbt.cloud,ftp,jdbc,microsoft.azure,microsoft.mssql,mysql,openlineage,oracle,"
                             "postgres,sftp,snowflake,trino] Providers[google] 
Providers[standard]",
                         }
                     ]
diff --git a/providers/microsoft/azure/docs/index.rst 
b/providers/microsoft/azure/docs/index.rst
index c327fa52a3a..ba8d8c74df3 100644
--- a/providers/microsoft/azure/docs/index.rst
+++ b/providers/microsoft/azure/docs/index.rst
@@ -159,6 +159,8 @@ Dependent package
 `apache-airflow-providers-amazon 
<https://airflow.apache.org/docs/apache-airflow-providers-amazon>`_             
         ``amazon``
 `apache-airflow-providers-common-compat 
<https://airflow.apache.org/docs/apache-airflow-providers-common-compat>`_      
  ``common.compat``
 `apache-airflow-providers-common-messaging 
<https://airflow.apache.org/docs/apache-airflow-providers-common-messaging>`_  
``common.messaging``
+`apache-airflow-providers-google 
<https://airflow.apache.org/docs/apache-airflow-providers-google>`_             
         ``google``
+`apache-airflow-providers-openlineage 
<https://airflow.apache.org/docs/apache-airflow-providers-openlineage>`_        
    ``openlineage``
 `apache-airflow-providers-oracle 
<https://airflow.apache.org/docs/apache-airflow-providers-oracle>`_             
         ``oracle``
 `apache-airflow-providers-sftp 
<https://airflow.apache.org/docs/apache-airflow-providers-sftp>`_               
           ``sftp``
 
========================================================================================================================
  ====================
diff --git a/providers/microsoft/azure/docs/transfer/gcs_to_wasb.rst 
b/providers/microsoft/azure/docs/transfer/gcs_to_wasb.rst
new file mode 100644
index 00000000000..0ff1cbadbb2
--- /dev/null
+++ b/providers/microsoft/azure/docs/transfer/gcs_to_wasb.rst
@@ -0,0 +1,69 @@
+
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+=====================================================
+Google Cloud Storage to Azure Blob Storage transfer
+=====================================================
+
+`Google Cloud Storage <https://cloud.google.com/storage/>`__ and
+`Azure Blob Storage <https://learn.microsoft.com/en-us/azure/storage/blobs/>`__
+are object stores commonly used for data lakes and file exchange.
+This guide describes copying objects from GCS into an Azure Blob container.
+
+Install the optional dependency when using this operator:
+
+.. code-block:: bash
+
+    pip install 'apache-airflow-providers-microsoft-azure[google]'
+
+Prerequisite Tasks
+------------------
+
+.. include:: ../operators/_partials/prerequisite_tasks.rst
+
+.. _howto/operator:GCSToAzureBlobStorageOperator:
+
+Operator
+--------
+
+Use 
:class:`~airflow.providers.microsoft.azure.transfers.gcs_to_wasb.GCSToAzureBlobStorageOperator`
+to list objects under a GCS ``prefix`` and upload them to a container using 
``blob_prefix`` as the base path.
+Use ``keep_directory_structure`` and ``flatten_structure`` the same way as
+:class:`~airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSToS3Operator` 
(``flatten_structure`` wins when both apply).
+Object keys ending with ``/`` (GCS console folder markers) are not copied.
+
+Example:
+
+.. code-block:: python
+
+    copy_gcs_to_azure = GCSToAzureBlobStorageOperator(
+        task_id="gcs_to_azure_blob",
+        gcs_bucket="my-gcs-bucket",
+        prefix="exports/daily/",
+        container_name="my-container",
+        blob_prefix="imports/daily",
+        gcp_conn_id="google_cloud_default",
+        wasb_conn_id="wasb_default",
+        replace=True,
+    )
+
+Reference
+---------
+
+* `Google Cloud Storage Python client 
<https://cloud.google.com/python/docs/reference/storage/latest>`__
+* `Azure Blob Storage client library 
<https://learn.microsoft.com/en-us/python/api/overview/azure/storage-blob-readme?view=azure-python>`__
diff --git a/providers/microsoft/azure/provider.yaml 
b/providers/microsoft/azure/provider.yaml
index 084483da2ee..81be326acba 100644
--- a/providers/microsoft/azure/provider.yaml
+++ b/providers/microsoft/azure/provider.yaml
@@ -362,6 +362,10 @@ transfers:
     target-integration-name: Microsoft Azure Blob Storage
     how-to-guide: 
/docs/apache-airflow-providers-microsoft-azure/transfer/s3_to_wasb.rst
     python-module: airflow.providers.microsoft.azure.transfers.s3_to_wasb
+  - source-integration-name: Google Cloud Storage (GCS)
+    target-integration-name: Microsoft Azure Blob Storage
+    how-to-guide: 
/docs/apache-airflow-providers-microsoft-azure/transfer/gcs_to_wasb.rst
+    python-module: airflow.providers.microsoft.azure.transfers.gcs_to_wasb
 
 
 connection-types:
diff --git a/providers/microsoft/azure/pyproject.toml 
b/providers/microsoft/azure/pyproject.toml
index e4b9a070f3d..d30d0950e52 100644
--- a/providers/microsoft/azure/pyproject.toml
+++ b/providers/microsoft/azure/pyproject.toml
@@ -113,6 +113,12 @@ dependencies = [
 "common.messaging" = [
     "apache-airflow-providers-common-messaging>=2.0.0"
 ]
+"google" = [
+    "apache-airflow-providers-google"
+]
+"openlineage" = [
+    "apache-airflow-providers-openlineage>=2.3.0"
+]
 
 [dependency-groups]
 dev = [
@@ -122,6 +128,8 @@ dev = [
     "apache-airflow-providers-amazon",
     "apache-airflow-providers-common-compat",
     "apache-airflow-providers-common-messaging",
+    "apache-airflow-providers-google",
+    "apache-airflow-providers-openlineage",
     "apache-airflow-providers-oracle",
     "apache-airflow-providers-sftp",
     # Additional devel dependencies (do not remove this line and add extra 
development dependencies)
diff --git 
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/get_provider_info.py
 
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/get_provider_info.py
index 40364bf1845..ee2e388980f 100644
--- 
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/get_provider_info.py
+++ 
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/get_provider_info.py
@@ -352,6 +352,12 @@ def get_provider_info():
                 "how-to-guide": 
"/docs/apache-airflow-providers-microsoft-azure/transfer/s3_to_wasb.rst",
                 "python-module": 
"airflow.providers.microsoft.azure.transfers.s3_to_wasb",
             },
+            {
+                "source-integration-name": "Google Cloud Storage (GCS)",
+                "target-integration-name": "Microsoft Azure Blob Storage",
+                "how-to-guide": 
"/docs/apache-airflow-providers-microsoft-azure/transfer/gcs_to_wasb.rst",
+                "python-module": 
"airflow.providers.microsoft.azure.transfers.gcs_to_wasb",
+            },
         ],
         "connection-types": [
             {
diff --git 
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/transfers/gcs_to_wasb.py
 
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/transfers/gcs_to_wasb.py
new file mode 100644
index 00000000000..acdc0fa65ea
--- /dev/null
+++ 
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/transfers/gcs_to_wasb.py
@@ -0,0 +1,261 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains Google Cloud Storage to Azure Blob Storage operator."""
+
+from __future__ import annotations
+
+import posixpath
+from collections.abc import Sequence
+from typing import TYPE_CHECKING
+
+from packaging.version import Version
+
+from airflow.providers.common.compat.sdk import BaseOperator
+from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
+
+try:
+    from airflow.providers.google.cloud.hooks.gcs import GCSHook
+except ModuleNotFoundError as e:
+    from airflow.providers.common.compat.sdk import 
AirflowOptionalProviderFeatureException
+
+    raise AirflowOptionalProviderFeatureException(e)
+
+if TYPE_CHECKING:
+    from airflow.providers.openlineage.extractors import OperatorLineage
+    from airflow.sdk import Context
+
+
+class GCSToAzureBlobStorageOperator(BaseOperator):
+    """
+    Synchronizes objects from a Google Cloud Storage bucket to Azure Blob 
Storage.
+
+    .. note::
+        When ``flatten_structure=True``, it takes precedence over 
``keep_directory_structure``.
+        For example, with ``flatten_structure=True``, 
``folder/subfolder/file.txt`` becomes
+        ``file.txt`` regardless of the ``keep_directory_structure`` setting.
+
+        Objects whose names end with ``/`` (GCS console folder markers) and 
keys that become an
+        empty destination path after ``flatten_structure`` are skipped.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:GCSToAzureBlobStorageOperator`
+
+    :param gcs_bucket: The GCS bucket to list objects from. (templated)
+    :param prefix: Prefix to filter object names under the bucket. (templated)
+    :param gcp_conn_id: Airflow connection ID for Google Cloud.
+    :param google_impersonation_chain: Optional impersonation chain for GCP 
credentials.
+    :param gcp_user_project: Requester-pays billing project for GCS requests, 
if required.
+    :param match_glob: Optional glob filter for object names (requires
+        ``apache-airflow-providers-google>=10.3.0``).
+    :param container_name: Azure Blob container to upload into. (templated)
+    :param blob_prefix: Base blob path for uploaded objects. (templated)
+    :param wasb_conn_id: Airflow connection ID for Azure Blob Storage.
+    :param replace: If ``True``, overwrite existing blobs (``overwrite=True`` 
on upload) and
+        upload all listed objects. If ``False``, skip objects that already 
exist under
+        ``blob_prefix`` with the same relative path and pass 
``overwrite=False`` on upload.
+    :param keep_directory_structure: When ``False`` and ``prefix`` is set (and
+        ``flatten_structure`` is ``False``), append ``prefix`` to 
``blob_prefix``.
+    :param flatten_structure: If ``True``, upload each object using only its 
file name
+        under ``blob_prefix``. Takes precedence over 
``keep_directory_structure``.
+    :param create_container: If ``True``, create the container when missing 
before upload.
+    """
+
+    template_fields: Sequence[str] = (
+        "gcs_bucket",
+        "prefix",
+        "blob_prefix",
+        "container_name",
+        "google_impersonation_chain",
+        "gcp_user_project",
+        "match_glob",
+    )
+    ui_color = "#f0eee4"
+
+    def __init__(
+        self,
+        *,
+        gcs_bucket: str,
+        container_name: str,
+        blob_prefix: str = "",
+        prefix: str | None = None,
+        gcp_conn_id: str = "google_cloud_default",
+        google_impersonation_chain: str | Sequence[str] | None = None,
+        wasb_conn_id: str = "wasb_default",
+        replace: bool = False,
+        keep_directory_structure: bool = True,
+        flatten_structure: bool = False,
+        match_glob: str | None = None,
+        gcp_user_project: str | None = None,
+        create_container: bool = False,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.gcs_bucket = gcs_bucket
+        self.prefix = prefix
+        self.gcp_conn_id = gcp_conn_id
+        self.google_impersonation_chain = google_impersonation_chain
+        self.container_name = container_name
+        self.blob_prefix = blob_prefix
+        self.wasb_conn_id = wasb_conn_id
+        self.replace = replace
+        self.keep_directory_structure = keep_directory_structure
+        self.flatten_structure = flatten_structure
+        self.gcp_user_project = gcp_user_project
+        self.create_container = create_container
+
+        if self.flatten_structure and self.keep_directory_structure:
+            self.log.warning("flatten_structure=True takes precedence over 
keep_directory_structure=True")
+
+        try:
+            from airflow.providers.google import __version__ as 
_GOOGLE_PROVIDER_VERSION
+
+            if Version(_GOOGLE_PROVIDER_VERSION) >= Version("10.3.0"):
+                self._is_match_glob_supported = True
+            else:
+                self._is_match_glob_supported = False
+        except ImportError:
+            self._is_match_glob_supported = False
+        if not self._is_match_glob_supported and match_glob:
+            raise ValueError("The 'match_glob' parameter requires 
'apache-airflow-providers-google>=10.3.0'.")
+        self.match_glob = match_glob
+
+    def _transform_file_path(self, file_path: str) -> str:
+        """
+        Transform the GCS file path according to the specified options.
+
+        :param file_path: The original GCS file path
+        :return: The transformed file path for Azure Blob destination
+        """
+        if self.flatten_structure:
+            return posixpath.basename(file_path)
+        return file_path
+
+    @staticmethod
+    def _should_skip_gcs_object(name: str) -> bool:
+        """
+        Return True if this object name should not be copied.
+
+        The GCS console creates zero-byte "folder" markers whose keys end with 
``/``.
+        Those yield an empty basename when ``flatten_structure=True`` and odd 
blob paths
+        on Azure when copied as-is.
+        """
+        if not name or not name.strip():
+            return True
+        return name.endswith("/")
+
+    def execute(self, context: Context) -> list[str]:
+        gcs_hook = GCSHook(
+            gcp_conn_id=self.gcp_conn_id,
+            impersonation_chain=self.google_impersonation_chain,
+        )
+        wasb_hook = WasbHook(wasb_conn_id=self.wasb_conn_id)
+
+        self.log.info(
+            "Getting list of the files. Bucket: %s; Prefix: %s",
+            self.gcs_bucket,
+            self.prefix,
+        )
+
+        list_kwargs: dict = {
+            "bucket_name": self.gcs_bucket,
+            "prefix": self.prefix,
+            "user_project": self.gcp_user_project,
+        }
+        if self._is_match_glob_supported:
+            list_kwargs["match_glob"] = self.match_glob
+
+        gcs_files = gcs_hook.list(**list_kwargs)  # type: ignore[call-arg]
+
+        gcs_files = [f for f in gcs_files if not 
self._should_skip_gcs_object(f)]
+
+        blob_prefix = self.blob_prefix
+        if not self.keep_directory_structure and self.prefix and not 
self.flatten_structure:
+            blob_prefix = posixpath.join(blob_prefix, self.prefix)
+
+        existing_blobs_set: set[str] = set()
+        if not self.replace:
+            existing_blobs = (
+                wasb_hook.get_blobs_list_recursive(
+                    container_name=self.container_name,
+                    prefix=blob_prefix or None,
+                )
+                or []
+            )
+            if blob_prefix:
+                prefix_str = blob_prefix.rstrip("/") + "/"
+                existing_blobs = [b.removeprefix(prefix_str) for b in 
existing_blobs]
+            existing_blobs_set = set(existing_blobs)
+
+        filtered_files: list[str] = []
+        seen_transformed: set[str] = set()
+        for file in gcs_files:
+            transformed = self._transform_file_path(file)
+            if transformed in existing_blobs_set:
+                continue
+            if transformed in seen_transformed:
+                self.log.warning(
+                    "Skipping duplicate file %s (transforms to %s)",
+                    file,
+                    transformed,
+                )
+                continue
+            filtered_files.append(file)
+            seen_transformed.add(transformed)
+
+        gcs_files = filtered_files
+
+        uploaded_blobs: list[str] = []
+        if gcs_files:
+            for file in gcs_files:
+                with gcs_hook.provide_file(
+                    object_name=file, bucket_name=str(self.gcs_bucket), 
user_project=self.gcp_user_project
+                ) as local_tmp_file:
+                    transformed_path = self._transform_file_path(file)
+                    dest_blob = posixpath.join(blob_prefix, transformed_path)
+                    self.log.info("Saving file from %s to %s", file, dest_blob)
+                    wasb_hook.load_file(
+                        file_path=local_tmp_file.name,
+                        container_name=self.container_name,
+                        blob_name=dest_blob,
+                        create_container=self.create_container,
+                        overwrite=self.replace,
+                    )
+                    uploaded_blobs.append(dest_blob)
+            self.log.info("All done, uploaded %d files to Azure Blob Storage", 
len(uploaded_blobs))
+        else:
+            self.log.info("In sync, no files needed to be uploaded to Azure 
Blob Storage")
+
+        return uploaded_blobs
+
+    def get_openlineage_facets_on_start(self) -> OperatorLineage:
+        from airflow.providers.common.compat.openlineage.facet import Dataset
+        from airflow.providers.openlineage.extractors import OperatorLineage
+
+        wasb_hook = WasbHook(wasb_conn_id=self.wasb_conn_id)
+        account_name = wasb_hook.get_conn().account_name
+
+        return OperatorLineage(
+            inputs=[Dataset(namespace=f"gs://{self.gcs_bucket}", 
name=self.prefix or "/")],
+            outputs=[
+                Dataset(
+                    namespace=f"wasbs://{self.container_name}@{account_name}",
+                    name=self.blob_prefix or "/",
+                )
+            ],
+        )
diff --git 
a/providers/microsoft/azure/tests/unit/microsoft/azure/transfers/test_gcs_to_wasb.py
 
b/providers/microsoft/azure/tests/unit/microsoft/azure/transfers/test_gcs_to_wasb.py
new file mode 100644
index 00000000000..27d61b0c2ae
--- /dev/null
+++ 
b/providers/microsoft/azure/tests/unit/microsoft/azure/transfers/test_gcs_to_wasb.py
@@ -0,0 +1,259 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import sys
+from types import ModuleType
+from unittest import mock
+
+import pytest
+
+from airflow.providers.microsoft.azure.transfers.gcs_to_wasb import 
GCSToAzureBlobStorageOperator
+
+TASK_ID = "test-gcs-to-azure-blob"
+GCS_BUCKET = "gcs-bucket"
+CONTAINER = "container"
+BLOB_PREFIX = "dest/prefix"
+GCS_OBJECTS = ["data/a.txt", "data/b.txt"]
+
+
+class TestGCSToAzureBlobStorageOperator:
+    def test_init_defaults(self):
+        op = GCSToAzureBlobStorageOperator(
+            task_id=TASK_ID,
+            gcs_bucket=GCS_BUCKET,
+            container_name=CONTAINER,
+            blob_prefix=BLOB_PREFIX,
+        )
+        assert op.gcp_conn_id == "google_cloud_default"
+        assert op.wasb_conn_id == "wasb_default"
+        assert op.replace is False
+        assert op.keep_directory_structure is True
+        assert op.flatten_structure is False
+        assert op.create_container is False
+
+    @mock.patch("airflow.providers.google.__version__", "10.2.0")
+    def test_match_glob_requires_recent_google_provider(self):
+        with pytest.raises(ValueError, match="match_glob"):
+            GCSToAzureBlobStorageOperator(
+                task_id=TASK_ID,
+                gcs_bucket=GCS_BUCKET,
+                container_name=CONTAINER,
+                match_glob="**/*.csv",
+            )
+
+    
@mock.patch("airflow.providers.microsoft.azure.transfers.gcs_to_wasb.WasbHook")
+    
@mock.patch("airflow.providers.microsoft.azure.transfers.gcs_to_wasb.GCSHook")
+    def test_execute_replace_uploads_all(self, mock_gcs_hook, mock_wasb_hook):
+        mock_gcs_hook.return_value.list.return_value = list(GCS_OBJECTS)
+        mock_file = mock.Mock()
+        mock_file.name = "/tmp/local"
+        mock_gcs_hook.return_value.provide_file.return_value.__enter__ = 
mock.Mock(return_value=mock_file)
+        mock_gcs_hook.return_value.provide_file.return_value.__exit__ = 
mock.Mock(return_value=None)
+
+        op = GCSToAzureBlobStorageOperator(
+            task_id=TASK_ID,
+            gcs_bucket=GCS_BUCKET,
+            prefix="data/",
+            container_name=CONTAINER,
+            blob_prefix=BLOB_PREFIX,
+            replace=True,
+        )
+        result = op.execute(context=None)
+
+        assert result == [f"{BLOB_PREFIX}/data/a.txt", 
f"{BLOB_PREFIX}/data/b.txt"]
+        assert mock_wasb_hook.return_value.load_file.call_count == 2
+        first_kw = 
mock_wasb_hook.return_value.load_file.call_args_list[0].kwargs
+        assert first_kw["container_name"] == CONTAINER
+        assert first_kw["blob_name"] == f"{BLOB_PREFIX}/data/a.txt"
+        assert first_kw["overwrite"] is True
+
+    
@mock.patch("airflow.providers.microsoft.azure.transfers.gcs_to_wasb.WasbHook")
+    
@mock.patch("airflow.providers.microsoft.azure.transfers.gcs_to_wasb.GCSHook")
+    def test_execute_returns_empty_when_no_objects(self, mock_gcs_hook, 
mock_wasb_hook):
+        mock_gcs_hook.return_value.list.return_value = []
+
+        op = GCSToAzureBlobStorageOperator(
+            task_id=TASK_ID,
+            gcs_bucket=GCS_BUCKET,
+            container_name=CONTAINER,
+            blob_prefix=BLOB_PREFIX,
+            replace=True,
+        )
+        assert op.execute(context=None) == []
+        mock_wasb_hook.return_value.load_file.assert_not_called()
+
+    
@mock.patch("airflow.providers.microsoft.azure.transfers.gcs_to_wasb.WasbHook")
+    
@mock.patch("airflow.providers.microsoft.azure.transfers.gcs_to_wasb.GCSHook")
+    def test_execute_skips_gcs_directory_placeholder_keys(self, mock_gcs_hook, 
mock_wasb_hook):
+        mock_gcs_hook.return_value.list.return_value = ["src/airflow.png", 
"src/"]
+        mock_file = mock.Mock()
+        mock_file.name = "/tmp/local"
+        mock_gcs_hook.return_value.provide_file.return_value.__enter__ = 
mock.Mock(return_value=mock_file)
+        mock_gcs_hook.return_value.provide_file.return_value.__exit__ = 
mock.Mock(return_value=None)
+
+        op = GCSToAzureBlobStorageOperator(
+            task_id=TASK_ID,
+            gcs_bucket=GCS_BUCKET,
+            container_name=CONTAINER,
+            blob_prefix="",
+            replace=True,
+        )
+        result = op.execute(context=None)
+
+        assert result == ["src/airflow.png"]
+        mock_wasb_hook.return_value.load_file.assert_called_once()
+        assert 
mock_wasb_hook.return_value.load_file.call_args.kwargs["blob_name"] == 
"src/airflow.png"
+        assert 
mock_wasb_hook.return_value.load_file.call_args.kwargs["overwrite"] is True
+
+    
@mock.patch("airflow.providers.microsoft.azure.transfers.gcs_to_wasb.WasbHook")
+    
@mock.patch("airflow.providers.microsoft.azure.transfers.gcs_to_wasb.GCSHook")
+    def test_execute_skip_existing_when_replace_false(self, mock_gcs_hook, 
mock_wasb_hook):
+        mock_gcs_hook.return_value.list.return_value = ["data/a.txt", 
"data/b.txt"]
+        mock_wasb_hook.return_value.get_blobs_list_recursive.return_value = [
+            f"{BLOB_PREFIX}/data/a.txt",
+        ]
+        mock_file = mock.Mock()
+        mock_file.name = "/tmp/local"
+        mock_gcs_hook.return_value.provide_file.return_value.__enter__ = 
mock.Mock(return_value=mock_file)
+        mock_gcs_hook.return_value.provide_file.return_value.__exit__ = 
mock.Mock(return_value=None)
+
+        op = GCSToAzureBlobStorageOperator(
+            task_id=TASK_ID,
+            gcs_bucket=GCS_BUCKET,
+            container_name=CONTAINER,
+            blob_prefix=BLOB_PREFIX,
+            replace=False,
+        )
+        result = op.execute(context=None)
+
+        assert result == [f"{BLOB_PREFIX}/data/b.txt"]
+        mock_wasb_hook.return_value.load_file.assert_called_once()
+        assert 
mock_wasb_hook.return_value.load_file.call_args.kwargs["overwrite"] is False
+
+    
@mock.patch("airflow.providers.microsoft.azure.transfers.gcs_to_wasb.WasbHook")
+    
@mock.patch("airflow.providers.microsoft.azure.transfers.gcs_to_wasb.GCSHook")
+    def test_flatten_structure(self, mock_gcs_hook, mock_wasb_hook):
+        mock_gcs_hook.return_value.list.return_value = ["data/a.txt"]
+        mock_file = mock.Mock()
+        mock_file.name = "/tmp/local"
+        mock_gcs_hook.return_value.provide_file.return_value.__enter__ = 
mock.Mock(return_value=mock_file)
+        mock_gcs_hook.return_value.provide_file.return_value.__exit__ = 
mock.Mock(return_value=None)
+
+        op = GCSToAzureBlobStorageOperator(
+            task_id=TASK_ID,
+            gcs_bucket=GCS_BUCKET,
+            container_name=CONTAINER,
+            blob_prefix=BLOB_PREFIX,
+            replace=True,
+            flatten_structure=True,
+        )
+        result = op.execute(context=None)
+
+        assert result == [f"{BLOB_PREFIX}/a.txt"]
+        kw = mock_wasb_hook.return_value.load_file.call_args.kwargs
+        assert kw["blob_name"] == f"{BLOB_PREFIX}/a.txt"
+        assert kw["overwrite"] is True
+
+    
@mock.patch("airflow.providers.microsoft.azure.transfers.gcs_to_wasb.WasbHook")
+    
@mock.patch("airflow.providers.microsoft.azure.transfers.gcs_to_wasb.GCSHook")
+    def test_execute_dedups_collisions_when_replace_true(self, mock_gcs_hook, 
mock_wasb_hook):
+        mock_gcs_hook.return_value.list.return_value = ["a/file.txt", 
"b/file.txt"]
+        mock_file = mock.Mock()
+        mock_file.name = "/tmp/local"
+        mock_gcs_hook.return_value.provide_file.return_value.__enter__ = 
mock.Mock(return_value=mock_file)
+        mock_gcs_hook.return_value.provide_file.return_value.__exit__ = 
mock.Mock(return_value=None)
+
+        op = GCSToAzureBlobStorageOperator(
+            task_id=TASK_ID,
+            gcs_bucket=GCS_BUCKET,
+            container_name=CONTAINER,
+            blob_prefix=BLOB_PREFIX,
+            replace=True,
+            flatten_structure=True,
+        )
+        result = op.execute(context=None)
+
+        assert result == [f"{BLOB_PREFIX}/file.txt"]
+        mock_wasb_hook.return_value.load_file.assert_called_once()
+
+    
@mock.patch("airflow.providers.microsoft.azure.transfers.gcs_to_wasb.WasbHook")
+    
@mock.patch("airflow.providers.microsoft.azure.transfers.gcs_to_wasb.GCSHook")
+    def test_execute_is_idempotent_on_retry(self, mock_gcs_hook, 
mock_wasb_hook):
+        mock_gcs_hook.return_value.list.return_value = ["data/a.txt"]
+        mock_file = mock.Mock()
+        mock_file.name = "/tmp/local"
+        mock_gcs_hook.return_value.provide_file.return_value.__enter__ = 
mock.Mock(return_value=mock_file)
+        mock_gcs_hook.return_value.provide_file.return_value.__exit__ = 
mock.Mock(return_value=None)
+
+        op = GCSToAzureBlobStorageOperator(
+            task_id=TASK_ID,
+            gcs_bucket=GCS_BUCKET,
+            prefix="data/",
+            container_name=CONTAINER,
+            blob_prefix=BLOB_PREFIX,
+            replace=True,
+            keep_directory_structure=False,
+        )
+
+        first = op.execute(context=None)
+        second = op.execute(context=None)
+
+        assert first == second
+        assert op.blob_prefix == BLOB_PREFIX
+
+    
@mock.patch("airflow.providers.microsoft.azure.transfers.gcs_to_wasb.WasbHook")
+    
@mock.patch("airflow.providers.microsoft.azure.transfers.gcs_to_wasb.GCSHook")
+    def test_openlineage_facets(self, mock_gcs_hook, mock_wasb_hook):
+        injected: list[str] = []
+        if "airflow.providers.openlineage.extractors" not in sys.modules:
+            ol_pkg = ModuleType("airflow.providers.openlineage")
+            ol_ext = ModuleType("airflow.providers.openlineage.extractors")
+
+            class _OperatorLineage:
+                __slots__ = ("inputs", "outputs")
+
+                def __init__(self, *, inputs=None, outputs=None):
+                    self.inputs = inputs
+                    self.outputs = outputs
+
+            ol_ext.OperatorLineage = _OperatorLineage
+            sys.modules["airflow.providers.openlineage"] = ol_pkg
+            sys.modules["airflow.providers.openlineage.extractors"] = ol_ext
+            injected = [
+                "airflow.providers.openlineage.extractors",
+                "airflow.providers.openlineage",
+            ]
+
+        try:
+            mock_wasb_hook.return_value.get_conn.return_value.account_name = 
"mystorage"
+            op = GCSToAzureBlobStorageOperator(
+                task_id=TASK_ID,
+                gcs_bucket=GCS_BUCKET,
+                prefix="p/",
+                container_name=CONTAINER,
+                blob_prefix=BLOB_PREFIX,
+            )
+            lineage = op.get_openlineage_facets_on_start()
+            assert len(lineage.inputs) == 1
+            assert lineage.inputs[0].namespace == "gs://gcs-bucket"
+            assert "wasbs://" in lineage.outputs[0].namespace
+        finally:
+            for name in injected:
+                sys.modules.pop(name, None)
diff --git a/uv.lock b/uv.lock
index 4ca5b928c8a..8a0780cf8a5 100644
--- a/uv.lock
+++ b/uv.lock
@@ -6170,6 +6170,12 @@ amazon = [
 common-messaging = [
     { name = "apache-airflow-providers-common-messaging" },
 ]
+google = [
+    { name = "apache-airflow-providers-google" },
+]
+openlineage = [
+    { name = "apache-airflow-providers-openlineage" },
+]
 oracle = [
     { name = "apache-airflow-providers-oracle" },
 ]
@@ -6184,6 +6190,8 @@ dev = [
     { name = "apache-airflow-providers-amazon" },
     { name = "apache-airflow-providers-common-compat" },
     { name = "apache-airflow-providers-common-messaging" },
+    { name = "apache-airflow-providers-google" },
+    { name = "apache-airflow-providers-openlineage" },
     { name = "apache-airflow-providers-oracle" },
     { name = "apache-airflow-providers-sftp" },
     { name = "apache-airflow-task-sdk" },
@@ -6201,6 +6209,8 @@ requires-dist = [
     { name = "apache-airflow-providers-amazon", marker = "extra == 'amazon'", 
editable = "providers/amazon" },
     { name = "apache-airflow-providers-common-compat", editable = 
"providers/common/compat" },
     { name = "apache-airflow-providers-common-messaging", marker = "extra == 
'common-messaging'", editable = "providers/common/messaging" },
+    { name = "apache-airflow-providers-google", marker = "extra == 'google'", 
editable = "providers/google" },
+    { name = "apache-airflow-providers-openlineage", marker = "extra == 
'openlineage'", editable = "providers/openlineage" },
     { name = "apache-airflow-providers-oracle", marker = "extra == 'oracle'", 
editable = "providers/oracle" },
     { name = "apache-airflow-providers-sftp", marker = "extra == 'sftp'", 
editable = "providers/sftp" },
     { name = "azure-batch", specifier = ">=8.0.0,<15.0.0" },
@@ -6232,7 +6242,7 @@ requires-dist = [
     { name = "msgraph-core", specifier = ">=1.3.3" },
     { name = "msgraphfs", specifier = ">=0.3.0" },
 ]
-provides-extras = ["amazon", "oracle", "sftp", "common-messaging"]
+provides-extras = ["amazon", "oracle", "sftp", "common-messaging", "google", 
"openlineage"]
 
 [package.metadata.requires-dev]
 dev = [
@@ -6241,6 +6251,8 @@ dev = [
     { name = "apache-airflow-providers-amazon", editable = "providers/amazon" 
},
     { name = "apache-airflow-providers-common-compat", editable = 
"providers/common/compat" },
     { name = "apache-airflow-providers-common-messaging", editable = 
"providers/common/messaging" },
+    { name = "apache-airflow-providers-google", editable = "providers/google" 
},
+    { name = "apache-airflow-providers-openlineage", editable = 
"providers/openlineage" },
     { name = "apache-airflow-providers-oracle", editable = "providers/oracle" 
},
     { name = "apache-airflow-providers-sftp", editable = "providers/sftp" },
     { name = "apache-airflow-task-sdk", editable = "task-sdk" },


Reply via email to