This is an automated email from the ASF dual-hosted git repository.

szehon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 7ffc718d28 Core, Spark: Preserve the relative path in 
RewriteTablePathUtil on staging
7ffc718d28 is described below

commit 7ffc718d2857c1f4e4e7e1d70eebc8662020d6bd
Author: Mustafa Elbehery <melbe...@redhat.com>
AuthorDate: Tue Aug 5 20:55:32 2025 +0200

    Core, Spark: Preserve the relative path in RewriteTablePathUtil on staging
---
 .../org/apache/iceberg/RewriteTablePathUtil.java   | 26 ++++++-
 .../apache/iceberg/TestRewriteTablePathUtil.java   | 90 ++++++++++++++++++++++
 .../spark/actions/RewriteTablePathSparkAction.java | 19 +++--
 .../spark/actions/TestRewriteTablePathsAction.java | 87 +++++++++++++++++++++
 .../spark/actions/RewriteTablePathSparkAction.java | 19 +++--
 .../spark/actions/TestRewriteTablePathsAction.java | 87 +++++++++++++++++++++
 .../spark/actions/RewriteTablePathSparkAction.java | 19 +++--
 .../spark/actions/TestRewriteTablePathsAction.java | 87 +++++++++++++++++++++
 8 files changed, 414 insertions(+), 20 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java 
b/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java
index f63222c185..9cff49f601 100644
--- a/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java
+++ b/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java
@@ -264,7 +264,9 @@ public class RewriteTablePathUtil {
 
         if (manifestsToRewrite.contains(file.path())) {
           result.toRewrite().add(file);
-          result.copyPlan().add(Pair.of(stagingPath(file.path(), stagingDir), 
newFile.path()));
+          result
+              .copyPlan()
+              .add(Pair.of(stagingPath(file.path(), sourcePrefix, stagingDir), 
newFile.path()));
         }
       }
       return result;
@@ -503,7 +505,10 @@ public class RewriteTablePathUtil {
         if (entry.isLive() && snapshotIds.contains(entry.snapshotId())) {
           result
               .copyPlan()
-              .add(Pair.of(stagingPath(file.location(), stagingLocation), 
movedFile.location()));
+              .add(
+                  Pair.of(
+                      stagingPath(file.location(), sourcePrefix, 
stagingLocation),
+                      movedFile.location()));
         }
         result.toRewrite().add(file);
         return result;
@@ -693,8 +698,25 @@ public class RewriteTablePathUtil {
    * @param originalPath source path
    * @param stagingDir staging directory
    * @return a staging path under the staging directory, based on the original 
path
+   * @deprecated since 1.10.0, will be removed in 1.11.0. Use {@link 
#stagingPath(String, String,
+   *     String)} instead to avoid filename conflicts
    */
+  @Deprecated
   public static String stagingPath(String originalPath, String stagingDir) {
     return stagingDir + fileName(originalPath);
   }
+
+  /**
+   * Construct a staging path under a given staging directory, preserving 
relative directory
+   * structure to avoid conflicts when multiple files have the same name but 
different paths.
+   *
+   * @param originalPath source path
+   * @param sourcePrefix source prefix to be replaced
+   * @param stagingDir staging directory
+   * @return a staging path under the staging directory that preserves the 
relative path structure
+   */
+  public static String stagingPath(String originalPath, String sourcePrefix, 
String stagingDir) {
+    String relativePath = relativize(originalPath, sourcePrefix);
+    return combinePaths(stagingDir, relativePath);
+  }
 }
