ashishkumar50 commented on code in PR #10243:
URL: https://github.com/apache/ozone/pull/10243#discussion_r3246069041


##########
hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneAction.java:
##########
@@ -460,4 +476,192 @@ private Set<Snapshot> deltaSnapshots(TableMetadata 
startMetadata, Set<Snapshot>
           .collect(Collectors.toSet());
     }
   }
+
+  /** Aggregated result of rewriting content files (data and delete 
manifests). */
+  public static class RewriteContentFileResult extends 
RewriteResult<ContentFile<?>> {
+    @Override
+    public RewriteContentFileResult append(RewriteResult<ContentFile<?>> r1) {
+      this.copyPlan().addAll(r1.copyPlan());
+      this.toRewrite().addAll(r1.toRewrite());
+      return this;
+    }
+
+    public RewriteContentFileResult appendDataFile(RewriteResult<DataFile> r1) 
{
+      this.copyPlan().addAll(r1.copyPlan());
+      this.toRewrite().addAll(r1.toRewrite());
+      return this;
+    }
+
+    public RewriteContentFileResult appendDeleteFile(RewriteResult<DeleteFile> 
r1) {
+      this.copyPlan().addAll(r1.copyPlan());
+      this.toRewrite().addAll(r1.toRewrite());
+      return this;
+    }
+  }
+
+  private RewriteContentFileResult rewriteManifests(
+      Set<Snapshot> deltaSnapshots, TableMetadata tableMetadata, 
Set<ManifestFile> toRewrite) {
+    if (toRewrite.isEmpty()) {
+      return new RewriteContentFileResult();
+    }
+
+    Set<Long> deltaSnapshotIds =

Review Comment:
   Inside `rewriteManifests,` `deltaSnapshotIds` is computed again from 
`deltaSnapshots.` Since `deltaSnapshotIds` is already available at the call 
site, it could be passed directly instead of `deltaSnapshots,` avoiding the 
redundant `stream().map()` inside `rewriteManifests.`



##########
hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneAction.java:
##########
@@ -460,4 +476,192 @@ private Set<Snapshot> deltaSnapshots(TableMetadata 
startMetadata, Set<Snapshot>
           .collect(Collectors.toSet());
     }
   }
+
+  /** Aggregated result of rewriting content files (data and delete 
manifests). */
+  public static class RewriteContentFileResult extends 
RewriteResult<ContentFile<?>> {

Review Comment:
   Since it is only used within this class and the package, it should be 
package-private.



##########
hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneAction.java:
##########
@@ -460,4 +476,192 @@ private Set<Snapshot> deltaSnapshots(TableMetadata 
startMetadata, Set<Snapshot>
           .collect(Collectors.toSet());
     }
   }
