This is an automated email from the ASF dual-hosted git repository. szehon pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push: new 7ffc718d28 Core, Spark: Preserve the relative path in RewriteTablePathUtil on staging 7ffc718d28 is described below commit 7ffc718d2857c1f4e4e7e1d70eebc8662020d6bd Author: Mustafa Elbehery <melbe...@redhat.com> AuthorDate: Tue Aug 5 20:55:32 2025 +0200 Core, Spark: Preserve the relative path in RewriteTablePathUtil on staging --- .../org/apache/iceberg/RewriteTablePathUtil.java | 26 ++++++- .../apache/iceberg/TestRewriteTablePathUtil.java | 90 ++++++++++++++++++++++ .../spark/actions/RewriteTablePathSparkAction.java | 19 +++-- .../spark/actions/TestRewriteTablePathsAction.java | 87 +++++++++++++++++++++ .../spark/actions/RewriteTablePathSparkAction.java | 19 +++-- .../spark/actions/TestRewriteTablePathsAction.java | 87 +++++++++++++++++++++ .../spark/actions/RewriteTablePathSparkAction.java | 19 +++-- .../spark/actions/TestRewriteTablePathsAction.java | 87 +++++++++++++++++++++ 8 files changed, 414 insertions(+), 20 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java b/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java index f63222c185..9cff49f601 100644 --- a/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java +++ b/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java @@ -264,7 +264,9 @@ public class RewriteTablePathUtil { if (manifestsToRewrite.contains(file.path())) { result.toRewrite().add(file); - result.copyPlan().add(Pair.of(stagingPath(file.path(), stagingDir), newFile.path())); + result + .copyPlan() + .add(Pair.of(stagingPath(file.path(), sourcePrefix, stagingDir), newFile.path())); } } return result; @@ -503,7 +505,10 @@ public class RewriteTablePathUtil { if (entry.isLive() && snapshotIds.contains(entry.snapshotId())) { result .copyPlan() - .add(Pair.of(stagingPath(file.location(), stagingLocation), movedFile.location())); + .add( + Pair.of( + stagingPath(file.location(), sourcePrefix, stagingLocation), + movedFile.location())); } result.toRewrite().add(file); return result; @@ -693,8 +698,25 @@ public class RewriteTablePathUtil { * @param originalPath source path * @param stagingDir staging directory * @return a staging path under the staging directory, based on the original path + * @deprecated since 1.10.0, will be removed in 1.11.0. Use {@link #stagingPath(String, String, + * String)} instead to avoid filename conflicts */ + @Deprecated public static String stagingPath(String originalPath, String stagingDir) { return stagingDir + fileName(originalPath); } + + /** + * Construct a staging path under a given staging directory, preserving relative directory + * structure to avoid conflicts when multiple files have the same name but different paths. + * + * @param originalPath source path + * @param sourcePrefix source prefix to be replaced + * @param stagingDir staging directory + * @return a staging path under the staging directory that preserves the relative path structure + */ + public static String stagingPath(String originalPath, String sourcePrefix, String stagingDir) { + String relativePath = relativize(originalPath, sourcePrefix); + return combinePaths(stagingDir, relativePath); + } } diff --git a/core/src/test/java/org/apache/iceberg/TestRewriteTablePathUtil.java b/core/src/test/java/org/apache/iceberg/TestRewriteTablePathUtil.java new file mode 100644 index 0000000000..8a688bebf5 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestRewriteTablePathUtil.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.junit.jupiter.api.Test; + +public class TestRewriteTablePathUtil { + + @Test + public void testStagingPathPreservesDirectoryStructure() { + String sourcePrefix = "/source/table"; + String stagingDir = "/staging/"; + + // Two files with same name but different paths + String file1 = "/source/table/hash1/delete_0_0_0.parquet"; + String file2 = "/source/table/hash2/delete_0_0_0.parquet"; + + String stagingPath1 = RewriteTablePathUtil.stagingPath(file1, sourcePrefix, stagingDir); + String stagingPath2 = RewriteTablePathUtil.stagingPath(file2, sourcePrefix, stagingDir); + + // Should preserve directory structure to avoid conflicts + assertThat(stagingPath1) + .startsWith(stagingDir) + .isEqualTo("/staging/hash1/delete_0_0_0.parquet") + .isNotEqualTo(stagingPath2); + assertThat(stagingPath2) + .startsWith(stagingDir) + .isEqualTo("/staging/hash2/delete_0_0_0.parquet"); + } + + @Test + public void testStagingPathBackwardCompatibility() { + // Test that the deprecated method still works + String originalPath = "/some/path/file.parquet"; + String stagingDir = "/staging/"; + + String result = RewriteTablePathUtil.stagingPath(originalPath, stagingDir); + + assertThat(result).isEqualTo("/staging/file.parquet"); + } + + @Test + public void testStagingPathWithComplexPaths() { + String sourcePrefix = "/warehouse/db/table"; + String stagingDir = "/tmp/staging/"; + + String filePath = "/warehouse/db/table/data/year=2023/month=01/part-00001.parquet"; + String result = RewriteTablePathUtil.stagingPath(filePath, sourcePrefix, stagingDir); + + assertThat(result).isEqualTo("/tmp/staging/data/year=2023/month=01/part-00001.parquet"); + } + + @Test + public void testStagingPathWithNoMiddlePart() { + // Test case where file is directly under source prefix (no middle directory structure) + String sourcePrefix = "/source/table"; + String stagingDir = "/staging/"; + String fileDirectlyUnderPrefix = "/source/table/file.parquet"; + + // Test new method + String newMethodResult = + RewriteTablePathUtil.stagingPath(fileDirectlyUnderPrefix, sourcePrefix, stagingDir); + + // Test old deprecated method + String oldMethodResult = RewriteTablePathUtil.stagingPath(fileDirectlyUnderPrefix, stagingDir); + + // Both methods should behave the same when there's no middle part + assertThat(newMethodResult).isEqualTo("/staging/file.parquet"); + assertThat(oldMethodResult).isEqualTo("/staging/file.parquet"); + assertThat(newMethodResult).isEqualTo(oldMethodResult); + } +} diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java index fbeabfe63e..f7e90e679a 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java @@ -372,7 +372,8 @@ public class RewriteTablePathSparkAction extends BaseSparkAction<RewriteTablePat private Set<Pair<String, String>> rewriteVersionFile( TableMetadata metadata, String versionFilePath) { Set<Pair<String, String>> result = Sets.newHashSet(); - String stagingPath = RewriteTablePathUtil.stagingPath(versionFilePath, stagingDir); + String stagingPath = + RewriteTablePathUtil.stagingPath(versionFilePath, sourcePrefix, stagingDir); TableMetadata newTableMetadata = RewriteTablePathUtil.replacePaths(metadata, sourcePrefix, targetPrefix); TableMetadataParser.overwrite(newTableMetadata, table.io().newOutputFile(stagingPath)); @@ -404,7 +405,9 @@ public class RewriteTablePathSparkAction extends BaseSparkAction<RewriteTablePat before.fileSizeInBytes() == after.fileSizeInBytes(), "Before and after path rewrite, statistic file size should be same"); result.add( - Pair.of(RewriteTablePathUtil.stagingPath(before.path(), stagingDir), after.path())); + Pair.of( + RewriteTablePathUtil.stagingPath(before.path(), sourcePrefix, stagingDir), + after.path())); } return result; } @@ -423,7 +426,7 @@ public class RewriteTablePathSparkAction extends BaseSparkAction<RewriteTablePat RewriteResult<ManifestFile> result = new RewriteResult<>(); String path = snapshot.manifestListLocation(); - String outputPath = RewriteTablePathUtil.stagingPath(path, stagingDir); + String outputPath = RewriteTablePathUtil.stagingPath(path, sourcePrefix, stagingDir); RewriteResult<ManifestFile> rewriteResult = RewriteTablePathUtil.rewriteManifestList( snapshot, @@ -570,7 +573,8 @@ public class RewriteTablePathSparkAction extends BaseSparkAction<RewriteTablePat String sourcePrefix, String targetPrefix) { try { - String stagingPath = RewriteTablePathUtil.stagingPath(manifestFile.path(), stagingLocation); + String stagingPath = + RewriteTablePathUtil.stagingPath(manifestFile.path(), sourcePrefix, stagingLocation); FileIO io = table.getValue().io(); OutputFile outputFile = io.newOutputFile(stagingPath); Map<Integer, PartitionSpec> specsById = table.getValue().specs(); @@ -598,7 +602,8 @@ public class RewriteTablePathSparkAction extends BaseSparkAction<RewriteTablePat String sourcePrefix, String targetPrefix) { try { - String stagingPath = RewriteTablePathUtil.stagingPath(manifestFile.path(), stagingLocation); + String stagingPath = + RewriteTablePathUtil.stagingPath(manifestFile.path(), sourcePrefix, stagingLocation); FileIO io = table.getValue().io(); OutputFile outputFile = io.newOutputFile(stagingPath); Map<Integer, PartitionSpec> specsById = table.getValue().specs(); @@ -662,7 +667,9 @@ public class RewriteTablePathSparkAction extends BaseSparkAction<RewriteTablePat PositionDeleteReaderWriter posDeleteReaderWriter) { return deleteFile -> { FileIO io = tableArg.getValue().io(); - String newPath = RewriteTablePathUtil.stagingPath(deleteFile.location(), stagingLocationArg); + String newPath = + RewriteTablePathUtil.stagingPath( + deleteFile.location(), sourcePrefixArg, stagingLocationArg); OutputFile outputFile = io.newOutputFile(newPath); PartitionSpec spec = tableArg.getValue().specs().get(deleteFile.specId()); RewriteTablePathUtil.rewritePositionDeleteFile( diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java index 705e0074c7..5735d2b335 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java @@ -1122,6 +1122,93 @@ public class TestRewriteTablePathsAction extends TestBase { assertThat(tableBroadcast.getValue().uuid()).isEqualTo(table.uuid()); } + @Test + public void testNestedDirectoryStructurePreservation() throws Exception { + String sourceTableLocation = newTableLocation(); + Table sourceTable = createTableWithSnapshots(sourceTableLocation, 1); + + // Create position delete files with same names in different nested directories + // This simulates the scenario tested in + // TestRewriteTablePathUtil.testStagingPathPreservesDirectoryStructure + List<Pair<CharSequence, Long>> deletes1 = + Lists.newArrayList( + Pair.of( + sourceTable + .currentSnapshot() + .addedDataFiles(sourceTable.io()) + .iterator() + .next() + .location(), + 0L)); + + List<Pair<CharSequence, Long>> deletes2 = + Lists.newArrayList( + Pair.of( + sourceTable + .currentSnapshot() + .addedDataFiles(sourceTable.io()) + .iterator() + .next() + .location(), + 0L)); + + // Create delete files with same name in different nested paths (hash1/ and hash2/) + File file1 = + new File(removePrefix(sourceTable.location() + "/data/hash1/delete_0_0_0.parquet")); + File file2 = + new File(removePrefix(sourceTable.location() + "/data/hash2/delete_0_0_0.parquet")); + + DeleteFile positionDeletes1 = + FileHelpers.writeDeleteFile( + sourceTable, sourceTable.io().newOutputFile(file1.toURI().toString()), deletes1) + .first(); + + DeleteFile positionDeletes2 = + FileHelpers.writeDeleteFile( + sourceTable, sourceTable.io().newOutputFile(file2.toURI().toString()), deletes2) + .first(); + + sourceTable.newRowDelta().addDeletes(positionDeletes1).commit(); + sourceTable.newRowDelta().addDeletes(positionDeletes2).commit(); + + // Perform rewrite with staging location to test directory structure preservation + RewriteTablePath.Result result = + actions() + .rewriteTablePath(sourceTable) + .stagingLocation(stagingLocation()) + .rewriteLocationPrefix(sourceTableLocation, targetTableLocation()) + .execute(); + + // Copy the files and verify structure is preserved + copyTableFiles(result); + + // Read the file paths from the rewritten result to verify directory structure + List<Tuple2<String, String>> filePaths = readPathPairList(result.fileListLocation()); + + // Find the delete files in the result + List<Tuple2<String, String>> deleteFilePaths = + filePaths.stream() + .filter(pair -> pair._2().contains("delete_0_0_0.parquet")) + .collect(Collectors.toList()); + + // Should have 2 delete files with different paths + assertThat(deleteFilePaths).hasSize(2); + + // Verify that the directory structure is preserved in target paths + assertThat(deleteFilePaths) + .anyMatch(pair -> pair._2().contains("/hash1/delete_0_0_0.parquet")) + .anyMatch(pair -> pair._2().contains("/hash2/delete_0_0_0.parquet")); + + // Verify that the files have different target paths (no conflicts) + String targetPath1 = deleteFilePaths.get(0)._2(); + String targetPath2 = deleteFilePaths.get(1)._2(); + assertThat(targetPath1).isNotEqualTo(targetPath2); + + // Verify both target paths start with the target table location + assertThat(targetPath1).startsWith(targetTableLocation()); + assertThat(targetPath2).startsWith(targetTableLocation()); + } + protected void checkFileNum( int versionFileCount, int manifestListCount, diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java index fbeabfe63e..f7e90e679a 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java @@ -372,7 +372,8 @@ public class RewriteTablePathSparkAction extends BaseSparkAction<RewriteTablePat private Set<Pair<String, String>> rewriteVersionFile( TableMetadata metadata, String versionFilePath) { Set<Pair<String, String>> result = Sets.newHashSet(); - String stagingPath = RewriteTablePathUtil.stagingPath(versionFilePath, stagingDir); + String stagingPath = + RewriteTablePathUtil.stagingPath(versionFilePath, sourcePrefix, stagingDir); TableMetadata newTableMetadata = RewriteTablePathUtil.replacePaths(metadata, sourcePrefix, targetPrefix); TableMetadataParser.overwrite(newTableMetadata, table.io().newOutputFile(stagingPath)); @@ -404,7 +405,9 @@ public class RewriteTablePathSparkAction extends BaseSparkAction<RewriteTablePat before.fileSizeInBytes() == after.fileSizeInBytes(), "Before and after path rewrite, statistic file size should be same"); result.add( - Pair.of(RewriteTablePathUtil.stagingPath(before.path(), stagingDir), after.path())); + Pair.of( + RewriteTablePathUtil.stagingPath(before.path(), sourcePrefix, stagingDir), + after.path())); } return result; } @@ -423,7 +426,7 @@ public class RewriteTablePathSparkAction extends BaseSparkAction<RewriteTablePat RewriteResult<ManifestFile> result = new RewriteResult<>(); String path = snapshot.manifestListLocation(); - String outputPath = RewriteTablePathUtil.stagingPath(path, stagingDir); + String outputPath = RewriteTablePathUtil.stagingPath(path, sourcePrefix, stagingDir); RewriteResult<ManifestFile> rewriteResult = RewriteTablePathUtil.rewriteManifestList( snapshot, @@ -570,7 +573,8 @@ public class RewriteTablePathSparkAction extends BaseSparkAction<RewriteTablePat String sourcePrefix, String targetPrefix) { try { - String stagingPath = RewriteTablePathUtil.stagingPath(manifestFile.path(), stagingLocation); + String stagingPath = + RewriteTablePathUtil.stagingPath(manifestFile.path(), sourcePrefix, stagingLocation); FileIO io = table.getValue().io(); OutputFile outputFile = io.newOutputFile(stagingPath); Map<Integer, PartitionSpec> specsById = table.getValue().specs(); @@ -598,7 +602,8 @@ public class RewriteTablePathSparkAction extends BaseSparkAction<RewriteTablePat String sourcePrefix, String targetPrefix) { try { - String stagingPath = RewriteTablePathUtil.stagingPath(manifestFile.path(), stagingLocation); + String stagingPath = + RewriteTablePathUtil.stagingPath(manifestFile.path(), sourcePrefix, stagingLocation); FileIO io = table.getValue().io(); OutputFile outputFile = io.newOutputFile(stagingPath); Map<Integer, PartitionSpec> specsById = table.getValue().specs(); @@ -662,7 +667,9 @@ public class RewriteTablePathSparkAction extends BaseSparkAction<RewriteTablePat PositionDeleteReaderWriter posDeleteReaderWriter) { return deleteFile -> { FileIO io = tableArg.getValue().io(); - String newPath = RewriteTablePathUtil.stagingPath(deleteFile.location(), stagingLocationArg); + String newPath = + RewriteTablePathUtil.stagingPath( + deleteFile.location(), sourcePrefixArg, stagingLocationArg); OutputFile outputFile = io.newOutputFile(newPath); PartitionSpec spec = tableArg.getValue().specs().get(deleteFile.specId()); RewriteTablePathUtil.rewritePositionDeleteFile( diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java index 705e0074c7..5735d2b335 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java @@ -1122,6 +1122,93 @@ public class TestRewriteTablePathsAction extends TestBase { assertThat(tableBroadcast.getValue().uuid()).isEqualTo(table.uuid()); } + @Test + public void testNestedDirectoryStructurePreservation() throws Exception { + String sourceTableLocation = newTableLocation(); + Table sourceTable = createTableWithSnapshots(sourceTableLocation, 1); + + // Create position delete files with same names in different nested directories + // This simulates the scenario tested in + // TestRewriteTablePathUtil.testStagingPathPreservesDirectoryStructure + List<Pair<CharSequence, Long>> deletes1 = + Lists.newArrayList( + Pair.of( + sourceTable + .currentSnapshot() + .addedDataFiles(sourceTable.io()) + .iterator() + .next() + .location(), + 0L)); + + List<Pair<CharSequence, Long>> deletes2 = + Lists.newArrayList( + Pair.of( + sourceTable + .currentSnapshot() + .addedDataFiles(sourceTable.io()) + .iterator() + .next() + .location(), + 0L)); + + // Create delete files with same name in different nested paths (hash1/ and hash2/) + File file1 = + new File(removePrefix(sourceTable.location() + "/data/hash1/delete_0_0_0.parquet")); + File file2 = + new File(removePrefix(sourceTable.location() + "/data/hash2/delete_0_0_0.parquet")); + + DeleteFile positionDeletes1 = + FileHelpers.writeDeleteFile( + sourceTable, sourceTable.io().newOutputFile(file1.toURI().toString()), deletes1) + .first(); + + DeleteFile positionDeletes2 = + FileHelpers.writeDeleteFile( + sourceTable, sourceTable.io().newOutputFile(file2.toURI().toString()), deletes2) + .first(); + + sourceTable.newRowDelta().addDeletes(positionDeletes1).commit(); + sourceTable.newRowDelta().addDeletes(positionDeletes2).commit(); + + // Perform rewrite with staging location to test directory structure preservation + RewriteTablePath.Result result = + actions() + .rewriteTablePath(sourceTable) + .stagingLocation(stagingLocation()) + .rewriteLocationPrefix(sourceTableLocation, targetTableLocation()) + .execute(); + + // Copy the files and verify structure is preserved + copyTableFiles(result); + + // Read the file paths from the rewritten result to verify directory structure + List<Tuple2<String, String>> filePaths = readPathPairList(result.fileListLocation()); + + // Find the delete files in the result + List<Tuple2<String, String>> deleteFilePaths = + filePaths.stream() + .filter(pair -> pair._2().contains("delete_0_0_0.parquet")) + .collect(Collectors.toList()); + + // Should have 2 delete files with different paths + assertThat(deleteFilePaths).hasSize(2); + + // Verify that the directory structure is preserved in target paths + assertThat(deleteFilePaths) + .anyMatch(pair -> pair._2().contains("/hash1/delete_0_0_0.parquet")) + .anyMatch(pair -> pair._2().contains("/hash2/delete_0_0_0.parquet")); + + // Verify that the files have different target paths (no conflicts) + String targetPath1 = deleteFilePaths.get(0)._2(); + String targetPath2 = deleteFilePaths.get(1)._2(); + assertThat(targetPath1).isNotEqualTo(targetPath2); + + // Verify both target paths start with the target table location + assertThat(targetPath1).startsWith(targetTableLocation()); + assertThat(targetPath2).startsWith(targetTableLocation()); + } + protected void checkFileNum( int versionFileCount, int manifestListCount, diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java index fbeabfe63e..f7e90e679a 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java @@ -372,7 +372,8 @@ public class RewriteTablePathSparkAction extends BaseSparkAction<RewriteTablePat private Set<Pair<String, String>> rewriteVersionFile( TableMetadata metadata, String versionFilePath) { Set<Pair<String, String>> result = Sets.newHashSet(); - String stagingPath = RewriteTablePathUtil.stagingPath(versionFilePath, stagingDir); + String stagingPath = + RewriteTablePathUtil.stagingPath(versionFilePath, sourcePrefix, stagingDir); TableMetadata newTableMetadata = RewriteTablePathUtil.replacePaths(metadata, sourcePrefix, targetPrefix); TableMetadataParser.overwrite(newTableMetadata, table.io().newOutputFile(stagingPath)); @@ -404,7 +405,9 @@ public class RewriteTablePathSparkAction extends BaseSparkAction<RewriteTablePat before.fileSizeInBytes() == after.fileSizeInBytes(), "Before and after path rewrite, statistic file size should be same"); result.add( - Pair.of(RewriteTablePathUtil.stagingPath(before.path(), stagingDir), after.path())); + Pair.of( + RewriteTablePathUtil.stagingPath(before.path(), sourcePrefix, stagingDir), + after.path())); } return result; } @@ -423,7 +426,7 @@ public class RewriteTablePathSparkAction extends BaseSparkAction<RewriteTablePat RewriteResult<ManifestFile> result = new RewriteResult<>(); String path = snapshot.manifestListLocation(); - String outputPath = RewriteTablePathUtil.stagingPath(path, stagingDir); + String outputPath = RewriteTablePathUtil.stagingPath(path, sourcePrefix, stagingDir); RewriteResult<ManifestFile> rewriteResult = RewriteTablePathUtil.rewriteManifestList( snapshot, @@ -570,7 +573,8 @@ public class RewriteTablePathSparkAction extends BaseSparkAction<RewriteTablePat String sourcePrefix, String targetPrefix) { try { - String stagingPath = RewriteTablePathUtil.stagingPath(manifestFile.path(), stagingLocation); + String stagingPath = + RewriteTablePathUtil.stagingPath(manifestFile.path(), sourcePrefix, stagingLocation); FileIO io = table.getValue().io(); OutputFile outputFile = io.newOutputFile(stagingPath); Map<Integer, PartitionSpec> specsById = table.getValue().specs(); @@ -598,7 +602,8 @@ public class RewriteTablePathSparkAction extends BaseSparkAction<RewriteTablePat String sourcePrefix, String targetPrefix) { try { - String stagingPath = RewriteTablePathUtil.stagingPath(manifestFile.path(), stagingLocation); + String stagingPath = + RewriteTablePathUtil.stagingPath(manifestFile.path(), sourcePrefix, stagingLocation); FileIO io = table.getValue().io(); OutputFile outputFile = io.newOutputFile(stagingPath); Map<Integer, PartitionSpec> specsById = table.getValue().specs(); @@ -662,7 +667,9 @@ public class RewriteTablePathSparkAction extends BaseSparkAction<RewriteTablePat PositionDeleteReaderWriter posDeleteReaderWriter) { return deleteFile -> { FileIO io = tableArg.getValue().io(); - String newPath = RewriteTablePathUtil.stagingPath(deleteFile.location(), stagingLocationArg); + String newPath = + RewriteTablePathUtil.stagingPath( + deleteFile.location(), sourcePrefixArg, stagingLocationArg); OutputFile outputFile = io.newOutputFile(newPath); PartitionSpec spec = tableArg.getValue().specs().get(deleteFile.specId()); RewriteTablePathUtil.rewritePositionDeleteFile( diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java index 705e0074c7..5735d2b335 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java @@ -1122,6 +1122,93 @@ public class TestRewriteTablePathsAction extends TestBase { assertThat(tableBroadcast.getValue().uuid()).isEqualTo(table.uuid()); } + @Test + public void testNestedDirectoryStructurePreservation() throws Exception { + String sourceTableLocation = newTableLocation(); + Table sourceTable = createTableWithSnapshots(sourceTableLocation, 1); + + // Create position delete files with same names in different nested directories + // This simulates the scenario tested in + // TestRewriteTablePathUtil.testStagingPathPreservesDirectoryStructure + List<Pair<CharSequence, Long>> deletes1 = + Lists.newArrayList( + Pair.of( + sourceTable + .currentSnapshot() + .addedDataFiles(sourceTable.io()) + .iterator() + .next() + .location(), + 0L)); + + List<Pair<CharSequence, Long>> deletes2 = + Lists.newArrayList( + Pair.of( + sourceTable + .currentSnapshot() + .addedDataFiles(sourceTable.io()) + .iterator() + .next() + .location(), + 0L)); + + // Create delete files with same name in different nested paths (hash1/ and hash2/) + File file1 = + new File(removePrefix(sourceTable.location() + "/data/hash1/delete_0_0_0.parquet")); + File file2 = + new File(removePrefix(sourceTable.location() + "/data/hash2/delete_0_0_0.parquet")); + + DeleteFile positionDeletes1 = + FileHelpers.writeDeleteFile( + sourceTable, sourceTable.io().newOutputFile(file1.toURI().toString()), deletes1) + .first(); + + DeleteFile positionDeletes2 = + FileHelpers.writeDeleteFile( + sourceTable, sourceTable.io().newOutputFile(file2.toURI().toString()), deletes2) + .first(); + + sourceTable.newRowDelta().addDeletes(positionDeletes1).commit(); + sourceTable.newRowDelta().addDeletes(positionDeletes2).commit(); + + // Perform rewrite with staging location to test directory structure preservation + RewriteTablePath.Result result = + actions() + .rewriteTablePath(sourceTable) + .stagingLocation(stagingLocation()) + .rewriteLocationPrefix(sourceTableLocation, targetTableLocation()) + .execute(); + + // Copy the files and verify structure is preserved + copyTableFiles(result); + + // Read the file paths from the rewritten result to verify directory structure + List<Tuple2<String, String>> filePaths = readPathPairList(result.fileListLocation()); + + // Find the delete files in the result + List<Tuple2<String, String>> deleteFilePaths = + filePaths.stream() + .filter(pair -> pair._2().contains("delete_0_0_0.parquet")) + .collect(Collectors.toList()); + + // Should have 2 delete files with different paths + assertThat(deleteFilePaths).hasSize(2); + + // Verify that the directory structure is preserved in target paths + assertThat(deleteFilePaths) + .anyMatch(pair -> pair._2().contains("/hash1/delete_0_0_0.parquet")) + .anyMatch(pair -> pair._2().contains("/hash2/delete_0_0_0.parquet")); + + // Verify that the files have different target paths (no conflicts) + String targetPath1 = deleteFilePaths.get(0)._2(); + String targetPath2 = deleteFilePaths.get(1)._2(); + assertThat(targetPath1).isNotEqualTo(targetPath2); + + // Verify both target paths start with the target table location + assertThat(targetPath1).startsWith(targetTableLocation()); + assertThat(targetPath2).startsWith(targetTableLocation()); + } + protected void checkFileNum( int versionFileCount, int manifestListCount,