This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 2ad91a7808 Bugfix GCSToGCSOperator when copy files to folder without
wildcard (#32486)
2ad91a7808 is described below
commit 2ad91a7808e97a70386513e016bdc08dbb8b72d0
Author: max <[email protected]>
AuthorDate: Tue Jul 11 13:46:25 2023 +0200
Bugfix GCSToGCSOperator when copy files to folder without wildcard (#32486)
---
.../providers/google/cloud/transfers/gcs_to_gcs.py | 48 ++++--
.../operators/transfer/gcs_to_gcs.rst | 6 +
.../google/cloud/transfers/test_gcs_to_gcs.py | 171 +++++++++++++++++++++
.../google/cloud/gcs/example_gcs_to_gcs.py | 2 +
4 files changed, 217 insertions(+), 10 deletions(-)
diff --git a/airflow/providers/google/cloud/transfers/gcs_to_gcs.py
b/airflow/providers/google/cloud/transfers/gcs_to_gcs.py
index f9433d7434..17d1638559 100644
--- a/airflow/providers/google/cloud/transfers/gcs_to_gcs.py
+++ b/airflow/providers/google/cloud/transfers/gcs_to_gcs.py
@@ -106,11 +106,12 @@ class GCSToGCSOperator(BaseOperator):
source_objects=['sales/sales-2017/january.avro'],
destination_bucket='data_backup',
destination_object='copied_sales/2017/january-backup.avro',
+ exact_match=True,
gcp_conn_id=google_cloud_conn_id
)
The following Operator would copy all the Avro files from
``sales/sales-2017``
- folder (i.e. with names starting with that prefix) in ``data`` bucket to
the
+ folder (i.e. all files with names starting with that prefix) in ``data``
bucket to the
``copied_sales/2017`` folder in the ``data_backup`` bucket. ::
copy_files = GCSToGCSOperator(
@@ -135,7 +136,7 @@ class GCSToGCSOperator(BaseOperator):
)
The following Operator would move all the Avro files from
``sales/sales-2017``
- folder (i.e. with names starting with that prefix) in ``data`` bucket to
the
+ folder (i.e. all files with names starting with that prefix) in ``data``
bucket to the
same folder in the ``data_backup`` bucket, deleting the original files in
the
process. ::
@@ -314,9 +315,11 @@ class GCSToGCSOperator(BaseOperator):
"""
List all files in source_objects, copy files to destination_object,
and rename each source file.
- For source_objects with no wildcard, this operator would first list all
- files in source_objects, using provided delimiter if any. Then copy
files
- from source_objects to destination_object and rename each source file.
+ For source_objects with no wildcard, this operator would first list
+ all files in source_objects, using provided delimiter if any. Then copy
+ files from source_objects to destination_object and rename each source
+ file. Note that if the flag exact_match=False, then each item in the
source_objects
+ (or source_object itself) will be considered as a prefix for the
source objects search.
Example 1:
@@ -366,6 +369,22 @@ class GCSToGCSOperator(BaseOperator):
destination_object='b/',
gcp_conn_id=google_cloud_conn_id
)
+
+ Example 4:
+
+ The following Operator would copy files corresponding to the prefix
'a/foo.txt'
+ (a/foo.txt, a/foo.txt.abc, a/foo.txt/subfolder/file.txt) in ``data``
bucket to
+ the ``b/`` folder in the ``data_backup`` bucket
+ (b/foo.txt, b/foo.txt.abc, b/foo.txt/subfolder/file.txt) ::
+
+ copy_files = GCSToGCSOperator(
+ task_id='copy_files_without_wildcard',
+ source_bucket='data',
+ source_object='a/foo.txt',
+ destination_bucket='data_backup',
+ destination_object='b/',
+ gcp_conn_id=google_cloud_conn_id
+ )
"""
objects = hook.list(
self.source_bucket, prefix=prefix, delimiter=self.delimiter,
match_glob=self.match_glob
@@ -390,11 +409,10 @@ class GCSToGCSOperator(BaseOperator):
msg = f"{prefix} does not exist in bucket {self.source_bucket}"
self.log.warning(msg)
raise AirflowException(msg)
-
if len(objects) == 1 and objects[0][-1] != "/":
self._copy_file(hook=hook, source_object=objects[0])
elif len(objects):
- self._copy_directory(hook=hook, source_objects=objects,
prefix=prefix)
+ self._copy_multiple_objects(hook=hook, source_objects=objects,
prefix=prefix)
def _copy_file(self, hook, source_object):
destination_object = self.destination_object or source_object
@@ -405,15 +423,25 @@ class GCSToGCSOperator(BaseOperator):
hook=hook, source_object=source_object,
destination_object=destination_object
)
- def _copy_directory(self, hook, source_objects, prefix):
- _prefix = prefix.rstrip("/") + "/"
+ def _copy_multiple_objects(self, hook, source_objects, prefix):
+ # Check whether the prefix is a root directory for all the rest of
objects.
+ _pref = prefix.rstrip("/")
+ is_directory = prefix.endswith("/") or all(
+ [obj.replace(_pref, "", 1).startswith("/") for obj in
source_objects]
+ )
+
+ if is_directory:
+ base_path = prefix.rstrip("/") + "/"
+ else:
+ base_path = prefix[0 : prefix.rfind("/") + 1] if "/" in prefix
else ""
+
for source_obj in source_objects:
if not self._check_exact_match(source_obj, prefix):
continue
if self.destination_object is None:
destination_object = source_obj
else:
- file_name_postfix = source_obj.replace(_prefix, "", 1)
+ file_name_postfix = source_obj.replace(base_path, "", 1)
destination_object = self.destination_object.rstrip("/") + "/"
+ file_name_postfix
self._copy_single_object(
diff --git
a/docs/apache-airflow-providers-google/operators/transfer/gcs_to_gcs.rst
b/docs/apache-airflow-providers-google/operators/transfer/gcs_to_gcs.rst
index ef805355c4..38f4b0b092 100644
--- a/docs/apache-airflow-providers-google/operators/transfer/gcs_to_gcs.rst
+++ b/docs/apache-airflow-providers-google/operators/transfer/gcs_to_gcs.rst
@@ -97,6 +97,9 @@ Copy single file
----------------
The following example would copy a single file, ``OBJECT_1`` from the
``BUCKET_1_SRC`` GCS bucket to the ``BUCKET_1_DST`` bucket.
+Note that if the flag ``exact_match=False`` then the ``source_object`` will be
considered as a prefix for search objects
+in the ``BUCKET_1_SRC`` GCS bucket. That's why if any will be found, they will
be copied as well. To prevent this from
+happening, please use ``exact_match=False``.
.. exampleinclude::
/../../tests/system/providers/google/cloud/gcs/example_gcs_to_gcs.py
:language: python
@@ -165,6 +168,9 @@ Move single file
----------------
Supplying ``True`` to the ``move`` argument causes the operator to delete
``source_object`` once the copy is complete.
+Note that if the flag ``exact_match=False`` then the ``source_object`` will be
considered as a prefix for search objects
+in the ``BUCKET_1_SRC`` GCS bucket. That's why if any will be found, they will
be copied as well. To prevent this from
+happening, please use ``exact_match=False``.
.. exampleinclude::
/../../tests/system/providers/google/cloud/gcs/example_gcs_to_gcs.py
:language: python
diff --git a/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py
b/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py
index 3966be50cc..d29a505ba3 100644
--- a/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py
+++ b/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py
@@ -656,3 +656,174 @@ class TestGoogleCloudStorageToCloudStorageOperator:
AirflowException, match=f"{SOURCE_OBJECTS_SINGLE_FILE} does not
exist in bucket {TEST_BUCKET}"
):
operator.execute(None)
+
+ @pytest.mark.parametrize(
+ "existing_objects, source_object, match_glob, exact_match,
expected_source_objects, "
+ "expected_destination_objects",
+ [
+ (["source/foo.txt"], "source/foo.txt", None, True,
["source/foo.txt"], ["{prefix}/foo.txt"]),
+ (["source/foo.txt"], "source/foo.txt", None, False,
["source/foo.txt"], ["{prefix}/foo.txt"]),
+ (["source/foo.txt"], "source", None, False, ["source/foo.txt"],
["{prefix}/foo.txt"]),
+ (["source/foo.txt"], "source/", None, False, ["source/foo.txt"],
["{prefix}/foo.txt"]),
+ (["source/foo.txt"], "source/*", None, False, ["source/foo.txt"],
["{prefix}/foo.txt"]),
+ (["source/foo.txt"], "source/foo.*", None, False,
["source/foo.txt"], ["{prefix}/txt"]),
+ (["source/foo.txt"], "source/", "**/foo*", False,
["source/foo.txt"], ["{prefix}/foo.txt"]),
+ (["source/foo.txt"], "source/", "**/foo.txt", False,
["source/foo.txt"], ["{prefix}/foo.txt"]),
+ (
+ ["source/foo.txt", "source/foo.txt.abc"],
+ "source/foo.txt",
+ None,
+ True,
+ ["source/foo.txt"],
+ ["{prefix}/foo.txt"],
+ ),
+ (
+ ["source/foo.txt", "source/foo.txt.abc"],
+ "source/foo.txt",
+ None,
+ False,
+ ["source/foo.txt", "source/foo.txt.abc"],
+ ["{prefix}/foo.txt", "{prefix}/foo.txt.abc"],
+ ),
+ (
+ ["source/foo.txt", "source/foo.txt.abc"],
+ "source",
+ None,
+ False,
+ ["source/foo.txt", "source/foo.txt.abc"],
+ ["{prefix}/foo.txt", "{prefix}/foo.txt.abc"],
+ ),
+ (
+ ["source/foo.txt", "source/foo.txt.abc"],
+ "source/",
+ None,
+ False,
+ ["source/foo.txt", "source/foo.txt.abc"],
+ ["{prefix}/foo.txt", "{prefix}/foo.txt.abc"],
+ ),
+ (
+ ["source/foo.txt", "source/foo.txt.abc"],
+ "source/*",
+ None,
+ False,
+ ["source/foo.txt", "source/foo.txt.abc"],
+ ["{prefix}/foo.txt", "{prefix}/foo.txt.abc"],
+ ),
+ (
+ ["source/foo.txt", "source/foo.txt.abc"],
+ "source/foo.*",
+ None,
+ False,
+ ["source/foo.txt", "source/foo.txt.abc"],
+ ["{prefix}/txt", "{prefix}/txt.abc"],
+ ),
+ (
+ ["source/foo.txt", "source/foo.txt.abc"],
+ "source/",
+ "**/foo*",
+ False,
+ ["source/foo.txt", "source/foo.txt.abc"],
+ ["{prefix}/foo.txt", "{prefix}/foo.txt.abc"],
+ ),
+ (
+ ["source/foo.txt", "source/foo.txt.abc"],
+ "source/",
+ "**/foo.txt",
+ False,
+ ["source/foo.txt"],
+ ["{prefix}/foo.txt"],
+ ),
+ (
+ ["source/foo.txt", "source/foo.txt.abc",
"source/foo.txt/subfolder/file.txt"],
+ "source/foo.txt",
+ None,
+ True,
+ ["source/foo.txt"],
+ ["{prefix}/foo.txt"],
+ ),
+ (
+ ["source/foo.txt", "source/foo.txt.abc",
"source/foo.txt/subfolder/file.txt"],
+ "source/foo.txt",
+ None,
+ False,
+ ["source/foo.txt", "source/foo.txt.abc",
"source/foo.txt/subfolder/file.txt"],
+ ["{prefix}/foo.txt", "{prefix}/foo.txt.abc",
"{prefix}/foo.txt/subfolder/file.txt"],
+ ),
+ (
+ ["source/foo.txt", "source/foo.txt.abc",
"source/foo.txt/subfolder/file.txt"],
+ "source",
+ None,
+ False,
+ ["source/foo.txt", "source/foo.txt.abc",
"source/foo.txt/subfolder/file.txt"],
+ ["{prefix}/foo.txt", "{prefix}/foo.txt.abc",
"{prefix}/foo.txt/subfolder/file.txt"],
+ ),
+ (
+ ["source/foo.txt", "source/foo.txt.abc",
"source/foo.txt/subfolder/file.txt"],
+ "source/",
+ None,
+ False,
+ ["source/foo.txt", "source/foo.txt.abc",
"source/foo.txt/subfolder/file.txt"],
+ ["{prefix}/foo.txt", "{prefix}/foo.txt.abc",
"{prefix}/foo.txt/subfolder/file.txt"],
+ ),
+ (
+ ["source/foo.txt", "source/foo.txt.abc",
"source/foo.txt/subfolder/file.txt"],
+ "source/*",
+ None,
+ False,
+ ["source/foo.txt", "source/foo.txt.abc",
"source/foo.txt/subfolder/file.txt"],
+ ["{prefix}/foo.txt", "{prefix}/foo.txt.abc",
"{prefix}/foo.txt/subfolder/file.txt"],
+ ),
+ (
+ ["source/foo.txt", "source/foo.txt.abc",
"source/foo.txt/subfolder/file.txt"],
+ "source/foo.*",
+ None,
+ False,
+ ["source/foo.txt", "source/foo.txt.abc",
"source/foo.txt/subfolder/file.txt"],
+ ["{prefix}/txt", "{prefix}/txt.abc",
"{prefix}/txt/subfolder/file.txt"],
+ ),
+ (
+ ["source/foo.txt", "source/foo.txt.abc",
"source/foo.txt/subfolder/file.txt"],
+ "source/",
+ "**/foo*",
+ False,
+ ["source/foo.txt", "source/foo.txt.abc",
"source/foo.txt/subfolder/file.txt"],
+ ["{prefix}/foo.txt", "{prefix}/foo.txt.abc",
"{prefix}/foo.txt/subfolder/file.txt"],
+ ),
+ (
+ ["source/foo.txt", "source/foo.txt.abc",
"source/foo.txt/subfolder/file.txt"],
+ "source/",
+ "**/foo.txt",
+ False,
+ ["source/foo.txt"],
+ ["{prefix}/foo.txt"],
+ ),
+ ],
+ )
+ @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook")
+ def test_copy_files_into_a_folder(
+ self,
+ mock_hook,
+ existing_objects,
+ source_object,
+ match_glob,
+ exact_match,
+ expected_source_objects,
+ expected_destination_objects,
+ ):
+ mock_hook.return_value.list.return_value = existing_objects
+ operator = GCSToGCSOperator(
+ task_id=TASK_ID,
+ source_bucket=TEST_BUCKET,
+ source_object=source_object,
+ destination_bucket=DESTINATION_BUCKET,
+ destination_object=DESTINATION_OBJECT_PREFIX + "/",
+ exact_match=exact_match,
+ match_glob=match_glob,
+ )
+ operator.execute(None)
+
+ mock_calls = [
+ mock.call(TEST_BUCKET, src, DESTINATION_BUCKET,
dst.format(prefix=DESTINATION_OBJECT_PREFIX))
+ for src, dst in zip(expected_source_objects,
expected_destination_objects)
+ ]
+ mock_hook.return_value.rewrite.assert_has_calls(mock_calls)
diff --git a/tests/system/providers/google/cloud/gcs/example_gcs_to_gcs.py
b/tests/system/providers/google/cloud/gcs/example_gcs_to_gcs.py
index e13f76ebc4..0e3d72bef1 100644
--- a/tests/system/providers/google/cloud/gcs/example_gcs_to_gcs.py
+++ b/tests/system/providers/google/cloud/gcs/example_gcs_to_gcs.py
@@ -139,6 +139,7 @@ with models.DAG(
source_object=OBJECT_1,
destination_bucket=BUCKET_NAME_DST, # If not supplied the
source_bucket value will be used
destination_object="backup_" + OBJECT_1, # If not supplied the
source_object value will be used
+ exact_match=True,
)
# [END howto_operator_gcs_to_gcs_single_file]
@@ -201,6 +202,7 @@ with models.DAG(
source_object=OBJECT_1,
destination_bucket=BUCKET_NAME_DST,
destination_object="backup_" + OBJECT_1,
+ exact_match=True,
move_object=True,
)
# [END howto_operator_gcs_to_gcs_single_file_move]