This is an automated email from the ASF dual-hosted git repository.
amogh-jahagirdar pushed a commit to branch 1.10.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/1.10.x by this push:
new 57396d628c Core: Backport fix row ID assignment for EXISTING entry
during a manifest merge to 1.10.x (#16304)
57396d628c is described below
commit 57396d628cb9f92e121f9c2919398475393f0a3a
Author: Amogh Jahagirdar <[email protected]>
AuthorDate: Tue May 12 18:48:22 2026 -0600
Core: Backport fix row ID assignment for EXISTING entry during a manifest
merge to 1.10.x (#16304)
Co-authored-by: ChengJi <[email protected]>
Co-authored-by: Russell Spitzer <[email protected]>
---
.../java/org/apache/iceberg/ManifestFiles.java | 9 ++++++
.../org/apache/iceberg/ManifestMergeManager.java | 8 ++++-
.../java/org/apache/iceberg/ManifestReader.java | 24 +++++++++++++--
.../apache/iceberg/MergingSnapshotProducer.java | 8 ++++-
.../org/apache/iceberg/TestManifestReader.java | 36 ++++++++++++++++++++++
.../apache/iceberg/TestRowLineageAssignment.java | 27 ++++++++++++++++
6 files changed, 107 insertions(+), 5 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/ManifestFiles.java
b/core/src/main/java/org/apache/iceberg/ManifestFiles.java
index 739f0be251..7cc16b2cbe 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestFiles.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestFiles.java
@@ -124,6 +124,14 @@ public class ManifestFiles {
*/
public static ManifestReader<DataFile> read(
ManifestFile manifest, FileIO io, Map<Integer, PartitionSpec> specsById)
{
+ return read(manifest, io, specsById, true);
+ }
+
+ static ManifestReader<DataFile> read(
+ ManifestFile manifest,
+ FileIO io,
+ Map<Integer, PartitionSpec> specsById,
+ boolean isCommitted) {
Preconditions.checkArgument(
manifest.content() == ManifestContent.DATA,
"Cannot read a delete manifest with a ManifestReader: %s",
@@ -136,6 +144,7 @@ public class ManifestFiles {
specsById,
inheritableMetadata,
manifest.firstRowId(),
+ isCommitted,
FileType.DATA_FILES);
}
diff --git a/core/src/main/java/org/apache/iceberg/ManifestMergeManager.java
b/core/src/main/java/org/apache/iceberg/ManifestMergeManager.java
index 94eb8a1107..326257b2cd 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestMergeManager.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestMergeManager.java
@@ -69,6 +69,10 @@ abstract class ManifestMergeManager<F extends
ContentFile<F>> {
protected abstract ManifestReader<F> newManifestReader(ManifestFile
manifest);
+ protected ManifestReader<F> newManifestReader(ManifestFile manifest, boolean
isCommitted) {
+ return newManifestReader(manifest);
+ }
+
Iterable<ManifestFile> mergeManifests(Iterable<ManifestFile> manifests) {
Iterator<ManifestFile> manifestIter = manifests.iterator();
if (!mergeEnabled || !manifestIter.hasNext()) {
@@ -172,7 +176,9 @@ abstract class ManifestMergeManager<F extends
ContentFile<F>> {
boolean threw = true;
try {
for (ManifestFile manifest : bin) {
- try (ManifestReader<F> reader = newManifestReader(manifest)) {
+ boolean isCommitted =
+ manifest.snapshotId() != null && snapshotId() !=
manifest.snapshotId();
+ try (ManifestReader<F> reader = newManifestReader(manifest,
isCommitted)) {
for (ManifestEntry<F> entry : reader.entries()) {
if (entry.status() == Status.DELETED) {
// suppress deletes from previous snapshots. only files deleted
by this snapshot
diff --git a/core/src/main/java/org/apache/iceberg/ManifestReader.java
b/core/src/main/java/org/apache/iceberg/ManifestReader.java
index 40ea17aaa5..ac1d06c382 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestReader.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestReader.java
@@ -82,6 +82,7 @@ public class ManifestReader<F extends ContentFile<F>> extends
CloseableGroup
private final InputFile file;
private final InheritableMetadata inheritableMetadata;
private final Long firstRowId;
+ private final boolean isCommitted;
private final FileType content;
private final PartitionSpec spec;
private final Schema fileSchema;
@@ -115,12 +116,24 @@ public class ManifestReader<F extends ContentFile<F>>
extends CloseableGroup
InheritableMetadata inheritableMetadata,
Long firstRowId,
FileType content) {
+ this(file, specId, specsById, inheritableMetadata, firstRowId, true,
content);
+ }
+
+ protected ManifestReader(
+ InputFile file,
+ int specId,
+ Map<Integer, PartitionSpec> specsById,
+ InheritableMetadata inheritableMetadata,
+ Long firstRowId,
+ boolean isCommitted,
+ FileType content) {
Preconditions.checkArgument(
firstRowId == null || content == FileType.DATA_FILES,
"First row ID is not valid for delete manifests");
this.file = file;
this.inheritableMetadata = inheritableMetadata;
this.firstRowId = firstRowId;
+ this.isCommitted = isCommitted;
this.content = content;
if (specsById != null) {
@@ -296,7 +309,7 @@ public class ManifestReader<F extends ContentFile<F>>
extends CloseableGroup
CloseableIterable<ManifestEntry<F>> withMetadata =
CloseableIterable.transform(reader, inheritableMetadata::apply);
- return CloseableIterable.transform(withMetadata, idAssigner(firstRowId));
+ return CloseableIterable.transform(withMetadata, idAssigner(firstRowId,
isCommitted));
}
CloseableIterable<ManifestEntry<F>> liveEntries() {
@@ -386,7 +399,7 @@ public class ManifestReader<F extends ContentFile<F>>
extends CloseableGroup
}
private static <F extends ContentFile<F>> Function<ManifestEntry<F>,
ManifestEntry<F>> idAssigner(
- Long firstRowId) {
+ Long firstRowId, boolean isCommitted) {
if (firstRowId != null) {
return new Function<>() {
private long nextRowId = firstRowId;
@@ -404,8 +417,13 @@ public class ManifestReader<F extends ContentFile<F>>
extends CloseableGroup
return entry;
}
};
+ } else if (!isCommitted) {
+ // Preserve firstRowId for entries in uncommitted manifests, including
EXISTING entries that
+ // may be merged later
+ return Function.identity();
} else {
- // data file's first_row_id is null when the manifest's first_row_id is
null
+ // committed manifest with null manifest-level firstRowId (pre-v3
upgrade path)
+ // defensively set the first row ID for every entry to be null
return entry -> {
if (entry.file() instanceof BaseFile) {
((BaseFile<?>) entry.file()).setFirstRowId(null);
diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
index f2ebb72706..5256eadfad 100644
--- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
@@ -1231,7 +1231,13 @@ abstract class MergingSnapshotProducer<ThisT> extends
SnapshotProducer<ThisT> {
@Override
protected ManifestReader<DataFile> newManifestReader(ManifestFile
manifest) {
- return MergingSnapshotProducer.this.newManifestReader(manifest);
+ return newManifestReader(manifest, true);
+ }
+
+ @Override
+ protected ManifestReader<DataFile> newManifestReader(
+ ManifestFile manifest, boolean isCommitted) {
+ return ManifestFiles.read(manifest, ops().io(),
ops().current().specsById(), isCommitted);
}
}
diff --git a/core/src/test/java/org/apache/iceberg/TestManifestReader.java
b/core/src/test/java/org/apache/iceberg/TestManifestReader.java
index fe4e4a74d1..fd408fa67e 100644
--- a/core/src/test/java/org/apache/iceberg/TestManifestReader.java
+++ b/core/src/test/java/org/apache/iceberg/TestManifestReader.java
@@ -217,6 +217,42 @@ public class TestManifestReader extends TestBase {
}
}
+ @TestTemplate
+ public void testReadCommitedManifestNullifiesEntryRowId() throws IOException
{
+ long firstRowId = 42L;
+ DataFile fileWithRowId =
+
DataFiles.builder(SPEC).copy(FILE_A).withFirstRowId(firstRowId).build();
+ // the manifest has no manifest-level first_row_id (the v3+ entry still
carries one)
+ ManifestFile manifest =
+ writeManifest(1000L, manifestEntry(Status.EXISTING, 123L,
fileWithRowId));
+ assertThat(manifest.firstRowId()).isNull();
+
+ try (ManifestReader<DataFile> reader = ManifestFiles.read(manifest,
FILE_IO, table.specs())) {
+ assertThat(Iterables.getOnlyElement(reader).firstRowId()).isNull();
+ }
+ }
+
+ @TestTemplate
+ public void testReadUncommittedManifestPreservesEntryRowId() throws
IOException {
+ assumeThat(formatVersion)
+ .as("first_row_id is only written in v3+ manifests")
+ .isGreaterThanOrEqualTo(3);
+
+ long firstRowId = 42L;
+ DataFile fileWithRowId =
+
DataFiles.builder(SPEC).copy(FILE_A).withFirstRowId(firstRowId).build();
+ // the manifest has no manifest-level first_row_id, but the entry has one
+ ManifestFile manifest =
+ writeManifest(1000L, manifestEntry(Status.EXISTING, 123L,
fileWithRowId));
+ assertThat(manifest.firstRowId()).isNull();
+ assertThat(fileWithRowId.firstRowId()).isEqualTo(firstRowId);
+
+ try (ManifestReader<DataFile> reader =
+ ManifestFiles.read(manifest, FILE_IO, table.specs(), false /*
isCommitted */)) {
+
assertThat(Iterables.getOnlyElement(reader).firstRowId()).isEqualTo(firstRowId);
+ }
+ }
+
@TestTemplate
public void testDataFileSplitOffsetsNullWhenInvalid() throws IOException {
DataFile invalidOffset =
diff --git
a/core/src/test/java/org/apache/iceberg/TestRowLineageAssignment.java
b/core/src/test/java/org/apache/iceberg/TestRowLineageAssignment.java
index 5f027ccc88..91aec244d4 100644
--- a/core/src/test/java/org/apache/iceberg/TestRowLineageAssignment.java
+++ b/core/src/test/java/org/apache/iceberg/TestRowLineageAssignment.java
@@ -781,6 +781,33 @@ public class TestRowLineageAssignment {
FILE_C.recordCount() + FILE_B.recordCount());
}
+ @Test
+ public void testRewritePreservesExistingFileFirstRowIds() {
+ table.newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+
+ // FILE_A gets firstRowId=0, FILE_B gets
firstRowId=FILE_A.recordCount()=125; assert before
+ // rewrite
+ ManifestFile preRewriteManifest =
+
Iterables.getOnlyElement(table.currentSnapshot().dataManifests(table.io()));
+ checkDataFileAssignment(table, preRewriteManifest, 0L,
FILE_A.recordCount());
+
+ // set low to trigger an internal manifest merge during the rewrite
+ table.updateProperties().set(TableProperties.MANIFEST_MIN_MERGE_COUNT,
"1").commit();
+ // FILE_A and FILE_C must be removed and added in the same operation.
Removing FILE_A creates
+ // an uncommitted manifest containing FILE_B as an existing entry. Adding
FILE_C triggers the
+ // internal manifest merge to read that uncommitted manifest before its
firstRowId is assigned.
+ table.newRewrite().deleteFile(FILE_A).addFile(FILE_C).commit();
+
+ // merged manifest live files: [FILE_C (added), FILE_B (existing)]
+ ManifestFile manifest =
+
Iterables.getOnlyElement(table.currentSnapshot().dataManifests(table.io()));
+ checkDataFileAssignment(
+ table,
+ manifest,
+ FILE_A.recordCount() + FILE_B.recordCount(), // FILE_C gets 225
+ FILE_A.recordCount()); // FILE_B must retain its original firstRowId
(125)
+ }
+
private static ManifestContent content(int ordinal) {
return ManifestContent.values()[ordinal];
}