This is an automated email from the ASF dual-hosted git repository.
ashishkumar50 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new e136cb00bfb HDDS-14943. Implement rewrite logic for Iceberg's
manifest-list files for path migration (#10190)
e136cb00bfb is described below
commit e136cb00bfbea9f7f3aee7d3e6c3e05944dab7dc
Author: sreejasahithi <[email protected]>
AuthorDate: Mon May 11 13:59:42 2026 +0530
HDDS-14943. Implement rewrite logic for Iceberg's manifest-list files for
path migration (#10190)
---
.../ozone/iceberg/RewriteTablePathOzoneAction.java | 91 +++++++++++++-
.../iceberg/TestRewriteTablePathOzoneAction.java | 140 ++++++++++++++++-----
2 files changed, 200 insertions(+), 31 deletions(-)
diff --git
a/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneAction.java
b/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneAction.java
index a3e9190469e..2e8b930bf26 100644
---
a/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneAction.java
+++
b/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneAction.java
@@ -211,7 +211,7 @@ private boolean versionInFilePath(String path, String
version) {
}
private String rebuildMetadata() {
- //TODO need to implement rewrite of manifest list , manifest files and
position delete files.
+ //TODO need to implement rewrite of manifest files and position delete
files.
TableMetadata startMetadata = startVersionName != null
? new StaticTableOperations(startVersionName, table.io()).current()
: null;
@@ -227,12 +227,14 @@ private String rebuildMetadata() {
Set<Long> deltaSnapshotIds =
deltaSnapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
Set<Snapshot> validSnapshots = new
HashSet<>(RewriteTablePathOzoneUtils.snapshotSet(endMetadata));
validSnapshots.removeAll(RewriteTablePathOzoneUtils.snapshotSet(startMetadata));
- //TODO: manifestsToRewrite will be used while re-write of manifest-list
files.
Set<String> manifestsToRewrite = manifestsToRewrite(validSnapshots,
startMetadata != null ? deltaSnapshotIds : null);
+ RewriteResult<ManifestFile> rewriteManifestListResult =
+ rewriteManifestLists(validSnapshots, endMetadata, manifestsToRewrite);
Set<Pair<String, String>> copyPlan = new HashSet<>();
copyPlan.addAll(rewriteVersionResult.copyPlan());
+ copyPlan.addAll(rewriteManifestListResult.copyPlan());
return RewriteTablePathOzoneUtils.saveFileList(copyPlan, stagingDir,
table.io());
}
@@ -362,6 +364,91 @@ private Set<String> manifestsToRewrite(Set<Snapshot>
validSnapshots, Set<Long> d
return manifestPaths;
}
+ private RewriteResult<ManifestFile> rewriteManifestList(
+ Snapshot snapshot, TableMetadata tableMetadata, Set<String>
manifestsToRewrite) {
+ RewriteResult<ManifestFile> result = new RewriteResult<>();
+
+ String path = snapshot.manifestListLocation();
+ String outputPath = RewriteTablePathUtil.stagingPath(path, sourcePrefix,
stagingDir);
+ RewriteResult<ManifestFile> rewriteResult =
+ RewriteTablePathUtil.rewriteManifestList(
+ snapshot,
+ table.io(),
+ tableMetadata,
+ manifestsToRewrite,
+ sourcePrefix,
+ targetPrefix,
+ stagingDir,
+ outputPath);
+
+ result.append(rewriteResult);
+ result
+ .copyPlan()
+ .add(Pair.of(outputPath, RewriteTablePathUtil.newPath(path,
sourcePrefix, targetPrefix)));
+ return result;
+ }
+
+ private RewriteResult<ManifestFile> rewriteManifestLists(Set<Snapshot>
validSnapshots, TableMetadata endMetadata,
+ Set<String> manifestsToRewrite) {
+
+ if (validSnapshots.isEmpty()) {
+ return new RewriteResult<>();
+ }
+
+ int maxInFlight = parallelism * MAX_INFLIGHT_MULTIPLIER;
+ Semaphore semaphore = new Semaphore(maxInFlight);
+ ExecutorCompletionService<RewriteResult<ManifestFile>> completionService =
+ new ExecutorCompletionService<>(executorService);
+
+ RewriteResult<ManifestFile> combined = new RewriteResult<>();
+ int submittedTasks = 0;
+ int completedTasks = 0;
+
+ try {
+ for (Snapshot snapshot : validSnapshots) {
+ semaphore.acquire();
+
+ boolean taskSubmitted = false;
+ try {
+ completionService.submit(() -> {
+ try {
+ return rewriteManifestList(snapshot, endMetadata,
manifestsToRewrite);
+ } finally {
+ semaphore.release();
+ }
+ });
+ taskSubmitted = true;
+ submittedTasks++;
+ } finally {
+ if (!taskSubmitted) {
+ semaphore.release();
+ }
+ }
+
+ Future<RewriteResult<ManifestFile>> done;
+ while ((done = completionService.poll()) != null) {
+ combined.append(done.get());
+ completedTasks++;
+ }
+ }
+
+ while (completedTasks < submittedTasks) {
+ combined.append(completionService.take().get());
+ completedTasks++;
+ }
+
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ executorService.shutdownNow();
+ throw new RuntimeException("Interrupted while rewriting manifest lists",
e);
+ } catch (ExecutionException e) {
+ executorService.shutdownNow();
+ throw new RuntimeException("Failed to rewrite manifest list",
e.getCause());
+ }
+
+ return combined;
+ }
+
private Set<Snapshot> deltaSnapshots(TableMetadata startMetadata,
Set<Snapshot> allSnapshots) {
if (startMetadata == null) {
return allSnapshots;
diff --git
a/hadoop-ozone/iceberg/src/test/java/org/apache/hadoop/ozone/iceberg/TestRewriteTablePathOzoneAction.java
b/hadoop-ozone/iceberg/src/test/java/org/apache/hadoop/ozone/iceberg/TestRewriteTablePathOzoneAction.java
index 8678fe52301..56442a75063 100644
---
a/hadoop-ozone/iceberg/src/test/java/org/apache/hadoop/ozone/iceberg/TestRewriteTablePathOzoneAction.java
+++
b/hadoop-ozone/iceberg/src/test/java/org/apache/hadoop/ozone/iceberg/TestRewriteTablePathOzoneAction.java
@@ -23,6 +23,7 @@
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -33,7 +34,11 @@
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.GenericManifestFile;
+import org.apache.iceberg.GenericPartitionFieldSummary;
import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.InternalData;
+import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.RewriteTablePathUtil;
import org.apache.iceberg.Schema;
@@ -44,6 +49,7 @@
import org.apache.iceberg.TableMetadata.MetadataLogEntry;
import org.apache.iceberg.actions.RewriteTablePath;
import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Pair;
import org.junit.jupiter.api.BeforeEach;
@@ -68,11 +74,11 @@ class TestRewriteTablePathOzoneAction {
private Table table = null;
@TempDir
- private java.nio.file.Path tableDir;
+ private Path tableDir;
@TempDir
- private java.nio.file.Path targetDir;
+ private Path targetDir;
@TempDir
- private java.nio.file.Path stagingDir;
+ private Path stagingDir;
@BeforeEach
public void setupTableLocation() {
@@ -96,7 +102,9 @@ void fullTablePathRewrite() throws Exception {
}
Set<Pair<String, String>> csvPairs = readCsvPairs(table,
result.fileListLocation());
- assertEquals(expectedTargets,
csvPairs.stream().map(Pair::second).collect(Collectors.toSet()));
+ Set<String> actualTargets =
csvPairs.stream().map(Pair::second).collect(Collectors.toSet());
+ assertTrue(actualTargets.containsAll(expectedTargets),
+ "Copy plan should contain all expected version file targets");
// Verify all internal paths inside each staged metadata file are
rewritten to target.
assertAllInternalPathsRewritten(csvPairs, targetPrefix);
@@ -124,7 +132,9 @@ void tablePathRewriteForStartAndNoEndVersionProvided()
throws Exception {
}
Set<Pair<String, String>> csvPairs = readCsvPairs(table,
result.fileListLocation());
- assertEquals(expectedTargets,
csvPairs.stream().map(Pair::second).collect(Collectors.toSet()));
+ Set<String> actualTargets =
csvPairs.stream().map(Pair::second).collect(Collectors.toSet());
+ assertTrue(actualTargets.containsAll(expectedTargets),
+ "Copy plan should contain all expected version file targets");
// Verify all internal paths inside each staged metadata file are
rewritten to target
assertAllInternalPathsRewritten(csvPairs, targetPrefix);
@@ -152,7 +162,9 @@ void tablePathRewriteForOnlyEndVersionProvided() throws
Exception {
}
Set<Pair<String, String>> csvPairs = readCsvPairs(table,
result.fileListLocation());
- assertEquals(expectedTargets,
csvPairs.stream().map(Pair::second).collect(Collectors.toSet()));
+ Set<String> actualTargets =
csvPairs.stream().map(Pair::second).collect(Collectors.toSet());
+ assertTrue(actualTargets.containsAll(expectedTargets),
+ "Copy plan should contain all expected version file targets");
// Verify all internal paths inside each staged metadata file are
rewritten to target
assertAllInternalPathsRewritten(csvPairs, targetPrefix);
@@ -182,49 +194,119 @@ void tablePathRewriteForStartAndEndVersionProvided()
throws Exception {
}
Set<Pair<String, String>> csvPairs = readCsvPairs(table,
result.fileListLocation());
- assertEquals(expectedTargets,
csvPairs.stream().map(Pair::second).collect(Collectors.toSet()));
+ Set<String> actualTargets =
csvPairs.stream().map(Pair::second).collect(Collectors.toSet());
+ assertTrue(actualTargets.containsAll(expectedTargets),
+ "Copy plan should contain all expected version file targets");
// Verify all internal paths inside each staged metadata file are
rewritten to target
assertAllInternalPathsRewritten(csvPairs, targetPrefix);
}
/**
- * For every staged metadata JSON file in the CSV, parses the file and
asserts that:
- * - The table location starts with target
- * - Every metadata-log entry path starts with target
- * - Every snapshot's manifest-list path starts with target
- * - Every statistics file path starts with target
- * - None of the above contain the source prefix.
+ * For every staged file in the CSV copy plan, asserts that internal paths
are rewritten
+ * to the target prefix:
+ * <ul>
+ * <li><b>.metadata.json</b>: table location, metadata-log entries, and
snapshot
+ * manifest-list references all start with target.</li>
+ * <li><b>snap-*.avro (manifest-list)</b>: target path starts with target,
and every
+ * manifest entry path inside the staged file starts with target.</li>
+ * <li><b>*.avro (manifest)</b>: target path starts with target (content
rewrite
+ * is not yet implemented).</li>
+ * </ul>
*/
- private void assertAllInternalPathsRewritten(Set<Pair<String, String>>
csvPairs, String target) {
+ private void assertAllInternalPathsRewritten(Set<Pair<String, String>>
csvPairs, String target) throws Exception {
+
for (Pair<String, String> pair : csvPairs) {
String stagingPath = pair.first();
String targetPath = pair.second();
- // Only inspect .metadata.json files, manifest/data files and snapshots
are not yet rewritten
- if (!stagingPath.endsWith(".metadata.json")) {
- continue;
+ if (stagingPath.endsWith(".metadata.json")) {
+ assertMetadataFileRewritten(stagingPath, targetPath, target);
+ } else if
(RewriteTablePathUtil.fileName(stagingPath).startsWith("snap-")) {
+ assertManifestListRewritten(stagingPath, targetPath, target);
+ } else if (RewriteTablePathUtil.fileName(stagingPath).endsWith(".avro"))
{
+ assertTrue(targetPath.startsWith(target),
+ "Manifest file target path should start with target prefix: " +
targetPath);
}
+ }
+ }
- assertTrue(targetPath.startsWith(target),
- "Target path in CSV should start with target prefix: " + targetPath);
+ private void assertMetadataFileRewritten(String stagingPath, String
targetPath, String target) {
+
+ assertTrue(targetPath.startsWith(target),
+ "Target path in CSV should start with target prefix: " + targetPath);
+ assertEquals(RewriteTablePathUtil.fileName(stagingPath),
RewriteTablePathUtil.fileName(targetPath),
+ "original and target metadata file should have the same filename");
+
+ TableMetadata rewritten = new StaticTableOperations(stagingPath,
table.io()).current();
+ TableMetadata original = new StaticTableOperations(
+ targetPath.replace(targetPrefix, sourcePrefix), table.io()).current();
+ Set<String> expectedMetadata = original.previousFiles().stream()
+ .map(e -> RewriteTablePathUtil.fileName(e.file()))
+ .collect(Collectors.toSet());
+ Set<String> expectedManifestLists = original.snapshots().stream()
+ .map(s -> RewriteTablePathUtil.fileName(s.manifestListLocation()))
+ .collect(Collectors.toSet());
+ Set<String> actualMetadata = new HashSet<>();
+ Set<String> actualManifestLists = new HashSet<>();
+
+ assertTrue(rewritten.location().startsWith(target),
+ "Metadata location should start with target: " + rewritten.location());
+
+ for (MetadataLogEntry entry : rewritten.previousFiles()) {
+ assertTrue(entry.file().startsWith(target),
+ "Metadata log entry should start with target: " + entry.file());
+ actualMetadata.add(RewriteTablePathUtil.fileName(entry.file()));
+ }
- TableMetadata rewritten = new StaticTableOperations(stagingPath,
table.io()).current();
+ assertEquals(expectedMetadata, actualMetadata,
+ "Rewritten metadata file should reference the same metadata files as
the original");
- assertTrue(rewritten.location().startsWith(target),
- "Metadata location should start with target: " +
rewritten.location());
+ for (Snapshot snapshot : rewritten.snapshots()) {
+ String manifestList = snapshot.manifestListLocation();
+ assertTrue(manifestList.startsWith(target),
+ "Snapshot's manifest-list should start with target: " +
manifestList);
+ actualManifestLists.add(RewriteTablePathUtil.fileName(manifestList));
+ }
+ assertEquals(expectedManifestLists, actualManifestLists,
+ "Rewritten metadata file should reference the same manifest-lists as
the original");
+ }
- for (MetadataLogEntry entry : rewritten.previousFiles()) {
- assertTrue(entry.file().startsWith(target),
- "Metadata log entry should start with target: " + entry.file());
+ private void assertManifestListRewritten(String stagingPath, String
targetPath, String target) throws Exception {
+
+ assertTrue(targetPath.startsWith(target),
+ "Manifest list target path should start with target prefix: " +
targetPath);
+ assertEquals(RewriteTablePathUtil.fileName(stagingPath),
RewriteTablePathUtil.fileName(targetPath),
+ "original and target manifest list should have the same filename");
+
+ Set<String> expectedManifests = new HashSet<>();
+ Set<String> actualManifests = new HashSet<>();
+ for (Snapshot s : table.snapshots()) {
+ if
(RewriteTablePathUtil.fileName(s.manifestListLocation()).equals(RewriteTablePathUtil.fileName(stagingPath)))
{
+ expectedManifests = s.allManifests(table.io())
+ .stream()
+ .map(m -> RewriteTablePathUtil.fileName(m.path()))
+ .collect(Collectors.toSet());
+ break;
}
+ }
- for (Snapshot snapshot : rewritten.snapshots()) {
- String manifestList = snapshot.manifestListLocation();
- assertTrue(manifestList.startsWith(target),
- "Snapshot manifest-list should start with target: " +
manifestList);
+ try (CloseableIterable<ManifestFile> manifests =
+ InternalData.read(FileFormat.AVRO,
table.io().newInputFile(stagingPath))
+ .setRootType(GenericManifestFile.class)
+ .setCustomType(
+ ManifestFile.PARTITION_SUMMARIES_ELEMENT_ID,
+ GenericPartitionFieldSummary.class)
+ .project(ManifestFile.schema())
+ .build()) {
+ for (ManifestFile manifest : manifests) {
+ assertTrue(manifest.path().startsWith(target),
+ "Manifest path inside staged manifest list should start with
target prefix: " + manifest.path());
+ actualManifests.add(RewriteTablePathUtil.fileName(manifest.path()));
}
}
+ assertEquals(expectedManifests, actualManifests,
+ "Rewritten manifest list should reference the same manifest files as
the original");
}
private static List<String> metadataLogEntryPaths(Table tbl) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]