This is an automated email from the ASF dual-hosted git repository.

ashishkumar50 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new e136cb00bfb HDDS-14943. Implement rewrite logic for Iceberg's 
manifest-list files for path migration (#10190)
e136cb00bfb is described below

commit e136cb00bfbea9f7f3aee7d3e6c3e05944dab7dc
Author: sreejasahithi <[email protected]>
AuthorDate: Mon May 11 13:59:42 2026 +0530

    HDDS-14943. Implement rewrite logic for Iceberg's manifest-list files for 
path migration (#10190)
---
 .../ozone/iceberg/RewriteTablePathOzoneAction.java |  91 +++++++++++++-
 .../iceberg/TestRewriteTablePathOzoneAction.java   | 140 ++++++++++++++++-----
 2 files changed, 200 insertions(+), 31 deletions(-)

diff --git 
a/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneAction.java
 
b/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneAction.java
index a3e9190469e..2e8b930bf26 100644
--- 
a/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneAction.java
+++ 
b/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneAction.java
@@ -211,7 +211,7 @@ private boolean versionInFilePath(String path, String 
version) {
   }
 
   private String rebuildMetadata() {
-    //TODO need to implement rewrite of manifest list , manifest files and 
position delete files.
+    //TODO need to implement rewrite of manifest files and position delete 
files.
     TableMetadata startMetadata = startVersionName != null
         ? new StaticTableOperations(startVersionName, table.io()).current()
         : null;
@@ -227,12 +227,14 @@ private String rebuildMetadata() {
     Set<Long> deltaSnapshotIds =  
deltaSnapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
     Set<Snapshot> validSnapshots = new 
HashSet<>(RewriteTablePathOzoneUtils.snapshotSet(endMetadata));
     
validSnapshots.removeAll(RewriteTablePathOzoneUtils.snapshotSet(startMetadata));
-    //TODO: manifestsToRewrite will be used while re-write of manifest-list 
files.
     Set<String> manifestsToRewrite = manifestsToRewrite(validSnapshots, 
         startMetadata != null ? deltaSnapshotIds : null);
+    RewriteResult<ManifestFile> rewriteManifestListResult = 
+        rewriteManifestLists(validSnapshots, endMetadata, manifestsToRewrite);
 
     Set<Pair<String, String>> copyPlan = new HashSet<>();
     copyPlan.addAll(rewriteVersionResult.copyPlan());
+    copyPlan.addAll(rewriteManifestListResult.copyPlan());
 
     return RewriteTablePathOzoneUtils.saveFileList(copyPlan, stagingDir, 
table.io());
   }
@@ -362,6 +364,91 @@ private Set<String> manifestsToRewrite(Set<Snapshot> 
validSnapshots, Set<Long> d
     return manifestPaths;
   }
 
+  private RewriteResult<ManifestFile> rewriteManifestList(
+      Snapshot snapshot, TableMetadata tableMetadata, Set<String> 
manifestsToRewrite) {
+    RewriteResult<ManifestFile> result = new RewriteResult<>();
+
+    String path = snapshot.manifestListLocation();
+    String outputPath = RewriteTablePathUtil.stagingPath(path, sourcePrefix, 
stagingDir);
+    RewriteResult<ManifestFile> rewriteResult =
+        RewriteTablePathUtil.rewriteManifestList(
+            snapshot,
+            table.io(),
+            tableMetadata,
+            manifestsToRewrite,
+            sourcePrefix,
+            targetPrefix,
+            stagingDir,
+            outputPath);
+
+    result.append(rewriteResult);
+    result
+        .copyPlan()
+        .add(Pair.of(outputPath, RewriteTablePathUtil.newPath(path, 
sourcePrefix, targetPrefix)));
+    return result;
+  }
+
+  private RewriteResult<ManifestFile> rewriteManifestLists(Set<Snapshot> 
validSnapshots, TableMetadata endMetadata,
+      Set<String> manifestsToRewrite) {
+
+    if (validSnapshots.isEmpty()) {
+      return new RewriteResult<>();
+    }
+
+    int maxInFlight = parallelism * MAX_INFLIGHT_MULTIPLIER;
+    Semaphore semaphore = new Semaphore(maxInFlight);
+    ExecutorCompletionService<RewriteResult<ManifestFile>> completionService =
+        new ExecutorCompletionService<>(executorService);
+
+    RewriteResult<ManifestFile> combined = new RewriteResult<>();
+    int submittedTasks = 0;
+    int completedTasks = 0;
+
+    try {
+      for (Snapshot snapshot : validSnapshots) {
+        semaphore.acquire();
+
+        boolean taskSubmitted = false;
+        try {
+          completionService.submit(() -> {
+            try {
+              return rewriteManifestList(snapshot, endMetadata, 
manifestsToRewrite);
+            } finally {
+              semaphore.release();
+            }
+          });
+          taskSubmitted = true;
+          submittedTasks++;
+        } finally {
+          if (!taskSubmitted) {
+            semaphore.release();
+          }
+        }
+
+        Future<RewriteResult<ManifestFile>> done;
+        while ((done = completionService.poll()) != null) {
+          combined.append(done.get());
+          completedTasks++;
+        }
+      }
+      
+      while (completedTasks < submittedTasks) {
+        combined.append(completionService.take().get());
+        completedTasks++;
+      }
+
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      executorService.shutdownNow();
+      throw new RuntimeException("Interrupted while rewriting manifest lists", 
e);
+    } catch (ExecutionException e) {
+      executorService.shutdownNow();
+      throw new RuntimeException("Failed to rewrite manifest list", 
e.getCause());
+    }
+
+    return combined;
+  }
+
   private Set<Snapshot> deltaSnapshots(TableMetadata startMetadata, 
Set<Snapshot> allSnapshots) {
     if (startMetadata == null) {
       return allSnapshots;
diff --git 
a/hadoop-ozone/iceberg/src/test/java/org/apache/hadoop/ozone/iceberg/TestRewriteTablePathOzoneAction.java
 
b/hadoop-ozone/iceberg/src/test/java/org/apache/hadoop/ozone/iceberg/TestRewriteTablePathOzoneAction.java
index 8678fe52301..56442a75063 100644
--- 
a/hadoop-ozone/iceberg/src/test/java/org/apache/hadoop/ozone/iceberg/TestRewriteTablePathOzoneAction.java
+++ 
b/hadoop-ozone/iceberg/src/test/java/org/apache/hadoop/ozone/iceberg/TestRewriteTablePathOzoneAction.java
@@ -23,6 +23,7 @@
 import java.io.BufferedReader;
 import java.io.InputStreamReader;
 import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -33,7 +34,11 @@
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DataFiles;
 import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.GenericManifestFile;
+import org.apache.iceberg.GenericPartitionFieldSummary;
 import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.InternalData;
+import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.RewriteTablePathUtil;
 import org.apache.iceberg.Schema;
@@ -44,6 +49,7 @@
 import org.apache.iceberg.TableMetadata.MetadataLogEntry;
 import org.apache.iceberg.actions.RewriteTablePath;
 import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.Pair;
 import org.junit.jupiter.api.BeforeEach;
@@ -68,11 +74,11 @@ class TestRewriteTablePathOzoneAction {
   private Table table = null;
 
   @TempDir
-  private java.nio.file.Path tableDir;
+  private Path tableDir;
   @TempDir
-  private java.nio.file.Path targetDir;
+  private Path targetDir;
   @TempDir
-  private java.nio.file.Path stagingDir;
+  private Path stagingDir;
 
   @BeforeEach
   public void setupTableLocation() {
@@ -96,7 +102,9 @@ void fullTablePathRewrite() throws Exception {
     }
 
     Set<Pair<String, String>> csvPairs = readCsvPairs(table, 
result.fileListLocation());
-    assertEquals(expectedTargets, 
csvPairs.stream().map(Pair::second).collect(Collectors.toSet()));
+    Set<String> actualTargets = 
csvPairs.stream().map(Pair::second).collect(Collectors.toSet());
+    assertTrue(actualTargets.containsAll(expectedTargets),
+        "Copy plan should contain all expected version file targets");
 
     // Verify all internal paths inside each staged metadata file are 
rewritten to target.
     assertAllInternalPathsRewritten(csvPairs, targetPrefix);
@@ -124,7 +132,9 @@ void tablePathRewriteForStartAndNoEndVersionProvided() 
throws Exception {
     }
 
     Set<Pair<String, String>> csvPairs = readCsvPairs(table, 
result.fileListLocation());
-    assertEquals(expectedTargets, 
csvPairs.stream().map(Pair::second).collect(Collectors.toSet()));
+    Set<String> actualTargets = 
csvPairs.stream().map(Pair::second).collect(Collectors.toSet());
+    assertTrue(actualTargets.containsAll(expectedTargets),
+        "Copy plan should contain all expected version file targets");
 
     // Verify all internal paths inside each staged metadata file are 
rewritten to target
     assertAllInternalPathsRewritten(csvPairs, targetPrefix);
@@ -152,7 +162,9 @@ void tablePathRewriteForOnlyEndVersionProvided() throws 
Exception {
     }
 
     Set<Pair<String, String>> csvPairs = readCsvPairs(table, 
result.fileListLocation());
-    assertEquals(expectedTargets, 
csvPairs.stream().map(Pair::second).collect(Collectors.toSet()));
+    Set<String> actualTargets = 
csvPairs.stream().map(Pair::second).collect(Collectors.toSet());
+    assertTrue(actualTargets.containsAll(expectedTargets),
+        "Copy plan should contain all expected version file targets");
 
     // Verify all internal paths inside each staged metadata file are 
rewritten to target
     assertAllInternalPathsRewritten(csvPairs, targetPrefix);
@@ -182,49 +194,119 @@ void tablePathRewriteForStartAndEndVersionProvided() 
throws Exception {
     }
 
     Set<Pair<String, String>> csvPairs = readCsvPairs(table, 
result.fileListLocation());
-    assertEquals(expectedTargets, 
csvPairs.stream().map(Pair::second).collect(Collectors.toSet()));
+    Set<String> actualTargets = 
csvPairs.stream().map(Pair::second).collect(Collectors.toSet());
+    assertTrue(actualTargets.containsAll(expectedTargets),
+        "Copy plan should contain all expected version file targets");
 
     // Verify all internal paths inside each staged metadata file are 
rewritten to target
     assertAllInternalPathsRewritten(csvPairs, targetPrefix);
   }
 
   /**
-   * For every staged metadata JSON file in the CSV, parses the file and 
asserts that:
-   * - The table location starts with target
-   * - Every metadata-log entry path starts with target
-   * - Every snapshot's manifest-list path starts with target
-   * - Every statistics file path starts with target
-   * - None of the above contain the source prefix.
+   * For every staged file in the CSV copy plan, asserts that internal paths 
are rewritten
+   * to the target prefix:
+   * <ul>
+   *   <li><b>.metadata.json</b>: table location, metadata-log entries, and 
snapshot
+   *       manifest-list references all start with target.</li>
+   *   <li><b>snap-*.avro (manifest-list)</b>: target path starts with target, 
and every
+   *       manifest entry path inside the staged file starts with target.</li>
+   *   <li><b>*.avro (manifest)</b>: target path starts with target (content 
rewrite
+   *       is not yet implemented).</li>
+   * </ul>
    */
-  private void assertAllInternalPathsRewritten(Set<Pair<String, String>> 
csvPairs, String target) {
+  private void assertAllInternalPathsRewritten(Set<Pair<String, String>> 
csvPairs, String target) throws Exception {
+
     for (Pair<String, String> pair : csvPairs) {
       String stagingPath = pair.first();
       String targetPath = pair.second();
 
-      // Only inspect .metadata.json files, manifest/data files and snapshots 
are not yet rewritten
-      if (!stagingPath.endsWith(".metadata.json")) {
-        continue;
+      if (stagingPath.endsWith(".metadata.json")) {
+        assertMetadataFileRewritten(stagingPath, targetPath, target);
+      } else if 
(RewriteTablePathUtil.fileName(stagingPath).startsWith("snap-")) {
+        assertManifestListRewritten(stagingPath, targetPath, target);
+      } else if (RewriteTablePathUtil.fileName(stagingPath).endsWith(".avro")) 
{
+        assertTrue(targetPath.startsWith(target),
+            "Manifest file target path should start with target prefix: " + 
targetPath);
       }
+    }
+  }
 
-      assertTrue(targetPath.startsWith(target),
-          "Target path in CSV should start with target prefix: " + targetPath);
+  private void assertMetadataFileRewritten(String stagingPath, String 
targetPath, String target) {
+
+    assertTrue(targetPath.startsWith(target),
+        "Target path in CSV should start with target prefix: " + targetPath);
+    assertEquals(RewriteTablePathUtil.fileName(stagingPath), 
RewriteTablePathUtil.fileName(targetPath),
+        "original and target metadata file should have the same filename");
+
+    TableMetadata rewritten = new StaticTableOperations(stagingPath, 
table.io()).current();
+    TableMetadata original = new StaticTableOperations(
+        targetPath.replace(targetPrefix, sourcePrefix), table.io()).current();
+    Set<String> expectedMetadata = original.previousFiles().stream()
+        .map(e -> RewriteTablePathUtil.fileName(e.file()))
+        .collect(Collectors.toSet());
+    Set<String> expectedManifestLists = original.snapshots().stream()
+        .map(s -> RewriteTablePathUtil.fileName(s.manifestListLocation()))
+        .collect(Collectors.toSet());
+    Set<String> actualMetadata = new HashSet<>();
+    Set<String> actualManifestLists = new HashSet<>();
+
+    assertTrue(rewritten.location().startsWith(target),
+        "Metadata location should start with target: " + rewritten.location());
+
+    for (MetadataLogEntry entry : rewritten.previousFiles()) {
+      assertTrue(entry.file().startsWith(target),
+          "Metadata log entry should start with target: " + entry.file());
+      actualMetadata.add(RewriteTablePathUtil.fileName(entry.file()));
+    }
 
-      TableMetadata rewritten = new StaticTableOperations(stagingPath, 
table.io()).current();
+    assertEquals(expectedMetadata, actualMetadata, 
+        "Rewritten metadata file should reference the same metadata files as 
the original");
 
-      assertTrue(rewritten.location().startsWith(target),
-          "Metadata location should start with target: " + 
rewritten.location());
+    for (Snapshot snapshot : rewritten.snapshots()) {
+      String manifestList = snapshot.manifestListLocation();
+      assertTrue(manifestList.startsWith(target),
+          "Snapshot's manifest-list should start with target: " + 
manifestList);
+      actualManifestLists.add(RewriteTablePathUtil.fileName(manifestList));
+    }
+    assertEquals(expectedManifestLists, actualManifestLists,
+        "Rewritten metadata file should reference the same manifest-lists as 
the original");
+  }
 
-      for (MetadataLogEntry entry : rewritten.previousFiles()) {
-        assertTrue(entry.file().startsWith(target),
-            "Metadata log entry should start with target: " + entry.file());
+  private void assertManifestListRewritten(String stagingPath, String 
targetPath, String target) throws Exception {
+
+    assertTrue(targetPath.startsWith(target),
+        "Manifest list target path should start with target prefix: " + 
targetPath);
+    assertEquals(RewriteTablePathUtil.fileName(stagingPath), 
RewriteTablePathUtil.fileName(targetPath),
+        "original and target manifest list should have the same filename");
+
+    Set<String> expectedManifests = new HashSet<>();
+    Set<String> actualManifests = new HashSet<>();
+    for (Snapshot s : table.snapshots()) {
+      if 
(RewriteTablePathUtil.fileName(s.manifestListLocation()).equals(RewriteTablePathUtil.fileName(stagingPath)))
 {
+        expectedManifests = s.allManifests(table.io())
+            .stream()
+            .map(m -> RewriteTablePathUtil.fileName(m.path()))
+            .collect(Collectors.toSet());
+        break;
       }
+    }
 
-      for (Snapshot snapshot : rewritten.snapshots()) {
-        String manifestList = snapshot.manifestListLocation();
-        assertTrue(manifestList.startsWith(target),
-            "Snapshot manifest-list should start with target: " + 
manifestList);
+    try (CloseableIterable<ManifestFile> manifests =
+             InternalData.read(FileFormat.AVRO, 
table.io().newInputFile(stagingPath))
+                 .setRootType(GenericManifestFile.class)
+                 .setCustomType(
+                     ManifestFile.PARTITION_SUMMARIES_ELEMENT_ID,
+                     GenericPartitionFieldSummary.class)
+                 .project(ManifestFile.schema())
+                 .build()) {
+      for (ManifestFile manifest : manifests) {
+        assertTrue(manifest.path().startsWith(target),
+            "Manifest path inside staged manifest list should start with 
target prefix: " + manifest.path());
+        actualManifests.add(RewriteTablePathUtil.fileName(manifest.path()));
       }
     }
+    assertEquals(expectedManifests, actualManifests,
+        "Rewritten manifest list should reference the same manifest files as 
the original");
   }
 
   private static List<String> metadataLogEntryPaths(Table tbl) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to