This is an automated email from the ASF dual-hosted git repository.

ashishkumar50 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 623a7546ff8 HDDS-14944. Implement rewrite logic for Iceberg's manifest 
files for path migration (#10243)
623a7546ff8 is described below

commit 623a7546ff88df42c9d8b63ee49eb04fc14587bc
Author: sreejasahithi <[email protected]>
AuthorDate: Mon May 18 12:30:34 2026 +0530

    HDDS-14944. Implement rewrite logic for Iceberg's manifest files for path 
migration (#10243)
---
 .../ozone/iceberg/RewriteTablePathOzoneAction.java | 214 ++++++++++++++++++++-
 .../iceberg/TestRewriteTablePathOzoneAction.java   | 119 ++++++++++--
 2 files changed, 321 insertions(+), 12 deletions(-)

diff --git 
a/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneAction.java
 
b/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneAction.java
index 2e8b930bf26..2bda160e32e 100644
--- 
a/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneAction.java
+++ 
b/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneAction.java
@@ -17,8 +17,10 @@
 
 package org.apache.hadoop.ozone.iceberg;
 
+import java.io.IOException;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
@@ -31,12 +33,16 @@
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.GenericManifestFile;
 import org.apache.iceberg.GenericPartitionFieldSummary;
 import org.apache.iceberg.HasTableOperations;
 import org.apache.iceberg.InternalData;
 import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.PartitionStatisticsFile;
 import org.apache.iceberg.RewriteTablePathUtil;
 import org.apache.iceberg.RewriteTablePathUtil.RewriteResult;
@@ -48,8 +54,13 @@
 import org.apache.iceberg.TableMetadataParser;
 import org.apache.iceberg.actions.ImmutableRewriteTablePath;
 import org.apache.iceberg.actions.RewriteTablePath;
+import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFile;
 import org.apache.iceberg.util.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * An implementation of {@link RewriteTablePath} for Apache Ozone backed 
Iceberg tables.
@@ -62,6 +73,9 @@
  */
 public class RewriteTablePathOzoneAction implements RewriteTablePath {
 
+  private static final Logger LOG =
+      LoggerFactory.getLogger(RewriteTablePathOzoneAction.class);
+
   private String sourcePrefix;
   private String targetPrefix;
   private String startVersionName;
@@ -211,7 +225,8 @@ private boolean versionInFilePath(String path, String 
version) {
   }
 
   private String rebuildMetadata() {
-    //TODO need to implement rewrite of manifest files and position delete 
files.
+    // TODO: position delete file entries in rewriteManifestResult.copyPlan() 
reference staging paths
+    //       that are never written, exclude them until position delete 
rewriting is implemented.
     TableMetadata startMetadata = startVersionName != null
         ? new StaticTableOperations(startVersionName, table.io()).current()
         : null;
@@ -227,14 +242,20 @@ private String rebuildMetadata() {
     Set<Long> deltaSnapshotIds =  
deltaSnapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
     Set<Snapshot> validSnapshots = new 
HashSet<>(RewriteTablePathOzoneUtils.snapshotSet(endMetadata));
     
validSnapshots.removeAll(RewriteTablePathOzoneUtils.snapshotSet(startMetadata));
+    
     Set<String> manifestsToRewrite = manifestsToRewrite(validSnapshots, 
         startMetadata != null ? deltaSnapshotIds : null);
+    
     RewriteResult<ManifestFile> rewriteManifestListResult = 
         rewriteManifestLists(validSnapshots, endMetadata, manifestsToRewrite);
 
+    RewriteContentFileResult rewriteManifestResult =
+        rewriteManifests(deltaSnapshotIds, endMetadata, 
rewriteManifestListResult.toRewrite());
+
     Set<Pair<String, String>> copyPlan = new HashSet<>();
     copyPlan.addAll(rewriteVersionResult.copyPlan());
     copyPlan.addAll(rewriteManifestListResult.copyPlan());
+    copyPlan.addAll(rewriteManifestResult.copyPlan());
 
     return RewriteTablePathOzoneUtils.saveFileList(copyPlan, stagingDir, 
table.io());
   }
@@ -321,6 +342,8 @@ private Set<String> manifestsToRewrite(Set<Snapshot> 
validSnapshots, Set<Long> d
               }
 
             } catch (Exception e) {
+              LOG.error("Failed to read manifests for snapshot {} at {}",
+                  snapshotId, manifestListLocation, e);
               throw new RuntimeException(
                   "Failed to read manifests for snapshot " + snapshotId, e);
             } finally {
@@ -460,4 +483,193 @@ private Set<Snapshot> deltaSnapshots(TableMetadata 
startMetadata, Set<Snapshot>
           .collect(Collectors.toSet());
     }
   }
+
+  /** Aggregated result of rewriting content files (data and delete 
manifests). */
+  static class RewriteContentFileResult extends RewriteResult<ContentFile<?>> {
+    @Override
+    public RewriteContentFileResult append(RewriteResult<ContentFile<?>> r1) {
+      this.copyPlan().addAll(r1.copyPlan());
+      this.toRewrite().addAll(r1.toRewrite());
+      return this;
+    }
+
+    RewriteContentFileResult appendDataFile(RewriteResult<DataFile> r1) {
+      this.copyPlan().addAll(r1.copyPlan());
+      this.toRewrite().addAll(r1.toRewrite());
+      return this;
+    }
+
+    RewriteContentFileResult appendDeleteFile(RewriteResult<DeleteFile> r1) {
+      this.copyPlan().addAll(r1.copyPlan());
+      this.toRewrite().addAll(r1.toRewrite());
+      return this;
+    }
+  }
+
+  private RewriteContentFileResult rewriteManifests(
+      Set<Long> deltaSnapshotIds, TableMetadata tableMetadata, 
Set<ManifestFile> toRewrite) {
+    if (toRewrite.isEmpty()) {
+      return new RewriteContentFileResult();
+    }
+
+    int maxInFlight = parallelism * MAX_INFLIGHT_MULTIPLIER;
+    Semaphore semaphore = new Semaphore(maxInFlight);
+    ExecutorCompletionService<RewriteContentFileResult> completionService =
+        new ExecutorCompletionService<>(executorService);
+
+    RewriteContentFileResult aggregatedResult = new RewriteContentFileResult();
+    int submittedTasks = 0;
+    int completedTasks = 0;
+
+    try {
+      for (ManifestFile manifestFile : toRewrite) {
+        semaphore.acquire();
+
+        boolean taskSubmitted = false;
+        try {
+          completionService.submit(() -> {
+            try {
+              return processManifest(
+                  manifestFile,
+                  table,
+                  deltaSnapshotIds,
+                  stagingDir,
+                  tableMetadata.formatVersion(),
+                  sourcePrefix,
+                  targetPrefix);
+            } finally {
+              semaphore.release();
+            }
+          });
+          taskSubmitted = true;
+          submittedTasks++;
+        } finally {
+          if (!taskSubmitted) {
+            semaphore.release();
+          }
+        }
+
+        Future<RewriteContentFileResult> done;
+        while ((done = completionService.poll()) != null) {
+          aggregatedResult.append(done.get());
+          completedTasks++;
+        }
+      }
+
+      while (completedTasks < submittedTasks) {
+        aggregatedResult.append(completionService.take().get());
+        completedTasks++;
+      }
+
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      executorService.shutdownNow();
+      throw new RuntimeException("Interrupted while rewriting manifests", e);
+    } catch (ExecutionException e) {
+      executorService.shutdownNow();
+      throw new RuntimeException("Failed to rewrite manifest", e.getCause());
+    }
+
+    return aggregatedResult;
+  }
+
+  private static RewriteContentFileResult processManifest(
+      ManifestFile manifestFile,
+      Table table,
+      Set<Long> deltaSnapshotIds,
+      String stagingLocation,
+      int format,
+      String sourcePrefix,
+      String targetPrefix) {
+    RewriteContentFileResult result = new RewriteContentFileResult();
+    switch (manifestFile.content()) {
+    case DATA:
+      result.appendDataFile(
+          writeDataManifest(
+              manifestFile,
+              table,
+              deltaSnapshotIds,
+              stagingLocation,
+              format,
+              sourcePrefix,
+              targetPrefix));
+      break;
+    case DELETES:
+      result.appendDeleteFile(
+          writeDeleteManifest(
+              manifestFile,
+              table,
+              deltaSnapshotIds,
+              stagingLocation,
+              format,
+              sourcePrefix,
+              targetPrefix));
+      break;
+    default:
+      LOG.error("Unsupported manifest type: {} for manifest: {}",
+          manifestFile.content(), manifestFile.path());
+      throw new UnsupportedOperationException(
+          "Unsupported manifest type: " + manifestFile.content());
+    }
+    return result;
+  }
+
+  private static RewriteResult<DataFile> writeDataManifest(
+      ManifestFile manifestFile,
+      Table table,
+      Set<Long> snapshotIds,
+      String stagingLocation,
+      int format,
+      String sourcePrefix,
+      String targetPrefix) {
+    try {
+      String stagingPath =
+          RewriteTablePathUtil.stagingPath(manifestFile.path(), sourcePrefix, 
stagingLocation);
+      FileIO io = table.io();
+      OutputFile outputFile = io.newOutputFile(stagingPath);
+      Map<Integer, PartitionSpec> specsById = table.specs();
+      return RewriteTablePathUtil.rewriteDataManifest(
+          manifestFile,
+          snapshotIds,
+          outputFile,
+          io,
+          format,
+          specsById,
+          sourcePrefix,
+          targetPrefix);
+    } catch (IOException e) {
+      LOG.error("Failed to rewrite data manifest: {}", manifestFile.path(), e);
+      throw new RuntimeIOException(e);
+    }
+  }
+
+  private static RewriteResult<DeleteFile> writeDeleteManifest(
+      ManifestFile manifestFile,
+      Table table,
+      Set<Long> snapshotIds,
+      String stagingLocation,
+      int format,
+      String sourcePrefix,
+      String targetPrefix) {
+    try {
+      String stagingPath =
+          RewriteTablePathUtil.stagingPath(manifestFile.path(), sourcePrefix, 
stagingLocation);
+      FileIO io = table.io();
+      OutputFile outputFile = io.newOutputFile(stagingPath);
+      Map<Integer, PartitionSpec> specsById = table.specs();
+      return RewriteTablePathUtil.rewriteDeleteManifest(
+          manifestFile,
+          snapshotIds,
+          outputFile,
+          io,
+          format,
+          specsById,
+          sourcePrefix,
+          targetPrefix,
+          stagingLocation);
+    } catch (IOException e) {
+      LOG.error("Failed to rewrite delete manifest: {}", manifestFile.path(), 
e);
+      throw new RuntimeIOException(e);
+    }
+  }
 }
diff --git 
a/hadoop-ozone/iceberg/src/test/java/org/apache/hadoop/ozone/iceberg/TestRewriteTablePathOzoneAction.java
 
b/hadoop-ozone/iceberg/src/test/java/org/apache/hadoop/ozone/iceberg/TestRewriteTablePathOzoneAction.java
index 35bab0df55a..405740ab6ee 100644
--- 
a/hadoop-ozone/iceberg/src/test/java/org/apache/hadoop/ozone/iceberg/TestRewriteTablePathOzoneAction.java
+++ 
b/hadoop-ozone/iceberg/src/test/java/org/apache/hadoop/ozone/iceberg/TestRewriteTablePathOzoneAction.java
@@ -23,6 +23,7 @@
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.io.BufferedReader;
+import java.io.IOException;
 import java.io.InputStreamReader;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Path;
@@ -30,18 +31,23 @@
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.GenericManifestFile;
 import org.apache.iceberg.GenericPartitionFieldSummary;
 import org.apache.iceberg.GenericStatisticsFile;
 import org.apache.iceberg.HasTableOperations;
 import org.apache.iceberg.InternalData;
+import org.apache.iceberg.ManifestContent;
 import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.ManifestReader;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.RewriteTablePathUtil;
 import org.apache.iceberg.Schema;
@@ -59,6 +65,7 @@
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
+import org.mockito.Mockito;
 
 /**
  * Testing path rewrite of iceberg table metadata files.
@@ -106,8 +113,10 @@ void fullTablePathRewrite() throws Exception {
     }
 
     Set<Pair<String, String>> csvPairs = readCsvPairs(table, 
result.fileListLocation());
-    Set<String> actualTargets = 
csvPairs.stream().map(Pair::second).collect(Collectors.toSet());
-    assertTrue(actualTargets.containsAll(expectedTargets),
+    Set<String> actualTargets = csvPairs.stream().map(Pair::second)
+        .filter(p -> p.endsWith(".metadata.json"))
+        .collect(Collectors.toSet());
+    assertEquals(expectedTargets, actualTargets,
         "Copy plan should contain all expected version file targets");
 
     // Verify all internal paths inside each staged metadata file are 
rewritten to target.
@@ -136,8 +145,10 @@ void tablePathRewriteForStartAndNoEndVersionProvided() 
throws Exception {
     }
 
     Set<Pair<String, String>> csvPairs = readCsvPairs(table, 
result.fileListLocation());
-    Set<String> actualTargets = 
csvPairs.stream().map(Pair::second).collect(Collectors.toSet());
-    assertTrue(actualTargets.containsAll(expectedTargets),
+    Set<String> actualTargets = csvPairs.stream().map(Pair::second)
+        .filter(p -> p.endsWith(".metadata.json"))
+        .collect(Collectors.toSet());
+    assertEquals(expectedTargets, actualTargets,
         "Copy plan should contain all expected version file targets");
 
     // Verify all internal paths inside each staged metadata file are 
rewritten to target
@@ -166,8 +177,10 @@ void tablePathRewriteForOnlyEndVersionProvided() throws 
Exception {
     }
 
     Set<Pair<String, String>> csvPairs = readCsvPairs(table, 
result.fileListLocation());
-    Set<String> actualTargets = 
csvPairs.stream().map(Pair::second).collect(Collectors.toSet());
-    assertTrue(actualTargets.containsAll(expectedTargets),
+    Set<String> actualTargets = csvPairs.stream().map(Pair::second)
+        .filter(p -> p.endsWith(".metadata.json"))
+        .collect(Collectors.toSet());
+    assertEquals(expectedTargets, actualTargets,
         "Copy plan should contain all expected version file targets");
 
     // Verify all internal paths inside each staged metadata file are 
rewritten to target
@@ -198,14 +211,34 @@ void tablePathRewriteForStartAndEndVersionProvided() 
throws Exception {
     }
 
     Set<Pair<String, String>> csvPairs = readCsvPairs(table, 
result.fileListLocation());
-    Set<String> actualTargets = 
csvPairs.stream().map(Pair::second).collect(Collectors.toSet());
-    assertTrue(actualTargets.containsAll(expectedTargets),
+    Set<String> actualTargets = csvPairs.stream().map(Pair::second)
+        .filter(p -> p.endsWith(".metadata.json"))
+        .collect(Collectors.toSet());
+    assertEquals(expectedTargets, actualTargets,
         "Copy plan should contain all expected version file targets");
 
     // Verify all internal paths inside each staged metadata file are 
rewritten to target
     assertAllInternalPathsRewritten(csvPairs, targetPrefix);
   }
 
+  @Test
+  void defaultStagingDirIsUnderTableMetadataLocation() {
+    String metadataLocation = 
RewriteTablePathOzoneUtils.getMetadataLocation(table);
+
+    RewriteTablePath.Result result = new RewriteTablePathOzoneAction(table)
+        .rewriteLocationPrefix(sourcePrefix, targetPrefix)
+        .execute();
+
+    String actualStagingDir = result.stagingLocation();
+    assertTrue(actualStagingDir.startsWith(metadataLocation),
+        "Auto-generated staging dir should be under the table's metadata 
location."
+            + " Expected prefix: " + metadataLocation + ", actual: " + 
actualStagingDir);
+    assertTrue(actualStagingDir.contains("copy-table-staging-"),
+        "Auto-generated staging dir should contain 'copy-table-staging-': " + 
actualStagingDir);
+    assertTrue(actualStagingDir.endsWith(RewriteTablePathUtil.FILE_SEPARATOR),
+        "Auto-generated staging dir should end with FILE_SEPARATOR: " + 
actualStagingDir);
+  }
+  
   @Test
   void statsFileCopyPlanReturnsEmptySetForEmptyStats() {
     Set<Pair<String, String>> copyPlan =
@@ -250,7 +283,7 @@ void statsFileCopyPlanReturnsBeforeToAfterPathPairs() {
         Pair.of("before-1.stats", "after-1.stats"),
         Pair.of("before-2.stats", "after-2.stats")), copyPlan);
   }
-
+  
   /**
    * For every staged file in the CSV copy plan, asserts that internal paths 
are rewritten
    * to the target prefix:
@@ -272,7 +305,7 @@ private void 
assertAllInternalPathsRewritten(Set<Pair<String, String>> csvPairs,
       if (stagingPath.endsWith(".metadata.json")) {
         assertMetadataFileRewritten(stagingPath, targetPath, target);
       } else if 
(RewriteTablePathUtil.fileName(stagingPath).startsWith("snap-")) {
-        assertManifestListRewritten(stagingPath, targetPath, target);
+        assertManifestListRewritten(stagingPath, targetPath, target, csvPairs);
       } else if (RewriteTablePathUtil.fileName(stagingPath).endsWith(".avro")) 
{
         assertTrue(targetPath.startsWith(target),
             "Manifest file target path should start with target prefix: " + 
targetPath);
@@ -321,7 +354,8 @@ private void assertMetadataFileRewritten(String 
stagingPath, String targetPath,
         "Rewritten metadata file should reference the same manifest-lists as 
the original");
   }
 
-  private void assertManifestListRewritten(String stagingPath, String 
targetPath, String target) throws Exception {
+  private void assertManifestListRewritten(String stagingPath, String 
targetPath, String target,
+                                           Set<Pair<String, String>> csvPairs) 
throws Exception {
 
     assertTrue(targetPath.startsWith(target),
         "Manifest list target path should start with target prefix: " + 
targetPath);
@@ -352,12 +386,75 @@ private void assertManifestListRewritten(String 
stagingPath, String targetPath,
         assertTrue(manifest.path().startsWith(target),
             "Manifest path inside staged manifest list should start with 
target prefix: " + manifest.path());
         actualManifests.add(RewriteTablePathUtil.fileName(manifest.path()));
+        Optional<String> manifestStagingPath = csvPairs.stream()
+            .filter(p -> p.second().equals(manifest.path()))
+            .map(Pair::first)
+            .findFirst();
+        if (manifestStagingPath.isPresent()) {
+          String originalPath = manifest.path().replace(targetPrefix, 
sourcePrefix);
+          ManifestFile original = Mockito.spy(manifest);
+          Mockito.doReturn(originalPath).when(original).path();
+          
+          ManifestFile staged = Mockito.spy(manifest);
+          Mockito.doReturn(manifestStagingPath.get()).when(staged).path();
+
+          if (manifest.content() == ManifestContent.DATA) {
+            assertDataManifestPathsRewritten(staged, original, target);
+          } else if (manifest.content() == ManifestContent.DELETES) {
+            assertDeleteManifestPathsRewritten(staged, original, target);
+          }
+        }
       }
     }
     assertEquals(expectedManifests, actualManifests,
         "Rewritten manifest list should reference the same manifest files as 
the original");
   }
 
+  private void assertDataManifestPathsRewritten(ManifestFile staged, 
ManifestFile original,
+      String target) throws IOException {
+    Set<String> expectedFileNames = new HashSet<>();
+    try (ManifestReader<DataFile> reader = ManifestFiles.read(original, 
table.io())) {
+      for (DataFile df : reader) {
+        expectedFileNames.add(RewriteTablePathUtil.fileName(df.location()));
+      }
+    }
+
+    Set<String> actualFileNames = new HashSet<>();
+    try (ManifestReader<DataFile> reader = ManifestFiles.read(staged, 
table.io())) {
+      for (DataFile dataPath : reader) {
+        assertTrue(dataPath.location().startsWith(target),
+            "Data file path inside staged data manifest should start with 
target prefix: " + dataPath);
+        
actualFileNames.add(RewriteTablePathUtil.fileName(dataPath.location()));
+      }
+    }
+
+    assertEquals(expectedFileNames, actualFileNames,
+        "Rewritten data manifest should reference the same data files as the 
original");
+  }
+
+  private void assertDeleteManifestPathsRewritten(ManifestFile staged, 
ManifestFile original, 
+      String target) throws IOException {
+    Set<String> expectedFileNames = new HashSet<>();
+    try (ManifestReader<DeleteFile> reader = 
ManifestFiles.readDeleteManifest(original, table.io(), table.specs())) {
+      for (DeleteFile df : reader) {
+        expectedFileNames.add(RewriteTablePathUtil.fileName(df.location()));
+      }
+    }
+
+    Set<String> actualFileNames = new HashSet<>();
+    try (ManifestReader<DeleteFile> reader = 
ManifestFiles.readDeleteManifest(staged, table.io(), table.specs())) {
+      for (DeleteFile df : reader) {
+        assertTrue(df.location().startsWith(target),
+            "Delete file path inside staged delete manifest should start with 
target prefix: "
+                + df.location());
+        actualFileNames.add(RewriteTablePathUtil.fileName(df.location()));
+      }
+    }
+
+    assertEquals(expectedFileNames, actualFileNames,
+        "Rewritten delete manifest should reference the same delete files (by 
name) as the original");
+  }
+
   private static List<String> metadataLogEntryPaths(Table tbl) {
     TableMetadata meta = ((HasTableOperations) tbl).operations().current();
     List<String> paths = new ArrayList<>();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to