This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new fb6c501124 Add flag apply_gcs_prefix to S3ToGCSOperator (b/245077385) 
(#31127)
fb6c501124 is described below

commit fb6c501124186f695b9dfa497cde10298ac12e9f
Author: max <[email protected]>
AuthorDate: Sat May 13 21:44:58 2023 +0200

    Add flag apply_gcs_prefix to S3ToGCSOperator (b/245077385) (#31127)
---
 .../providers/google/cloud/transfers/s3_to_gcs.py  | 123 +++++++++--------
 .../google/cloud/transfers/test_s3_to_gcs.py       | 146 +++++++++++++++++++--
 .../google/cloud/gcs/example_s3_to_gcs.py          |   6 +-
 3 files changed, 211 insertions(+), 64 deletions(-)

diff --git a/airflow/providers/google/cloud/transfers/s3_to_gcs.py 
b/airflow/providers/google/cloud/transfers/s3_to_gcs.py
index 52e0357559..39a2f2a68f 100644
--- a/airflow/providers/google/cloud/transfers/s3_to_gcs.py
+++ b/airflow/providers/google/cloud/transfers/s3_to_gcs.py
@@ -45,6 +45,15 @@ class S3ToGCSOperator(S3ListOperator):
     :param bucket: The S3 bucket where to find the objects. (templated)
     :param prefix: Prefix string which filters objects whose name begin with
         such prefix. (templated)
+    :param apply_gcs_prefix: (Optional) Whether to replace source objects' 
path by given GCS destination path.
+        If apply_gcs_prefix is False (default), then objects from S3 will be 
copied to GCS bucket into a given
+        GSC path and the source path will be place inside. For example,
+        <s3_bucket><s3_prefix><content> => <gcs_prefix><s3_prefix><content>
+
+        If apply_gcs_prefix is True, then objects from S3 will be copied to 
GCS bucket into a given
+        GCS path and the source path will be omitted. For example:
+        <s3_bucket><s3_prefix><content> => <gcs_prefix><content>
+
     :param delimiter: the delimiter marks key hierarchy. (templated)
     :param aws_conn_id: The source S3 connection
     :param verify: Whether or not to verify SSL certificates for S3 connection.
@@ -106,6 +115,7 @@ class S3ToGCSOperator(S3ListOperator):
         *,
         bucket,
         prefix="",
+        apply_gcs_prefix=False,
         delimiter="",
         aws_conn_id="aws_default",
         verify=None,
@@ -118,6 +128,7 @@ class S3ToGCSOperator(S3ListOperator):
     ):
 
         super().__init__(bucket=bucket, prefix=prefix, delimiter=delimiter, 
aws_conn_id=aws_conn_id, **kwargs)
+        self.apply_gcs_prefix = apply_gcs_prefix
         self.gcp_conn_id = gcp_conn_id
         self.dest_gcs = dest_gcs
         self.replace = replace
@@ -139,68 +150,74 @@ class S3ToGCSOperator(S3ListOperator):
     def execute(self, context: Context):
         self._check_inputs()
         # use the super method to list all the files in an S3 bucket/key
-        files = super().execute(context)
+        s3_objects = super().execute(context)
 
         gcs_hook = GCSHook(
             gcp_conn_id=self.gcp_conn_id,
             impersonation_chain=self.google_impersonation_chain,
         )
-
         if not self.replace:
-            # if we are not replacing -> list all files in the GCS bucket
-            # and only keep those files which are present in
-            # S3 and not in Google Cloud Storage
-            bucket_name, object_prefix = _parse_gcs_url(self.dest_gcs)
-            existing_files_prefixed = gcs_hook.list(bucket_name, 
prefix=object_prefix)
-
-            existing_files = []
-
-            if existing_files_prefixed:
-                # Remove the object prefix itself, an empty directory was found
-                if object_prefix in existing_files_prefixed:
-                    existing_files_prefixed.remove(object_prefix)
-
-                # Remove the object prefix from all object string paths
-                for f in existing_files_prefixed:
-                    if f.startswith(object_prefix):
-                        existing_files.append(f[len(object_prefix) :])
-                    else:
-                        existing_files.append(f)
-
-            files = list(set(files) - set(existing_files))
-            if len(files) > 0:
-                self.log.info("%s files are going to be synced: %s.", 
len(files), files)
-            else:
-                self.log.info("There are no new files to sync. Have a nice 
day!")
-
-        if files:
+            s3_objects = self.exclude_existing_objects(s3_objects=s3_objects, 
gcs_hook=gcs_hook)
+
+        if s3_objects:
             hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
 
-            for file in files:
-                # GCS hook builds its own in-memory file so we have to create
+            dest_gcs_bucket, dest_gcs_object_prefix = 
_parse_gcs_url(self.dest_gcs)
+            for obj in s3_objects:
+                # GCS hook builds its own in-memory file, so we have to create
                 # and pass the path
-                file_object = hook.get_key(file, self.bucket)
-                with NamedTemporaryFile(mode="wb", delete=True) as f:
-                    file_object.download_fileobj(f)
-                    f.flush()
-
-                    dest_gcs_bucket, dest_gcs_object_prefix = 
_parse_gcs_url(self.dest_gcs)
-                    # There will always be a '/' before file because it is
-                    # enforced at instantiation time
-                    dest_gcs_object = dest_gcs_object_prefix + file
-
-                    # Sync is sequential and the hook already logs too much
-                    # so skip this for now
-                    # self.log.info(
-                    #     'Saving file {0} from S3 bucket {1} in GCS bucket 
{2}'
-                    #     ' as object {3}'.format(file, self.bucket,
-                    #                             dest_gcs_bucket,
-                    #                             dest_gcs_object))
-
-                    gcs_hook.upload(dest_gcs_bucket, dest_gcs_object, f.name, 
gzip=self.gzip)
-
-            self.log.info("All done, uploaded %d files to Google Cloud 
Storage", len(files))
+                file_object = hook.get_key(obj, self.bucket)
+                with NamedTemporaryFile(mode="wb", delete=True) as file:
+                    file_object.download_fileobj(file)
+                    file.flush()
+                    gcs_file = self.s3_to_gcs_object(s3_object=obj)
+                    gcs_hook.upload(dest_gcs_bucket, gcs_file, file.name, 
gzip=self.gzip)
+
+            self.log.info("All done, uploaded %d files to Google Cloud 
Storage", len(s3_objects))
         else:
             self.log.info("In sync, no files needed to be uploaded to Google 
Cloud Storage")
 
-        return files
+        return s3_objects
+
+    def exclude_existing_objects(self, s3_objects: list[str], gcs_hook: 
GCSHook) -> list[str]:
+        """Excludes from the list objects that already exist in GCS bucket."""
+        bucket_name, object_prefix = _parse_gcs_url(self.dest_gcs)
+
+        existing_gcs_objects = set(gcs_hook.list(bucket_name, 
prefix=object_prefix))
+
+        s3_paths = set(self.gcs_to_s3_object(gcs_object=gcs_object) for 
gcs_object in existing_gcs_objects)
+        s3_objects_reduced = list(set(s3_objects) - s3_paths)
+
+        if s3_objects_reduced:
+            self.log.info("%s files are going to be synced: %s.", 
len(s3_objects_reduced), s3_objects_reduced)
+        else:
+            self.log.info("There are no new files to sync. Have a nice day!")
+        return s3_objects_reduced
+
+    def s3_to_gcs_object(self, s3_object: str) -> str:
+        """
+        Transforms S3 path to GCS path according to the operator's logic.
+
+        If apply_gcs_prefix == True then <s3_prefix><content> => 
<gcs_prefix><content>
+        If apply_gcs_prefix == False then <s3_prefix><content> => 
<gcs_prefix><s3_prefix><content>
+
+        """
+        gcs_bucket, gcs_prefix = _parse_gcs_url(self.dest_gcs)
+        if self.apply_gcs_prefix:
+            gcs_object = s3_object.replace(self.prefix, gcs_prefix, 1)
+            return gcs_object
+        return gcs_prefix + s3_object
+
+    def gcs_to_s3_object(self, gcs_object: str) -> str:
+        """
+        Transforms GCS path to S3 path according to the operator's logic.
+
+        If apply_gcs_prefix == True then <gcs_prefix><content> => 
<s3_prefix><content>
+        If apply_gcs_prefix == False then <gcs_prefix><s3_prefix><content> => 
<s3_prefix><content>
+
+        """
+        gcs_bucket, gcs_prefix = _parse_gcs_url(self.dest_gcs)
+        s3_object = gcs_object.replace(gcs_prefix, "", 1)
+        if self.apply_gcs_prefix:
+            return self.prefix + s3_object
+        return s3_object
diff --git a/tests/providers/google/cloud/transfers/test_s3_to_gcs.py 
b/tests/providers/google/cloud/transfers/test_s3_to_gcs.py
index 9b7d86d280..480fbf6f75 100644
--- a/tests/providers/google/cloud/transfers/test_s3_to_gcs.py
+++ b/tests/providers/google/cloud/transfers/test_s3_to_gcs.py
@@ -19,17 +19,39 @@ from __future__ import annotations
 
 from unittest import mock
 
+import pytest
+
 from airflow.providers.google.cloud.transfers.s3_to_gcs import S3ToGCSOperator
 
 TASK_ID = "test-s3-gcs-operator"
 S3_BUCKET = "test-bucket"
 S3_PREFIX = "TEST"
 S3_DELIMITER = "/"
-GCS_PATH_PREFIX = "gs://gcs-bucket/data/"
-MOCK_FILES = ["TEST1.csv", "TEST2.csv", "TEST3.csv"]
+GCS_BUCKET = "gcs-bucket"
+GCS_BUCKET_URI = "gs://" + GCS_BUCKET
+GCS_PREFIX = "data/"
+GCS_PATH_PREFIX = GCS_BUCKET_URI + "/" + GCS_PREFIX
+MOCK_FILE_1 = "TEST1.csv"
+MOCK_FILE_2 = "TEST2.csv"
+MOCK_FILE_3 = "TEST3.csv"
+MOCK_FILES = [MOCK_FILE_1, MOCK_FILE_2, MOCK_FILE_3]
 AWS_CONN_ID = "aws_default"
 GCS_CONN_ID = "google_cloud_default"
 IMPERSONATION_CHAIN = ["ACCOUNT_1", "ACCOUNT_2", "ACCOUNT_3"]
+APPLY_GCS_PREFIX = False
+PARAMETRIZED_OBJECT_PATHS = (
+    "apply_gcs_prefix, s3_prefix, s3_object, gcs_destination, gcs_object",
+    [
+        (False, "", MOCK_FILE_1, GCS_PATH_PREFIX, GCS_PREFIX + MOCK_FILE_1),
+        (False, S3_PREFIX, MOCK_FILE_1, GCS_PATH_PREFIX, GCS_PREFIX + 
S3_PREFIX + MOCK_FILE_1),
+        (False, "", MOCK_FILE_1, GCS_BUCKET_URI, MOCK_FILE_1),
+        (False, S3_PREFIX, MOCK_FILE_1, GCS_BUCKET_URI, S3_PREFIX + 
MOCK_FILE_1),
+        (True, "", MOCK_FILE_1, GCS_PATH_PREFIX, GCS_PREFIX + MOCK_FILE_1),
+        (True, S3_PREFIX, MOCK_FILE_1, GCS_PATH_PREFIX, GCS_PREFIX + 
MOCK_FILE_1),
+        (True, "", MOCK_FILE_1, GCS_BUCKET_URI, MOCK_FILE_1),
+        (True, S3_PREFIX, MOCK_FILE_1, GCS_BUCKET_URI, MOCK_FILE_1),
+    ],
+)
 
 
 class TestS3ToGoogleCloudStorageOperator:
@@ -44,6 +66,7 @@ class TestS3ToGoogleCloudStorageOperator:
             gcp_conn_id=GCS_CONN_ID,
             dest_gcs=GCS_PATH_PREFIX,
             google_impersonation_chain=IMPERSONATION_CHAIN,
+            apply_gcs_prefix=APPLY_GCS_PREFIX,
         )
 
         assert operator.task_id == TASK_ID
@@ -53,6 +76,7 @@ class TestS3ToGoogleCloudStorageOperator:
         assert operator.gcp_conn_id == GCS_CONN_ID
         assert operator.dest_gcs == GCS_PATH_PREFIX
         assert operator.google_impersonation_chain == IMPERSONATION_CHAIN
+        assert operator.apply_gcs_prefix == APPLY_GCS_PREFIX
 
     @mock.patch("airflow.providers.google.cloud.transfers.s3_to_gcs.S3Hook")
     @mock.patch("airflow.providers.amazon.aws.operators.s3.S3Hook")
@@ -73,12 +97,12 @@ class TestS3ToGoogleCloudStorageOperator:
         s3_one_mock_hook.return_value.list_keys.return_value = MOCK_FILES
         s3_two_mock_hook.return_value.list_keys.return_value = MOCK_FILES
 
-        uploaded_files = operator.execute(None)
+        uploaded_files = operator.execute(context={})
         gcs_mock_hook.return_value.upload.assert_has_calls(
             [
-                mock.call("gcs-bucket", "data/TEST1.csv", mock.ANY, 
gzip=False),
-                mock.call("gcs-bucket", "data/TEST3.csv", mock.ANY, 
gzip=False),
-                mock.call("gcs-bucket", "data/TEST2.csv", mock.ANY, 
gzip=False),
+                mock.call(GCS_BUCKET, GCS_PREFIX + MOCK_FILE_1, mock.ANY, 
gzip=False),
+                mock.call(GCS_BUCKET, GCS_PREFIX + MOCK_FILE_2, mock.ANY, 
gzip=False),
+                mock.call(GCS_BUCKET, GCS_PREFIX + MOCK_FILE_3, mock.ANY, 
gzip=False),
             ],
             any_order=True,
         )
@@ -112,16 +136,118 @@ class TestS3ToGoogleCloudStorageOperator:
         s3_one_mock_hook.return_value.list_keys.return_value = MOCK_FILES
         s3_two_mock_hook.return_value.list_keys.return_value = MOCK_FILES
 
-        operator.execute(None)
+        operator.execute(context={})
         gcs_mock_hook.assert_called_once_with(
             gcp_conn_id=GCS_CONN_ID,
             impersonation_chain=None,
         )
         gcs_mock_hook.return_value.upload.assert_has_calls(
             [
-                mock.call("gcs-bucket", "data/TEST2.csv", mock.ANY, gzip=True),
-                mock.call("gcs-bucket", "data/TEST1.csv", mock.ANY, gzip=True),
-                mock.call("gcs-bucket", "data/TEST3.csv", mock.ANY, gzip=True),
+                mock.call(GCS_BUCKET, GCS_PREFIX + MOCK_FILE_1, mock.ANY, 
gzip=True),
+                mock.call(GCS_BUCKET, GCS_PREFIX + MOCK_FILE_2, mock.ANY, 
gzip=True),
+                mock.call(GCS_BUCKET, GCS_PREFIX + MOCK_FILE_3, mock.ANY, 
gzip=True),
             ],
             any_order=True,
         )
+
+    @pytest.mark.parametrize(
+        "source_objects, existing_objects, objects_expected",
+        [
+            (MOCK_FILES, [], MOCK_FILES),
+            (MOCK_FILES, [MOCK_FILE_1], [MOCK_FILE_2, MOCK_FILE_3]),
+            (MOCK_FILES, [MOCK_FILE_1, MOCK_FILE_2], [MOCK_FILE_3]),
+            (MOCK_FILES, [MOCK_FILE_3, MOCK_FILE_2], [MOCK_FILE_1]),
+            (MOCK_FILES, MOCK_FILES, []),
+        ],
+    )
+    @mock.patch("airflow.providers.google.cloud.transfers.s3_to_gcs.GCSHook")
+    def test_exclude_existing_objects(
+        self, mock_gcs_hook, source_objects, existing_objects, objects_expected
+    ):
+        operator = S3ToGCSOperator(
+            task_id=TASK_ID,
+            bucket=S3_BUCKET,
+            prefix=S3_PREFIX,
+            delimiter=S3_DELIMITER,
+            gcp_conn_id=GCS_CONN_ID,
+            dest_gcs=GCS_PATH_PREFIX,
+            gzip=True,
+        )
+        mock_gcs_hook.list.return_value = existing_objects
+        files_reduced = 
operator.exclude_existing_objects(s3_objects=source_objects, 
gcs_hook=mock_gcs_hook)
+        assert set(files_reduced) == set(objects_expected)
+
+    @pytest.mark.parametrize(*PARAMETRIZED_OBJECT_PATHS)
+    def test_s3_to_gcs_object(self, apply_gcs_prefix, s3_prefix, s3_object, 
gcs_destination, gcs_object):
+        operator = S3ToGCSOperator(
+            task_id=TASK_ID,
+            bucket=S3_BUCKET,
+            prefix=s3_prefix,
+            delimiter=S3_DELIMITER,
+            gcp_conn_id=GCS_CONN_ID,
+            dest_gcs=gcs_destination,
+            gzip=True,
+            apply_gcs_prefix=apply_gcs_prefix,
+        )
+        assert operator.s3_to_gcs_object(s3_object=s3_prefix + s3_object) == 
gcs_object
+
+    @pytest.mark.parametrize(*PARAMETRIZED_OBJECT_PATHS)
+    def test_gcs_to_s3_object(self, apply_gcs_prefix, s3_prefix, s3_object, 
gcs_destination, gcs_object):
+        operator = S3ToGCSOperator(
+            task_id=TASK_ID,
+            bucket=S3_BUCKET,
+            prefix=s3_prefix,
+            delimiter=S3_DELIMITER,
+            gcp_conn_id=GCS_CONN_ID,
+            dest_gcs=gcs_destination,
+            gzip=True,
+            apply_gcs_prefix=apply_gcs_prefix,
+        )
+        assert operator.gcs_to_s3_object(gcs_object=gcs_object) == s3_prefix + 
s3_object
+
+    @pytest.mark.parametrize(*PARAMETRIZED_OBJECT_PATHS)
+    @mock.patch("airflow.providers.google.cloud.transfers.s3_to_gcs.S3Hook")
+    @mock.patch("airflow.providers.amazon.aws.operators.s3.S3Hook")
+    @mock.patch("airflow.providers.google.cloud.transfers.s3_to_gcs.GCSHook")
+    def test_execute_apply_gcs_prefix(
+        self,
+        gcs_mock_hook,
+        s3_one_mock_hook,
+        s3_two_mock_hook,
+        apply_gcs_prefix,
+        s3_prefix,
+        s3_object,
+        gcs_destination,
+        gcs_object,
+    ):
+
+        operator = S3ToGCSOperator(
+            task_id=TASK_ID,
+            bucket=S3_BUCKET,
+            prefix=s3_prefix,
+            delimiter=S3_DELIMITER,
+            gcp_conn_id=GCS_CONN_ID,
+            dest_gcs=gcs_destination,
+            google_impersonation_chain=IMPERSONATION_CHAIN,
+            apply_gcs_prefix=apply_gcs_prefix,
+        )
+
+        s3_one_mock_hook.return_value.list_keys.return_value = [s3_prefix + 
s3_object]
+        s3_two_mock_hook.return_value.list_keys.return_value = [s3_prefix + 
s3_object]
+
+        uploaded_files = operator.execute(context={})
+        gcs_mock_hook.return_value.upload.assert_has_calls(
+            [
+                mock.call(GCS_BUCKET, gcs_object, mock.ANY, gzip=False),
+            ],
+            any_order=True,
+        )
+
+        s3_one_mock_hook.assert_called_once_with(aws_conn_id=AWS_CONN_ID, 
verify=None)
+        s3_two_mock_hook.assert_called_once_with(aws_conn_id=AWS_CONN_ID, 
verify=None)
+        gcs_mock_hook.assert_called_once_with(
+            gcp_conn_id=GCS_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+
+        assert sorted([s3_prefix + s3_object]) == sorted(uploaded_files)
diff --git a/tests/system/providers/google/cloud/gcs/example_s3_to_gcs.py 
b/tests/system/providers/google/cloud/gcs/example_s3_to_gcs.py
index 648e73223a..063d5a6743 100644
--- a/tests/system/providers/google/cloud/gcs/example_s3_to_gcs.py
+++ b/tests/system/providers/google/cloud/gcs/example_s3_to_gcs.py
@@ -62,7 +62,11 @@ with models.DAG(
     )
     # [START howto_transfer_s3togcs_operator]
     transfer_to_gcs = S3ToGCSOperator(
-        task_id="s3_to_gcs_task", bucket=BUCKET_NAME, prefix=PREFIX, 
dest_gcs=GCS_BUCKET_URL
+        task_id="s3_to_gcs_task",
+        bucket=BUCKET_NAME,
+        prefix=PREFIX,
+        dest_gcs=GCS_BUCKET_URL,
+        apply_gcs_prefix=True,
     )
     # [END howto_transfer_s3togcs_operator]
 

Reply via email to