diff --git 
a/core/src/test/java/org/apache/iceberg/TestRewriteTablePathUtil.java 
b/core/src/test/java/org/apache/iceberg/TestRewriteTablePathUtil.java
new file mode 100644
index 0000000000..8a688bebf5
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/TestRewriteTablePathUtil.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.jupiter.api.Test;
+
+public class TestRewriteTablePathUtil {
+
+  @Test
+  public void testStagingPathPreservesDirectoryStructure() {
+    String sourcePrefix = "/source/table";
+    String stagingDir = "/staging/";
+
+    // Two files with same name but different paths
+    String file1 = "/source/table/hash1/delete_0_0_0.parquet";
+    String file2 = "/source/table/hash2/delete_0_0_0.parquet";
+
+    String stagingPath1 = RewriteTablePathUtil.stagingPath(file1, 
sourcePrefix, stagingDir);
+    String stagingPath2 = RewriteTablePathUtil.stagingPath(file2, 
sourcePrefix, stagingDir);
+
+    // Should preserve directory structure to avoid conflicts
+    assertThat(stagingPath1)
+        .startsWith(stagingDir)
+        .isEqualTo("/staging/hash1/delete_0_0_0.parquet")
+        .isNotEqualTo(stagingPath2);
+    assertThat(stagingPath2)
+        .startsWith(stagingDir)
+        .isEqualTo("/staging/hash2/delete_0_0_0.parquet");
+  }
+
+  @Test
+  public void testStagingPathBackwardCompatibility() {
+    // Test that the deprecated method still works
+    String originalPath = "/some/path/file.parquet";
+    String stagingDir = "/staging/";
+
+    String result = RewriteTablePathUtil.stagingPath(originalPath, stagingDir);
+
+    assertThat(result).isEqualTo("/staging/file.parquet");
+  }
+
+  @Test
+  public void testStagingPathWithComplexPaths() {
+    String sourcePrefix = "/warehouse/db/table";
+    String stagingDir = "/tmp/staging/";
+
+    String filePath = 
"/warehouse/db/table/data/year=2023/month=01/part-00001.parquet";
+    String result = RewriteTablePathUtil.stagingPath(filePath, sourcePrefix, 
stagingDir);
+
+    
assertThat(result).isEqualTo("/tmp/staging/data/year=2023/month=01/part-00001.parquet");
+  }
+
+  @Test
+  public void testStagingPathWithNoMiddlePart() {
+    // Test case where file is directly under source prefix (no middle 
directory structure)
+    String sourcePrefix = "/source/table";
+    String stagingDir = "/staging/";
+    String fileDirectlyUnderPrefix = "/source/table/file.parquet";
+
+    // Test new method
+    String newMethodResult =
+        RewriteTablePathUtil.stagingPath(fileDirectlyUnderPrefix, 
sourcePrefix, stagingDir);
+
+    // Test old deprecated method
+    String oldMethodResult = 
RewriteTablePathUtil.stagingPath(fileDirectlyUnderPrefix, stagingDir);
+
+    // Both methods should behave the same when there's no middle part
+    assertThat(newMethodResult).isEqualTo("/staging/file.parquet");
+    assertThat(oldMethodResult).isEqualTo("/staging/file.parquet");
+    assertThat(newMethodResult).isEqualTo(oldMethodResult);
+  }
+}
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java
index fbeabfe63e..f7e90e679a 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java
@@ -372,7 +372,8 @@ public class RewriteTablePathSparkAction extends 
BaseSparkAction<RewriteTablePat
   private Set<Pair<String, String>> rewriteVersionFile(
       TableMetadata metadata, String versionFilePath) {
     Set<Pair<String, String>> result = Sets.newHashSet();
-    String stagingPath = RewriteTablePathUtil.stagingPath(versionFilePath, 
stagingDir);
+    String stagingPath =
+        RewriteTablePathUtil.stagingPath(versionFilePath, sourcePrefix, 
stagingDir);
     TableMetadata newTableMetadata =
         RewriteTablePathUtil.replacePaths(metadata, sourcePrefix, 
targetPrefix);
     TableMetadataParser.overwrite(newTableMetadata, 
table.io().newOutputFile(stagingPath));
@@ -404,7 +405,9 @@ public class RewriteTablePathSparkAction extends 
BaseSparkAction<RewriteTablePat
           before.fileSizeInBytes() == after.fileSizeInBytes(),
           "Before and after path rewrite, statistic file size should be same");
       result.add(
-          Pair.of(RewriteTablePathUtil.stagingPath(before.path(), stagingDir), 
after.path()));
+          Pair.of(
+              RewriteTablePathUtil.stagingPath(before.path(), sourcePrefix, 
stagingDir),
+              after.path()));
     }
     return result;
   }
