dramaticlly commented on code in PR #13720:
URL: https://github.com/apache/iceberg/pull/13720#discussion_r2286202074


##########
spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java:
##########
@@ -1209,6 +1213,191 @@ public void testNestedDirectoryStructurePreservation() 
throws Exception {
     assertThat(targetPath2).startsWith(targetTableLocation());
   }
 
+  @Test
+  public void testFullRewriteUpdatesAllManifestLengthsInManifestList(
+      @TempDir Path rootTargetLocation) throws Exception {
+    String location = newTableLocation();
+    Table sourceTable = createTableWithSnapshots(location, 10);
+
+    addAnyPositionDelete(
+        sourceTable, removePrefix(sourceTable.location() + 
"/data/deeply/nested/deletes.parquet"));
+
+    Map<String, Long> manifestSizesBeforeRewrite =
+        sourceTable.currentSnapshot().allManifests(sourceTable.io()).stream()
+            .collect(Collectors.toMap(m -> fileName(m.path()), m -> 
m.length()));
+
+    // Rewrite table metadata to a location that's much longer than the 
original in order
+    // to make manifests larger
+    String targetLocation = toAbsolute(rootTargetLocation) + 
generateLongNestedPath(25);
+    RewriteTablePath.Result result =
+        actions()
+            .rewriteTablePath(sourceTable)
+            .rewriteLocationPrefix(newTableLocation(), targetLocation)
+            .execute();
+
+    // 1 + 11 JSON metadata files, 11 snapshots, 11 manifests, 10 data files, 
1 delete file
+    checkFileNum(12, 11, 11, 45, result);
+    copyTableFiles(result);
+
+    Table targetTable = TABLES.load(targetLocation);
+
+    // We have rewritten all 11 snapshots. Make sure all sizes were correctly 
updated
+    // across all manifest lists
+    assertThat(targetTable.snapshots())
+        .allSatisfy(
+            snapshot ->
+                assertThat(snapshot.allManifests(targetTable.io()))
+                    .allSatisfy(
+                        manifest -> {
+                          String manifestName = fileName(manifest.path());
+                          assertThat(manifest.length())
+                              
.isNotEqualTo(manifestSizesBeforeRewrite.get(manifestName));
+                        })
+                    .allSatisfy(
+                        manifest -> {
+                          
assertThat(targetTable.io().newInputFile(manifest.path()).getLength())
+                              .isEqualTo(manifest.length());
+                        }));

Review Comment:
   love this



##########
spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java:
##########
@@ -1209,6 +1213,191 @@ public void testNestedDirectoryStructurePreservation() 
throws Exception {
     assertThat(targetPath2).startsWith(targetTableLocation());
   }
 
+  @Test
+  public void testFullRewriteUpdatesAllManifestLengthsInManifestList(
+      @TempDir Path rootTargetLocation) throws Exception {
+    String location = newTableLocation();
+    Table sourceTable = createTableWithSnapshots(location, 10);
+
+    addAnyPositionDelete(
+        sourceTable, removePrefix(sourceTable.location() + 
"/data/deeply/nested/deletes.parquet"));
+
+    Map<String, Long> manifestSizesBeforeRewrite =
+        sourceTable.currentSnapshot().allManifests(sourceTable.io()).stream()
+            .collect(Collectors.toMap(m -> fileName(m.path()), m -> 
m.length()));

Review Comment:
   nit: not relate to your change, but I just realized for filename(m.path()), 
we can just replace with just `Paths.get(path).getFileName().toString()` and 
and `m.length()` can be replaced with Method reference



##########
spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java:
##########
@@ -1209,6 +1213,191 @@ public void testNestedDirectoryStructurePreservation() 
throws Exception {
     assertThat(targetPath2).startsWith(targetTableLocation());
   }
 
+  @Test
+  public void testFullRewriteUpdatesAllManifestLengthsInManifestList(
+      @TempDir Path rootTargetLocation) throws Exception {
+    String location = newTableLocation();
+    Table sourceTable = createTableWithSnapshots(location, 10);
+
+    addAnyPositionDelete(
+        sourceTable, removePrefix(sourceTable.location() + 
"/data/deeply/nested/deletes.parquet"));
+
+    Map<String, Long> manifestSizesBeforeRewrite =
+        sourceTable.currentSnapshot().allManifests(sourceTable.io()).stream()
+            .collect(Collectors.toMap(m -> fileName(m.path()), m -> 
m.length()));
+
+    // Rewrite table metadata to a location that's much longer than the 
original in order
+    // to make manifests larger
+    String targetLocation = toAbsolute(rootTargetLocation) + 
generateLongNestedPath(25);
+    RewriteTablePath.Result result =
+        actions()
+            .rewriteTablePath(sourceTable)
+            .rewriteLocationPrefix(newTableLocation(), targetLocation)
+            .execute();
+
+    // 1 + 11 JSON metadata files, 11 snapshots, 11 manifests, 10 data files, 
1 delete file
+    checkFileNum(12, 11, 11, 45, result);
+    copyTableFiles(result);
+
+    Table targetTable = TABLES.load(targetLocation);
+
+    // We have rewritten all 11 snapshots. Make sure all sizes were correctly 
updated
+    // across all manifest lists
+    assertThat(targetTable.snapshots())
+        .allSatisfy(
+            snapshot ->
+                assertThat(snapshot.allManifests(targetTable.io()))
+                    .allSatisfy(
+                        manifest -> {
+                          String manifestName = fileName(manifest.path());
+                          assertThat(manifest.length())
+                              
.isNotEqualTo(manifestSizesBeforeRewrite.get(manifestName));
+                        })
+                    .allSatisfy(
+                        manifest -> {
+                          
assertThat(targetTable.io().newInputFile(manifest.path()).getLength())
+                              .isEqualTo(manifest.length());
+                        }));
+  }
+
+  @Test
+  public void testPartialRewriteUpdatesDataManifestLengthInManifestList(
+      @TempDir Path rootTargetLocation) throws Exception {
+    String location = newTableLocation();
+    Table sourceTable = createTableWithSnapshots(location, 10);
+
+    Map<String, Long> manifestSizesBeforeRewrite =
+        sourceTable.currentSnapshot().allManifests(sourceTable.io()).stream()
+            .collect(Collectors.toMap(m -> fileName(m.path()), m -> 
m.length()));
+
+    // Rewrite just the latest table version to a location that's much longer 
than
+    // the original in order to make manifests larger
+    String targetLocation = toAbsolute(rootTargetLocation) + 
generateLongNestedPath(25);
+    RewriteTablePath.Result result =
+        actions()
+            .rewriteTablePath(sourceTable)
+            .rewriteLocationPrefix(newTableLocation(), targetLocation)
+            .startVersion("v10.metadata.json")
+            .execute();
+
+    // 1 metadata JSON file, 1 snapshot, 10 manifests, 1 data file
+    checkFileNum(1, 1, 10, 13, result);
+    copyTableFiles(result);
+
+    Table targetTable = TABLES.load(targetLocation);
+    Snapshot lastSnapshot = targetTable.currentSnapshot();
+
+    List<ManifestFile> allManifests = 
lastSnapshot.allManifests(targetTable.io());
+    ManifestFile rewrittenManifest =
+        Iterables.getOnlyElement(
+            allManifests.stream()
+                .filter(manifest -> manifest.snapshotId() == 
lastSnapshot.snapshotId())
+                .collect(Collectors.toList()));

Review Comment:
   looks like this `rewrittenManifest` is not used, do we want to apply some 
assertion to this or can be removed?



##########
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java:
##########
@@ -494,77 +478,128 @@ public RewriteContentFileResult 
appendDeleteFile(RewriteResult<DeleteFile> r1) {
     }
   }
 
-  /** Rewrite manifest files in a distributed manner and return rewritten data 
files path pairs. */
-  private RewriteContentFileResult rewriteManifests(
+  private static class ManifestsRewriteResult {
+    private final RewriteContentFileResult contentFileResult;
+    private final Map<String, Long> rewrittenManifests;
+
+    ManifestsRewriteResult(
+        RewriteContentFileResult contentFileResult, Map<String, Long> 
rewrittenManifests) {
+      this.contentFileResult = contentFileResult;
+      this.rewrittenManifests = rewrittenManifests;
+    }
+
+    public RewriteContentFileResult getContentFileResult() {
+      return contentFileResult;
+    }
+
+    public Map<String, Long> getRewrittenManifests() {
+      return rewrittenManifests;
+    }
+  }
+
+  /**
+   * Rewrite manifest files in a distributed manner and return the resulting 
manifests and content
+   * files selected for rewriting.
+   */
+  private ManifestsRewriteResult rewriteManifests(
       Set<Snapshot> deltaSnapshots, TableMetadata tableMetadata, 
Set<ManifestFile> toRewrite) {
     if (toRewrite.isEmpty()) {
-      return new RewriteContentFileResult();
+      return new ManifestsRewriteResult(new RewriteContentFileResult(), 
Maps.newHashMap());
     }
 
     Encoder<ManifestFile> manifestFileEncoder = 
Encoders.javaSerialization(ManifestFile.class);
+    Encoder<RewriteContentFileResult> contentResultEncoder =
+        Encoders.javaSerialization(RewriteContentFileResult.class);
+    Encoder<Tuple3<String, Long, RewriteContentFileResult>> tupleEncoder =
+        Encoders.tuple(Encoders.STRING(), Encoders.LONG(), 
contentResultEncoder);
+
     Dataset<ManifestFile> manifestDS =
         spark().createDataset(Lists.newArrayList(toRewrite), 
manifestFileEncoder);
     Set<Long> deltaSnapshotIds =
         
deltaSnapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
 
-    return manifestDS
-        .repartition(toRewrite.size())
-        .map(
-            toManifests(
-                tableBroadcast(),
-                sparkContext().broadcast(deltaSnapshotIds),
-                stagingDir,
-                tableMetadata.formatVersion(),
-                sourcePrefix,
-                targetPrefix),
-            Encoders.bean(RewriteContentFileResult.class))
-        // duplicates are expected here as the same data file can have 
different statuses
-        // (e.g. added and deleted)
-        .reduce((ReduceFunction<RewriteContentFileResult>) 
RewriteContentFileResult::append);
-  }
-
-  private static MapFunction<ManifestFile, RewriteContentFileResult> 
toManifests(
-      Broadcast<Table> table,
-      Broadcast<Set<Long>> deltaSnapshotIds,
-      String stagingLocation,
-      int format,
-      String sourcePrefix,
-      String targetPrefix) {
+    RewriteContentFileResult finalContentResult = new 
RewriteContentFileResult();
+    Iterator<Tuple3<String, Long, RewriteContentFileResult>> resultIterator =
+        manifestDS
+            .repartition(toRewrite.size())
+            .map(
+                toManifests(
+                    tableBroadcast(),
+                    sparkContext().broadcast(deltaSnapshotIds),
+                    stagingDir,
+                    tableMetadata.formatVersion(),
+                    sourcePrefix,
+                    targetPrefix),
+                tupleEncoder)
+            .toLocalIterator();
+
+    Map<String, Long> rewrittenManifests = Maps.newHashMap();
+
+    while (resultIterator.hasNext()) {
+      Tuple3<String, Long, RewriteContentFileResult> resultTuple = 
resultIterator.next();
+      String originalManifestPath = resultTuple._1();
+      Long rewrittenManifestLength = resultTuple._2();
+      RewriteContentFileResult contentFileResult = resultTuple._3();
+      String stagingManifestPath =
+          RewriteTablePathUtil.stagingPath(originalManifestPath, sourcePrefix, 
stagingDir);
+      String targetManifestPath =
+          RewriteTablePathUtil.newPath(originalManifestPath, sourcePrefix, 
targetPrefix);
+
+      finalContentResult.append(contentFileResult);
+      finalContentResult.copyPlan().add(Pair.of(stagingManifestPath, 
targetManifestPath));
+      rewrittenManifests.put(originalManifestPath, rewrittenManifestLength);
+    }
+
+    return new ManifestsRewriteResult(finalContentResult, rewrittenManifests);
+  }

Review Comment:
   @huaxingao can I ask for your expertise on this as well



##########
core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java:
##########
@@ -359,6 +419,48 @@ public static RewriteResult<DataFile> rewriteDataManifest(
     }
   }
 
+  /**
+   * Rewrite a data manifest, replacing path references.
+   *
+   * @param manifestFile source manifest file to rewrite
+   * @param snapshotIds snapshot ids for filtering returned data manifest 
entries
+   * @param outputFile output file to rewrite manifest file to
+   * @param io file io
+   * @param format format of the manifest file
+   * @param specsById map of partition specs by id
+   * @param sourcePrefix source prefix that will be replaced
+   * @param targetPrefix target prefix that will replace it
+   * @return size of the resulting manifest file and a copy plan for the 
referenced content files
+   */
+  public static Pair<Long, RewriteResult<DataFile>> 
rewriteDataManifestWithResult(
+      ManifestFile manifestFile,
+      Set<Long> snapshotIds,
+      OutputFile outputFile,
+      FileIO io,
+      int format,
+      Map<Integer, PartitionSpec> specsById,
+      String sourcePrefix,
+      String targetPrefix)
+      throws IOException {
+    PartitionSpec spec = specsById.get(manifestFile.partitionSpecId());
+    ManifestWriter<DataFile> writer =
+        ManifestFiles.write(format, spec, outputFile, 
manifestFile.snapshotId());
+    RewriteResult<DataFile> rewriteResult = null;
+
+    try (ManifestWriter<DataFile> dataManifestWriter = writer;

Review Comment:
   dataManifestWriter seems unused, I assume we want to ensure writer is closed 
with try resource 



##########
spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java:
##########
@@ -1209,6 +1213,191 @@ public void testNestedDirectoryStructurePreservation() 
throws Exception {
     assertThat(targetPath2).startsWith(targetTableLocation());
   }
 
+  @Test
+  public void testFullRewriteUpdatesAllManifestLengthsInManifestList(
+      @TempDir Path rootTargetLocation) throws Exception {
+    String location = newTableLocation();
+    Table sourceTable = createTableWithSnapshots(location, 10);
+
+    addAnyPositionDelete(
+        sourceTable, removePrefix(sourceTable.location() + 
"/data/deeply/nested/deletes.parquet"));
+
+    Map<String, Long> manifestSizesBeforeRewrite =
+        sourceTable.currentSnapshot().allManifests(sourceTable.io()).stream()
+            .collect(Collectors.toMap(m -> fileName(m.path()), m -> 
m.length()));
+
+    // Rewrite table metadata to a location that's much longer than the 
original in order
+    // to make manifests larger
+    String targetLocation = toAbsolute(rootTargetLocation) + 
generateLongNestedPath(25);

Review Comment:
   I think you can actually just move this into the function above and no need 
to keep `@TempDir Path rootTargetLocation` for each tests
   
   ```java
     protected String targetTableLocation() {
       return toAbsolute(targetTableDir) + generateLongNestedPath(5);
     }
   ```



##########
spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java:
##########
@@ -1209,6 +1213,191 @@ public void testNestedDirectoryStructurePreservation() 
throws Exception {
     assertThat(targetPath2).startsWith(targetTableLocation());
   }
 
+  @Test
+  public void testFullRewriteUpdatesAllManifestLengthsInManifestList(
+      @TempDir Path rootTargetLocation) throws Exception {
+    String location = newTableLocation();
+    Table sourceTable = createTableWithSnapshots(location, 10);
+
+    addAnyPositionDelete(
+        sourceTable, removePrefix(sourceTable.location() + 
"/data/deeply/nested/deletes.parquet"));
+
+    Map<String, Long> manifestSizesBeforeRewrite =
+        sourceTable.currentSnapshot().allManifests(sourceTable.io()).stream()
+            .collect(Collectors.toMap(m -> fileName(m.path()), m -> 
m.length()));
+
+    // Rewrite table metadata to a location that's much longer than the 
original in order
+    // to make manifests larger
+    String targetLocation = toAbsolute(rootTargetLocation) + 
generateLongNestedPath(25);
+    RewriteTablePath.Result result =
+        actions()
+            .rewriteTablePath(sourceTable)
+            .rewriteLocationPrefix(newTableLocation(), targetLocation)
+            .execute();
+
+    // 1 + 11 JSON metadata files, 11 snapshots, 11 manifests, 10 data files, 
1 delete file
+    checkFileNum(12, 11, 11, 45, result);
+    copyTableFiles(result);
+
+    Table targetTable = TABLES.load(targetLocation);
+
+    // We have rewritten all 11 snapshots. Make sure all sizes were correctly 
updated
+    // across all manifest lists
+    assertThat(targetTable.snapshots())
+        .allSatisfy(
+            snapshot ->
+                assertThat(snapshot.allManifests(targetTable.io()))
+                    .allSatisfy(
+                        manifest -> {
+                          String manifestName = fileName(manifest.path());
+                          assertThat(manifest.length())
+                              
.isNotEqualTo(manifestSizesBeforeRewrite.get(manifestName));
+                        })
+                    .allSatisfy(
+                        manifest -> {
+                          
assertThat(targetTable.io().newInputFile(manifest.path()).getLength())
+                              .isEqualTo(manifest.length());
+                        }));
+  }
+
+  @Test
+  public void testPartialRewriteUpdatesDataManifestLengthInManifestList(
+      @TempDir Path rootTargetLocation) throws Exception {
+    String location = newTableLocation();
+    Table sourceTable = createTableWithSnapshots(location, 10);
+
+    Map<String, Long> manifestSizesBeforeRewrite =
+        sourceTable.currentSnapshot().allManifests(sourceTable.io()).stream()
+            .collect(Collectors.toMap(m -> fileName(m.path()), m -> 
m.length()));
+
+    // Rewrite just the latest table version to a location that's much longer 
than
+    // the original in order to make manifests larger
+    String targetLocation = toAbsolute(rootTargetLocation) + 
generateLongNestedPath(25);
+    RewriteTablePath.Result result =
+        actions()
+            .rewriteTablePath(sourceTable)
+            .rewriteLocationPrefix(newTableLocation(), targetLocation)
+            .startVersion("v10.metadata.json")
+            .execute();
+
+    // 1 metadata JSON file, 1 snapshot, 10 manifests, 1 data file
+    checkFileNum(1, 1, 10, 13, result);
+    copyTableFiles(result);
+
+    Table targetTable = TABLES.load(targetLocation);
+    Snapshot lastSnapshot = targetTable.currentSnapshot();
+
+    List<ManifestFile> allManifests = 
lastSnapshot.allManifests(targetTable.io());
+    ManifestFile rewrittenManifest =
+        Iterables.getOnlyElement(
+            allManifests.stream()
+                .filter(manifest -> manifest.snapshotId() == 
lastSnapshot.snapshotId())
+                .collect(Collectors.toList()));
+
+    // We have rewritten all manifests in one snapshot. Make sure all sizes 
were correctly
+    // updated in the manifest list
+    assertThat(targetTable.currentSnapshot().allManifests(targetTable.io()))
+        .allSatisfy(
+            manifest -> {
+              String manifestName = fileName(manifest.path());
+              assertThat(manifest.length())
+                  .isNotEqualTo(manifestSizesBeforeRewrite.get(manifestName));
+            })
+        .allSatisfy(
+            manifest -> {
+              
assertThat(targetTable.io().newInputFile(manifest.path()).getLength())
+                  .isEqualTo(manifest.length());
+            });
+  }
+
+  @Test
+  public void testPartialRewriteUpdatesDeleteManifestLengthInManifestList(
+      @TempDir Path rootTargetLocation) throws Exception {
+    String location = newTableLocation();
+    Table sourceTable = createTableWithSnapshots(location, 5);
+
+    // Add two more snapshots with just position deletes
+    addAnyPositionDelete(
+        sourceTable,
+        removePrefix(sourceTable.location() + 
"/data/deeply/nested/deletes-1.parquet"));
+    addAnyPositionDelete(
+        sourceTable,
+        removePrefix(sourceTable.location() + 
"/data/deeply/nested/deletes-2.parquet"));
+
+    Map<String, Long> manifestSizesBeforeRewrite =
+        sourceTable.currentSnapshot().allManifests(sourceTable.io()).stream()
+            .collect(Collectors.toMap(m -> fileName(m.path()), m -> 
m.length()));
+
+    // Rewrite just the latest table version to a location that's much longer 
than
+    // the original in order to make manifests larger
+    String targetLocation = toAbsolute(rootTargetLocation) + 
generateLongNestedPath(25);
+    RewriteTablePath.Result result =
+        actions()
+            .rewriteTablePath(sourceTable)
+            .rewriteLocationPrefix(newTableLocation(), targetLocation)
+            .startVersion("v7.metadata.json")
+            .execute();
+
+    // 1 metadata JSON file, 1 snapshot, 5 + 2 manifests, 1 delete file
+    checkFileNum(1, 1, 7, 10, result);
+    copyTableFiles(result);
+
+    Table targetTable = TABLES.load(targetLocation);
+    Snapshot lastSnapshot = targetTable.currentSnapshot();
+
+    List<ManifestFile> allManifests = 
lastSnapshot.allManifests(targetTable.io());
+    ManifestFile rewrittenManifest =
+        Iterables.getOnlyElement(
+            allManifests.stream()
+                .filter(manifest -> manifest.snapshotId() == 
lastSnapshot.snapshotId())
+                .collect(Collectors.toList()));
+
+    // We have rewritten all manifests in one snapshot. Make sure all sizes 
were correctly
+    // updated in the manifest list
+    assertThat(targetTable.currentSnapshot().allManifests(targetTable.io()))
+        .allSatisfy(
+            manifest -> {
+              String manifestName = fileName(manifest.path());
+              assertThat(manifest.length())
+                  .isNotEqualTo(manifestSizesBeforeRewrite.get(manifestName));
+            })
+        .allSatisfy(
+            manifest -> {
+              
assertThat(targetTable.io().newInputFile(manifest.path()).getLength())
+                  .isEqualTo(manifest.length());
+            });
+  }
+
+  protected static String generateLongNestedPath(int depth) {
+    StringBuilder pathBuilder = new StringBuilder();
+    for (int i = 1; i <= depth; i++) {
+      pathBuilder.append(String.format("/%03d", i));
+    }
+    pathBuilder.append("/");
+    return pathBuilder.toString();
+  }
+
+  protected void addAnyPositionDelete(Table targetTable, String path) throws 
Exception {

Review Comment:
   nit: how about
   
   `protected void commitNewPositionDelete(Table table, String deleteFilePath)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to