This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 76bed661465 Add GCSToAzureBlobStorageOperator for GCS to Azure Blob
transfer (#64966)
76bed661465 is described below
commit 76bed6614658e6c436aa61854475bd1d67e0b5aa
Author: Yuseok Jo <[email protected]>
AuthorDate: Tue Jun 9 19:54:11 2026 +0900
Add GCSToAzureBlobStorageOperator for GCS to Azure Blob transfer (#64966)
* Add GCSToAzureBlobStorageOperator for GCS to Azure Blob transfer
* Fix CI
* Fix CI error
* Replace AirflowException with ValueError
* Update
providers/microsoft/azure/src/airflow/providers/microsoft/azure/transfers/gcs_to_wasb.py
Co-authored-by: Jarek Potiuk <[email protected]>
* Update
providers/microsoft/azure/src/airflow/providers/microsoft/azure/transfers/gcs_to_wasb.py
Co-authored-by: Jarek Potiuk <[email protected]>
* Avoid name mangling on _is_match_glob_supported attribute
* Remove redundant _transform_file_path filter clause
* Fix self.blob_prefix mutation on execute() retry
* Fix silent overwrite of colliding blobs when replace=True
* Fix Windows backslash in Azure blob names
---------
Co-authored-by: Jarek Potiuk <[email protected]>
---
dev/breeze/tests/test_selective_checks.py | 4 +-
providers/microsoft/azure/docs/index.rst | 2 +
.../microsoft/azure/docs/transfer/gcs_to_wasb.rst | 69 ++++++
providers/microsoft/azure/provider.yaml | 4 +
providers/microsoft/azure/pyproject.toml | 8 +
.../providers/microsoft/azure/get_provider_info.py | 6 +
.../microsoft/azure/transfers/gcs_to_wasb.py | 261 +++++++++++++++++++++
.../microsoft/azure/transfers/test_gcs_to_wasb.py | 259 ++++++++++++++++++++
uv.lock | 14 +-
9 files changed, 624 insertions(+), 3 deletions(-)
diff --git a/dev/breeze/tests/test_selective_checks.py
b/dev/breeze/tests/test_selective_checks.py
index f11438e0440..622041285ac 100644
--- a/dev/breeze/tests/test_selective_checks.py
+++ b/dev/breeze/tests/test_selective_checks.py
@@ -2484,7 +2484,7 @@ def test_expected_output_push(
),
{
"selected-providers-list-as-string": "amazon common.compat
common.io common.sql "
- "databricks dbt.cloud ftp google jdbc microsoft.mssql mysql "
+ "databricks dbt.cloud ftp google jdbc microsoft.azure
microsoft.mssql mysql "
"openlineage oracle postgres sftp snowflake standard trino",
"all-python-versions":
f"['{DEFAULT_PYTHON_MAJOR_MINOR_VERSION}']",
"all-python-versions-list-as-string":
DEFAULT_PYTHON_MAJOR_MINOR_VERSION,
@@ -2503,7 +2503,7 @@ def test_expected_output_push(
{
"description": "amazon...standard",
"test_types": "Providers[amazon]
Providers[common.compat,common.io,common.sql,"
-
"databricks,dbt.cloud,ftp,jdbc,microsoft.mssql,mysql,openlineage,oracle,"
+
"databricks,dbt.cloud,ftp,jdbc,microsoft.azure,microsoft.mssql,mysql,openlineage,oracle,"
"postgres,sftp,snowflake,trino] Providers[google]
Providers[standard]",
}
]
diff --git a/providers/microsoft/azure/docs/index.rst
b/providers/microsoft/azure/docs/index.rst
index c327fa52a3a..ba8d8c74df3 100644
--- a/providers/microsoft/azure/docs/index.rst
+++ b/providers/microsoft/azure/docs/index.rst
@@ -159,6 +159,8 @@ Dependent package
`apache-airflow-providers-amazon
<https://airflow.apache.org/docs/apache-airflow-providers-amazon>`_
``amazon``
`apache-airflow-providers-common-compat
<https://airflow.apache.org/docs/apache-airflow-providers-common-compat>`_
``common.compat``
`apache-airflow-providers-common-messaging
<https://airflow.apache.org/docs/apache-airflow-providers-common-messaging>`_
``common.messaging``
+`apache-airflow-providers-google
<https://airflow.apache.org/docs/apache-airflow-providers-google>`_
``google``
+`apache-airflow-providers-openlineage
<https://airflow.apache.org/docs/apache-airflow-providers-openlineage>`_
``openlineage``
`apache-airflow-providers-oracle
<https://airflow.apache.org/docs/apache-airflow-providers-oracle>`_
``oracle``
`apache-airflow-providers-sftp
<https://airflow.apache.org/docs/apache-airflow-providers-sftp>`_
``sftp``
========================================================================================================================
====================
diff --git a/providers/microsoft/azure/docs/transfer/gcs_to_wasb.rst
b/providers/microsoft/azure/docs/transfer/gcs_to_wasb.rst
new file mode 100644
index 00000000000..0ff1cbadbb2
--- /dev/null
+++ b/providers/microsoft/azure/docs/transfer/gcs_to_wasb.rst
@@ -0,0 +1,69 @@
+
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+=====================================================
+Google Cloud Storage to Azure Blob Storage transfer
+=====================================================
+
+`Google Cloud Storage <https://cloud.google.com/storage/>`__ and
+`Azure Blob Storage <https://learn.microsoft.com/en-us/azure/storage/blobs/>`__
+are object stores commonly used for data lakes and file exchange.
+This guide describes copying objects from GCS into an Azure Blob container.
+
+Install the optional dependency when using this operator:
+
+.. code-block:: bash
+
+ pip install 'apache-airflow-providers-microsoft-azure[google]'
+
+Prerequisite Tasks
+------------------
+
+.. include:: ../operators/_partials/prerequisite_tasks.rst
+
+.. _howto/operator:GCSToAzureBlobStorageOperator:
+
+Operator
+--------
+
+Use
:class:`~airflow.providers.microsoft.azure.transfers.gcs_to_wasb.GCSToAzureBlobStorageOperator`
+to list objects under a GCS ``prefix`` and upload them to a container using
``blob_prefix`` as the base path.
+Use ``keep_directory_structure`` and ``flatten_structure`` the same way as
+:class:`~airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSToS3Operator`
(``flatten_structure`` wins when both apply).
+Object keys ending with ``/`` (GCS console folder markers) are not copied.
+
+Example:
+
+.. code-block:: python
+
+ copy_gcs_to_azure = GCSToAzureBlobStorageOperator(
+ task_id="gcs_to_azure_blob",
+ gcs_bucket="my-gcs-bucket",
+ prefix="exports/daily/",
+ container_name="my-container",
+ blob_prefix="imports/daily",
+ gcp_conn_id="google_cloud_default",
+ wasb_conn_id="wasb_default",
+ replace=True,
+ )
+
+Reference
+---------
+
+* `Google Cloud Storage Python client
<https://cloud.google.com/python/docs/reference/storage/latest>`__
+* `Azure Blob Storage client library
<https://learn.microsoft.com/en-us/python/api/overview/azure/storage-blob-readme?view=azure-python>`__
diff --git a/providers/microsoft/azure/provider.yaml
b/providers/microsoft/azure/provider.yaml
index 084483da2ee..81be326acba 100644
--- a/providers/microsoft/azure/provider.yaml
+++ b/providers/microsoft/azure/provider.yaml
@@ -362,6 +362,10 @@ transfers:
target-integration-name: Microsoft Azure Blob Storage
how-to-guide:
/docs/apache-airflow-providers-microsoft-azure/transfer/s3_to_wasb.rst
python-module: airflow.providers.microsoft.azure.transfers.s3_to_wasb
+ - source-integration-name: Google Cloud Storage (GCS)
+ target-integration-name: Microsoft Azure Blob Storage
+ how-to-guide:
/docs/apache-airflow-providers-microsoft-azure/transfer/gcs_to_wasb.rst
+ python-module: airflow.providers.microsoft.azure.transfers.gcs_to_wasb
connection-types:
diff --git a/providers/microsoft/azure/pyproject.toml
b/providers/microsoft/azure/pyproject.toml
index e4b9a070f3d..d30d0950e52 100644
--- a/providers/microsoft/azure/pyproject.toml
+++ b/providers/microsoft/azure/pyproject.toml
@@ -113,6 +113,12 @@ dependencies = [
"common.messaging" = [
"apache-airflow-providers-common-messaging>=2.0.0"
]
+"google" = [
+ "apache-airflow-providers-google"
+]
+"openlineage" = [
+ "apache-airflow-providers-openlineage>=2.3.0"
+]
[dependency-groups]
dev = [
@@ -122,6 +128,8 @@ dev = [
"apache-airflow-providers-amazon",
"apache-airflow-providers-common-compat",
"apache-airflow-providers-common-messaging",
+ "apache-airflow-providers-google",
+ "apache-airflow-providers-openlineage",
"apache-airflow-providers-oracle",
"apache-airflow-providers-sftp",
# Additional devel dependencies (do not remove this line and add extra
development dependencies)
diff --git
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/get_provider_info.py
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/get_provider_info.py
index 40364bf1845..ee2e388980f 100644
---
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/get_provider_info.py
+++
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/get_provider_info.py
@@ -352,6 +352,12 @@ def get_provider_info():
"how-to-guide":
"/docs/apache-airflow-providers-microsoft-azure/transfer/s3_to_wasb.rst",
"python-module":
"airflow.providers.microsoft.azure.transfers.s3_to_wasb",
},
+ {
+ "source-integration-name": "Google Cloud Storage (GCS)",
+ "target-integration-name": "Microsoft Azure Blob Storage",
+ "how-to-guide":
"/docs/apache-airflow-providers-microsoft-azure/transfer/gcs_to_wasb.rst",
+ "python-module":
"airflow.providers.microsoft.azure.transfers.gcs_to_wasb",
+ },
],
"connection-types": [
{
diff --git
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/transfers/gcs_to_wasb.py
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/transfers/gcs_to_wasb.py
new file mode 100644
index 00000000000..acdc0fa65ea
--- /dev/null
+++
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/transfers/gcs_to_wasb.py
@@ -0,0 +1,261 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains Google Cloud Storage to Azure Blob Storage operator."""
+
+from __future__ import annotations
+
+import posixpath
+from collections.abc import Sequence
+from typing import TYPE_CHECKING
+
+from packaging.version import Version
+
+from airflow.providers.common.compat.sdk import BaseOperator
+from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
+
+try:
+ from airflow.providers.google.cloud.hooks.gcs import GCSHook
+except ModuleNotFoundError as e:
+ from airflow.providers.common.compat.sdk import
AirflowOptionalProviderFeatureException
+
+ raise AirflowOptionalProviderFeatureException(e)
+
+if TYPE_CHECKING:
+ from airflow.providers.openlineage.extractors import OperatorLineage
+ from airflow.sdk import Context
+
+
+class GCSToAzureBlobStorageOperator(BaseOperator):
+ """
+ Synchronizes objects from a Google Cloud Storage bucket to Azure Blob
Storage.
+
+ .. note::
+ When ``flatten_structure=True``, it takes precedence over
``keep_directory_structure``.
+ For example, with ``flatten_structure=True``,
``folder/subfolder/file.txt`` becomes
+ ``file.txt`` regardless of the ``keep_directory_structure`` setting.
+
+ Objects whose names end with ``/`` (GCS console folder markers) and
keys that become an
+ empty destination path after ``flatten_structure`` are skipped.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:GCSToAzureBlobStorageOperator`
+
+ :param gcs_bucket: The GCS bucket to list objects from. (templated)
+ :param prefix: Prefix to filter object names under the bucket. (templated)
+ :param gcp_conn_id: Airflow connection ID for Google Cloud.
+ :param google_impersonation_chain: Optional impersonation chain for GCP
credentials.
+ :param gcp_user_project: Requester-pays billing project for GCS requests,
if required.
+ :param match_glob: Optional glob filter for object names (requires
+ ``apache-airflow-providers-google>=10.3.0``).
+ :param container_name: Azure Blob container to upload into. (templated)
+ :param blob_prefix: Base blob path for uploaded objects. (templated)
+ :param wasb_conn_id: Airflow connection ID for Azure Blob Storage.
+ :param replace: If ``True``, overwrite existing blobs (``overwrite=True``
on upload) and
+ upload all listed objects. If ``False``, skip objects that already
exist under
+ ``blob_prefix`` with the same relative path and pass
``overwrite=False`` on upload.
+ :param keep_directory_structure: When ``False`` and ``prefix`` is set (and
+ ``flatten_structure`` is ``False``), append ``prefix`` to
``blob_prefix``.
+ :param flatten_structure: If ``True``, upload each object using only its
file name
+ under ``blob_prefix``. Takes precedence over
``keep_directory_structure``.
+ :param create_container: If ``True``, create the container when missing
before upload.
+ """
+
+ template_fields: Sequence[str] = (
+ "gcs_bucket",
+ "prefix",
+ "blob_prefix",
+ "container_name",
+ "google_impersonation_chain",
+ "gcp_user_project",
+ "match_glob",
+ )
+ ui_color = "#f0eee4"
+
+ def __init__(
+ self,
+ *,
+ gcs_bucket: str,
+ container_name: str,
+ blob_prefix: str = "",
+ prefix: str | None = None,
+ gcp_conn_id: str = "google_cloud_default",
+ google_impersonation_chain: str | Sequence[str] | None = None,
+ wasb_conn_id: str = "wasb_default",
+ replace: bool = False,
+ keep_directory_structure: bool = True,
+ flatten_structure: bool = False,
+ match_glob: str | None = None,
+ gcp_user_project: str | None = None,
+ create_container: bool = False,
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.gcs_bucket = gcs_bucket
+ self.prefix = prefix
+ self.gcp_conn_id = gcp_conn_id
+ self.google_impersonation_chain = google_impersonation_chain
+ self.container_name = container_name
+ self.blob_prefix = blob_prefix
+ self.wasb_conn_id = wasb_conn_id
+ self.replace = replace
+ self.keep_directory_structure = keep_directory_structure
+ self.flatten_structure = flatten_structure
+ self.gcp_user_project = gcp_user_project
+ self.create_container = create_container
+
+ if self.flatten_structure and self.keep_directory_structure:
+ self.log.warning("flatten_structure=True takes precedence over
keep_directory_structure=True")
+
+ try:
+ from airflow.providers.google import __version__ as
_GOOGLE_PROVIDER_VERSION
+
+ if Version(_GOOGLE_PROVIDER_VERSION) >= Version("10.3.0"):
+ self._is_match_glob_supported = True
+ else:
+ self._is_match_glob_supported = False
+ except ImportError:
+ self._is_match_glob_supported = False
+ if not self._is_match_glob_supported and match_glob:
+ raise ValueError("The 'match_glob' parameter requires
'apache-airflow-providers-google>=10.3.0'.")
+ self.match_glob = match_glob
+
+ def _transform_file_path(self, file_path: str) -> str:
+ """
+ Transform the GCS file path according to the specified options.
+
+ :param file_path: The original GCS file path
+ :return: The transformed file path for Azure Blob destination
+ """
+ if self.flatten_structure:
+ return posixpath.basename(file_path)
+ return file_path
+
+ @staticmethod
+ def _should_skip_gcs_object(name: str) -> bool:
+ """
+ Return True if this object name should not be copied.
+
+ The GCS console creates zero-byte "folder" markers whose keys end with
``/``.
+ Those yield an empty basename when ``flatten_structure=True`` and odd
blob paths
+ on Azure when copied as-is.
+ """
+ if not name or not name.strip():
+ return True
+ return name.endswith("/")
+
+ def execute(self, context: Context) -> list[str]:
+ gcs_hook = GCSHook(
+ gcp_conn_id=self.gcp_conn_id,
+ impersonation_chain=self.google_impersonation_chain,
+ )
+ wasb_hook = WasbHook(wasb_conn_id=self.wasb_conn_id)
+
+ self.log.info(
+ "Getting list of the files. Bucket: %s; Prefix: %s",
+ self.gcs_bucket,
+ self.prefix,
+ )
+
+ list_kwargs: dict = {
+ "bucket_name": self.gcs_bucket,
+ "prefix": self.prefix,
+ "user_project": self.gcp_user_project,
+ }
+ if self._is_match_glob_supported:
+ list_kwargs["match_glob"] = self.match_glob
+
+ gcs_files = gcs_hook.list(**list_kwargs) # type: ignore[call-arg]
+
+ gcs_files = [f for f in gcs_files if not
self._should_skip_gcs_object(f)]
+
+ blob_prefix = self.blob_prefix
+ if not self.keep_directory_structure and self.prefix and not
self.flatten_structure:
+ blob_prefix = posixpath.join(blob_prefix, self.prefix)
+
+ existing_blobs_set: set[str] = set()
+ if not self.replace:
+ existing_blobs = (
+ wasb_hook.get_blobs_list_recursive(
+ container_name=self.container_name,
+ prefix=blob_prefix or None,
+ )
+ or []
+ )
+ if blob_prefix:
+ prefix_str = blob_prefix.rstrip("/") + "/"
+ existing_blobs = [b.removeprefix(prefix_str) for b in
existing_blobs]
+ existing_blobs_set = set(existing_blobs)
+
+ filtered_files: list[str] = []
+ seen_transformed: set[str] = set()
+ for file in gcs_files:
+ transformed = self._transform_file_path(file)
+ if transformed in existing_blobs_set:
+ continue
+ if transformed in seen_transformed:
+ self.log.warning(
+ "Skipping duplicate file %s (transforms to %s)",
+ file,
+ transformed,
+ )
+ continue
+ filtered_files.append(file)
+ seen_transformed.add(transformed)
+
+ gcs_files = filtered_files
+
+ uploaded_blobs: list[str] = []
+ if gcs_files:
+ for file in gcs_files:
+ with gcs_hook.provide_file(
+ object_name=file, bucket_name=str(self.gcs_bucket),
user_project=self.gcp_user_project
+ ) as local_tmp_file:
+ transformed_path = self._transform_file_path(file)
+ dest_blob = posixpath.join(blob_prefix, transformed_path)
+ self.log.info("Saving file from %s to %s", file, dest_blob)
+ wasb_hook.load_file(
+ file_path=local_tmp_file.name,
+ container_name=self.container_name,
+ blob_name=dest_blob,
+ create_container=self.create_container,
+ overwrite=self.replace,
+ )
+ uploaded_blobs.append(dest_blob)
+ self.log.info("All done, uploaded %d files to Azure Blob Storage",
len(uploaded_blobs))
+ else:
+ self.log.info("In sync, no files needed to be uploaded to Azure
Blob Storage")
+
+ return uploaded_blobs
+
+ def get_openlineage_facets_on_start(self) -> OperatorLineage:
+ from airflow.providers.common.compat.openlineage.facet import Dataset
+ from airflow.providers.openlineage.extractors import OperatorLineage
+
+ wasb_hook = WasbHook(wasb_conn_id=self.wasb_conn_id)
+ account_name = wasb_hook.get_conn().account_name
+
+ return OperatorLineage(
+ inputs=[Dataset(namespace=f"gs://{self.gcs_bucket}",
name=self.prefix or "/")],
+ outputs=[
+ Dataset(
+ namespace=f"wasbs://{self.container_name}@{account_name}",
+ name=self.blob_prefix or "/",
+ )
+ ],
+ )
diff --git
a/providers/microsoft/azure/tests/unit/microsoft/azure/transfers/test_gcs_to_wasb.py
b/providers/microsoft/azure/tests/unit/microsoft/azure/transfers/test_gcs_to_wasb.py
new file mode 100644
index 00000000000..27d61b0c2ae
--- /dev/null
+++
b/providers/microsoft/azure/tests/unit/microsoft/azure/transfers/test_gcs_to_wasb.py
@@ -0,0 +1,259 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import sys
+from types import ModuleType
+from unittest import mock
+
+import pytest
+
+from airflow.providers.microsoft.azure.transfers.gcs_to_wasb import
GCSToAzureBlobStorageOperator
+
+TASK_ID = "test-gcs-to-azure-blob"
+GCS_BUCKET = "gcs-bucket"
+CONTAINER = "container"
+BLOB_PREFIX = "dest/prefix"
+GCS_OBJECTS = ["data/a.txt", "data/b.txt"]
+
+
+class TestGCSToAzureBlobStorageOperator:
+ def test_init_defaults(self):
+ op = GCSToAzureBlobStorageOperator(
+ task_id=TASK_ID,
+ gcs_bucket=GCS_BUCKET,
+ container_name=CONTAINER,
+ blob_prefix=BLOB_PREFIX,
+ )
+ assert op.gcp_conn_id == "google_cloud_default"
+ assert op.wasb_conn_id == "wasb_default"
+ assert op.replace is False
+ assert op.keep_directory_structure is True
+ assert op.flatten_structure is False
+ assert op.create_container is False
+
+ @mock.patch("airflow.providers.google.__version__", "10.2.0")
+ def test_match_glob_requires_recent_google_provider(self):
+ with pytest.raises(ValueError, match="match_glob"):
+ GCSToAzureBlobStorageOperator(
+ task_id=TASK_ID,
+ gcs_bucket=GCS_BUCKET,
+ container_name=CONTAINER,
+ match_glob="**/*.csv",
+ )
+
+
@mock.patch("airflow.providers.microsoft.azure.transfers.gcs_to_wasb.WasbHook")
+
@mock.patch("airflow.providers.microsoft.azure.transfers.gcs_to_wasb.GCSHook")
+ def test_execute_replace_uploads_all(self, mock_gcs_hook, mock_wasb_hook):
+ mock_gcs_hook.return_value.list.return_value = list(GCS_OBJECTS)
+ mock_file = mock.Mock()
+ mock_file.name = "/tmp/local"
+ mock_gcs_hook.return_value.provide_file.return_value.__enter__ =
mock.Mock(return_value=mock_file)
+ mock_gcs_hook.return_value.provide_file.return_value.__exit__ =
mock.Mock(return_value=None)
+
+ op = GCSToAzureBlobStorageOperator(
+ task_id=TASK_ID,
+ gcs_bucket=GCS_BUCKET,
+ prefix="data/",
+ container_name=CONTAINER,
+ blob_prefix=BLOB_PREFIX,
+ replace=True,
+ )
+ result = op.execute(context=None)
+
+ assert result == [f"{BLOB_PREFIX}/data/a.txt",
f"{BLOB_PREFIX}/data/b.txt"]
+ assert mock_wasb_hook.return_value.load_file.call_count == 2
+ first_kw =
mock_wasb_hook.return_value.load_file.call_args_list[0].kwargs
+ assert first_kw["container_name"] == CONTAINER
+ assert first_kw["blob_name"] == f"{BLOB_PREFIX}/data/a.txt"
+ assert first_kw["overwrite"] is True
+
+
@mock.patch("airflow.providers.microsoft.azure.transfers.gcs_to_wasb.WasbHook")
+
@mock.patch("airflow.providers.microsoft.azure.transfers.gcs_to_wasb.GCSHook")
+ def test_execute_returns_empty_when_no_objects(self, mock_gcs_hook,
mock_wasb_hook):
+ mock_gcs_hook.return_value.list.return_value = []
+
+ op = GCSToAzureBlobStorageOperator(
+ task_id=TASK_ID,
+ gcs_bucket=GCS_BUCKET,
+ container_name=CONTAINER,
+ blob_prefix=BLOB_PREFIX,
+ replace=True,
+ )
+ assert op.execute(context=None) == []
+ mock_wasb_hook.return_value.load_file.assert_not_called()
+
+
@mock.patch("airflow.providers.microsoft.azure.transfers.gcs_to_wasb.WasbHook")
+
@mock.patch("airflow.providers.microsoft.azure.transfers.gcs_to_wasb.GCSHook")
+ def test_execute_skips_gcs_directory_placeholder_keys(self, mock_gcs_hook,
mock_wasb_hook):
+ mock_gcs_hook.return_value.list.return_value = ["src/airflow.png",
"src/"]
+ mock_file = mock.Mock()
+ mock_file.name = "/tmp/local"
+ mock_gcs_hook.return_value.provide_file.return_value.__enter__ =
mock.Mock(return_value=mock_file)
+ mock_gcs_hook.return_value.provide_file.return_value.__exit__ =
mock.Mock(return_value=None)
+
+ op = GCSToAzureBlobStorageOperator(
+ task_id=TASK_ID,
+ gcs_bucket=GCS_BUCKET,
+ container_name=CONTAINER,
+ blob_prefix="",
+ replace=True,
+ )
+ result = op.execute(context=None)
+
+ assert result == ["src/airflow.png"]
+ mock_wasb_hook.return_value.load_file.assert_called_once()
+ assert
mock_wasb_hook.return_value.load_file.call_args.kwargs["blob_name"] ==
"src/airflow.png"
+ assert
mock_wasb_hook.return_value.load_file.call_args.kwargs["overwrite"] is True
+
+
@mock.patch("airflow.providers.microsoft.azure.transfers.gcs_to_wasb.WasbHook")
+
@mock.patch("airflow.providers.microsoft.azure.transfers.gcs_to_wasb.GCSHook")
+ def test_execute_skip_existing_when_replace_false(self, mock_gcs_hook,
mock_wasb_hook):
+ mock_gcs_hook.return_value.list.return_value = ["data/a.txt",
"data/b.txt"]
+ mock_wasb_hook.return_value.get_blobs_list_recursive.return_value = [
+ f"{BLOB_PREFIX}/data/a.txt",
+ ]
+ mock_file = mock.Mock()
+ mock_file.name = "/tmp/local"
+ mock_gcs_hook.return_value.provide_file.return_value.__enter__ =
mock.Mock(return_value=mock_file)
+ mock_gcs_hook.return_value.provide_file.return_value.__exit__ =
mock.Mock(return_value=None)
+
+ op = GCSToAzureBlobStorageOperator(
+ task_id=TASK_ID,
+ gcs_bucket=GCS_BUCKET,
+ container_name=CONTAINER,
+ blob_prefix=BLOB_PREFIX,
+ replace=False,
+ )
+ result = op.execute(context=None)
+
+ assert result == [f"{BLOB_PREFIX}/data/b.txt"]
+ mock_wasb_hook.return_value.load_file.assert_called_once()
+ assert
mock_wasb_hook.return_value.load_file.call_args.kwargs["overwrite"] is False
+
+
@mock.patch("airflow.providers.microsoft.azure.transfers.gcs_to_wasb.WasbHook")
+
@mock.patch("airflow.providers.microsoft.azure.transfers.gcs_to_wasb.GCSHook")
+ def test_flatten_structure(self, mock_gcs_hook, mock_wasb_hook):
+ mock_gcs_hook.return_value.list.return_value = ["data/a.txt"]
+ mock_file = mock.Mock()
+ mock_file.name = "/tmp/local"
+ mock_gcs_hook.return_value.provide_file.return_value.__enter__ =
mock.Mock(return_value=mock_file)
+ mock_gcs_hook.return_value.provide_file.return_value.__exit__ =
mock.Mock(return_value=None)
+
+ op = GCSToAzureBlobStorageOperator(
+ task_id=TASK_ID,
+ gcs_bucket=GCS_BUCKET,
+ container_name=CONTAINER,
+ blob_prefix=BLOB_PREFIX,
+ replace=True,
+ flatten_structure=True,
+ )
+ result = op.execute(context=None)
+
+ assert result == [f"{BLOB_PREFIX}/a.txt"]
+ kw = mock_wasb_hook.return_value.load_file.call_args.kwargs
+ assert kw["blob_name"] == f"{BLOB_PREFIX}/a.txt"
+ assert kw["overwrite"] is True
+
+
@mock.patch("airflow.providers.microsoft.azure.transfers.gcs_to_wasb.WasbHook")
+
@mock.patch("airflow.providers.microsoft.azure.transfers.gcs_to_wasb.GCSHook")
+ def test_execute_dedups_collisions_when_replace_true(self, mock_gcs_hook,
mock_wasb_hook):
+ mock_gcs_hook.return_value.list.return_value = ["a/file.txt",
"b/file.txt"]
+ mock_file = mock.Mock()
+ mock_file.name = "/tmp/local"
+ mock_gcs_hook.return_value.provide_file.return_value.__enter__ =
mock.Mock(return_value=mock_file)
+ mock_gcs_hook.return_value.provide_file.return_value.__exit__ =
mock.Mock(return_value=None)
+
+ op = GCSToAzureBlobStorageOperator(
+ task_id=TASK_ID,
+ gcs_bucket=GCS_BUCKET,
+ container_name=CONTAINER,
+ blob_prefix=BLOB_PREFIX,
+ replace=True,
+ flatten_structure=True,
+ )
+ result = op.execute(context=None)
+
+ assert result == [f"{BLOB_PREFIX}/file.txt"]
+ mock_wasb_hook.return_value.load_file.assert_called_once()
+
+
@mock.patch("airflow.providers.microsoft.azure.transfers.gcs_to_wasb.WasbHook")
+
@mock.patch("airflow.providers.microsoft.azure.transfers.gcs_to_wasb.GCSHook")
+ def test_execute_is_idempotent_on_retry(self, mock_gcs_hook,
mock_wasb_hook):
+ mock_gcs_hook.return_value.list.return_value = ["data/a.txt"]
+ mock_file = mock.Mock()
+ mock_file.name = "/tmp/local"
+ mock_gcs_hook.return_value.provide_file.return_value.__enter__ =
mock.Mock(return_value=mock_file)
+ mock_gcs_hook.return_value.provide_file.return_value.__exit__ =
mock.Mock(return_value=None)
+
+ op = GCSToAzureBlobStorageOperator(
+ task_id=TASK_ID,
+ gcs_bucket=GCS_BUCKET,
+ prefix="data/",
+ container_name=CONTAINER,
+ blob_prefix=BLOB_PREFIX,
+ replace=True,
+ keep_directory_structure=False,
+ )
+
+ first = op.execute(context=None)
+ second = op.execute(context=None)
+
+ assert first == second
+ assert op.blob_prefix == BLOB_PREFIX
+
+
@mock.patch("airflow.providers.microsoft.azure.transfers.gcs_to_wasb.WasbHook")
+
@mock.patch("airflow.providers.microsoft.azure.transfers.gcs_to_wasb.GCSHook")
+ def test_openlineage_facets(self, mock_gcs_hook, mock_wasb_hook):
+ injected: list[str] = []
+ if "airflow.providers.openlineage.extractors" not in sys.modules:
+ ol_pkg = ModuleType("airflow.providers.openlineage")
+ ol_ext = ModuleType("airflow.providers.openlineage.extractors")
+
+ class _OperatorLineage:
+ __slots__ = ("inputs", "outputs")
+
+ def __init__(self, *, inputs=None, outputs=None):
+ self.inputs = inputs
+ self.outputs = outputs
+
+ ol_ext.OperatorLineage = _OperatorLineage
+ sys.modules["airflow.providers.openlineage"] = ol_pkg
+ sys.modules["airflow.providers.openlineage.extractors"] = ol_ext
+ injected = [
+ "airflow.providers.openlineage.extractors",
+ "airflow.providers.openlineage",
+ ]
+
+ try:
+ mock_wasb_hook.return_value.get_conn.return_value.account_name =
"mystorage"
+ op = GCSToAzureBlobStorageOperator(
+ task_id=TASK_ID,
+ gcs_bucket=GCS_BUCKET,
+ prefix="p/",
+ container_name=CONTAINER,
+ blob_prefix=BLOB_PREFIX,
+ )
+ lineage = op.get_openlineage_facets_on_start()
+ assert len(lineage.inputs) == 1
+ assert lineage.inputs[0].namespace == "gs://gcs-bucket"
+ assert "wasbs://" in lineage.outputs[0].namespace
+ finally:
+ for name in injected:
+ sys.modules.pop(name, None)
diff --git a/uv.lock b/uv.lock
index 4ca5b928c8a..8a0780cf8a5 100644
--- a/uv.lock
+++ b/uv.lock
@@ -6170,6 +6170,12 @@ amazon = [
common-messaging = [
{ name = "apache-airflow-providers-common-messaging" },
]
+google = [
+ { name = "apache-airflow-providers-google" },
+]
+openlineage = [
+ { name = "apache-airflow-providers-openlineage" },
+]
oracle = [
{ name = "apache-airflow-providers-oracle" },
]
@@ -6184,6 +6190,8 @@ dev = [
{ name = "apache-airflow-providers-amazon" },
{ name = "apache-airflow-providers-common-compat" },
{ name = "apache-airflow-providers-common-messaging" },
+ { name = "apache-airflow-providers-google" },
+ { name = "apache-airflow-providers-openlineage" },
{ name = "apache-airflow-providers-oracle" },
{ name = "apache-airflow-providers-sftp" },
{ name = "apache-airflow-task-sdk" },
@@ -6201,6 +6209,8 @@ requires-dist = [
{ name = "apache-airflow-providers-amazon", marker = "extra == 'amazon'",
editable = "providers/amazon" },
{ name = "apache-airflow-providers-common-compat", editable =
"providers/common/compat" },
{ name = "apache-airflow-providers-common-messaging", marker = "extra ==
'common-messaging'", editable = "providers/common/messaging" },
+ { name = "apache-airflow-providers-google", marker = "extra == 'google'",
editable = "providers/google" },
+ { name = "apache-airflow-providers-openlineage", marker = "extra ==
'openlineage'", editable = "providers/openlineage" },
{ name = "apache-airflow-providers-oracle", marker = "extra == 'oracle'",
editable = "providers/oracle" },
{ name = "apache-airflow-providers-sftp", marker = "extra == 'sftp'",
editable = "providers/sftp" },
{ name = "azure-batch", specifier = ">=8.0.0,<15.0.0" },
@@ -6232,7 +6242,7 @@ requires-dist = [
{ name = "msgraph-core", specifier = ">=1.3.3" },
{ name = "msgraphfs", specifier = ">=0.3.0" },
]
-provides-extras = ["amazon", "oracle", "sftp", "common-messaging"]
+provides-extras = ["amazon", "oracle", "sftp", "common-messaging", "google",
"openlineage"]
[package.metadata.requires-dev]
dev = [
@@ -6241,6 +6251,8 @@ dev = [
{ name = "apache-airflow-providers-amazon", editable = "providers/amazon"
},
{ name = "apache-airflow-providers-common-compat", editable =
"providers/common/compat" },
{ name = "apache-airflow-providers-common-messaging", editable =
"providers/common/messaging" },
+ { name = "apache-airflow-providers-google", editable = "providers/google"
},
+ { name = "apache-airflow-providers-openlineage", editable =
"providers/openlineage" },
{ name = "apache-airflow-providers-oracle", editable = "providers/oracle"
},
{ name = "apache-airflow-providers-sftp", editable = "providers/sftp" },
{ name = "apache-airflow-task-sdk", editable = "task-sdk" },