This is an automated email from the ASF dual-hosted git repository.

amogh-jahagirdar pushed a commit to branch 1.10.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/1.10.x by this push:
     new 57396d628c Core: Backport fix row ID assignment for EXISTING entry 
during a manifest merge to 1.10.x (#16304)
57396d628c is described below

commit 57396d628cb9f92e121f9c2919398475393f0a3a
Author: Amogh Jahagirdar <[email protected]>
AuthorDate: Tue May 12 18:48:22 2026 -0600

    Core: Backport fix row ID assignment for EXISTING entry during a manifest 
merge to 1.10.x (#16304)
    
    Co-authored-by: ChengJi <[email protected]>
    Co-authored-by: Russell Spitzer <[email protected]>
---
 .../java/org/apache/iceberg/ManifestFiles.java     |  9 ++++++
 .../org/apache/iceberg/ManifestMergeManager.java   |  8 ++++-
 .../java/org/apache/iceberg/ManifestReader.java    | 24 +++++++++++++--
 .../apache/iceberg/MergingSnapshotProducer.java    |  8 ++++-
 .../org/apache/iceberg/TestManifestReader.java     | 36 ++++++++++++++++++++++
 .../apache/iceberg/TestRowLineageAssignment.java   | 27 ++++++++++++++++
 6 files changed, 107 insertions(+), 5 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/ManifestFiles.java 
b/core/src/main/java/org/apache/iceberg/ManifestFiles.java
index 739f0be251..7cc16b2cbe 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestFiles.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestFiles.java
@@ -124,6 +124,14 @@ public class ManifestFiles {
    */
   public static ManifestReader<DataFile> read(
       ManifestFile manifest, FileIO io, Map<Integer, PartitionSpec> specsById) 
{
+    return read(manifest, io, specsById, true);
+  }
+
+  static ManifestReader<DataFile> read(
+      ManifestFile manifest,
+      FileIO io,
+      Map<Integer, PartitionSpec> specsById,
+      boolean isCommitted) {
     Preconditions.checkArgument(
         manifest.content() == ManifestContent.DATA,
         "Cannot read a delete manifest with a ManifestReader: %s",
@@ -136,6 +144,7 @@ public class ManifestFiles {
         specsById,
         inheritableMetadata,
         manifest.firstRowId(),
+        isCommitted,
         FileType.DATA_FILES);
   }
 
diff --git a/core/src/main/java/org/apache/iceberg/ManifestMergeManager.java 
b/core/src/main/java/org/apache/iceberg/ManifestMergeManager.java
index 94eb8a1107..326257b2cd 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestMergeManager.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestMergeManager.java
@@ -69,6 +69,10 @@ abstract class ManifestMergeManager<F extends 
ContentFile<F>> {
 
   protected abstract ManifestReader<F> newManifestReader(ManifestFile 
manifest);
 
+  protected ManifestReader<F> newManifestReader(ManifestFile manifest, boolean 
isCommitted) {
+    return newManifestReader(manifest);
+  }
+
   Iterable<ManifestFile> mergeManifests(Iterable<ManifestFile> manifests) {
     Iterator<ManifestFile> manifestIter = manifests.iterator();
     if (!mergeEnabled || !manifestIter.hasNext()) {
@@ -172,7 +176,9 @@ abstract class ManifestMergeManager<F extends 
ContentFile<F>> {
     boolean threw = true;
     try {
       for (ManifestFile manifest : bin) {
-        try (ManifestReader<F> reader = newManifestReader(manifest)) {
+        boolean isCommitted =
+            manifest.snapshotId() != null && snapshotId() != 
manifest.snapshotId();
+        try (ManifestReader<F> reader = newManifestReader(manifest, 
isCommitted)) {
           for (ManifestEntry<F> entry : reader.entries()) {
             if (entry.status() == Status.DELETED) {
               // suppress deletes from previous snapshots. only files deleted 
by this snapshot
diff --git a/core/src/main/java/org/apache/iceberg/ManifestReader.java 
b/core/src/main/java/org/apache/iceberg/ManifestReader.java
index 40ea17aaa5..ac1d06c382 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestReader.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestReader.java
@@ -82,6 +82,7 @@ public class ManifestReader<F extends ContentFile<F>> extends 
CloseableGroup
   private final InputFile file;
   private final InheritableMetadata inheritableMetadata;
   private final Long firstRowId;
+  private final boolean isCommitted;
   private final FileType content;
   private final PartitionSpec spec;
   private final Schema fileSchema;
@@ -115,12 +116,24 @@ public class ManifestReader<F extends ContentFile<F>> 
extends CloseableGroup
       InheritableMetadata inheritableMetadata,
       Long firstRowId,
       FileType content) {
+    this(file, specId, specsById, inheritableMetadata, firstRowId, true, 
content);
+  }
+
+  protected ManifestReader(
+      InputFile file,
+      int specId,
+      Map<Integer, PartitionSpec> specsById,
+      InheritableMetadata inheritableMetadata,
+      Long firstRowId,
+      boolean isCommitted,
+      FileType content) {
     Preconditions.checkArgument(
         firstRowId == null || content == FileType.DATA_FILES,
         "First row ID is not valid for delete manifests");
     this.file = file;
     this.inheritableMetadata = inheritableMetadata;
     this.firstRowId = firstRowId;
+    this.isCommitted = isCommitted;
     this.content = content;
 
     if (specsById != null) {
@@ -296,7 +309,7 @@ public class ManifestReader<F extends ContentFile<F>> 
extends CloseableGroup
 
     CloseableIterable<ManifestEntry<F>> withMetadata =
         CloseableIterable.transform(reader, inheritableMetadata::apply);
-    return CloseableIterable.transform(withMetadata, idAssigner(firstRowId));
+    return CloseableIterable.transform(withMetadata, idAssigner(firstRowId, 
isCommitted));
   }
 
   CloseableIterable<ManifestEntry<F>> liveEntries() {
@@ -386,7 +399,7 @@ public class ManifestReader<F extends ContentFile<F>> 
extends CloseableGroup
   }
 
   private static <F extends ContentFile<F>> Function<ManifestEntry<F>, 
ManifestEntry<F>> idAssigner(
-      Long firstRowId) {
+      Long firstRowId, boolean isCommitted) {
     if (firstRowId != null) {
       return new Function<>() {
         private long nextRowId = firstRowId;
@@ -404,8 +417,13 @@ public class ManifestReader<F extends ContentFile<F>> 
extends CloseableGroup
           return entry;
         }
       };
+    } else if (!isCommitted) {
+      // Preserve firstRowId for entries in uncommitted manifests, including 
EXISTING entries that
+      // may be merged later
+      return Function.identity();
     } else {
-      // data file's first_row_id is null when the manifest's first_row_id is 
null
+      // committed manifest with null manifest-level firstRowId (pre-v3 
upgrade path)
+      // defensively set the first row ID for every entry to be null
       return entry -> {
         if (entry.file() instanceof BaseFile) {
           ((BaseFile<?>) entry.file()).setFirstRowId(null);
diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java 
b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
index f2ebb72706..5256eadfad 100644
--- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
@@ -1231,7 +1231,13 @@ abstract class MergingSnapshotProducer<ThisT> extends 
SnapshotProducer<ThisT> {
 
     @Override
     protected ManifestReader<DataFile> newManifestReader(ManifestFile 
manifest) {
-      return MergingSnapshotProducer.this.newManifestReader(manifest);
+      return newManifestReader(manifest, true);
+    }
+
+    @Override
+    protected ManifestReader<DataFile> newManifestReader(
+        ManifestFile manifest, boolean isCommitted) {
+      return ManifestFiles.read(manifest, ops().io(), 
ops().current().specsById(), isCommitted);
     }
   }
 
diff --git a/core/src/test/java/org/apache/iceberg/TestManifestReader.java 
b/core/src/test/java/org/apache/iceberg/TestManifestReader.java
index fe4e4a74d1..fd408fa67e 100644
--- a/core/src/test/java/org/apache/iceberg/TestManifestReader.java
+++ b/core/src/test/java/org/apache/iceberg/TestManifestReader.java
@@ -217,6 +217,42 @@ public class TestManifestReader extends TestBase {
     }
   }
 
+  @TestTemplate
+  public void testReadCommitedManifestNullifiesEntryRowId() throws IOException 
{
+    long firstRowId = 42L;
+    DataFile fileWithRowId =
+        
DataFiles.builder(SPEC).copy(FILE_A).withFirstRowId(firstRowId).build();
+    // the manifest has no manifest-level first_row_id (the v3+ entry still 
carries one)
+    ManifestFile manifest =
+        writeManifest(1000L, manifestEntry(Status.EXISTING, 123L, 
fileWithRowId));
+    assertThat(manifest.firstRowId()).isNull();
+
+    try (ManifestReader<DataFile> reader = ManifestFiles.read(manifest, 
FILE_IO, table.specs())) {
+      assertThat(Iterables.getOnlyElement(reader).firstRowId()).isNull();
+    }
+  }
+
+  @TestTemplate
+  public void testReadUncommittedManifestPreservesEntryRowId() throws 
IOException {
+    assumeThat(formatVersion)
+        .as("first_row_id is only written in v3+ manifests")
+        .isGreaterThanOrEqualTo(3);
+
+    long firstRowId = 42L;
+    DataFile fileWithRowId =
+        
DataFiles.builder(SPEC).copy(FILE_A).withFirstRowId(firstRowId).build();
+    // the manifest has no manifest-level first_row_id, but the entry has one
+    ManifestFile manifest =
+        writeManifest(1000L, manifestEntry(Status.EXISTING, 123L, 
fileWithRowId));
+    assertThat(manifest.firstRowId()).isNull();
+    assertThat(fileWithRowId.firstRowId()).isEqualTo(firstRowId);
+
+    try (ManifestReader<DataFile> reader =
+        ManifestFiles.read(manifest, FILE_IO, table.specs(), false /* 
isCommitted */)) {
+      
assertThat(Iterables.getOnlyElement(reader).firstRowId()).isEqualTo(firstRowId);
+    }
+  }
+
   @TestTemplate
   public void testDataFileSplitOffsetsNullWhenInvalid() throws IOException {
     DataFile invalidOffset =
diff --git 
a/core/src/test/java/org/apache/iceberg/TestRowLineageAssignment.java 
b/core/src/test/java/org/apache/iceberg/TestRowLineageAssignment.java
index 5f027ccc88..91aec244d4 100644
--- a/core/src/test/java/org/apache/iceberg/TestRowLineageAssignment.java
+++ b/core/src/test/java/org/apache/iceberg/TestRowLineageAssignment.java
@@ -781,6 +781,33 @@ public class TestRowLineageAssignment {
         FILE_C.recordCount() + FILE_B.recordCount());
   }
 
+  @Test
+  public void testRewritePreservesExistingFileFirstRowIds() {
+    table.newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+
+    // FILE_A gets firstRowId=0, FILE_B gets 
firstRowId=FILE_A.recordCount()=125; assert before
+    // rewrite
+    ManifestFile preRewriteManifest =
+        
Iterables.getOnlyElement(table.currentSnapshot().dataManifests(table.io()));
+    checkDataFileAssignment(table, preRewriteManifest, 0L, 
FILE_A.recordCount());
+
+    // set low to trigger an internal manifest merge during the rewrite
+    table.updateProperties().set(TableProperties.MANIFEST_MIN_MERGE_COUNT, 
"1").commit();
+    // FILE_A and FILE_C must be removed and added in the same operation. 
Removing FILE_A creates
+    // an uncommitted manifest containing FILE_B as an existing entry. Adding 
FILE_C triggers the
+    // internal manifest merge to read that uncommitted manifest before its 
firstRowId is assigned.
+    table.newRewrite().deleteFile(FILE_A).addFile(FILE_C).commit();
+
+    // merged manifest live files: [FILE_C (added), FILE_B (existing)]
+    ManifestFile manifest =
+        
Iterables.getOnlyElement(table.currentSnapshot().dataManifests(table.io()));
+    checkDataFileAssignment(
+        table,
+        manifest,
+        FILE_A.recordCount() + FILE_B.recordCount(), // FILE_C gets 225
+        FILE_A.recordCount()); // FILE_B must retain its original firstRowId 
(125)
+  }
+
   private static ManifestContent content(int ordinal) {
     return ManifestContent.values()[ordinal];
   }

Reply via email to