This is an automated email from the ASF dual-hosted git repository.
amogh-jahagirdar pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 1cea23eda5 Core: Fix row ID assignment for EXISTING entry during a
manifest merge (#16263)
1cea23eda5 is described below
commit 1cea23eda51c9b9ddcfb88dd499b1fd14f3bf3b3
Author: Amogh Jahagirdar <[email protected]>
AuthorDate: Tue May 12 14:57:47 2026 -0600
Core: Fix row ID assignment for EXISTING entry during a manifest merge
(#16263)
Co-authored-by: ChengJi <[email protected]>
Co-authored-by: Russell Spitzer <[email protected]>
---
.../java/org/apache/iceberg/ManifestFiles.java | 9 ++++++
.../org/apache/iceberg/ManifestMergeManager.java | 8 ++++-
.../java/org/apache/iceberg/ManifestReader.java | 24 +++++++++++++--
.../apache/iceberg/MergingSnapshotProducer.java | 8 ++++-
.../org/apache/iceberg/TestManifestReader.java | 36 ++++++++++++++++++++++
.../apache/iceberg/TestRowLineageAssignment.java | 27 ++++++++++++++++
6 files changed, 107 insertions(+), 5 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/ManifestFiles.java
b/core/src/main/java/org/apache/iceberg/ManifestFiles.java
index ffeff9c991..5ac55f0cf4 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestFiles.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestFiles.java
@@ -151,6 +151,14 @@ public class ManifestFiles {
*/
public static ManifestReader<DataFile> read(
ManifestFile manifest, FileIO io, Map<Integer, PartitionSpec> specsById)
{
+ return read(manifest, io, specsById, true);
+ }
+
+ static ManifestReader<DataFile> read(
+ ManifestFile manifest,
+ FileIO io,
+ Map<Integer, PartitionSpec> specsById,
+ boolean isCommitted) {
Preconditions.checkArgument(
manifest.content() == ManifestContent.DATA,
"Cannot read a delete manifest with a ManifestReader: %s",
@@ -163,6 +171,7 @@ public class ManifestFiles {
specsById,
inheritableMetadata,
manifest.firstRowId(),
+ isCommitted,
FileType.DATA_FILES);
}
diff --git a/core/src/main/java/org/apache/iceberg/ManifestMergeManager.java
b/core/src/main/java/org/apache/iceberg/ManifestMergeManager.java
index 410edcc068..0aec0ac69a 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestMergeManager.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestMergeManager.java
@@ -72,6 +72,10 @@ abstract class ManifestMergeManager<F extends
ContentFile<F>> {
protected abstract ManifestReader<F> newManifestReader(ManifestFile
manifest);
+ protected ManifestReader<F> newManifestReader(ManifestFile manifest, boolean
isCommitted) {
+ return newManifestReader(manifest);
+ }
+
Iterable<ManifestFile> mergeManifests(Iterable<ManifestFile> manifests) {
Iterator<ManifestFile> manifestIter = manifests.iterator();
if (!mergeEnabled || !manifestIter.hasNext()) {
@@ -192,7 +196,9 @@ abstract class ManifestMergeManager<F extends
ContentFile<F>> {
boolean threw = true;
try {
for (ManifestFile manifest : bin) {
- try (ManifestReader<F> reader = newManifestReader(manifest)) {
+ boolean isCommitted =
+ manifest.snapshotId() != null && snapshotId() !=
manifest.snapshotId();
+ try (ManifestReader<F> reader = newManifestReader(manifest,
isCommitted)) {
for (ManifestEntry<F> entry : reader.entries()) {
if (entry.status() == Status.DELETED) {
// suppress deletes from previous snapshots. only files deleted
by this snapshot
diff --git a/core/src/main/java/org/apache/iceberg/ManifestReader.java
b/core/src/main/java/org/apache/iceberg/ManifestReader.java
index 09bbe8b0cc..e3c2325ab7 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestReader.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestReader.java
@@ -92,6 +92,7 @@ public class ManifestReader<F extends ContentFile<F>> extends
CloseableGroup
private final InputFile file;
private final InheritableMetadata inheritableMetadata;
private final Long firstRowId;
+ private final boolean isCommitted;
private final FileType content;
private final PartitionSpec spec;
private final Schema fileSchema;
@@ -125,12 +126,24 @@ public class ManifestReader<F extends ContentFile<F>>
extends CloseableGroup
InheritableMetadata inheritableMetadata,
Long firstRowId,
FileType content) {
+ this(file, specId, specsById, inheritableMetadata, firstRowId, true,
content);
+ }
+
+ protected ManifestReader(
+ InputFile file,
+ int specId,
+ Map<Integer, PartitionSpec> specsById,
+ InheritableMetadata inheritableMetadata,
+ Long firstRowId,
+ boolean isCommitted,
+ FileType content) {
Preconditions.checkArgument(
firstRowId == null || content == FileType.DATA_FILES,
"First row ID is not valid for delete manifests");
this.file = file;
this.inheritableMetadata = inheritableMetadata;
this.firstRowId = firstRowId;
+ this.isCommitted = isCommitted;
this.content = content;
if (specsById != null) {
@@ -308,7 +321,7 @@ public class ManifestReader<F extends ContentFile<F>>
extends CloseableGroup
CloseableIterable<ManifestEntry<F>> withMetadata =
CloseableIterable.transform(reader, inheritableMetadata::apply);
- return CloseableIterable.transform(withMetadata, idAssigner(firstRowId));
+ return CloseableIterable.transform(withMetadata, idAssigner(firstRowId,
isCommitted));
}
CloseableIterable<ManifestEntry<F>> liveEntries() {
@@ -398,7 +411,7 @@ public class ManifestReader<F extends ContentFile<F>>
extends CloseableGroup
}
private static <F extends ContentFile<F>> Function<ManifestEntry<F>,
ManifestEntry<F>> idAssigner(
- Long firstRowId) {
+ Long firstRowId, boolean isCommitted) {
if (firstRowId != null) {
return new Function<>() {
private long nextRowId = firstRowId;
@@ -416,8 +429,13 @@ public class ManifestReader<F extends ContentFile<F>>
extends CloseableGroup
return entry;
}
};
+ } else if (!isCommitted) {
+ // Preserve firstRowId for entries in uncommitted manifests, including
EXISTING entries that
+ // may be merged later
+ return Function.identity();
} else {
- // data file's first_row_id is null when the manifest's first_row_id is
null
+ // committed manifest with null manifest-level firstRowId (pre-v3
upgrade path)
+ // defensively set the first row ID for every entry to be null
return entry -> {
if (entry.file() instanceof BaseFile) {
((BaseFile<?>) entry.file()).setFirstRowId(null);
diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
index e072382543..1a70b4f90b 100644
--- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
@@ -1250,7 +1250,13 @@ abstract class MergingSnapshotProducer<ThisT> extends
SnapshotProducer<ThisT> {
@Override
protected ManifestReader<DataFile> newManifestReader(ManifestFile
manifest) {
- return MergingSnapshotProducer.this.newManifestReader(manifest);
+ return newManifestReader(manifest, true);
+ }
+
+ @Override
+ protected ManifestReader<DataFile> newManifestReader(
+ ManifestFile manifest, boolean isCommitted) {
+ return ManifestFiles.read(manifest, ops().io(),
ops().current().specsById(), isCommitted);
}
}
diff --git a/core/src/test/java/org/apache/iceberg/TestManifestReader.java
b/core/src/test/java/org/apache/iceberg/TestManifestReader.java
index 0af0c87d35..6690a1483e 100644
--- a/core/src/test/java/org/apache/iceberg/TestManifestReader.java
+++ b/core/src/test/java/org/apache/iceberg/TestManifestReader.java
@@ -218,6 +218,42 @@ public class TestManifestReader extends TestBase {
}
}
+ @TestTemplate
+ public void testReadCommitedManifestNullifiesEntryRowId() throws IOException
{
+ long firstRowId = 42L;
+ DataFile fileWithRowId =
+
DataFiles.builder(SPEC).copy(FILE_A).withFirstRowId(firstRowId).build();
+ // the manifest has no manifest-level first_row_id (the v3+ entry still
carries one)
+ ManifestFile manifest =
+ writeManifest(1000L, manifestEntry(Status.EXISTING, 123L,
fileWithRowId));
+ assertThat(manifest.firstRowId()).isNull();
+
+ try (ManifestReader<DataFile> reader = ManifestFiles.read(manifest,
FILE_IO, table.specs())) {
+ assertThat(Iterables.getOnlyElement(reader).firstRowId()).isNull();
+ }
+ }
+
+ @TestTemplate
+ public void testReadUncommittedManifestPreservesEntryRowId() throws
IOException {
+ assumeThat(formatVersion)
+ .as("first_row_id is only written in v3+ manifests")
+ .isGreaterThanOrEqualTo(3);
+
+ long firstRowId = 42L;
+ DataFile fileWithRowId =
+
DataFiles.builder(SPEC).copy(FILE_A).withFirstRowId(firstRowId).build();
+ // the manifest has no manifest-level first_row_id, but the entry has one
+ ManifestFile manifest =
+ writeManifest(1000L, manifestEntry(Status.EXISTING, 123L,
fileWithRowId));
+ assertThat(manifest.firstRowId()).isNull();
+ assertThat(fileWithRowId.firstRowId()).isEqualTo(firstRowId);
+
+ try (ManifestReader<DataFile> reader =
+ ManifestFiles.read(manifest, FILE_IO, table.specs(), false /*
isCommitted */)) {
+
assertThat(Iterables.getOnlyElement(reader).firstRowId()).isEqualTo(firstRowId);
+ }
+ }
+
@TestTemplate
public void testDataFileSplitOffsetsNullWhenInvalid() throws IOException {
DataFile invalidOffset =
diff --git
a/core/src/test/java/org/apache/iceberg/TestRowLineageAssignment.java
b/core/src/test/java/org/apache/iceberg/TestRowLineageAssignment.java
index 870622bc98..42ee8d0157 100644
--- a/core/src/test/java/org/apache/iceberg/TestRowLineageAssignment.java
+++ b/core/src/test/java/org/apache/iceberg/TestRowLineageAssignment.java
@@ -781,6 +781,33 @@ public class TestRowLineageAssignment {
FILE_C.recordCount() + FILE_B.recordCount());
}
+ @Test
+ public void testRewritePreservesExistingFileFirstRowIds() {
+ table.newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+
+ // FILE_A gets firstRowId=0, FILE_B gets
firstRowId=FILE_A.recordCount()=125; assert before
+ // rewrite
+ ManifestFile preRewriteManifest =
+
Iterables.getOnlyElement(table.currentSnapshot().dataManifests(table.io()));
+ checkDataFileAssignment(table, preRewriteManifest, 0L,
FILE_A.recordCount());
+
+ // set low to trigger an internal manifest merge during the rewrite
+ table.updateProperties().set(TableProperties.MANIFEST_MIN_MERGE_COUNT,
"1").commit();
+ // FILE_A and FILE_C must be removed and added in the same operation.
Removing FILE_A creates
+ // an uncommitted manifest containing FILE_B as an existing entry. Adding
FILE_C triggers the
+ // internal manifest merge to read that uncommitted manifest before its
firstRowId is assigned.
+ table.newRewrite().deleteFile(FILE_A).addFile(FILE_C).commit();
+
+ // merged manifest live files: [FILE_C (added), FILE_B (existing)]
+ ManifestFile manifest =
+
Iterables.getOnlyElement(table.currentSnapshot().dataManifests(table.io()));
+ checkDataFileAssignment(
+ table,
+ manifest,
+ FILE_A.recordCount() + FILE_B.recordCount(), // FILE_C gets 225
+ FILE_A.recordCount()); // FILE_B must retain its original firstRowId
(125)
+ }
+
private static ManifestContent content(int ordinal) {
return ManifestContent.values()[ordinal];
}