@@ -423,7 +426,7 @@ public class RewriteTablePathSparkAction extends 
BaseSparkAction<RewriteTablePat
     RewriteResult<ManifestFile> result = new RewriteResult<>();
 
     String path = snapshot.manifestListLocation();
-    String outputPath = RewriteTablePathUtil.stagingPath(path, stagingDir);
+    String outputPath = RewriteTablePathUtil.stagingPath(path, sourcePrefix, 
stagingDir);
     RewriteResult<ManifestFile> rewriteResult =
         RewriteTablePathUtil.rewriteManifestList(
             snapshot,
@@ -570,7 +573,8 @@ public class RewriteTablePathSparkAction extends 
BaseSparkAction<RewriteTablePat
       String sourcePrefix,
       String targetPrefix) {
     try {
-      String stagingPath = 
RewriteTablePathUtil.stagingPath(manifestFile.path(), stagingLocation);
+      String stagingPath =
+          RewriteTablePathUtil.stagingPath(manifestFile.path(), sourcePrefix, 
stagingLocation);
       FileIO io = table.getValue().io();
       OutputFile outputFile = io.newOutputFile(stagingPath);
       Map<Integer, PartitionSpec> specsById = table.getValue().specs();
@@ -598,7 +602,8 @@ public class RewriteTablePathSparkAction extends 
BaseSparkAction<RewriteTablePat
       String sourcePrefix,
       String targetPrefix) {
     try {
-      String stagingPath = 
RewriteTablePathUtil.stagingPath(manifestFile.path(), stagingLocation);
+      String stagingPath =
+          RewriteTablePathUtil.stagingPath(manifestFile.path(), sourcePrefix, 
stagingLocation);
       FileIO io = table.getValue().io();
       OutputFile outputFile = io.newOutputFile(stagingPath);
       Map<Integer, PartitionSpec> specsById = table.getValue().specs();
@@ -662,7 +667,9 @@ public class RewriteTablePathSparkAction extends 
BaseSparkAction<RewriteTablePat
       PositionDeleteReaderWriter posDeleteReaderWriter) {
     return deleteFile -> {
       FileIO io = tableArg.getValue().io();
-      String newPath = RewriteTablePathUtil.stagingPath(deleteFile.location(), 
stagingLocationArg);
+      String newPath =
+          RewriteTablePathUtil.stagingPath(
+              deleteFile.location(), sourcePrefixArg, stagingLocationArg);
       OutputFile outputFile = io.newOutputFile(newPath);
       PartitionSpec spec = 
tableArg.getValue().specs().get(deleteFile.specId());
       RewriteTablePathUtil.rewritePositionDeleteFile(
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
index 705e0074c7..5735d2b335 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
@@ -1122,6 +1122,93 @@ public class TestRewriteTablePathsAction extends 
TestBase {
     assertThat(tableBroadcast.getValue().uuid()).isEqualTo(table.uuid());
   }
 
+  @Test
+  public void testNestedDirectoryStructurePreservation() throws Exception {
+    String sourceTableLocation = newTableLocation();
+    Table sourceTable = createTableWithSnapshots(sourceTableLocation, 1);
+
+    // Create position delete files with same names in different nested 
directories
+    // This simulates the scenario tested in
+    // TestRewriteTablePathUtil.testStagingPathPreservesDirectoryStructure
+    List<Pair<CharSequence, Long>> deletes1 =
+        Lists.newArrayList(
+            Pair.of(
+                sourceTable
+                    .currentSnapshot()
+                    .addedDataFiles(sourceTable.io())
+                    .iterator()
+                    .next()
+                    .location(),
+                0L));
+
+    List<Pair<CharSequence, Long>> deletes2 =
+        Lists.newArrayList(
+            Pair.of(
+                sourceTable
+                    .currentSnapshot()
+                    .addedDataFiles(sourceTable.io())
+                    .iterator()
+                    .next()
+                    .location(),
+                0L));
+
+    // Create delete files with same name in different nested paths (hash1/ 
and hash2/)
+    File file1 =
+        new File(removePrefix(sourceTable.location() + 
"/data/hash1/delete_0_0_0.parquet"));
+    File file2 =
+        new File(removePrefix(sourceTable.location() + 
"/data/hash2/delete_0_0_0.parquet"));
+
+    DeleteFile positionDeletes1 =
+        FileHelpers.writeDeleteFile(
+                sourceTable, 
sourceTable.io().newOutputFile(file1.toURI().toString()), deletes1)
+            .first();
+
+    DeleteFile positionDeletes2 =
+        FileHelpers.writeDeleteFile(
+                sourceTable, 
sourceTable.io().newOutputFile(file2.toURI().toString()), deletes2)
+            .first();
+
+    sourceTable.newRowDelta().addDeletes(positionDeletes1).commit();
+    sourceTable.newRowDelta().addDeletes(positionDeletes2).commit();
+
+    // Perform rewrite with staging location to test directory structure 
preservation
+    RewriteTablePath.Result result =
+        actions()
+            .rewriteTablePath(sourceTable)
+            .stagingLocation(stagingLocation())
+            .rewriteLocationPrefix(sourceTableLocation, targetTableLocation())
+            .execute();
+
+    // Copy the files and verify structure is preserved
+    copyTableFiles(result);
+
+    // Read the file paths from the rewritten result to verify directory 
structure
+    List<Tuple2<String, String>> filePaths = 
readPathPairList(result.fileListLocation());
+
+    // Find the delete files in the result
+    List<Tuple2<String, String>> deleteFilePaths =
+        filePaths.stream()
+            .filter(pair -> pair._2().contains("delete_0_0_0.parquet"))
+            .collect(Collectors.toList());
+
+    // Should have 2 delete files with different paths
+    assertThat(deleteFilePaths).hasSize(2);
+
+    // Verify that the directory structure is preserved in target paths
+    assertThat(deleteFilePaths)
+        .anyMatch(pair -> pair._2().contains("/hash1/delete_0_0_0.parquet"))
+        .anyMatch(pair -> pair._2().contains("/hash2/delete_0_0_0.parquet"));
+
+    // Verify that the files have different target paths (no conflicts)
+    String targetPath1 = deleteFilePaths.get(0)._2();
+    String targetPath2 = deleteFilePaths.get(1)._2();
+    assertThat(targetPath1).isNotEqualTo(targetPath2);
+
+    // Verify both target paths start with the target table location
+    assertThat(targetPath1).startsWith(targetTableLocation());
+    assertThat(targetPath2).startsWith(targetTableLocation());
+  }
+
   protected void checkFileNum(
       int versionFileCount,
       int manifestListCount,
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java
index fbeabfe63e..f7e90e679a 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java
@@ -372,7 +372,8 @@ public class RewriteTablePathSparkAction extends 
BaseSparkAction<RewriteTablePat
   private Set<Pair<String, String>> rewriteVersionFile(
       TableMetadata metadata, String versionFilePath) {
     Set<Pair<String, String>> result = Sets.newHashSet();
-    String stagingPath = RewriteTablePathUtil.stagingPath(versionFilePath, 
stagingDir);
+    String stagingPath =
+        RewriteTablePathUtil.stagingPath(versionFilePath, sourcePrefix, 
stagingDir);
     TableMetadata newTableMetadata =
         RewriteTablePathUtil.replacePaths(metadata, sourcePrefix, 
targetPrefix);
     TableMetadataParser.overwrite(newTableMetadata, 
table.io().newOutputFile(stagingPath));
@@ -404,7 +405,9 @@ public class RewriteTablePathSparkAction extends 
BaseSparkAction<RewriteTablePat
           before.fileSizeInBytes() == after.fileSizeInBytes(),
           "Before and after path rewrite, statistic file size should be same");
       result.add(
-          Pair.of(RewriteTablePathUtil.stagingPath(before.path(), stagingDir), 
after.path()));
+          Pair.of(
+              RewriteTablePathUtil.stagingPath(before.path(), sourcePrefix, 
stagingDir),
+              after.path()));
     }
     return result;
   }
@@ -423,7 +426,7 @@ public class RewriteTablePathSparkAction extends 
BaseSparkAction<RewriteTablePat
     RewriteResult<ManifestFile> result = new RewriteResult<>();
 
     String path = snapshot.manifestListLocation();
-    String outputPath = RewriteTablePathUtil.stagingPath(path, stagingDir);
+    String outputPath = RewriteTablePathUtil.stagingPath(path, sourcePrefix, 
stagingDir);
     RewriteResult<ManifestFile> rewriteResult =
         RewriteTablePathUtil.rewriteManifestList(
             snapshot,
@@ -570,7 +573,8 @@ public class RewriteTablePathSparkAction extends 
BaseSparkAction<RewriteTablePat
       String sourcePrefix,
       String targetPrefix) {
     try {
-      String stagingPath = 
RewriteTablePathUtil.stagingPath(manifestFile.path(), stagingLocation);
+      String stagingPath =
+          RewriteTablePathUtil.stagingPath(manifestFile.path(), sourcePrefix, 
stagingLocation);
       FileIO io = table.getValue().io();
       OutputFile outputFile = io.newOutputFile(stagingPath);
       Map<Integer, PartitionSpec> specsById = table.getValue().specs();
@@ -598,7 +602,8 @@ public class RewriteTablePathSparkAction extends 
BaseSparkAction<RewriteTablePat
       String sourcePrefix,
       String targetPrefix) {
     try {
-      String stagingPath = 
RewriteTablePathUtil.stagingPath(manifestFile.path(), stagingLocation);
+      String stagingPath =
+          RewriteTablePathUtil.stagingPath(manifestFile.path(), sourcePrefix, 
stagingLocation);
       FileIO io = table.getValue().io();
       OutputFile outputFile = io.newOutputFile(stagingPath);
       Map<Integer, PartitionSpec> specsById = table.getValue().specs();
@@ -662,7 +667,9 @@ public class RewriteTablePathSparkAction extends 
BaseSparkAction<RewriteTablePat
       PositionDeleteReaderWriter posDeleteReaderWriter) {
     return deleteFile -> {
       FileIO io = tableArg.getValue().io();
-      String newPath = RewriteTablePathUtil.stagingPath(deleteFile.location(), 
stagingLocationArg);
+      String newPath =
+          RewriteTablePathUtil.stagingPath(
+              deleteFile.location(), sourcePrefixArg, stagingLocationArg);
       OutputFile outputFile = io.newOutputFile(newPath);
       PartitionSpec spec = 
tableArg.getValue().specs().get(deleteFile.specId());
       RewriteTablePathUtil.rewritePositionDeleteFile(
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
index 705e0074c7..5735d2b335 100644
--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
@@ -1122,6 +1122,93 @@ public class TestRewriteTablePathsAction extends 
TestBase {
     assertThat(tableBroadcast.getValue().uuid()).isEqualTo(table.uuid());
   }
 
+  @Test
+  public void testNestedDirectoryStructurePreservation() throws Exception {
+    String sourceTableLocation = newTableLocation();
+    Table sourceTable = createTableWithSnapshots(sourceTableLocation, 1);
+
+    // Create position delete files with same names in different nested 
directories
+    // This simulates the scenario tested in
+    // TestRewriteTablePathUtil.testStagingPathPreservesDirectoryStructure
+    List<Pair<CharSequence, Long>> deletes1 =
+        Lists.newArrayList(
+            Pair.of(
+                sourceTable
+                    .currentSnapshot()
+                    .addedDataFiles(sourceTable.io())
+                    .iterator()
+                    .next()
+                    .location(),
+                0L));
+
+    List<Pair<CharSequence, Long>> deletes2 =
+        Lists.newArrayList(
+            Pair.of(
+                sourceTable
+                    .currentSnapshot()
+                    .addedDataFiles(sourceTable.io())
+                    .iterator()
+                    .next()
+                    .location(),
+                0L));
+
+    // Create delete files with same name in different nested paths (hash1/ 
and hash2/)
+    File file1 =
+        new File(removePrefix(sourceTable.location() + 
"/data/hash1/delete_0_0_0.parquet"));
+    File file2 =
+        new File(removePrefix(sourceTable.location() + 
"/data/hash2/delete_0_0_0.parquet"));
+
+    DeleteFile positionDeletes1 =
+        FileHelpers.writeDeleteFile(
+                sourceTable, 
sourceTable.io().newOutputFile(file1.toURI().toString()), deletes1)
+            .first();
+
+    DeleteFile positionDeletes2 =
+        FileHelpers.writeDeleteFile(
+                sourceTable, 
sourceTable.io().newOutputFile(file2.toURI().toString()), deletes2)
+            .first();
+
+    sourceTable.newRowDelta().addDeletes(positionDeletes1).commit();
+    sourceTable.newRowDelta().addDeletes(positionDeletes2).commit();
+
+    // Perform rewrite with staging location to test directory structure 
preservation
+    RewriteTablePath.Result result =
+        actions()
+            .rewriteTablePath(sourceTable)
+            .stagingLocation(stagingLocation())
+            .rewriteLocationPrefix(sourceTableLocation, targetTableLocation())
+            .execute();
+
+    // Copy the files and verify structure is preserved
+    copyTableFiles(result);
+
+    // Read the file paths from the rewritten result to verify directory 
structure
+    List<Tuple2<String, String>> filePaths = 
readPathPairList(result.fileListLocation());
+
+    // Find the delete files in the result
+    List<Tuple2<String, String>> deleteFilePaths =
+        filePaths.stream()
+            .filter(pair -> pair._2().contains("delete_0_0_0.parquet"))
+            .collect(Collectors.toList());
+
+    // Should have 2 delete files with different paths
+    assertThat(deleteFilePaths).hasSize(2);
+
+    // Verify that the directory structure is preserved in target paths
+    assertThat(deleteFilePaths)
+        .anyMatch(pair -> pair._2().contains("/hash1/delete_0_0_0.parquet"))
+        .anyMatch(pair -> pair._2().contains("/hash2/delete_0_0_0.parquet"));
+
+    // Verify that the files have different target paths (no conflicts)
+    String targetPath1 = deleteFilePaths.get(0)._2();
+    String targetPath2 = deleteFilePaths.get(1)._2();
+    assertThat(targetPath1).isNotEqualTo(targetPath2);
+
+    // Verify both target paths start with the target table location
+    assertThat(targetPath1).startsWith(targetTableLocation());
+    assertThat(targetPath2).startsWith(targetTableLocation());
+  }
+
   protected void checkFileNum(
       int versionFileCount,
       int manifestListCount,
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java
index fbeabfe63e..f7e90e679a 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java
@@ -372,7 +372,8 @@ public class RewriteTablePathSparkAction extends 
BaseSparkAction<RewriteTablePat
   private Set<Pair<String, String>> rewriteVersionFile(
       TableMetadata metadata, String versionFilePath) {
     Set<Pair<String, String>> result = Sets.newHashSet();
-    String stagingPath = RewriteTablePathUtil.stagingPath(versionFilePath, 
stagingDir);
+    String stagingPath =
+        RewriteTablePathUtil.stagingPath(versionFilePath, sourcePrefix, 
stagingDir);
     TableMetadata newTableMetadata =
         RewriteTablePathUtil.replacePaths(metadata, sourcePrefix, 
targetPrefix);
     TableMetadataParser.overwrite(newTableMetadata, 
table.io().newOutputFile(stagingPath));
@@ -404,7 +405,9 @@ public class RewriteTablePathSparkAction extends 
BaseSparkAction<RewriteTablePat
           before.fileSizeInBytes() == after.fileSizeInBytes(),
           "Before and after path rewrite, statistic file size should be same");
       result.add(
-          Pair.of(RewriteTablePathUtil.stagingPath(before.path(), stagingDir), 
after.path()));
+          Pair.of(
+              RewriteTablePathUtil.stagingPath(before.path(), sourcePrefix, 
stagingDir),
+              after.path()));
     }
     return result;
   }
@@ -423,7 +426,7 @@ public class RewriteTablePathSparkAction extends 
BaseSparkAction<RewriteTablePat
     RewriteResult<ManifestFile> result = new RewriteResult<>();
 
     String path = snapshot.manifestListLocation();
-    String outputPath = RewriteTablePathUtil.stagingPath(path, stagingDir);
+    String outputPath = RewriteTablePathUtil.stagingPath(path, sourcePrefix, 
stagingDir);
     RewriteResult<ManifestFile> rewriteResult =
         RewriteTablePathUtil.rewriteManifestList(
             snapshot,
@@ -570,7 +573,8 @@ public class RewriteTablePathSparkAction extends 
BaseSparkAction<RewriteTablePat
       String sourcePrefix,
       String targetPrefix) {
     try {
-      String stagingPath = 
RewriteTablePathUtil.stagingPath(manifestFile.path(), stagingLocation);
+      String stagingPath =
+          RewriteTablePathUtil.stagingPath(manifestFile.path(), sourcePrefix, 
stagingLocation);
       FileIO io = table.getValue().io();
       OutputFile outputFile = io.newOutputFile(stagingPath);
       Map<Integer, PartitionSpec> specsById = table.getValue().specs();
@@ -598,7 +602,8 @@ public class RewriteTablePathSparkAction extends 
BaseSparkAction<RewriteTablePat
       String sourcePrefix,
       String targetPrefix) {
     try {
-      String stagingPath = 
RewriteTablePathUtil.stagingPath(manifestFile.path(), stagingLocation);
+      String stagingPath =
+          RewriteTablePathUtil.stagingPath(manifestFile.path(), sourcePrefix, 
stagingLocation);
       FileIO io = table.getValue().io();
       OutputFile outputFile = io.newOutputFile(stagingPath);
       Map<Integer, PartitionSpec> specsById = table.getValue().specs();
@@ -662,7 +667,9 @@ public class RewriteTablePathSparkAction extends 
BaseSparkAction<RewriteTablePat
       PositionDeleteReaderWriter posDeleteReaderWriter) {
     return deleteFile -> {
       FileIO io = tableArg.getValue().io();
-      String newPath = RewriteTablePathUtil.stagingPath(deleteFile.location(), 
stagingLocationArg);
+      String newPath =
+          RewriteTablePathUtil.stagingPath(
+              deleteFile.location(), sourcePrefixArg, stagingLocationArg);
       OutputFile outputFile = io.newOutputFile(newPath);
       PartitionSpec spec = 
tableArg.getValue().specs().get(deleteFile.specId());
       RewriteTablePathUtil.rewritePositionDeleteFile(
diff --git 
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
 
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
index 705e0074c7..5735d2b335 100644
--- 
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
+++ 
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
@@ -1122,6 +1122,93 @@ public class TestRewriteTablePathsAction extends 
TestBase {
     assertThat(tableBroadcast.getValue().uuid()).isEqualTo(table.uuid());
   }
 
+  @Test
+  public void testNestedDirectoryStructurePreservation() throws Exception {
+    String sourceTableLocation = newTableLocation();
+    Table sourceTable = createTableWithSnapshots(sourceTableLocation, 1);
+
+    // Create position delete files with same names in different nested 
directories
+    // This simulates the scenario tested in
+    // TestRewriteTablePathUtil.testStagingPathPreservesDirectoryStructure
+    List<Pair<CharSequence, Long>> deletes1 =
+        Lists.newArrayList(
+            Pair.of(
+                sourceTable
+                    .currentSnapshot()
+                    .addedDataFiles(sourceTable.io())
+                    .iterator()
+                    .next()
+                    .location(),
+                0L));
+
+    List<Pair<CharSequence, Long>> deletes2 =
+        Lists.newArrayList(
+            Pair.of(
+                sourceTable
+                    .currentSnapshot()
+                    .addedDataFiles(sourceTable.io())
+                    .iterator()
+                    .next()
+                    .location(),
+                0L));
+
+    // Create delete files with same name in different nested paths (hash1/ 
and hash2/)
+    File file1 =
+        new File(removePrefix(sourceTable.location() + 
"/data/hash1/delete_0_0_0.parquet"));
+    File file2 =
+        new File(removePrefix(sourceTable.location() + 
"/data/hash2/delete_0_0_0.parquet"));
+
+    DeleteFile positionDeletes1 =
+        FileHelpers.writeDeleteFile(
+                sourceTable, 
sourceTable.io().newOutputFile(file1.toURI().toString()), deletes1)
+            .first();
+
+    DeleteFile positionDeletes2 =
+        FileHelpers.writeDeleteFile(
+                sourceTable, 
sourceTable.io().newOutputFile(file2.toURI().toString()), deletes2)
+            .first();
+
+    sourceTable.newRowDelta().addDeletes(positionDeletes1).commit();
+    sourceTable.newRowDelta().addDeletes(positionDeletes2).commit();
+
+    // Perform rewrite with staging location to test directory structure 
preservation
+    RewriteTablePath.Result result =
+        actions()
+            .rewriteTablePath(sourceTable)
+            .stagingLocation(stagingLocation())
+            .rewriteLocationPrefix(sourceTableLocation, targetTableLocation())
+            .execute();
+
+    // Copy the files and verify structure is preserved
+    copyTableFiles(result);
+
+    // Read the file paths from the rewritten result to verify directory 
structure
+    List<Tuple2<String, String>> filePaths = 
readPathPairList(result.fileListLocation());
+
+    // Find the delete files in the result
+    List<Tuple2<String, String>> deleteFilePaths =
+        filePaths.stream()
+            .filter(pair -> pair._2().contains("delete_0_0_0.parquet"))
+            .collect(Collectors.toList());
+
+    // Should have 2 delete files with different paths
+    assertThat(deleteFilePaths).hasSize(2);
+
+    // Verify that the directory structure is preserved in target paths
+    assertThat(deleteFilePaths)
+        .anyMatch(pair -> pair._2().contains("/hash1/delete_0_0_0.parquet"))
+        .anyMatch(pair -> pair._2().contains("/hash2/delete_0_0_0.parquet"));
+
+    // Verify that the files have different target paths (no conflicts)
+    String targetPath1 = deleteFilePaths.get(0)._2();
+    String targetPath2 = deleteFilePaths.get(1)._2();
+    assertThat(targetPath1).isNotEqualTo(targetPath2);
+
+    // Verify both target paths start with the target table location
+    assertThat(targetPath1).startsWith(targetTableLocation());
+    assertThat(targetPath2).startsWith(targetTableLocation());
+  }
+
   protected void checkFileNum(
       int versionFileCount,
       int manifestListCount,

Reply via email to