This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new bbdb53f1 feat(metadata): row lineage tracking (#659)
bbdb53f1 is described below

commit bbdb53f1fa1b4aceb86cb0bc7c8bb78d3ebb990c
Author: Tobias Pütz <[email protected]>
AuthorDate: Tue Dec 23 16:56:09 2025 +0100

    feat(metadata): row lineage tracking (#659)
    
    this implements only the metadata side of things, no support read or
    writing tables with row lineage
---
 table/metadata.go                       |  48 +++++++-
 table/metadata_builder_internal_test.go | 204 ++++++++++++++++++++++++++++++++
 table/snapshots.go                      |  25 +++-
 table/snapshots_test.go                 | 128 ++++++++++++++++++++
 4 files changed, 400 insertions(+), 5 deletions(-)

diff --git a/table/metadata.go b/table/metadata.go
index d47d27ec..5ddedf8d 100644
--- a/table/metadata.go
+++ b/table/metadata.go
@@ -36,9 +36,11 @@ import (
 )
 
 const (
-       partitionFieldStartID             = 1000
-       supportedTableFormatVersion       = 3
-       oneMinuteInMs               int64 = 60_000
+       partitionFieldStartID       = 1000
+       supportedTableFormatVersion = 3
+       minFormatVersionRowLineage  = 3
+       initialRowID                = int64(0)
+       oneMinuteInMs               = int64(60_000)
 )
 
 func generateSnapshotID() int64 {
@@ -427,6 +429,10 @@ func (b *MetadataBuilder) AddSnapshot(snapshot *Snapshot) 
error {
                        snapshot.TimestampMs, maxTS)
        }
 
+       if err := b.validateAndUpdateRowLineage(snapshot); err != nil {
+               return err
+       }
+
        b.updates = append(b.updates, NewAddSnapshotUpdate(snapshot))
        b.lastUpdatedMS = snapshot.TimestampMs
        b.lastSequenceNumber = &snapshot.SequenceNumber
@@ -435,6 +441,42 @@ func (b *MetadataBuilder) AddSnapshot(snapshot *Snapshot) 
error {
        return nil
 }
 
+func (b *MetadataBuilder) validateAndUpdateRowLineage(snapshot *Snapshot) 
error {
+       if b.formatVersion < minFormatVersionRowLineage {
+               return nil
+       }
+
+       if err := snapshot.ValidateRowLineage(); err != nil {
+               return err
+       }
+
+       if snapshot.FirstRowID == nil {
+               return fmt.Errorf("%w: first-row-id is required for v3 
snapshots", ErrInvalidRowLineage)
+       }
+
+       nextRowID := b.currentNextRowID()
+       if *snapshot.FirstRowID < nextRowID {
+               return fmt.Errorf("%w: first-row-id %d is behind table 
next-row-id %d",
+                       ErrInvalidRowLineage, *snapshot.FirstRowID, nextRowID)
+       }
+
+       newNextRowID := nextRowID + *snapshot.AddedRows
+       b.nextRowID = &newNextRowID
+
+       return nil
+}
+
+func (b *MetadataBuilder) currentNextRowID() int64 {
+       if b.nextRowID != nil {
+               return *b.nextRowID
+       }
+       if b.base != nil {
+               return b.base.NextRowID()
+       }
+
+       return initialRowID
+}
+
 func (b *MetadataBuilder) RemoveSnapshots(snapshotIds []int64) error {
        if b.currentSnapshotID != nil && slices.Contains(snapshotIds, 
*b.currentSnapshotID) {
                return errors.New("current snapshot cannot be removed")
diff --git a/table/metadata_builder_internal_test.go 
b/table/metadata_builder_internal_test.go
index 3a046953..8fe67123 100644
--- a/table/metadata_builder_internal_test.go
+++ b/table/metadata_builder_internal_test.go
@@ -1337,6 +1337,210 @@ func TestUnsupportedTypes(t *testing.T) {
        }
 }
 
+func TestAddSnapshotV3RequiresRowLineage(t *testing.T) {
+       builder := builderWithoutChanges(3)
+       schemaID := 0
+       snapshot := Snapshot{
+               SnapshotID:       1,
+               ParentSnapshotID: nil,
+               SequenceNumber:   0,
+               TimestampMs:      builder.base.LastUpdatedMillis() + 1,
+               ManifestList:     "/snap-1.avro",
+               Summary:          &Summary{Operation: OpAppend},
+               SchemaID:         &schemaID,
+       }
+
+       err := builder.AddSnapshot(&snapshot)
+       require.ErrorIs(t, err, ErrInvalidRowLineage)
+       require.ErrorContains(t, err, "first-row-id is required for v3 
snapshots")
+}
+
+func TestAddSnapshotV3RejectsFirstRowIDBehindNextRowID(t *testing.T) {
+       builder := builderWithoutChanges(3)
+       schemaID := 0
+       firstRowID := int64(0)
+       addedRows := int64(100)
+
+       snapshot1 := Snapshot{
+               SnapshotID:       1,
+               ParentSnapshotID: nil,
+               SequenceNumber:   0,
+               TimestampMs:      builder.base.LastUpdatedMillis() + 1,
+               ManifestList:     "/snap-1.avro",
+               Summary:          &Summary{Operation: OpAppend},
+               SchemaID:         &schemaID,
+               FirstRowID:       &firstRowID,
+               AddedRows:        &addedRows,
+       }
+       require.NoError(t, builder.AddSnapshot(&snapshot1))
+
+       behindFirstRowID := int64(50)
+       snapshot2 := Snapshot{
+               SnapshotID:       2,
+               ParentSnapshotID: nil,
+               SequenceNumber:   1,
+               TimestampMs:      builder.base.LastUpdatedMillis() + 2,
+               ManifestList:     "/snap-2.avro",
+               Summary:          &Summary{Operation: OpAppend},
+               SchemaID:         &schemaID,
+               FirstRowID:       &behindFirstRowID,
+               AddedRows:        &addedRows,
+       }
+
+       err := builder.AddSnapshot(&snapshot2)
+       require.ErrorIs(t, err, ErrInvalidRowLineage)
+       require.ErrorContains(t, err, "first-row-id 50 is behind table 
next-row-id 100")
+}
+
+func TestAddSnapshotV3UpdatesNextRowID(t *testing.T) {
+       builder := builderWithoutChanges(3)
+       schemaID := 0
+
+       firstRowID1 := int64(0)
+       addedRows1 := int64(30)
+       snapshot1 := Snapshot{
+               SnapshotID:       1,
+               ParentSnapshotID: nil,
+               SequenceNumber:   0,
+               TimestampMs:      builder.base.LastUpdatedMillis() + 1,
+               ManifestList:     "/snap-1.avro",
+               Summary:          &Summary{Operation: OpAppend},
+               SchemaID:         &schemaID,
+               FirstRowID:       &firstRowID1,
+               AddedRows:        &addedRows1,
+       }
+       require.NoError(t, builder.AddSnapshot(&snapshot1))
+       require.Equal(t, int64(30), *builder.nextRowID)
+
+       firstRowID2 := int64(30)
+       addedRows2 := int64(28)
+       snapshot2 := Snapshot{
+               SnapshotID:       2,
+               ParentSnapshotID: nil,
+               SequenceNumber:   1,
+               TimestampMs:      builder.base.LastUpdatedMillis() + 2,
+               ManifestList:     "/snap-2.avro",
+               Summary:          &Summary{Operation: OpAppend},
+               SchemaID:         &schemaID,
+               FirstRowID:       &firstRowID2,
+               AddedRows:        &addedRows2,
+       }
+       require.NoError(t, builder.AddSnapshot(&snapshot2))
+       require.Equal(t, int64(58), *builder.nextRowID)
+
+       meta, err := builder.Build()
+       require.NoError(t, err)
+       require.Equal(t, int64(58), meta.NextRowID())
+}
+
+func TestAddSnapshotV3ValidatesNegativeFirstRowID(t *testing.T) {
+       builder := builderWithoutChanges(3)
+       schemaID := 0
+       negativeFirstRowID := int64(-1)
+       addedRows := int64(100)
+
+       snapshot := Snapshot{
+               SnapshotID:       1,
+               ParentSnapshotID: nil,
+               SequenceNumber:   0,
+               TimestampMs:      builder.base.LastUpdatedMillis() + 1,
+               ManifestList:     "/snap-1.avro",
+               Summary:          &Summary{Operation: OpAppend},
+               SchemaID:         &schemaID,
+               FirstRowID:       &negativeFirstRowID,
+               AddedRows:        &addedRows,
+       }
+
+       err := builder.AddSnapshot(&snapshot)
+       require.ErrorIs(t, err, ErrInvalidRowLineage)
+       require.ErrorContains(t, err, "first-row-id cannot be negative")
+}
+
+func TestAddSnapshotV3ValidatesNegativeAddedRows(t *testing.T) {
+       builder := builderWithoutChanges(3)
+       schemaID := 0
+       firstRowID := int64(0)
+       negativeAddedRows := int64(-1)
+
+       snapshot := Snapshot{
+               SnapshotID:       1,
+               ParentSnapshotID: nil,
+               SequenceNumber:   0,
+               TimestampMs:      builder.base.LastUpdatedMillis() + 1,
+               ManifestList:     "/snap-1.avro",
+               Summary:          &Summary{Operation: OpAppend},
+               SchemaID:         &schemaID,
+               FirstRowID:       &firstRowID,
+               AddedRows:        &negativeAddedRows,
+       }
+
+       err := builder.AddSnapshot(&snapshot)
+       require.ErrorIs(t, err, ErrInvalidRowLineage)
+       require.ErrorContains(t, err, "added-rows cannot be negative")
+}
+
+func TestAddSnapshotV3RequiresAddedRowsWhenFirstRowIDSet(t *testing.T) {
+       builder := builderWithoutChanges(3)
+       schemaID := 0
+       firstRowID := int64(0)
+
+       snapshot := Snapshot{
+               SnapshotID:       1,
+               ParentSnapshotID: nil,
+               SequenceNumber:   0,
+               TimestampMs:      builder.base.LastUpdatedMillis() + 1,
+               ManifestList:     "/snap-1.avro",
+               Summary:          &Summary{Operation: OpAppend},
+               SchemaID:         &schemaID,
+               FirstRowID:       &firstRowID,
+               AddedRows:        nil,
+       }
+
+       err := builder.AddSnapshot(&snapshot)
+       require.ErrorIs(t, err, ErrInvalidRowLineage)
+       require.ErrorContains(t, err, "added-rows is required when first-row-id 
is set")
+}
+
+func TestAddSnapshotV2DoesNotRequireRowLineage(t *testing.T) {
+       builder := builderWithoutChanges(2)
+       schemaID := 0
+       snapshot := Snapshot{
+               SnapshotID:       1,
+               ParentSnapshotID: nil,
+               SequenceNumber:   0,
+               TimestampMs:      builder.base.LastUpdatedMillis() + 1,
+               ManifestList:     "/snap-1.avro",
+               Summary:          &Summary{Operation: OpAppend},
+               SchemaID:         &schemaID,
+       }
+
+       err := builder.AddSnapshot(&snapshot)
+       require.NoError(t, err)
+}
+
+func TestAddSnapshotV3AcceptsFirstRowIDEqualToNextRowID(t *testing.T) {
+       builder := builderWithoutChanges(3)
+       schemaID := 0
+       firstRowID := int64(0)
+       addedRows := int64(100)
+
+       snapshot := Snapshot{
+               SnapshotID:       1,
+               ParentSnapshotID: nil,
+               SequenceNumber:   0,
+               TimestampMs:      builder.base.LastUpdatedMillis() + 1,
+               ManifestList:     "/snap-1.avro",
+               Summary:          &Summary{Operation: OpAppend},
+               SchemaID:         &schemaID,
+               FirstRowID:       &firstRowID,
+               AddedRows:        &addedRows,
+       }
+
+       err := builder.AddSnapshot(&snapshot)
+       require.NoError(t, err)
+       require.Equal(t, int64(100), *builder.nextRowID)
+}
+
 func generateTypeSchema(typ iceberg.Type) *iceberg.Schema {
        sc := iceberg.NewSchema(0,
                iceberg.NestedField{
diff --git a/table/snapshots.go b/table/snapshots.go
index 6c14c100..fbdd47de 100644
--- a/table/snapshots.go
+++ b/table/snapshots.go
@@ -42,8 +42,9 @@ const (
 )
 
 var (
-       ErrInvalidOperation = errors.New("invalid operation value")
-       ErrMissingOperation = errors.New("missing operation key")
+       ErrInvalidOperation  = errors.New("invalid operation value")
+       ErrMissingOperation  = errors.New("missing operation key")
+       ErrInvalidRowLineage = errors.New("invalid row lineage")
 )
 
 // ValidOperation ensures that a given string is one of the valid operation
@@ -255,6 +256,7 @@ type Snapshot struct {
        Summary          *Summary `json:"summary,omitempty"`
        SchemaID         *int     `json:"schema-id,omitempty"`
        FirstRowID       *int64   `json:"first-row-id,omitempty"` // V3: 
Starting row ID for this snapshot
+       AddedRows        *int64   `json:"added-rows,omitempty"`   // V3: Number 
of rows added by this snapshot
 }
 
 func (s Snapshot) String() string {
@@ -287,6 +289,10 @@ func (s Snapshot) Equals(other Snapshot) bool {
        case s.FirstRowID == nil && other.FirstRowID != nil:
                fallthrough
        case s.FirstRowID != nil && other.FirstRowID == nil:
+               fallthrough
+       case s.AddedRows == nil && other.AddedRows != nil:
+               fallthrough
+       case s.AddedRows != nil && other.AddedRows == nil:
                return false
        }
 
@@ -294,12 +300,27 @@ func (s Snapshot) Equals(other Snapshot) bool {
                ((s.ParentSnapshotID == other.ParentSnapshotID) || 
(*s.ParentSnapshotID == *other.ParentSnapshotID)) &&
                ((s.SchemaID == other.SchemaID) || (*s.SchemaID == 
*other.SchemaID)) &&
                ((s.FirstRowID == other.FirstRowID) || (*s.FirstRowID == 
*other.FirstRowID)) &&
+               ((s.AddedRows == other.AddedRows) || (*s.AddedRows == 
*other.AddedRows)) &&
                s.SequenceNumber == other.SequenceNumber &&
                s.TimestampMs == other.TimestampMs &&
                s.ManifestList == other.ManifestList &&
                s.Summary.Equals(other.Summary)
 }
 
+func (s Snapshot) ValidateRowLineage() error {
+       if s.FirstRowID != nil && s.AddedRows == nil {
+               return fmt.Errorf("%w: added-rows is required when first-row-id 
is set", ErrInvalidRowLineage)
+       }
+       if s.AddedRows != nil && *s.AddedRows < 0 {
+               return fmt.Errorf("%w: added-rows cannot be negative: %d", 
ErrInvalidRowLineage, *s.AddedRows)
+       }
+       if s.FirstRowID != nil && *s.FirstRowID < 0 {
+               return fmt.Errorf("%w: first-row-id cannot be negative: %d", 
ErrInvalidRowLineage, *s.FirstRowID)
+       }
+
+       return nil
+}
+
 func (s Snapshot) Manifests(fio iceio.IO) (_ []iceberg.ManifestFile, err 
error) {
        if s.ManifestList != "" {
                f, err := fio.Open(s.ManifestList)
diff --git a/table/snapshots_test.go b/table/snapshots_test.go
index 5be91b22..0e4d3b7e 100644
--- a/table/snapshots_test.go
+++ b/table/snapshots_test.go
@@ -115,3 +115,131 @@ func TestSnapshotString(t *testing.T) {
        assert.Equal(t, `append, {"foo":"bar"}: id=25, parent_id=19, 
schema_id=3, sequence_number=200, timestamp_ms=1602638573590, 
manifest_list=s3:/a/b/c.avro`,
                snapshot.String())
 }
+
+func TestSerializeSnapshotWithRowLineage(t *testing.T) {
+       parentID := int64(19)
+       manifest, schemaid := "s3:/a/b/c.avro", 3
+       firstRowID := int64(0)
+       addedRows := int64(100)
+
+       snapshot := table.Snapshot{
+               SnapshotID:       25,
+               ParentSnapshotID: &parentID,
+               SequenceNumber:   200,
+               TimestampMs:      1602638573590,
+               ManifestList:     manifest,
+               SchemaID:         &schemaid,
+               FirstRowID:       &firstRowID,
+               AddedRows:        &addedRows,
+               Summary: &table.Summary{
+                       Operation: table.OpAppend,
+               },
+       }
+
+       data, err := json.Marshal(snapshot)
+       require.NoError(t, err)
+
+       assert.JSONEq(t, `{
+               "snapshot-id": 25,
+               "parent-snapshot-id": 19,
+               "sequence-number": 200,
+               "timestamp-ms": 1602638573590,
+               "manifest-list": "s3:/a/b/c.avro",
+               "summary": {"operation": "append"},
+               "schema-id": 3,
+               "first-row-id": 0,
+               "added-rows": 100
+       }`, string(data))
+}
+
+func TestDeserializeSnapshotWithRowLineage(t *testing.T) {
+       jsonData := `{
+               "snapshot-id": 25,
+               "parent-snapshot-id": 19,
+               "sequence-number": 200,
+               "timestamp-ms": 1602638573590,
+               "manifest-list": "s3:/a/b/c.avro",
+               "summary": {"operation": "append"},
+               "schema-id": 3,
+               "first-row-id": 0,
+               "added-rows": 100
+       }`
+
+       var snapshot table.Snapshot
+       err := json.Unmarshal([]byte(jsonData), &snapshot)
+       require.NoError(t, err)
+
+       assert.Equal(t, int64(25), snapshot.SnapshotID)
+       require.NotNil(t, snapshot.FirstRowID)
+       assert.Equal(t, int64(0), *snapshot.FirstRowID)
+       require.NotNil(t, snapshot.AddedRows)
+       assert.Equal(t, int64(100), *snapshot.AddedRows)
+}
+
+func TestValidateRowLineage(t *testing.T) {
+       tests := []struct {
+               name       string
+               firstRowID *int64
+               addedRows  *int64
+               wantErr    string
+       }{
+               {
+                       name:       "valid: both nil",
+                       firstRowID: nil,
+                       addedRows:  nil,
+                       wantErr:    "",
+               },
+               {
+                       name:       "valid: both set",
+                       firstRowID: ptr(int64(0)),
+                       addedRows:  ptr(int64(100)),
+                       wantErr:    "",
+               },
+               {
+                       name:       "valid: zero added rows",
+                       firstRowID: ptr(int64(30)),
+                       addedRows:  ptr(int64(0)),
+                       wantErr:    "",
+               },
+               {
+                       name:       "invalid: first-row-id set but added-rows 
nil",
+                       firstRowID: ptr(int64(0)),
+                       addedRows:  nil,
+                       wantErr:    "added-rows is required when first-row-id 
is set",
+               },
+               {
+                       name:       "invalid: negative added-rows",
+                       firstRowID: ptr(int64(0)),
+                       addedRows:  ptr(int64(-1)),
+                       wantErr:    "added-rows cannot be negative: -1",
+               },
+               {
+                       name:       "invalid: negative first-row-id",
+                       firstRowID: ptr(int64(-1)),
+                       addedRows:  ptr(int64(100)),
+                       wantErr:    "first-row-id cannot be negative: -1",
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       snapshot := table.Snapshot{
+                               SnapshotID: 1,
+                               FirstRowID: tt.firstRowID,
+                               AddedRows:  tt.addedRows,
+                       }
+
+                       err := snapshot.ValidateRowLineage()
+                       if tt.wantErr == "" {
+                               assert.NoError(t, err)
+                       } else {
+                               assert.ErrorIs(t, err, 
table.ErrInvalidRowLineage)
+                               assert.ErrorContains(t, err, tt.wantErr)
+                       }
+               })
+       }
+}
+
+func ptr[T any](v T) *T {
+       return &v
+}

Reply via email to