+
+  /** Aggregated result of rewriting content files (data and delete 
manifests). */
+  public static class RewriteContentFileResult extends 
RewriteResult<ContentFile<?>> {
+    @Override
+    public RewriteContentFileResult append(RewriteResult<ContentFile<?>> r1) {
+      this.copyPlan().addAll(r1.copyPlan());
+      this.toRewrite().addAll(r1.toRewrite());
+      return this;
+    }
+
+    public RewriteContentFileResult appendDataFile(RewriteResult<DataFile> r1) 
{
+      this.copyPlan().addAll(r1.copyPlan());
+      this.toRewrite().addAll(r1.toRewrite());
+      return this;
+    }
+
+    public RewriteContentFileResult appendDeleteFile(RewriteResult<DeleteFile> 
r1) {
+      this.copyPlan().addAll(r1.copyPlan());
+      this.toRewrite().addAll(r1.toRewrite());
+      return this;
+    }
+  }
+
+  private RewriteContentFileResult rewriteManifests(
+      Set<Snapshot> deltaSnapshots, TableMetadata tableMetadata, 
Set<ManifestFile> toRewrite) {
+    if (toRewrite.isEmpty()) {
+      return new RewriteContentFileResult();
+    }
+
+    Set<Long> deltaSnapshotIds =
+        
deltaSnapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
+
+    int maxInFlight = parallelism * MAX_INFLIGHT_MULTIPLIER;
+    Semaphore semaphore = new Semaphore(maxInFlight);
+    ExecutorCompletionService<RewriteContentFileResult> completionService =
+        new ExecutorCompletionService<>(executorService);
+
+    RewriteContentFileResult aggregatedResult = new RewriteContentFileResult();
+    int submittedTasks = 0;
+    int completedTasks = 0;
+
+    try {
+      for (ManifestFile manifestFile : toRewrite) {
+        semaphore.acquire();
+
+        boolean taskSubmitted = false;
+        try {
+          completionService.submit(() -> {
+            try {
+              return processManifest(
+                  manifestFile,
+                  table,
+                  deltaSnapshotIds,
+                  stagingDir,
+                  tableMetadata.formatVersion(),
+                  sourcePrefix,
+                  targetPrefix);
+            } finally {
+              semaphore.release();
+            }
+          });
+          taskSubmitted = true;
+          submittedTasks++;
+        } finally {
+          if (!taskSubmitted) {
+            semaphore.release();
+          }
+        }
+
+        Future<RewriteContentFileResult> done;
+        while ((done = completionService.poll()) != null) {
+          aggregatedResult.append(done.get());
+          completedTasks++;
+        }
+      }
+
+      while (completedTasks < submittedTasks) {
+        aggregatedResult.append(completionService.take().get());
+        completedTasks++;
+      }
+
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      executorService.shutdownNow();
+      throw new RuntimeException("Interrupted while rewriting manifests", e);
+    } catch (ExecutionException e) {
+      executorService.shutdownNow();
+      throw new RuntimeException("Failed to rewrite manifest", e.getCause());
+    }
+
+    return aggregatedResult;
+  }
+
+  private static RewriteContentFileResult processManifest(
+      ManifestFile manifestFile,
+      Table table,
+      Set<Long> deltaSnapshotIds,
+      String stagingLocation,
+      int format,
+      String sourcePrefix,
+      String targetPrefix) {
+    RewriteContentFileResult result = new RewriteContentFileResult();
+    switch (manifestFile.content()) {
+    case DATA:
+      result.appendDataFile(
+          writeDataManifest(
+              manifestFile,
+              table,
+              deltaSnapshotIds,
+              stagingLocation,
+              format,
+              sourcePrefix,
+              targetPrefix));
+      break;
+    case DELETES:
+      result.appendDeleteFile(
+          writeDeleteManifest(
+              manifestFile,
+              table,
+              deltaSnapshotIds,
+              stagingLocation,
+              format,
+              sourcePrefix,
+              targetPrefix));
+      break;
+    default:
+      throw new UnsupportedOperationException(
+          "Unsupported manifest type: " + manifestFile.content());
+    }
+    return result;
+  }
+
+  private static RewriteResult<DataFile> writeDataManifest(
+      ManifestFile manifestFile,
+      Table table,
+      Set<Long> snapshotIds,
+      String stagingLocation,
+      int format,
+      String sourcePrefix,
+      String targetPrefix) {
+    try {
+      String stagingPath =
+          RewriteTablePathUtil.stagingPath(manifestFile.path(), sourcePrefix, 
stagingLocation);
+      FileIO io = table.io();
+      OutputFile outputFile = io.newOutputFile(stagingPath);
+      Map<Integer, PartitionSpec> specsById = table.specs();
+      return RewriteTablePathUtil.rewriteDataManifest(
+          manifestFile,
+          snapshotIds,
+          outputFile,
+          io,
+          format,
+          specsById,
+          sourcePrefix,
+          targetPrefix);
+    } catch (IOException e) {
+      throw new RuntimeIOException(e);

Review Comment:
   Can you add log in case of failure, in other places as well.



##########
hadoop-ozone/iceberg/src/test/java/org/apache/hadoop/ozone/iceberg/TestRewriteTablePathOzoneAction.java:
##########
@@ -352,12 +383,48 @@ private void assertManifestListRewritten(String 
stagingPath, String targetPath,
         assertTrue(manifest.path().startsWith(target),
             "Manifest path inside staged manifest list should start with 
target prefix: " + manifest.path());
         actualManifests.add(RewriteTablePathUtil.fileName(manifest.path()));
+        Optional<String> manifestStagingPath = csvPairs.stream()
+            .filter(p -> p.second().equals(manifest.path()))
+            .map(Pair::first)
+            .findFirst();
+        if (manifestStagingPath.isPresent()) {
+          assertManifestDataPathsRewritten(manifestStagingPath.get(), 
manifest, target);
+        }
       }
     }
     assertEquals(expectedManifests, actualManifests,
         "Rewritten manifest list should reference the same manifest files as 
the original");
   }
 
+  private void assertManifestDataPathsRewritten(String stagingManifestPath, 
ManifestFile manifestEntry, 

Review Comment:
   Add a test covering the delete manifest dispatch path.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to