HaJunYoo commented on code in PR #57713:
URL: https://github.com/apache/airflow/pull/57713#discussion_r2488931395


##########
providers/amazon/tests/unit/amazon/aws/transfers/test_gcs_to_s3.py:
##########
@@ -330,6 +330,133 @@ def test_execute_without_keep_director_structure(self, 
mock_hook):
             assert sorted(MOCK_FILES) == sorted(uploaded_files)
             assert hook.check_for_prefix(bucket_name="bucket", prefix=PREFIX + 
"/", delimiter="/") is True
 
+    @mock.patch("airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSHook")
+    def test_execute_with_flatten_structure(self, mock_hook):
+        """Test that flatten_structure parameter flattens directory 
structure."""
+        mock_files_with_paths = ["dir1/subdir1/file1.csv", 
"dir2/subdir2/file2.csv", "dir3/file3.csv"]
+        mock_hook.return_value.list.return_value = mock_files_with_paths
+
+        with NamedTemporaryFile() as f:
+            gcs_provide_file = mock_hook.return_value.provide_file
+            gcs_provide_file.return_value.__enter__.return_value.name = f.name
+
+            operator = GCSToS3Operator(
+                task_id=TASK_ID,
+                gcs_bucket=GCS_BUCKET,
+                prefix=PREFIX,
+                dest_aws_conn_id="aws_default",
+                dest_s3_key=S3_BUCKET,
+                replace=False,
+                flatten_structure=True,
+            )
+            hook, _ = _create_test_bucket()
+
+            uploaded_files = operator.execute(None)
+
+            # Verify all files were uploaded
+            assert sorted(mock_files_with_paths) == sorted(uploaded_files)
+
+            # Verify files are stored with flattened structure (only filenames)
+            expected_s3_keys = ["file1.csv", "file2.csv", "file3.csv"]
+            actual_keys = hook.list_keys("bucket", delimiter="/")
+            assert sorted(expected_s3_keys) == sorted(actual_keys)
+
+    @mock.patch("airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSHook")
+    def test_execute_with_flatten_structure_duplicate_filenames(self, 
mock_hook):
+        """Test that flatten_structure handles duplicate filenames 
correctly."""
+        mock_files_with_duplicates = [
+            "dir1/file.csv",
+            "dir2/file.csv",  # Same filename as above
+            "dir3/other.csv",
+        ]
+        mock_hook.return_value.list.return_value = mock_files_with_duplicates
+
+        with NamedTemporaryFile() as f:
+            gcs_provide_file = mock_hook.return_value.provide_file
+            gcs_provide_file.return_value.__enter__.return_value.name = f.name
+
+            operator = GCSToS3Operator(
+                task_id=TASK_ID,
+                gcs_bucket=GCS_BUCKET,
+                prefix=PREFIX,
+                dest_aws_conn_id="aws_default",
+                dest_s3_key=S3_BUCKET,
+                replace=False,
+                flatten_structure=True,
+            )
+            _, _ = _create_test_bucket()
+
+            # Mock the logging to verify warning is logged
+            with mock.patch.object(operator, "log") as mock_log:
+                uploaded_files = operator.execute(None)
+
+                # Only one of the duplicate files should be uploaded
+                assert len(uploaded_files) == 2
+                assert "dir3/other.csv" in uploaded_files
+                first_or_second = "dir1/file.csv" in uploaded_files or 
"dir2/file.csv" in uploaded_files
+                assert first_or_second
+
+                # Verify warning was logged for duplicate
+                mock_log.warning.assert_called()
+
+    def 
test_execute_with_flatten_structure_and_keep_directory_structure_warning(self):
+        """Test warning when both flatten_structure and 
keep_directory_structure are True."""
+        mock_path = 
"airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSToS3Operator.log"
+        with mock.patch(mock_path) as mock_log:
+            GCSToS3Operator(
+                task_id=TASK_ID,
+                gcs_bucket=GCS_BUCKET,
+                prefix=PREFIX,
+                dest_aws_conn_id="aws_default",
+                dest_s3_key=S3_BUCKET,
+                flatten_structure=True,
+                keep_directory_structure=True,  # This should trigger warning
+            )
+
+            # Verify warning was logged during initialization
+            expected_warning = "flatten_structure=True overrides 
keep_directory_structure=True"
+            mock_log.warning.assert_called_once_with(expected_warning)
+
+    def test_transform_file_path_with_flatten_structure(self):
+        """Test _transform_file_path method with flatten_structure=True."""

Review Comment:
   Thanks for the feedback.
   Refactored tests with `pytest.mark.parametrize` in this commit 
[96a2a36](https://github.com/apache/airflow/commit/96a2a360906db8297eba377efec02ff36fecfac5)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to