This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 2ad91a7808 Bugfix GCSToGCSOperator when copy files to folder without 
wildcard (#32486)
2ad91a7808 is described below

commit 2ad91a7808e97a70386513e016bdc08dbb8b72d0
Author: max <[email protected]>
AuthorDate: Tue Jul 11 13:46:25 2023 +0200

    Bugfix GCSToGCSOperator when copy files to folder without wildcard (#32486)
---
 .../providers/google/cloud/transfers/gcs_to_gcs.py |  48 ++++--
 .../operators/transfer/gcs_to_gcs.rst              |   6 +
 .../google/cloud/transfers/test_gcs_to_gcs.py      | 171 +++++++++++++++++++++
 .../google/cloud/gcs/example_gcs_to_gcs.py         |   2 +
 4 files changed, 217 insertions(+), 10 deletions(-)

diff --git a/airflow/providers/google/cloud/transfers/gcs_to_gcs.py 
b/airflow/providers/google/cloud/transfers/gcs_to_gcs.py
index f9433d7434..17d1638559 100644
--- a/airflow/providers/google/cloud/transfers/gcs_to_gcs.py
+++ b/airflow/providers/google/cloud/transfers/gcs_to_gcs.py
@@ -106,11 +106,12 @@ class GCSToGCSOperator(BaseOperator):
             source_objects=['sales/sales-2017/january.avro'],
             destination_bucket='data_backup',
             destination_object='copied_sales/2017/january-backup.avro',
+            exact_match=True,
             gcp_conn_id=google_cloud_conn_id
         )
 
     The following Operator would copy all the Avro files from 
``sales/sales-2017``
-    folder (i.e. with names starting with that prefix) in ``data`` bucket to 
the
+    folder (i.e. all files with names starting with that prefix) in ``data`` 
bucket to the
     ``copied_sales/2017`` folder in the ``data_backup`` bucket. ::
 
         copy_files = GCSToGCSOperator(
@@ -135,7 +136,7 @@ class GCSToGCSOperator(BaseOperator):
         )
 
     The following Operator would move all the Avro files from 
``sales/sales-2017``
-    folder (i.e. with names starting with that prefix) in ``data`` bucket to 
the
+    folder (i.e. all files with names starting with that prefix) in ``data`` 
bucket to the
     same folder in the ``data_backup`` bucket, deleting the original files in 
the
     process. ::
 
@@ -314,9 +315,11 @@ class GCSToGCSOperator(BaseOperator):
         """
         List all files in source_objects, copy files to destination_object, 
and rename each source file.
 
-        For source_objects with no wildcard, this operator would first list all
-        files in source_objects, using provided delimiter if any. Then copy 
files
-        from source_objects to destination_object and rename each source file.
+        For source_objects with no wildcard, this operator would first list
+        all files in source_objects, using provided delimiter if any. Then copy
+        files from source_objects to destination_object and rename each source
+        file. Note that if the flag exact_match=False, then each item in the 
source_objects
+        (or source_object itself) will be considered as a prefix for the 
source objects search.
 
         Example 1:
 
@@ -366,6 +369,22 @@ class GCSToGCSOperator(BaseOperator):
                 destination_object='b/',
                 gcp_conn_id=google_cloud_conn_id
             )
+
+        Example 4:
+
+        The following Operator would copy files corresponding to the prefix 
'a/foo.txt'
+        (a/foo.txt, a/foo.txt.abc, a/foo.txt/subfolder/file.txt) in ``data`` 
bucket to
+        the ``b/`` folder in the ``data_backup`` bucket
+        (b/foo.txt, b/foo.txt.abc, b/foo.txt/subfolder/file.txt) ::
+
+            copy_files = GCSToGCSOperator(
+                task_id='copy_files_without_wildcard',
+                source_bucket='data',
+                source_object='a/foo.txt',
+                destination_bucket='data_backup',
+                destination_object='b/',
+                gcp_conn_id=google_cloud_conn_id
+            )
         """
         objects = hook.list(
             self.source_bucket, prefix=prefix, delimiter=self.delimiter, 
match_glob=self.match_glob
@@ -390,11 +409,10 @@ class GCSToGCSOperator(BaseOperator):
                 msg = f"{prefix} does not exist in bucket {self.source_bucket}"
                 self.log.warning(msg)
                 raise AirflowException(msg)
-
         if len(objects) == 1 and objects[0][-1] != "/":
             self._copy_file(hook=hook, source_object=objects[0])
         elif len(objects):
-            self._copy_directory(hook=hook, source_objects=objects, 
prefix=prefix)
+            self._copy_multiple_objects(hook=hook, source_objects=objects, 
prefix=prefix)
 
     def _copy_file(self, hook, source_object):
         destination_object = self.destination_object or source_object
@@ -405,15 +423,25 @@ class GCSToGCSOperator(BaseOperator):
             hook=hook, source_object=source_object, 
destination_object=destination_object
         )
 
-    def _copy_directory(self, hook, source_objects, prefix):
-        _prefix = prefix.rstrip("/") + "/"
+    def _copy_multiple_objects(self, hook, source_objects, prefix):
+        # Check whether the prefix is a root directory for all the rest of 
objects.
+        _pref = prefix.rstrip("/")
+        is_directory = prefix.endswith("/") or all(
+            [obj.replace(_pref, "", 1).startswith("/") for obj in 
source_objects]
+        )
+
+        if is_directory:
+            base_path = prefix.rstrip("/") + "/"
+        else:
+            base_path = prefix[0 : prefix.rfind("/") + 1] if "/" in prefix 
else ""
+
         for source_obj in source_objects:
             if not self._check_exact_match(source_obj, prefix):
                 continue
             if self.destination_object is None:
                 destination_object = source_obj
             else:
-                file_name_postfix = source_obj.replace(_prefix, "", 1)
+                file_name_postfix = source_obj.replace(base_path, "", 1)
                 destination_object = self.destination_object.rstrip("/") + "/" 
+ file_name_postfix
 
             self._copy_single_object(
diff --git 
a/docs/apache-airflow-providers-google/operators/transfer/gcs_to_gcs.rst 
b/docs/apache-airflow-providers-google/operators/transfer/gcs_to_gcs.rst
index ef805355c4..38f4b0b092 100644
--- a/docs/apache-airflow-providers-google/operators/transfer/gcs_to_gcs.rst
+++ b/docs/apache-airflow-providers-google/operators/transfer/gcs_to_gcs.rst
@@ -97,6 +97,9 @@ Copy single file
 ----------------
 
 The following example would copy a single file, ``OBJECT_1`` from the 
``BUCKET_1_SRC`` GCS bucket to the ``BUCKET_1_DST`` bucket.
+Note that if the flag ``exact_match=False`` then the ``source_object`` will be 
considered as a prefix for search objects
+in the ``BUCKET_1_SRC`` GCS bucket. That's why if any will be found, they will 
be copied as well. To prevent this from
+happening, please use ``exact_match=False``.
 
 .. exampleinclude:: 
/../../tests/system/providers/google/cloud/gcs/example_gcs_to_gcs.py
     :language: python
@@ -165,6 +168,9 @@ Move single file
 ----------------
 
 Supplying ``True`` to the ``move`` argument causes the operator to delete 
``source_object`` once the copy is complete.
+Note that if the flag ``exact_match=False`` then the ``source_object`` will be 
considered as a prefix for search objects
+in the ``BUCKET_1_SRC`` GCS bucket. That's why if any will be found, they will 
be copied as well. To prevent this from
+happening, please use ``exact_match=False``.
 
 .. exampleinclude:: 
/../../tests/system/providers/google/cloud/gcs/example_gcs_to_gcs.py
     :language: python
diff --git a/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py 
b/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py
index 3966be50cc..d29a505ba3 100644
--- a/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py
+++ b/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py
@@ -656,3 +656,174 @@ class TestGoogleCloudStorageToCloudStorageOperator:
             AirflowException, match=f"{SOURCE_OBJECTS_SINGLE_FILE} does not 
exist in bucket {TEST_BUCKET}"
         ):
             operator.execute(None)
+
+    @pytest.mark.parametrize(
+        "existing_objects, source_object, match_glob, exact_match, 
expected_source_objects, "
+        "expected_destination_objects",
+        [
+            (["source/foo.txt"], "source/foo.txt", None, True, 
["source/foo.txt"], ["{prefix}/foo.txt"]),
+            (["source/foo.txt"], "source/foo.txt", None, False, 
["source/foo.txt"], ["{prefix}/foo.txt"]),
+            (["source/foo.txt"], "source", None, False, ["source/foo.txt"], 
["{prefix}/foo.txt"]),
+            (["source/foo.txt"], "source/", None, False, ["source/foo.txt"], 
["{prefix}/foo.txt"]),
+            (["source/foo.txt"], "source/*", None, False, ["source/foo.txt"], 
["{prefix}/foo.txt"]),
+            (["source/foo.txt"], "source/foo.*", None, False, 
["source/foo.txt"], ["{prefix}/txt"]),
+            (["source/foo.txt"], "source/", "**/foo*", False, 
["source/foo.txt"], ["{prefix}/foo.txt"]),
+            (["source/foo.txt"], "source/", "**/foo.txt", False, 
["source/foo.txt"], ["{prefix}/foo.txt"]),
+            (
+                ["source/foo.txt", "source/foo.txt.abc"],
+                "source/foo.txt",
+                None,
+                True,
+                ["source/foo.txt"],
+                ["{prefix}/foo.txt"],
+            ),
+            (
+                ["source/foo.txt", "source/foo.txt.abc"],
+                "source/foo.txt",
+                None,
+                False,
+                ["source/foo.txt", "source/foo.txt.abc"],
+                ["{prefix}/foo.txt", "{prefix}/foo.txt.abc"],
+            ),
+            (
+                ["source/foo.txt", "source/foo.txt.abc"],
+                "source",
+                None,
+                False,
+                ["source/foo.txt", "source/foo.txt.abc"],
+                ["{prefix}/foo.txt", "{prefix}/foo.txt.abc"],
+            ),
+            (
+                ["source/foo.txt", "source/foo.txt.abc"],
+                "source/",
+                None,
+                False,
+                ["source/foo.txt", "source/foo.txt.abc"],
+                ["{prefix}/foo.txt", "{prefix}/foo.txt.abc"],
+            ),
+            (
+                ["source/foo.txt", "source/foo.txt.abc"],
+                "source/*",
+                None,
+                False,
+                ["source/foo.txt", "source/foo.txt.abc"],
+                ["{prefix}/foo.txt", "{prefix}/foo.txt.abc"],
+            ),
+            (
+                ["source/foo.txt", "source/foo.txt.abc"],
+                "source/foo.*",
+                None,
+                False,
+                ["source/foo.txt", "source/foo.txt.abc"],
+                ["{prefix}/txt", "{prefix}/txt.abc"],
+            ),
+            (
+                ["source/foo.txt", "source/foo.txt.abc"],
+                "source/",
+                "**/foo*",
+                False,
+                ["source/foo.txt", "source/foo.txt.abc"],
+                ["{prefix}/foo.txt", "{prefix}/foo.txt.abc"],
+            ),
+            (
+                ["source/foo.txt", "source/foo.txt.abc"],
+                "source/",
+                "**/foo.txt",
+                False,
+                ["source/foo.txt"],
+                ["{prefix}/foo.txt"],
+            ),
+            (
+                ["source/foo.txt", "source/foo.txt.abc", 
"source/foo.txt/subfolder/file.txt"],
+                "source/foo.txt",
+                None,
+                True,
+                ["source/foo.txt"],
+                ["{prefix}/foo.txt"],
+            ),
+            (
+                ["source/foo.txt", "source/foo.txt.abc", 
"source/foo.txt/subfolder/file.txt"],
+                "source/foo.txt",
+                None,
+                False,
+                ["source/foo.txt", "source/foo.txt.abc", 
"source/foo.txt/subfolder/file.txt"],
+                ["{prefix}/foo.txt", "{prefix}/foo.txt.abc", 
"{prefix}/foo.txt/subfolder/file.txt"],
+            ),
+            (
+                ["source/foo.txt", "source/foo.txt.abc", 
"source/foo.txt/subfolder/file.txt"],
+                "source",
+                None,
+                False,
+                ["source/foo.txt", "source/foo.txt.abc", 
"source/foo.txt/subfolder/file.txt"],
+                ["{prefix}/foo.txt", "{prefix}/foo.txt.abc", 
"{prefix}/foo.txt/subfolder/file.txt"],
+            ),
+            (
+                ["source/foo.txt", "source/foo.txt.abc", 
"source/foo.txt/subfolder/file.txt"],
+                "source/",
+                None,
+                False,
+                ["source/foo.txt", "source/foo.txt.abc", 
"source/foo.txt/subfolder/file.txt"],
+                ["{prefix}/foo.txt", "{prefix}/foo.txt.abc", 
"{prefix}/foo.txt/subfolder/file.txt"],
+            ),
+            (
+                ["source/foo.txt", "source/foo.txt.abc", 
"source/foo.txt/subfolder/file.txt"],
+                "source/*",
+                None,
+                False,
+                ["source/foo.txt", "source/foo.txt.abc", 
"source/foo.txt/subfolder/file.txt"],
+                ["{prefix}/foo.txt", "{prefix}/foo.txt.abc", 
"{prefix}/foo.txt/subfolder/file.txt"],
+            ),
+            (
+                ["source/foo.txt", "source/foo.txt.abc", 
"source/foo.txt/subfolder/file.txt"],
+                "source/foo.*",
+                None,
+                False,
+                ["source/foo.txt", "source/foo.txt.abc", 
"source/foo.txt/subfolder/file.txt"],
+                ["{prefix}/txt", "{prefix}/txt.abc", 
"{prefix}/txt/subfolder/file.txt"],
+            ),
+            (
+                ["source/foo.txt", "source/foo.txt.abc", 
"source/foo.txt/subfolder/file.txt"],
+                "source/",
+                "**/foo*",
+                False,
+                ["source/foo.txt", "source/foo.txt.abc", 
"source/foo.txt/subfolder/file.txt"],
+                ["{prefix}/foo.txt", "{prefix}/foo.txt.abc", 
"{prefix}/foo.txt/subfolder/file.txt"],
+            ),
+            (
+                ["source/foo.txt", "source/foo.txt.abc", 
"source/foo.txt/subfolder/file.txt"],
+                "source/",
+                "**/foo.txt",
+                False,
+                ["source/foo.txt"],
+                ["{prefix}/foo.txt"],
+            ),
+        ],
+    )
+    @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook")
+    def test_copy_files_into_a_folder(
+        self,
+        mock_hook,
+        existing_objects,
+        source_object,
+        match_glob,
+        exact_match,
+        expected_source_objects,
+        expected_destination_objects,
+    ):
+        mock_hook.return_value.list.return_value = existing_objects
+        operator = GCSToGCSOperator(
+            task_id=TASK_ID,
+            source_bucket=TEST_BUCKET,
+            source_object=source_object,
+            destination_bucket=DESTINATION_BUCKET,
+            destination_object=DESTINATION_OBJECT_PREFIX + "/",
+            exact_match=exact_match,
+            match_glob=match_glob,
+        )
+        operator.execute(None)
+
+        mock_calls = [
+            mock.call(TEST_BUCKET, src, DESTINATION_BUCKET, 
dst.format(prefix=DESTINATION_OBJECT_PREFIX))
+            for src, dst in zip(expected_source_objects, 
expected_destination_objects)
+        ]
+        mock_hook.return_value.rewrite.assert_has_calls(mock_calls)
diff --git a/tests/system/providers/google/cloud/gcs/example_gcs_to_gcs.py 
b/tests/system/providers/google/cloud/gcs/example_gcs_to_gcs.py
index e13f76ebc4..0e3d72bef1 100644
--- a/tests/system/providers/google/cloud/gcs/example_gcs_to_gcs.py
+++ b/tests/system/providers/google/cloud/gcs/example_gcs_to_gcs.py
@@ -139,6 +139,7 @@ with models.DAG(
         source_object=OBJECT_1,
         destination_bucket=BUCKET_NAME_DST,  # If not supplied the 
source_bucket value will be used
         destination_object="backup_" + OBJECT_1,  # If not supplied the 
source_object value will be used
+        exact_match=True,
     )
     # [END howto_operator_gcs_to_gcs_single_file]
 
@@ -201,6 +202,7 @@ with models.DAG(
         source_object=OBJECT_1,
         destination_bucket=BUCKET_NAME_DST,
         destination_object="backup_" + OBJECT_1,
+        exact_match=True,
         move_object=True,
     )
     # [END howto_operator_gcs_to_gcs_single_file_move]

Reply via email to