This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 9ecc6643afc Add flatten_structure parameter to GCSToS3Operator 
(#56134) (#57713)
9ecc6643afc is described below

commit 9ecc6643afce9f7833f8a49dcc6ad9608bfde358
Author: Hajun Yoo <[email protected]>
AuthorDate: Wed Nov 12 09:10:39 2025 +0900

    Add flatten_structure parameter to GCSToS3Operator (#56134) (#57713)
    
    * Add flatten_structure parameter to GCSToS3Operator
    
    * Enhance GCSToS3Operator docs clarity and refactor tests to use 
pytest.mark.parametrize for better maintainability
    
    * Improve GCSToS3Operator documentation consistency and add concrete 
examples
    
      - Use consistent "takes precedence over" terminology in warning messages
      - Add specific file path transformation example in class docstring
    
    * Fix GCSToS3Operator test log mocking for Airflow SDK compatibility
    
      - Replace mock.patch.object() with mock.patch() string path to handle
      - read-only log property in new Airflow SDK.
---
 .../providers/amazon/aws/transfers/gcs_to_s3.py    |  51 ++++++++-
 .../unit/amazon/aws/transfers/test_gcs_to_s3.py    | 115 +++++++++++++++++++++
 2 files changed, 162 insertions(+), 4 deletions(-)

diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/transfers/gcs_to_s3.py 
b/providers/amazon/src/airflow/providers/amazon/aws/transfers/gcs_to_s3.py
index 92289a89638..4a0f078f952 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/transfers/gcs_to_s3.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/transfers/gcs_to_s3.py
@@ -39,6 +39,11 @@ class GCSToS3Operator(BaseOperator):
     """
     Synchronizes a Google Cloud Storage bucket with an S3 bucket.
 
+    .. note::
+        When flatten_structure=True, it takes precedence over 
keep_directory_structure.
+        For example, with flatten_structure=True, "folder/subfolder/file.txt" 
becomes "file.txt"
+        regardless of the keep_directory_structure setting.
+
     .. seealso::
         For more information on how to use this operator, take a look at the 
guide:
         :ref:`howto/operator:GCSToS3Operator`
@@ -79,6 +84,9 @@ class GCSToS3Operator(BaseOperator):
         object to be uploaded in S3
     :param keep_directory_structure: (Optional) When set to False the path of 
the file
          on the bucket is recreated within path passed in dest_s3_key.
+    :param flatten_structure: (Optional) When set to True, places all files 
directly
+        in the dest_s3_key directory without preserving subdirectory structure.
+        Takes precedence over keep_directory_structure when enabled.
     :param match_glob: (Optional) filters objects based on the glob pattern 
given by the string
         (e.g, ``'**/*/.json'``)
     :param gcp_user_project: (Optional) The identifier of the Google Cloud 
project to bill for this request.
@@ -108,6 +116,7 @@ class GCSToS3Operator(BaseOperator):
         dest_s3_extra_args: dict | None = None,
         s3_acl_policy: str | None = None,
         keep_directory_structure: bool = True,
+        flatten_structure: bool = False,
         match_glob: str | None = None,
         gcp_user_project: str | None = None,
         **kwargs,
@@ -124,6 +133,10 @@ class GCSToS3Operator(BaseOperator):
         self.dest_s3_extra_args = dest_s3_extra_args or {}
         self.s3_acl_policy = s3_acl_policy
         self.keep_directory_structure = keep_directory_structure
+        self.flatten_structure = flatten_structure
+
+        if self.flatten_structure and self.keep_directory_structure:
+            self.log.warning("flatten_structure=True takes precedence over 
keep_directory_structure=True")
         try:
             from airflow.providers.google import __version__ as 
_GOOGLE_PROVIDER_VERSION
 
@@ -140,6 +153,17 @@ class GCSToS3Operator(BaseOperator):
         self.match_glob = match_glob
         self.gcp_user_project = gcp_user_project
 
+    def _transform_file_path(self, file_path: str) -> str:
+        """
+        Transform the GCS file path according to the specified options.
+
+        :param file_path: The original GCS file path
+        :return: The transformed file path for S3 destination
+        """
+        if self.flatten_structure:
+            return os.path.basename(file_path)
+        return file_path
+
     def execute(self, context: Context) -> list[str]:
         # list all files in an Google Cloud Storage bucket
         gcs_hook = GCSHook(
@@ -167,7 +191,7 @@ class GCSToS3Operator(BaseOperator):
             aws_conn_id=self.dest_aws_conn_id, verify=self.dest_verify, 
extra_args=self.dest_s3_extra_args
         )
 
-        if not self.keep_directory_structure and self.prefix:
+        if not self.keep_directory_structure and self.prefix and not 
self.flatten_structure:
             self.dest_s3_key = os.path.join(self.dest_s3_key, self.prefix)
 
         if not self.replace:
@@ -187,15 +211,34 @@ class GCSToS3Operator(BaseOperator):
             existing_files = existing_files or []
             # remove the prefix for the existing files to allow the match
             existing_files = [file.replace(prefix, "", 1) for file in 
existing_files]
-            gcs_files = list(set(gcs_files) - set(existing_files))
+
+            # Transform GCS files for comparison and filter out existing ones
+            existing_files_set = set(existing_files)
+            filtered_files = []
+            seen_transformed = set()
+
+            for file in gcs_files:
+                transformed = self._transform_file_path(file)
+                if transformed not in existing_files_set and transformed not 
in seen_transformed:
+                    filtered_files.append(file)
+                    seen_transformed.add(transformed)
+                elif transformed in seen_transformed:
+                    self.log.warning(
+                        "Skipping duplicate file %s (transforms to %s)",
+                        file,
+                        transformed,
+                    )
+
+            gcs_files = filtered_files
 
         if gcs_files:
             for file in gcs_files:
                 with gcs_hook.provide_file(
                     object_name=file, bucket_name=str(self.gcs_bucket), 
user_project=self.gcp_user_project
                 ) as local_tmp_file:
-                    dest_key = os.path.join(self.dest_s3_key, file)
-                    self.log.info("Saving file to %s", dest_key)
+                    transformed_path = self._transform_file_path(file)
+                    dest_key = os.path.join(self.dest_s3_key, transformed_path)
+                    self.log.info("Saving file from %s to %s", file, dest_key)
                     s3_hook.load_file(
                         filename=local_tmp_file.name,
                         key=dest_key,
diff --git a/providers/amazon/tests/unit/amazon/aws/transfers/test_gcs_to_s3.py 
b/providers/amazon/tests/unit/amazon/aws/transfers/test_gcs_to_s3.py
index c64df03bc4f..014f26f4943 100644
--- a/providers/amazon/tests/unit/amazon/aws/transfers/test_gcs_to_s3.py
+++ b/providers/amazon/tests/unit/amazon/aws/transfers/test_gcs_to_s3.py
@@ -330,6 +330,121 @@ class TestGCSToS3Operator:
             assert sorted(MOCK_FILES) == sorted(uploaded_files)
             assert hook.check_for_prefix(bucket_name="bucket", prefix=PREFIX + 
"/", delimiter="/") is True
 
+    @mock.patch("airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSHook")
+    def test_execute_with_flatten_structure(self, mock_hook):
+        """Test that flatten_structure parameter flattens directory 
structure."""
+        mock_files_with_paths = ["dir1/subdir1/file1.csv", 
"dir2/subdir2/file2.csv", "dir3/file3.csv"]
+        mock_hook.return_value.list.return_value = mock_files_with_paths
+
+        with NamedTemporaryFile() as f:
+            gcs_provide_file = mock_hook.return_value.provide_file
+            gcs_provide_file.return_value.__enter__.return_value.name = f.name
+
+            operator = GCSToS3Operator(
+                task_id=TASK_ID,
+                gcs_bucket=GCS_BUCKET,
+                prefix=PREFIX,
+                dest_aws_conn_id="aws_default",
+                dest_s3_key=S3_BUCKET,
+                replace=False,
+                flatten_structure=True,
+            )
+            hook, _ = _create_test_bucket()
+
+            uploaded_files = operator.execute(None)
+
+            # Verify all files were uploaded
+            assert sorted(mock_files_with_paths) == sorted(uploaded_files)
+
+            # Verify files are stored with flattened structure (only filenames)
+            expected_s3_keys = ["file1.csv", "file2.csv", "file3.csv"]
+            actual_keys = hook.list_keys("bucket", delimiter="/")
+            assert sorted(expected_s3_keys) == sorted(actual_keys)
+
+    @mock.patch("airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSHook")
+    def test_execute_with_flatten_structure_duplicate_filenames(self, 
mock_hook):
+        """Test that flatten_structure handles duplicate filenames 
correctly."""
+        mock_files_with_duplicates = [
+            "dir1/file.csv",
+            "dir2/file.csv",  # Same filename as above
+            "dir3/other.csv",
+        ]
+        mock_hook.return_value.list.return_value = mock_files_with_duplicates
+
+        with NamedTemporaryFile() as f:
+            gcs_provide_file = mock_hook.return_value.provide_file
+            gcs_provide_file.return_value.__enter__.return_value.name = f.name
+
+            operator = GCSToS3Operator(
+                task_id=TASK_ID,
+                gcs_bucket=GCS_BUCKET,
+                prefix=PREFIX,
+                dest_aws_conn_id="aws_default",
+                dest_s3_key=S3_BUCKET,
+                replace=False,
+                flatten_structure=True,
+            )
+            _, _ = _create_test_bucket()
+
+            # Mock the logging to verify warning is logged
+            mock_path = 
"airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSToS3Operator.log"
+            with mock.patch(mock_path) as mock_log:
+                uploaded_files = operator.execute(None)
+
+                # Only one of the duplicate files should be uploaded
+                assert len(uploaded_files) == 2
+                assert "dir3/other.csv" in uploaded_files
+                first_or_second = "dir1/file.csv" in uploaded_files or 
"dir2/file.csv" in uploaded_files
+                assert first_or_second
+
+                # Verify warning was logged for duplicate
+                mock_log.warning.assert_called()
+
+    def 
test_execute_with_flatten_structure_and_keep_directory_structure_warning(self):
+        """Test warning when both flatten_structure and 
keep_directory_structure are True."""
+        mock_path = 
"airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSToS3Operator.log"
+        with mock.patch(mock_path) as mock_log:
+            GCSToS3Operator(
+                task_id=TASK_ID,
+                gcs_bucket=GCS_BUCKET,
+                prefix=PREFIX,
+                dest_aws_conn_id="aws_default",
+                dest_s3_key=S3_BUCKET,
+                flatten_structure=True,
+                keep_directory_structure=True,  # This should trigger warning
+            )
+
+            # Verify warning was logged during initialization
+            expected_warning = "flatten_structure=True takes precedence over 
keep_directory_structure=True"
+            mock_log.warning.assert_called_once_with(expected_warning)
+
+    @pytest.mark.parametrize(
+        ("flatten_structure", "input_path", "expected_output"),
+        [
+            # Tests with flatten_structure=True
+            (True, "dir1/subdir1/file.csv", "file.csv"),
+            (True, "path/to/deep/nested/file.txt", "file.txt"),
+            (True, "simple.txt", "simple.txt"),
+            (True, "", ""),
+            # Tests with flatten_structure=False (preserves original paths)
+            (False, "dir1/subdir1/file.csv", "dir1/subdir1/file.csv"),
+            (False, "path/to/deep/nested/file.txt", 
"path/to/deep/nested/file.txt"),
+            (False, "simple.txt", "simple.txt"),
+            (False, "", ""),
+        ],
+    )
+    def test_transform_file_path(self, flatten_structure, input_path, 
expected_output):
+        """Test _transform_file_path method with various flatten_structure 
settings."""
+        operator = GCSToS3Operator(
+            task_id=TASK_ID,
+            gcs_bucket=GCS_BUCKET,
+            dest_s3_key=S3_BUCKET,
+            flatten_structure=flatten_structure,
+        )
+
+        result = operator._transform_file_path(input_path)
+        assert result == expected_output
+
     @pytest.mark.parametrize(
         ("gcs_prefix", "dest_s3_key", "expected_input", "expected_output"),
         [

Reply via email to