This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 9ecc6643afc Add flatten_structure parameter to GCSToS3Operator
(#56134) (#57713)
9ecc6643afc is described below
commit 9ecc6643afce9f7833f8a49dcc6ad9608bfde358
Author: Hajun Yoo <[email protected]>
AuthorDate: Wed Nov 12 09:10:39 2025 +0900
Add flatten_structure parameter to GCSToS3Operator (#56134) (#57713)
* Add flatten_structure parameter to GCSToS3Operator
* Enhance GCSToS3Operator docs clarity and refactor tests to use
pytest.mark.parametrize for better maintainability
* Improve GCSToS3Operator documentation consistency and add concrete
examples
- Use consistent "takes precedence over" terminology in warning messages
- Add specific file path transformation example in class docstring
* Fix GCSToS3Operator test log mocking for Airflow SDK compatibility
- Replace mock.patch.object() with mock.patch() string path to handle
- read-only log property in new Airflow SDK.
---
.../providers/amazon/aws/transfers/gcs_to_s3.py | 51 ++++++++-
.../unit/amazon/aws/transfers/test_gcs_to_s3.py | 115 +++++++++++++++++++++
2 files changed, 162 insertions(+), 4 deletions(-)
diff --git
a/providers/amazon/src/airflow/providers/amazon/aws/transfers/gcs_to_s3.py
b/providers/amazon/src/airflow/providers/amazon/aws/transfers/gcs_to_s3.py
index 92289a89638..4a0f078f952 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/transfers/gcs_to_s3.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/transfers/gcs_to_s3.py
@@ -39,6 +39,11 @@ class GCSToS3Operator(BaseOperator):
"""
Synchronizes a Google Cloud Storage bucket with an S3 bucket.
+ .. note::
+ When flatten_structure=True, it takes precedence over
keep_directory_structure.
+ For example, with flatten_structure=True, "folder/subfolder/file.txt"
becomes "file.txt"
+ regardless of the keep_directory_structure setting.
+
.. seealso::
For more information on how to use this operator, take a look at the
guide:
:ref:`howto/operator:GCSToS3Operator`
@@ -79,6 +84,9 @@ class GCSToS3Operator(BaseOperator):
object to be uploaded in S3
:param keep_directory_structure: (Optional) When set to False the path of
the file
on the bucket is recreated within path passed in dest_s3_key.
+ :param flatten_structure: (Optional) When set to True, places all files
directly
+ in the dest_s3_key directory without preserving subdirectory structure.
+ Takes precedence over keep_directory_structure when enabled.
:param match_glob: (Optional) filters objects based on the glob pattern
given by the string
(e.g, ``'**/*/.json'``)
:param gcp_user_project: (Optional) The identifier of the Google Cloud
project to bill for this request.
@@ -108,6 +116,7 @@ class GCSToS3Operator(BaseOperator):
dest_s3_extra_args: dict | None = None,
s3_acl_policy: str | None = None,
keep_directory_structure: bool = True,
+ flatten_structure: bool = False,
match_glob: str | None = None,
gcp_user_project: str | None = None,
**kwargs,
@@ -124,6 +133,10 @@ class GCSToS3Operator(BaseOperator):
self.dest_s3_extra_args = dest_s3_extra_args or {}
self.s3_acl_policy = s3_acl_policy
self.keep_directory_structure = keep_directory_structure
+ self.flatten_structure = flatten_structure
+
+ if self.flatten_structure and self.keep_directory_structure:
+ self.log.warning("flatten_structure=True takes precedence over
keep_directory_structure=True")
try:
from airflow.providers.google import __version__ as
_GOOGLE_PROVIDER_VERSION
@@ -140,6 +153,17 @@ class GCSToS3Operator(BaseOperator):
self.match_glob = match_glob
self.gcp_user_project = gcp_user_project
+ def _transform_file_path(self, file_path: str) -> str:
+ """
+ Transform the GCS file path according to the specified options.
+
+ :param file_path: The original GCS file path
+ :return: The transformed file path for S3 destination
+ """
+ if self.flatten_structure:
+ return os.path.basename(file_path)
+ return file_path
+
def execute(self, context: Context) -> list[str]:
# list all files in an Google Cloud Storage bucket
gcs_hook = GCSHook(
@@ -167,7 +191,7 @@ class GCSToS3Operator(BaseOperator):
aws_conn_id=self.dest_aws_conn_id, verify=self.dest_verify,
extra_args=self.dest_s3_extra_args
)
- if not self.keep_directory_structure and self.prefix:
+ if not self.keep_directory_structure and self.prefix and not
self.flatten_structure:
self.dest_s3_key = os.path.join(self.dest_s3_key, self.prefix)
if not self.replace:
@@ -187,15 +211,34 @@ class GCSToS3Operator(BaseOperator):
existing_files = existing_files or []
# remove the prefix for the existing files to allow the match
existing_files = [file.replace(prefix, "", 1) for file in
existing_files]
- gcs_files = list(set(gcs_files) - set(existing_files))
+
+ # Transform GCS files for comparison and filter out existing ones
+ existing_files_set = set(existing_files)
+ filtered_files = []
+ seen_transformed = set()
+
+ for file in gcs_files:
+ transformed = self._transform_file_path(file)
+ if transformed not in existing_files_set and transformed not
in seen_transformed:
+ filtered_files.append(file)
+ seen_transformed.add(transformed)
+ elif transformed in seen_transformed:
+ self.log.warning(
+ "Skipping duplicate file %s (transforms to %s)",
+ file,
+ transformed,
+ )
+
+ gcs_files = filtered_files
if gcs_files:
for file in gcs_files:
with gcs_hook.provide_file(
object_name=file, bucket_name=str(self.gcs_bucket),
user_project=self.gcp_user_project
) as local_tmp_file:
- dest_key = os.path.join(self.dest_s3_key, file)
- self.log.info("Saving file to %s", dest_key)
+ transformed_path = self._transform_file_path(file)
+ dest_key = os.path.join(self.dest_s3_key, transformed_path)
+ self.log.info("Saving file from %s to %s", file, dest_key)
s3_hook.load_file(
filename=local_tmp_file.name,
key=dest_key,
diff --git a/providers/amazon/tests/unit/amazon/aws/transfers/test_gcs_to_s3.py
b/providers/amazon/tests/unit/amazon/aws/transfers/test_gcs_to_s3.py
index c64df03bc4f..014f26f4943 100644
--- a/providers/amazon/tests/unit/amazon/aws/transfers/test_gcs_to_s3.py
+++ b/providers/amazon/tests/unit/amazon/aws/transfers/test_gcs_to_s3.py
@@ -330,6 +330,121 @@ class TestGCSToS3Operator:
assert sorted(MOCK_FILES) == sorted(uploaded_files)
assert hook.check_for_prefix(bucket_name="bucket", prefix=PREFIX +
"/", delimiter="/") is True
+ @mock.patch("airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSHook")
+ def test_execute_with_flatten_structure(self, mock_hook):
+ """Test that flatten_structure parameter flattens directory
structure."""
+ mock_files_with_paths = ["dir1/subdir1/file1.csv",
"dir2/subdir2/file2.csv", "dir3/file3.csv"]
+ mock_hook.return_value.list.return_value = mock_files_with_paths
+
+ with NamedTemporaryFile() as f:
+ gcs_provide_file = mock_hook.return_value.provide_file
+ gcs_provide_file.return_value.__enter__.return_value.name = f.name
+
+ operator = GCSToS3Operator(
+ task_id=TASK_ID,
+ gcs_bucket=GCS_BUCKET,
+ prefix=PREFIX,
+ dest_aws_conn_id="aws_default",
+ dest_s3_key=S3_BUCKET,
+ replace=False,
+ flatten_structure=True,
+ )
+ hook, _ = _create_test_bucket()
+
+ uploaded_files = operator.execute(None)
+
+ # Verify all files were uploaded
+ assert sorted(mock_files_with_paths) == sorted(uploaded_files)
+
+ # Verify files are stored with flattened structure (only filenames)
+ expected_s3_keys = ["file1.csv", "file2.csv", "file3.csv"]
+ actual_keys = hook.list_keys("bucket", delimiter="/")
+ assert sorted(expected_s3_keys) == sorted(actual_keys)
+
+ @mock.patch("airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSHook")
+ def test_execute_with_flatten_structure_duplicate_filenames(self,
mock_hook):
+ """Test that flatten_structure handles duplicate filenames
correctly."""
+ mock_files_with_duplicates = [
+ "dir1/file.csv",
+ "dir2/file.csv", # Same filename as above
+ "dir3/other.csv",
+ ]
+ mock_hook.return_value.list.return_value = mock_files_with_duplicates
+
+ with NamedTemporaryFile() as f:
+ gcs_provide_file = mock_hook.return_value.provide_file
+ gcs_provide_file.return_value.__enter__.return_value.name = f.name
+
+ operator = GCSToS3Operator(
+ task_id=TASK_ID,
+ gcs_bucket=GCS_BUCKET,
+ prefix=PREFIX,
+ dest_aws_conn_id="aws_default",
+ dest_s3_key=S3_BUCKET,
+ replace=False,
+ flatten_structure=True,
+ )
+ _, _ = _create_test_bucket()
+
+ # Mock the logging to verify warning is logged
+ mock_path =
"airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSToS3Operator.log"
+ with mock.patch(mock_path) as mock_log:
+ uploaded_files = operator.execute(None)
+
+ # Only one of the duplicate files should be uploaded
+ assert len(uploaded_files) == 2
+ assert "dir3/other.csv" in uploaded_files
+ first_or_second = "dir1/file.csv" in uploaded_files or
"dir2/file.csv" in uploaded_files
+ assert first_or_second
+
+ # Verify warning was logged for duplicate
+ mock_log.warning.assert_called()
+
+ def
test_execute_with_flatten_structure_and_keep_directory_structure_warning(self):
+ """Test warning when both flatten_structure and
keep_directory_structure are True."""
+ mock_path =
"airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSToS3Operator.log"
+ with mock.patch(mock_path) as mock_log:
+ GCSToS3Operator(
+ task_id=TASK_ID,
+ gcs_bucket=GCS_BUCKET,
+ prefix=PREFIX,
+ dest_aws_conn_id="aws_default",
+ dest_s3_key=S3_BUCKET,
+ flatten_structure=True,
+ keep_directory_structure=True, # This should trigger warning
+ )
+
+ # Verify warning was logged during initialization
+ expected_warning = "flatten_structure=True takes precedence over
keep_directory_structure=True"
+ mock_log.warning.assert_called_once_with(expected_warning)
+
+ @pytest.mark.parametrize(
+ ("flatten_structure", "input_path", "expected_output"),
+ [
+ # Tests with flatten_structure=True
+ (True, "dir1/subdir1/file.csv", "file.csv"),
+ (True, "path/to/deep/nested/file.txt", "file.txt"),
+ (True, "simple.txt", "simple.txt"),
+ (True, "", ""),
+ # Tests with flatten_structure=False (preserves original paths)
+ (False, "dir1/subdir1/file.csv", "dir1/subdir1/file.csv"),
+ (False, "path/to/deep/nested/file.txt",
"path/to/deep/nested/file.txt"),
+ (False, "simple.txt", "simple.txt"),
+ (False, "", ""),
+ ],
+ )
+ def test_transform_file_path(self, flatten_structure, input_path,
expected_output):
+ """Test _transform_file_path method with various flatten_structure
settings."""
+ operator = GCSToS3Operator(
+ task_id=TASK_ID,
+ gcs_bucket=GCS_BUCKET,
+ dest_s3_key=S3_BUCKET,
+ flatten_structure=flatten_structure,
+ )
+
+ result = operator._transform_file_path(input_path)
+ assert result == expected_output
+
@pytest.mark.parametrize(
("gcs_prefix", "dest_s3_key", "expected_input", "expected_output"),
[