This is an automated email from the ASF dual-hosted git repository.
ashishkumar50 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 623a7546ff8 HDDS-14944. Implement rewrite logic for Iceberg's manifest
files for path migration (#10243)
623a7546ff8 is described below
commit 623a7546ff88df42c9d8b63ee49eb04fc14587bc
Author: sreejasahithi <[email protected]>
AuthorDate: Mon May 18 12:30:34 2026 +0530
HDDS-14944. Implement rewrite logic for Iceberg's manifest files for path
migration (#10243)
---
.../ozone/iceberg/RewriteTablePathOzoneAction.java | 214 ++++++++++++++++++++-
.../iceberg/TestRewriteTablePathOzoneAction.java | 119 ++++++++++--
2 files changed, 321 insertions(+), 12 deletions(-)
diff --git
a/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneAction.java
b/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneAction.java
index 2e8b930bf26..2bda160e32e 100644
---
a/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneAction.java
+++
b/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneAction.java
@@ -17,8 +17,10 @@
package org.apache.hadoop.ozone.iceberg;
+import java.io.IOException;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
@@ -31,12 +33,16 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.GenericManifestFile;
import org.apache.iceberg.GenericPartitionFieldSummary;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.InternalData;
import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PartitionStatisticsFile;
import org.apache.iceberg.RewriteTablePathUtil;
import org.apache.iceberg.RewriteTablePathUtil.RewriteResult;
@@ -48,8 +54,13 @@
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.actions.ImmutableRewriteTablePath;
import org.apache.iceberg.actions.RewriteTablePath;
+import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.util.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* An implementation of {@link RewriteTablePath} for Apache Ozone backed
Iceberg tables.
@@ -62,6 +73,9 @@
*/
public class RewriteTablePathOzoneAction implements RewriteTablePath {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(RewriteTablePathOzoneAction.class);
+
private String sourcePrefix;
private String targetPrefix;
private String startVersionName;
@@ -211,7 +225,8 @@ private boolean versionInFilePath(String path, String
version) {
}
private String rebuildMetadata() {
- //TODO need to implement rewrite of manifest files and position delete
files.
+ // TODO: position delete file entries in rewriteManifestResult.copyPlan()
reference staging paths
+ // that are never written, exclude them until position delete
rewriting is implemented.
TableMetadata startMetadata = startVersionName != null
? new StaticTableOperations(startVersionName, table.io()).current()
: null;
@@ -227,14 +242,20 @@ private String rebuildMetadata() {
Set<Long> deltaSnapshotIds =
deltaSnapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
Set<Snapshot> validSnapshots = new
HashSet<>(RewriteTablePathOzoneUtils.snapshotSet(endMetadata));
validSnapshots.removeAll(RewriteTablePathOzoneUtils.snapshotSet(startMetadata));
+
Set<String> manifestsToRewrite = manifestsToRewrite(validSnapshots,
startMetadata != null ? deltaSnapshotIds : null);
+
RewriteResult<ManifestFile> rewriteManifestListResult =
rewriteManifestLists(validSnapshots, endMetadata, manifestsToRewrite);
+ RewriteContentFileResult rewriteManifestResult =
+ rewriteManifests(deltaSnapshotIds, endMetadata,
rewriteManifestListResult.toRewrite());
+
Set<Pair<String, String>> copyPlan = new HashSet<>();
copyPlan.addAll(rewriteVersionResult.copyPlan());
copyPlan.addAll(rewriteManifestListResult.copyPlan());
+ copyPlan.addAll(rewriteManifestResult.copyPlan());
return RewriteTablePathOzoneUtils.saveFileList(copyPlan, stagingDir,
table.io());
}
@@ -321,6 +342,8 @@ private Set<String> manifestsToRewrite(Set<Snapshot>
validSnapshots, Set<Long> d
}
} catch (Exception e) {
+ LOG.error("Failed to read manifests for snapshot {} at {}",
+ snapshotId, manifestListLocation, e);
throw new RuntimeException(
"Failed to read manifests for snapshot " + snapshotId, e);
} finally {
@@ -460,4 +483,193 @@ private Set<Snapshot> deltaSnapshots(TableMetadata
startMetadata, Set<Snapshot>
.collect(Collectors.toSet());
}
}
+
+ /** Aggregated result of rewriting content files (data and delete
manifests). */
+ static class RewriteContentFileResult extends RewriteResult<ContentFile<?>> {
+ @Override
+ public RewriteContentFileResult append(RewriteResult<ContentFile<?>> r1) {
+ this.copyPlan().addAll(r1.copyPlan());
+ this.toRewrite().addAll(r1.toRewrite());
+ return this;
+ }
+
+ RewriteContentFileResult appendDataFile(RewriteResult<DataFile> r1) {
+ this.copyPlan().addAll(r1.copyPlan());
+ this.toRewrite().addAll(r1.toRewrite());
+ return this;
+ }
+
+ RewriteContentFileResult appendDeleteFile(RewriteResult<DeleteFile> r1) {
+ this.copyPlan().addAll(r1.copyPlan());
+ this.toRewrite().addAll(r1.toRewrite());
+ return this;
+ }
+ }
+
+ private RewriteContentFileResult rewriteManifests(
+ Set<Long> deltaSnapshotIds, TableMetadata tableMetadata,
Set<ManifestFile> toRewrite) {
+ if (toRewrite.isEmpty()) {
+ return new RewriteContentFileResult();
+ }
+
+ int maxInFlight = parallelism * MAX_INFLIGHT_MULTIPLIER;
+ Semaphore semaphore = new Semaphore(maxInFlight);
+ ExecutorCompletionService<RewriteContentFileResult> completionService =
+ new ExecutorCompletionService<>(executorService);
+
+ RewriteContentFileResult aggregatedResult = new RewriteContentFileResult();
+ int submittedTasks = 0;
+ int completedTasks = 0;
+
+ try {
+ for (ManifestFile manifestFile : toRewrite) {
+ semaphore.acquire();
+
+ boolean taskSubmitted = false;
+ try {
+ completionService.submit(() -> {
+ try {
+ return processManifest(
+ manifestFile,
+ table,
+ deltaSnapshotIds,
+ stagingDir,
+ tableMetadata.formatVersion(),
+ sourcePrefix,
+ targetPrefix);
+ } finally {
+ semaphore.release();
+ }
+ });
+ taskSubmitted = true;
+ submittedTasks++;
+ } finally {
+ if (!taskSubmitted) {
+ semaphore.release();
+ }
+ }
+
+ Future<RewriteContentFileResult> done;
+ while ((done = completionService.poll()) != null) {
+ aggregatedResult.append(done.get());
+ completedTasks++;
+ }
+ }
+
+ while (completedTasks < submittedTasks) {
+ aggregatedResult.append(completionService.take().get());
+ completedTasks++;
+ }
+
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ executorService.shutdownNow();
+ throw new RuntimeException("Interrupted while rewriting manifests", e);
+ } catch (ExecutionException e) {
+ executorService.shutdownNow();
+ throw new RuntimeException("Failed to rewrite manifest", e.getCause());
+ }
+
+ return aggregatedResult;
+ }
+
+ private static RewriteContentFileResult processManifest(
+ ManifestFile manifestFile,
+ Table table,
+ Set<Long> deltaSnapshotIds,
+ String stagingLocation,
+ int format,
+ String sourcePrefix,
+ String targetPrefix) {
+ RewriteContentFileResult result = new RewriteContentFileResult();
+ switch (manifestFile.content()) {
+ case DATA:
+ result.appendDataFile(
+ writeDataManifest(
+ manifestFile,
+ table,
+ deltaSnapshotIds,
+ stagingLocation,
+ format,
+ sourcePrefix,
+ targetPrefix));
+ break;
+ case DELETES:
+ result.appendDeleteFile(
+ writeDeleteManifest(
+ manifestFile,
+ table,
+ deltaSnapshotIds,
+ stagingLocation,
+ format,
+ sourcePrefix,
+ targetPrefix));
+ break;
+ default:
+ LOG.error("Unsupported manifest type: {} for manifest: {}",
+ manifestFile.content(), manifestFile.path());
+ throw new UnsupportedOperationException(
+ "Unsupported manifest type: " + manifestFile.content());
+ }
+ return result;
+ }
+
+ private static RewriteResult<DataFile> writeDataManifest(
+ ManifestFile manifestFile,
+ Table table,
+ Set<Long> snapshotIds,
+ String stagingLocation,
+ int format,
+ String sourcePrefix,
+ String targetPrefix) {
+ try {
+ String stagingPath =
+ RewriteTablePathUtil.stagingPath(manifestFile.path(), sourcePrefix,
stagingLocation);
+ FileIO io = table.io();
+ OutputFile outputFile = io.newOutputFile(stagingPath);
+ Map<Integer, PartitionSpec> specsById = table.specs();
+ return RewriteTablePathUtil.rewriteDataManifest(
+ manifestFile,
+ snapshotIds,
+ outputFile,
+ io,
+ format,
+ specsById,
+ sourcePrefix,
+ targetPrefix);
+ } catch (IOException e) {
+ LOG.error("Failed to rewrite data manifest: {}", manifestFile.path(), e);
+ throw new RuntimeIOException(e);
+ }
+ }
+
+ private static RewriteResult<DeleteFile> writeDeleteManifest(
+ ManifestFile manifestFile,
+ Table table,
+ Set<Long> snapshotIds,
+ String stagingLocation,
+ int format,
+ String sourcePrefix,
+ String targetPrefix) {
+ try {
+ String stagingPath =
+ RewriteTablePathUtil.stagingPath(manifestFile.path(), sourcePrefix,
stagingLocation);
+ FileIO io = table.io();
+ OutputFile outputFile = io.newOutputFile(stagingPath);
+ Map<Integer, PartitionSpec> specsById = table.specs();
+ return RewriteTablePathUtil.rewriteDeleteManifest(
+ manifestFile,
+ snapshotIds,
+ outputFile,
+ io,
+ format,
+ specsById,
+ sourcePrefix,
+ targetPrefix,
+ stagingLocation);
+ } catch (IOException e) {
+ LOG.error("Failed to rewrite delete manifest: {}", manifestFile.path(),
e);
+ throw new RuntimeIOException(e);
+ }
+ }
}
diff --git
a/hadoop-ozone/iceberg/src/test/java/org/apache/hadoop/ozone/iceberg/TestRewriteTablePathOzoneAction.java
b/hadoop-ozone/iceberg/src/test/java/org/apache/hadoop/ozone/iceberg/TestRewriteTablePathOzoneAction.java
index 35bab0df55a..405740ab6ee 100644
---
a/hadoop-ozone/iceberg/src/test/java/org/apache/hadoop/ozone/iceberg/TestRewriteTablePathOzoneAction.java
+++
b/hadoop-ozone/iceberg/src/test/java/org/apache/hadoop/ozone/iceberg/TestRewriteTablePathOzoneAction.java
@@ -23,6 +23,7 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.BufferedReader;
+import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
@@ -30,18 +31,23 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.GenericManifestFile;
import org.apache.iceberg.GenericPartitionFieldSummary;
import org.apache.iceberg.GenericStatisticsFile;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.InternalData;
+import org.apache.iceberg.ManifestContent;
import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.ManifestReader;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.RewriteTablePathUtil;
import org.apache.iceberg.Schema;
@@ -59,6 +65,7 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import org.mockito.Mockito;
/**
* Testing path rewrite of iceberg table metadata files.
@@ -106,8 +113,10 @@ void fullTablePathRewrite() throws Exception {
}
Set<Pair<String, String>> csvPairs = readCsvPairs(table,
result.fileListLocation());
- Set<String> actualTargets =
csvPairs.stream().map(Pair::second).collect(Collectors.toSet());
- assertTrue(actualTargets.containsAll(expectedTargets),
+ Set<String> actualTargets = csvPairs.stream().map(Pair::second)
+ .filter(p -> p.endsWith(".metadata.json"))
+ .collect(Collectors.toSet());
+ assertEquals(expectedTargets, actualTargets,
"Copy plan should contain all expected version file targets");
// Verify all internal paths inside each staged metadata file are
rewritten to target.
@@ -136,8 +145,10 @@ void tablePathRewriteForStartAndNoEndVersionProvided()
throws Exception {
}
Set<Pair<String, String>> csvPairs = readCsvPairs(table,
result.fileListLocation());
- Set<String> actualTargets =
csvPairs.stream().map(Pair::second).collect(Collectors.toSet());
- assertTrue(actualTargets.containsAll(expectedTargets),
+ Set<String> actualTargets = csvPairs.stream().map(Pair::second)
+ .filter(p -> p.endsWith(".metadata.json"))
+ .collect(Collectors.toSet());
+ assertEquals(expectedTargets, actualTargets,
"Copy plan should contain all expected version file targets");
// Verify all internal paths inside each staged metadata file are
rewritten to target
@@ -166,8 +177,10 @@ void tablePathRewriteForOnlyEndVersionProvided() throws
Exception {
}
Set<Pair<String, String>> csvPairs = readCsvPairs(table,
result.fileListLocation());
- Set<String> actualTargets =
csvPairs.stream().map(Pair::second).collect(Collectors.toSet());
- assertTrue(actualTargets.containsAll(expectedTargets),
+ Set<String> actualTargets = csvPairs.stream().map(Pair::second)
+ .filter(p -> p.endsWith(".metadata.json"))
+ .collect(Collectors.toSet());
+ assertEquals(expectedTargets, actualTargets,
"Copy plan should contain all expected version file targets");
// Verify all internal paths inside each staged metadata file are
rewritten to target
@@ -198,14 +211,34 @@ void tablePathRewriteForStartAndEndVersionProvided()
throws Exception {
}
Set<Pair<String, String>> csvPairs = readCsvPairs(table,
result.fileListLocation());
- Set<String> actualTargets =
csvPairs.stream().map(Pair::second).collect(Collectors.toSet());
- assertTrue(actualTargets.containsAll(expectedTargets),
+ Set<String> actualTargets = csvPairs.stream().map(Pair::second)
+ .filter(p -> p.endsWith(".metadata.json"))
+ .collect(Collectors.toSet());
+ assertEquals(expectedTargets, actualTargets,
"Copy plan should contain all expected version file targets");
// Verify all internal paths inside each staged metadata file are
rewritten to target
assertAllInternalPathsRewritten(csvPairs, targetPrefix);
}
+ @Test
+ void defaultStagingDirIsUnderTableMetadataLocation() {
+ String metadataLocation =
RewriteTablePathOzoneUtils.getMetadataLocation(table);
+
+ RewriteTablePath.Result result = new RewriteTablePathOzoneAction(table)
+ .rewriteLocationPrefix(sourcePrefix, targetPrefix)
+ .execute();
+
+ String actualStagingDir = result.stagingLocation();
+ assertTrue(actualStagingDir.startsWith(metadataLocation),
+ "Auto-generated staging dir should be under the table's metadata
location."
+ + " Expected prefix: " + metadataLocation + ", actual: " +
actualStagingDir);
+ assertTrue(actualStagingDir.contains("copy-table-staging-"),
+ "Auto-generated staging dir should contain 'copy-table-staging-': " +
actualStagingDir);
+ assertTrue(actualStagingDir.endsWith(RewriteTablePathUtil.FILE_SEPARATOR),
+ "Auto-generated staging dir should end with FILE_SEPARATOR: " +
actualStagingDir);
+ }
+
@Test
void statsFileCopyPlanReturnsEmptySetForEmptyStats() {
Set<Pair<String, String>> copyPlan =
@@ -250,7 +283,7 @@ void statsFileCopyPlanReturnsBeforeToAfterPathPairs() {
Pair.of("before-1.stats", "after-1.stats"),
Pair.of("before-2.stats", "after-2.stats")), copyPlan);
}
-
+
/**
* For every staged file in the CSV copy plan, asserts that internal paths
are rewritten
* to the target prefix:
@@ -272,7 +305,7 @@ private void
assertAllInternalPathsRewritten(Set<Pair<String, String>> csvPairs,
if (stagingPath.endsWith(".metadata.json")) {
assertMetadataFileRewritten(stagingPath, targetPath, target);
} else if
(RewriteTablePathUtil.fileName(stagingPath).startsWith("snap-")) {
- assertManifestListRewritten(stagingPath, targetPath, target);
+ assertManifestListRewritten(stagingPath, targetPath, target, csvPairs);
} else if (RewriteTablePathUtil.fileName(stagingPath).endsWith(".avro"))
{
assertTrue(targetPath.startsWith(target),
"Manifest file target path should start with target prefix: " +
targetPath);
@@ -321,7 +354,8 @@ private void assertMetadataFileRewritten(String
stagingPath, String targetPath,
"Rewritten metadata file should reference the same manifest-lists as
the original");
}
- private void assertManifestListRewritten(String stagingPath, String
targetPath, String target) throws Exception {
+ private void assertManifestListRewritten(String stagingPath, String
targetPath, String target,
+ Set<Pair<String, String>> csvPairs)
throws Exception {
assertTrue(targetPath.startsWith(target),
"Manifest list target path should start with target prefix: " +
targetPath);
@@ -352,12 +386,75 @@ private void assertManifestListRewritten(String
stagingPath, String targetPath,
assertTrue(manifest.path().startsWith(target),
"Manifest path inside staged manifest list should start with
target prefix: " + manifest.path());
actualManifests.add(RewriteTablePathUtil.fileName(manifest.path()));
+ Optional<String> manifestStagingPath = csvPairs.stream()
+ .filter(p -> p.second().equals(manifest.path()))
+ .map(Pair::first)
+ .findFirst();
+ if (manifestStagingPath.isPresent()) {
+ String originalPath = manifest.path().replace(targetPrefix,
sourcePrefix);
+ ManifestFile original = Mockito.spy(manifest);
+ Mockito.doReturn(originalPath).when(original).path();
+
+ ManifestFile staged = Mockito.spy(manifest);
+ Mockito.doReturn(manifestStagingPath.get()).when(staged).path();
+
+ if (manifest.content() == ManifestContent.DATA) {
+ assertDataManifestPathsRewritten(staged, original, target);
+ } else if (manifest.content() == ManifestContent.DELETES) {
+ assertDeleteManifestPathsRewritten(staged, original, target);
+ }
+ }
}
}
assertEquals(expectedManifests, actualManifests,
"Rewritten manifest list should reference the same manifest files as
the original");
}
+ private void assertDataManifestPathsRewritten(ManifestFile staged,
ManifestFile original,
+ String target) throws IOException {
+ Set<String> expectedFileNames = new HashSet<>();
+ try (ManifestReader<DataFile> reader = ManifestFiles.read(original,
table.io())) {
+ for (DataFile df : reader) {
+ expectedFileNames.add(RewriteTablePathUtil.fileName(df.location()));
+ }
+ }
+
+ Set<String> actualFileNames = new HashSet<>();
+ try (ManifestReader<DataFile> reader = ManifestFiles.read(staged,
table.io())) {
+ for (DataFile dataPath : reader) {
+ assertTrue(dataPath.location().startsWith(target),
+ "Data file path inside staged data manifest should start with
target prefix: " + dataPath);
+
actualFileNames.add(RewriteTablePathUtil.fileName(dataPath.location()));
+ }
+ }
+
+ assertEquals(expectedFileNames, actualFileNames,
+ "Rewritten data manifest should reference the same data files as the
original");
+ }
+
+ private void assertDeleteManifestPathsRewritten(ManifestFile staged,
ManifestFile original,
+ String target) throws IOException {
+ Set<String> expectedFileNames = new HashSet<>();
+ try (ManifestReader<DeleteFile> reader =
ManifestFiles.readDeleteManifest(original, table.io(), table.specs())) {
+ for (DeleteFile df : reader) {
+ expectedFileNames.add(RewriteTablePathUtil.fileName(df.location()));
+ }
+ }
+
+ Set<String> actualFileNames = new HashSet<>();
+ try (ManifestReader<DeleteFile> reader =
ManifestFiles.readDeleteManifest(staged, table.io(), table.specs())) {
+ for (DeleteFile df : reader) {
+ assertTrue(df.location().startsWith(target),
+ "Delete file path inside staged delete manifest should start with
target prefix: "
+ + df.location());
+ actualFileNames.add(RewriteTablePathUtil.fileName(df.location()));
+ }
+ }
+
+ assertEquals(expectedFileNames, actualFileNames,
+ "Rewritten delete manifest should reference the same delete files (by
name) as the original");
+ }
+
private static List<String> metadataLogEntryPaths(Table tbl) {
TableMetadata meta = ((HasTableOperations) tbl).operations().current();
List<String> paths = new ArrayList<>();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]