This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new beca2e69 fix(manifest): correct v3 manifest-list first row id 
assigment for row-lineage (#741)
beca2e69 is described below

commit beca2e693c85ba12b44b4a8561c0f5562ecd0c7a
Author: ferhat elmas <[email protected]>
AuthorDate: Thu Feb 19 20:45:52 2026 +0100

    fix(manifest): correct v3 manifest-list first row id assigment for 
row-lineage (#741)
    
    Pointer is aliased and then mutated before encode. Start becomes end of
    range. With wiring snapshot lineage from writer delta (#728), this can
    cause overlaps/gaps between commits.
    
    Signed-off-by: ferhat elmas <[email protected]>
---
 manifest.go                      |  3 +-
 manifest_test.go                 | 33 ++++++++++++++
 table/snapshot_producers_test.go | 94 +++++++++++++++++++++++++++++++++++++++-
 3 files changed, 127 insertions(+), 3 deletions(-)

diff --git a/manifest.go b/manifest.go
index 48bae7a6..68a7234a 100644
--- a/manifest.go
+++ b/manifest.go
@@ -1406,7 +1406,8 @@ func (m *ManifestListWriter) AddManifests(files 
[]ManifestFile) error {
                                // Ref: 
https://github.com/apache/iceberg/blob/ea2071568dc66148b483a82eefedcd2992b435f7/core/src/main/java/org/apache/iceberg/ManifestListWriter.java#L157-L168
                                if wrapped.Content == ManifestContentData && 
wrapped.FirstRowId == nil {
                                        if m.nextRowID != nil {
-                                               wrapped.FirstRowId = m.nextRowID
+                                               firstRowID := *m.nextRowID
+                                               wrapped.FirstRowId = &firstRowID
                                                *m.nextRowID += 
wrapped.ExistingRowsCount + wrapped.AddedRowsCount
                                        }
                                }
diff --git a/manifest_test.go b/manifest_test.go
index 87dc4efc..b94b47ae 100644
--- a/manifest_test.go
+++ b/manifest_test.go
@@ -1448,6 +1448,39 @@ func (m *ManifestTestSuite) 
TestV3ManifestListWriterDeltaIgnoresNonDataManifests
        m.Require().NoError(writer.Close())
 }
 
+func (m *ManifestTestSuite) 
TestV3ManifestListWriterPersistsPerManifestFirstRowIDStart() {
+       // Persisted first_row_id per manifest must be the start of each 
assigned row-id range.
+       var buf bytes.Buffer
+       commitSnapID := int64(100)
+       firstRowID := int64(5000)
+       sequenceNum := int64(1)
+
+       writer, err := NewManifestListWriterV3(&buf, commitSnapID, sequenceNum, 
firstRowID, nil)
+       m.Require().NoError(err)
+
+       manifests := []ManifestFile{
+               NewManifestFile(3, "m1.avro", 10, 1, 
commitSnapID).AddedRows(10).ExistingRows(5).Build(), // delta = 15
+               NewManifestFile(3, "m2.avro", 10, 1, 
commitSnapID).AddedRows(7).Build(),                  // delta = 7
+       }
+       m.Require().NoError(writer.AddManifests(manifests))
+       m.Require().NoError(writer.Close())
+
+       list, err := ReadManifestList(bytes.NewReader(buf.Bytes()))
+       m.Require().NoError(err)
+       m.Require().Len(list, 2)
+
+       firstManifest, ok := list[0].(*manifestFile)
+       m.Require().True(ok, "expected v3 manifest file type")
+       secondManifest, ok := list[1].(*manifestFile)
+       m.Require().True(ok, "expected v3 manifest file type")
+       m.Require().NotNil(firstManifest.FirstRowId)
+       m.Require().NotNil(secondManifest.FirstRowId)
+
+       m.EqualValues(5000, *firstManifest.FirstRowId) // start of first range
+       m.EqualValues(5015, *secondManifest.FirstRowId)
+       m.EqualValues(5022, *writer.NextRowID())
+}
+
 func (m *ManifestTestSuite) TestV3PrepareEntrySequenceNumberValidation() {
        // Test v3writerImpl.prepareEntry sequence number validation logic
        v3Writer := v3writerImpl{}
diff --git a/table/snapshot_producers_test.go b/table/snapshot_producers_test.go
index f2e55631..025e0b59 100644
--- a/table/snapshot_producers_test.go
+++ b/table/snapshot_producers_test.go
@@ -20,6 +20,7 @@ package table
 import (
        "bytes"
        "context"
+       "encoding/json"
        "errors"
        "io"
        "io/fs"
@@ -135,6 +136,10 @@ func manifestSize(t *testing.T, version int, spec 
iceberg.PartitionSpec, schema
 }
 
 func newTestDataFile(t *testing.T, spec iceberg.PartitionSpec, path string, 
partition map[int]any) iceberg.DataFile {
+       return newTestDataFileWithCount(t, spec, path, partition, 1)
+}
+
+func newTestDataFileWithCount(t *testing.T, spec iceberg.PartitionSpec, path 
string, partition map[int]any, count int64) iceberg.DataFile {
        t.Helper()
 
        builder, err := iceberg.NewDataFileBuilder(
@@ -145,8 +150,8 @@ func newTestDataFile(t *testing.T, spec 
iceberg.PartitionSpec, path string, part
                partition,
                nil,
                nil,
-               1,
-               1,
+               count,
+               count,
        )
        require.NoError(t, err, "new data file builder")
 
@@ -299,6 +304,91 @@ func TestCommitV3RowLineageDeltaIncludesExistingRows(t 
*testing.T) {
        require.Equal(t, int64(3), meta2.NextRowID(), "next-row-id = 
first-row-id + assigned delta (1+2)")
 }
 
+func readManifestListFromPath(t *testing.T, fs iceio.IO, path string) 
[]iceberg.ManifestFile {
+       t.Helper()
+
+       f, err := fs.Open(path)
+       require.NoError(t, err, "open manifest list: %s", path)
+       defer f.Close()
+
+       list, err := iceberg.ReadManifestList(f)
+       require.NoError(t, err, "read manifest list: %s", path)
+
+       return list
+}
+
+func manifestFirstRowIDForSnapshot(t *testing.T, manifests 
[]iceberg.ManifestFile, snapshotID int64) int64 {
+       t.Helper()
+
+       type manifestRowLineage struct {
+               AddedSnapshotID int64  `json:"AddedSnapshotID"`
+               FirstRowID      *int64 `json:"FirstRowId"`
+       }
+
+       for _, manifest := range manifests {
+               raw, err := json.Marshal(manifest)
+               require.NoError(t, err, "marshal manifest")
+
+               var decoded manifestRowLineage
+               require.NoError(t, json.Unmarshal(raw, &decoded), "unmarshal 
manifest row-lineage fields")
+
+               if decoded.AddedSnapshotID == snapshotID {
+                       require.NotNil(t, decoded.FirstRowID, "first_row_id 
must be persisted for v3 data manifests")
+
+                       return *decoded.FirstRowID
+               }
+       }
+
+       require.Failf(t, "missing manifest for snapshot", "snapshot-id=%d", 
snapshotID)
+
+       return 0
+}
+
+// TestCommitV3RowLineagePersistsManifestFirstRowID verifies that snapshot 
producer
+// writes first_row_id to manifest list entries using the snapshot's start 
row-id.
+func TestCommitV3RowLineagePersistsManifestFirstRowID(t *testing.T) {
+       spec := iceberg.NewPartitionSpec()
+       ident := Identifier{"db", "tbl"}
+       txn, memIO := createTestTransactionWithMemIO(t, spec)
+       txn.meta.formatVersion = 3
+
+       // Use multi-row files to make row-range starts obvious.
+       sp1 := newFastAppendFilesProducer(OpAppend, txn, memIO, nil, nil)
+       sp1.appendDataFile(newTestDataFileWithCount(t, spec, 
"file://data-1.parquet", nil, 3))
+       updates1, reqs1, err := sp1.commit()
+       require.NoError(t, err, "first commit should succeed")
+       addSnap1, ok := updates1[0].(*addSnapshotUpdate)
+       require.True(t, ok, "first update must be AddSnapshot")
+       require.Equal(t, int64(0), *addSnap1.Snapshot.FirstRowID, "snapshot 
first-row-id for commit 1")
+
+       manifests1 := readManifestListFromPath(t, memIO, 
addSnap1.Snapshot.ManifestList)
+       currentManifestFirstRowID1 := manifestFirstRowIDForSnapshot(t, 
manifests1, addSnap1.Snapshot.SnapshotID)
+       require.Equal(t, *addSnap1.Snapshot.FirstRowID, 
currentManifestFirstRowID1,
+               "persisted manifest first_row_id must match snapshot 
first-row-id for current commit")
+
+       err = txn.apply(updates1, reqs1)
+       require.NoError(t, err, "first apply should succeed")
+       meta1, err := txn.meta.Build()
+       require.NoError(t, err)
+       require.Equal(t, int64(3), meta1.NextRowID())
+
+       tbl2 := New(ident, meta1, "metadata.json", func(context.Context) 
(iceio.IO, error) { return memIO, nil }, nil)
+       txn2 := tbl2.NewTransaction()
+       txn2.meta.formatVersion = 3
+       sp2 := newFastAppendFilesProducer(OpAppend, txn2, memIO, nil, nil)
+       sp2.appendDataFile(newTestDataFileWithCount(t, spec, 
"file://data-2.parquet", nil, 5))
+       updates2, _, err := sp2.commit()
+       require.NoError(t, err, "second commit should succeed")
+       addSnap2, ok := updates2[0].(*addSnapshotUpdate)
+       require.True(t, ok, "first update must be AddSnapshot")
+       require.Equal(t, int64(3), *addSnap2.Snapshot.FirstRowID, "snapshot 
first-row-id for commit 2")
+
+       manifests2 := readManifestListFromPath(t, memIO, 
addSnap2.Snapshot.ManifestList)
+       currentManifestFirstRowID2 := manifestFirstRowIDForSnapshot(t, 
manifests2, addSnap2.Snapshot.SnapshotID)
+       require.Equal(t, *addSnap2.Snapshot.FirstRowID, 
currentManifestFirstRowID2,
+               "persisted manifest first_row_id must match snapshot 
first-row-id for current commit")
+}
+
 func TestSnapshotProducerManifestsClosesWriterOnError(t *testing.T) {
        spec := partitionedSpec()
        schema := simpleSchema()

Reply via email to