KoviAnusha commented on code in PR #57713:
URL: https://github.com/apache/airflow/pull/57713#discussion_r2485267958
##########
providers/amazon/src/airflow/providers/amazon/aws/transfers/gcs_to_s3.py:
##########
@@ -124,6 +128,10 @@ def __init__(
self.dest_s3_extra_args = dest_s3_extra_args or {}
self.s3_acl_policy = s3_acl_policy
self.keep_directory_structure = keep_directory_structure
+ self.flatten_structure = flatten_structure
+
+ if self.flatten_structure and self.keep_directory_structure:
+ self.log.warning("flatten_structure=True overrides
keep_directory_structure=True")
Review Comment:
Good for safeguarding here. How about adding this precedence note to the
top-level docstring so users immediately understand that flatten_structure=True
overrides the directory flag?
##########
providers/amazon/tests/unit/amazon/aws/transfers/test_gcs_to_s3.py:
##########
@@ -330,6 +330,133 @@ def test_execute_without_keep_director_structure(self,
mock_hook):
assert sorted(MOCK_FILES) == sorted(uploaded_files)
assert hook.check_for_prefix(bucket_name="bucket", prefix=PREFIX +
"/", delimiter="/") is True
+ @mock.patch("airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSHook")
+ def test_execute_with_flatten_structure(self, mock_hook):
+ """Test that flatten_structure parameter flattens directory
structure."""
+ mock_files_with_paths = ["dir1/subdir1/file1.csv",
"dir2/subdir2/file2.csv", "dir3/file3.csv"]
+ mock_hook.return_value.list.return_value = mock_files_with_paths
+
+ with NamedTemporaryFile() as f:
+ gcs_provide_file = mock_hook.return_value.provide_file
+ gcs_provide_file.return_value.__enter__.return_value.name = f.name
+
+ operator = GCSToS3Operator(
+ task_id=TASK_ID,
+ gcs_bucket=GCS_BUCKET,
+ prefix=PREFIX,
+ dest_aws_conn_id="aws_default",
+ dest_s3_key=S3_BUCKET,
+ replace=False,
+ flatten_structure=True,
+ )
+ hook, _ = _create_test_bucket()
+
+ uploaded_files = operator.execute(None)
+
+ # Verify all files were uploaded
+ assert sorted(mock_files_with_paths) == sorted(uploaded_files)
+
+ # Verify files are stored with flattened structure (only filenames)
+ expected_s3_keys = ["file1.csv", "file2.csv", "file3.csv"]
+ actual_keys = hook.list_keys("bucket", delimiter="/")
+ assert sorted(expected_s3_keys) == sorted(actual_keys)
+
+ @mock.patch("airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSHook")
+ def test_execute_with_flatten_structure_duplicate_filenames(self,
mock_hook):
+ """Test that flatten_structure handles duplicate filenames
correctly."""
+ mock_files_with_duplicates = [
+ "dir1/file.csv",
+ "dir2/file.csv", # Same filename as above
+ "dir3/other.csv",
+ ]
+ mock_hook.return_value.list.return_value = mock_files_with_duplicates
+
+ with NamedTemporaryFile() as f:
+ gcs_provide_file = mock_hook.return_value.provide_file
+ gcs_provide_file.return_value.__enter__.return_value.name = f.name
+
+ operator = GCSToS3Operator(
+ task_id=TASK_ID,
+ gcs_bucket=GCS_BUCKET,
+ prefix=PREFIX,
+ dest_aws_conn_id="aws_default",
+ dest_s3_key=S3_BUCKET,
+ replace=False,
+ flatten_structure=True,
+ )
+ _, _ = _create_test_bucket()
+
+ # Mock the logging to verify warning is logged
+ with mock.patch.object(operator, "log") as mock_log:
+ uploaded_files = operator.execute(None)
+
+ # Only one of the duplicate files should be uploaded
+ assert len(uploaded_files) == 2
+ assert "dir3/other.csv" in uploaded_files
+ first_or_second = "dir1/file.csv" in uploaded_files or
"dir2/file.csv" in uploaded_files
+ assert first_or_second
+
+ # Verify warning was logged for duplicate
+ mock_log.warning.assert_called()
+
+ def
test_execute_with_flatten_structure_and_keep_directory_structure_warning(self):
+ """Test warning when both flatten_structure and
keep_directory_structure are True."""
+ mock_path =
"airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSToS3Operator.log"
+ with mock.patch(mock_path) as mock_log:
+ GCSToS3Operator(
+ task_id=TASK_ID,
+ gcs_bucket=GCS_BUCKET,
+ prefix=PREFIX,
+ dest_aws_conn_id="aws_default",
+ dest_s3_key=S3_BUCKET,
+ flatten_structure=True,
+ keep_directory_structure=True, # This should trigger warning
+ )
+
+ # Verify warning was logged during initialization
+ expected_warning = "flatten_structure=True overrides
keep_directory_structure=True"
+ mock_log.warning.assert_called_once_with(expected_warning)
+
+ def test_transform_file_path_with_flatten_structure(self):
+ """Test _transform_file_path method with flatten_structure=True."""
Review Comment:
I think parameterizing these file path checks with 'pytest.mark.parametrize'
would make it even cleaner and easier to extend in the future.
##########
providers/amazon/src/airflow/providers/amazon/aws/transfers/gcs_to_s3.py:
##########
@@ -79,6 +79,9 @@ class GCSToS3Operator(BaseOperator):
object to be uploaded in S3
:param keep_directory_structure: (Optional) When set to False the path of
the file
on the bucket is recreated within path passed in dest_s3_key.
+ :param flatten_structure: (Optional) When set to True, places all files
directly
+ in the dest_s3_key directory without preserving subdirectory structure.
+ Overrides keep_directory_structure when enabled.
Review Comment:
Nice doc addition! Also, you can mention explicitly here that
flatten_structure takes precedence over keep_directory_structure. Users often
skim docstrings, so a one-liner note here helps avoid confusion later I believe.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]