This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new bbdb53f1 feat(metadata): row lineage tracking (#659)
bbdb53f1 is described below
commit bbdb53f1fa1b4aceb86cb0bc7c8bb78d3ebb990c
Author: Tobias Pütz <[email protected]>
AuthorDate: Tue Dec 23 16:56:09 2025 +0100
feat(metadata): row lineage tracking (#659)
this implements only the metadata side of things, no support read or
writing tables with row lineage
---
table/metadata.go | 48 +++++++-
table/metadata_builder_internal_test.go | 204 ++++++++++++++++++++++++++++++++
table/snapshots.go | 25 +++-
table/snapshots_test.go | 128 ++++++++++++++++++++
4 files changed, 400 insertions(+), 5 deletions(-)
diff --git a/table/metadata.go b/table/metadata.go
index d47d27ec..5ddedf8d 100644
--- a/table/metadata.go
+++ b/table/metadata.go
@@ -36,9 +36,11 @@ import (
)
const (
- partitionFieldStartID = 1000
- supportedTableFormatVersion = 3
- oneMinuteInMs int64 = 60_000
+ partitionFieldStartID = 1000
+ supportedTableFormatVersion = 3
+ minFormatVersionRowLineage = 3
+ initialRowID = int64(0)
+ oneMinuteInMs = int64(60_000)
)
func generateSnapshotID() int64 {
@@ -427,6 +429,10 @@ func (b *MetadataBuilder) AddSnapshot(snapshot *Snapshot)
error {
snapshot.TimestampMs, maxTS)
}
+ if err := b.validateAndUpdateRowLineage(snapshot); err != nil {
+ return err
+ }
+
b.updates = append(b.updates, NewAddSnapshotUpdate(snapshot))
b.lastUpdatedMS = snapshot.TimestampMs
b.lastSequenceNumber = &snapshot.SequenceNumber
@@ -435,6 +441,42 @@ func (b *MetadataBuilder) AddSnapshot(snapshot *Snapshot)
error {
return nil
}
+func (b *MetadataBuilder) validateAndUpdateRowLineage(snapshot *Snapshot)
error {
+ if b.formatVersion < minFormatVersionRowLineage {
+ return nil
+ }
+
+ if err := snapshot.ValidateRowLineage(); err != nil {
+ return err
+ }
+
+ if snapshot.FirstRowID == nil {
+ return fmt.Errorf("%w: first-row-id is required for v3
snapshots", ErrInvalidRowLineage)
+ }
+
+ nextRowID := b.currentNextRowID()
+ if *snapshot.FirstRowID < nextRowID {
+ return fmt.Errorf("%w: first-row-id %d is behind table
next-row-id %d",
+ ErrInvalidRowLineage, *snapshot.FirstRowID, nextRowID)
+ }
+
+ newNextRowID := nextRowID + *snapshot.AddedRows
+ b.nextRowID = &newNextRowID
+
+ return nil
+}
+
+func (b *MetadataBuilder) currentNextRowID() int64 {
+ if b.nextRowID != nil {
+ return *b.nextRowID
+ }
+ if b.base != nil {
+ return b.base.NextRowID()
+ }
+
+ return initialRowID
+}
+
func (b *MetadataBuilder) RemoveSnapshots(snapshotIds []int64) error {
if b.currentSnapshotID != nil && slices.Contains(snapshotIds,
*b.currentSnapshotID) {
return errors.New("current snapshot cannot be removed")
diff --git a/table/metadata_builder_internal_test.go
b/table/metadata_builder_internal_test.go
index 3a046953..8fe67123 100644
--- a/table/metadata_builder_internal_test.go
+++ b/table/metadata_builder_internal_test.go
@@ -1337,6 +1337,210 @@ func TestUnsupportedTypes(t *testing.T) {
}
}
+func TestAddSnapshotV3RequiresRowLineage(t *testing.T) {
+ builder := builderWithoutChanges(3)
+ schemaID := 0
+ snapshot := Snapshot{
+ SnapshotID: 1,
+ ParentSnapshotID: nil,
+ SequenceNumber: 0,
+ TimestampMs: builder.base.LastUpdatedMillis() + 1,
+ ManifestList: "/snap-1.avro",
+ Summary: &Summary{Operation: OpAppend},
+ SchemaID: &schemaID,
+ }
+
+ err := builder.AddSnapshot(&snapshot)
+ require.ErrorIs(t, err, ErrInvalidRowLineage)
+ require.ErrorContains(t, err, "first-row-id is required for v3
snapshots")
+}
+
+func TestAddSnapshotV3RejectsFirstRowIDBehindNextRowID(t *testing.T) {
+ builder := builderWithoutChanges(3)
+ schemaID := 0
+ firstRowID := int64(0)
+ addedRows := int64(100)
+
+ snapshot1 := Snapshot{
+ SnapshotID: 1,
+ ParentSnapshotID: nil,
+ SequenceNumber: 0,
+ TimestampMs: builder.base.LastUpdatedMillis() + 1,
+ ManifestList: "/snap-1.avro",
+ Summary: &Summary{Operation: OpAppend},
+ SchemaID: &schemaID,
+ FirstRowID: &firstRowID,
+ AddedRows: &addedRows,
+ }
+ require.NoError(t, builder.AddSnapshot(&snapshot1))
+
+ behindFirstRowID := int64(50)
+ snapshot2 := Snapshot{
+ SnapshotID: 2,
+ ParentSnapshotID: nil,
+ SequenceNumber: 1,
+ TimestampMs: builder.base.LastUpdatedMillis() + 2,
+ ManifestList: "/snap-2.avro",
+ Summary: &Summary{Operation: OpAppend},
+ SchemaID: &schemaID,
+ FirstRowID: &behindFirstRowID,
+ AddedRows: &addedRows,
+ }
+
+ err := builder.AddSnapshot(&snapshot2)
+ require.ErrorIs(t, err, ErrInvalidRowLineage)
+ require.ErrorContains(t, err, "first-row-id 50 is behind table
next-row-id 100")
+}
+
+func TestAddSnapshotV3UpdatesNextRowID(t *testing.T) {
+ builder := builderWithoutChanges(3)
+ schemaID := 0
+
+ firstRowID1 := int64(0)
+ addedRows1 := int64(30)
+ snapshot1 := Snapshot{
+ SnapshotID: 1,
+ ParentSnapshotID: nil,
+ SequenceNumber: 0,
+ TimestampMs: builder.base.LastUpdatedMillis() + 1,
+ ManifestList: "/snap-1.avro",
+ Summary: &Summary{Operation: OpAppend},
+ SchemaID: &schemaID,
+ FirstRowID: &firstRowID1,
+ AddedRows: &addedRows1,
+ }
+ require.NoError(t, builder.AddSnapshot(&snapshot1))
+ require.Equal(t, int64(30), *builder.nextRowID)
+
+ firstRowID2 := int64(30)
+ addedRows2 := int64(28)
+ snapshot2 := Snapshot{
+ SnapshotID: 2,
+ ParentSnapshotID: nil,
+ SequenceNumber: 1,
+ TimestampMs: builder.base.LastUpdatedMillis() + 2,
+ ManifestList: "/snap-2.avro",
+ Summary: &Summary{Operation: OpAppend},
+ SchemaID: &schemaID,
+ FirstRowID: &firstRowID2,
+ AddedRows: &addedRows2,
+ }
+ require.NoError(t, builder.AddSnapshot(&snapshot2))
+ require.Equal(t, int64(58), *builder.nextRowID)
+
+ meta, err := builder.Build()
+ require.NoError(t, err)
+ require.Equal(t, int64(58), meta.NextRowID())
+}
+
+func TestAddSnapshotV3ValidatesNegativeFirstRowID(t *testing.T) {
+ builder := builderWithoutChanges(3)
+ schemaID := 0
+ negativeFirstRowID := int64(-1)
+ addedRows := int64(100)
+
+ snapshot := Snapshot{
+ SnapshotID: 1,
+ ParentSnapshotID: nil,
+ SequenceNumber: 0,
+ TimestampMs: builder.base.LastUpdatedMillis() + 1,
+ ManifestList: "/snap-1.avro",
+ Summary: &Summary{Operation: OpAppend},
+ SchemaID: &schemaID,
+ FirstRowID: &negativeFirstRowID,
+ AddedRows: &addedRows,
+ }
+
+ err := builder.AddSnapshot(&snapshot)
+ require.ErrorIs(t, err, ErrInvalidRowLineage)
+ require.ErrorContains(t, err, "first-row-id cannot be negative")
+}
+
+func TestAddSnapshotV3ValidatesNegativeAddedRows(t *testing.T) {
+ builder := builderWithoutChanges(3)
+ schemaID := 0
+ firstRowID := int64(0)
+ negativeAddedRows := int64(-1)
+
+ snapshot := Snapshot{
+ SnapshotID: 1,
+ ParentSnapshotID: nil,
+ SequenceNumber: 0,
+ TimestampMs: builder.base.LastUpdatedMillis() + 1,
+ ManifestList: "/snap-1.avro",
+ Summary: &Summary{Operation: OpAppend},
+ SchemaID: &schemaID,
+ FirstRowID: &firstRowID,
+ AddedRows: &negativeAddedRows,
+ }
+
+ err := builder.AddSnapshot(&snapshot)
+ require.ErrorIs(t, err, ErrInvalidRowLineage)
+ require.ErrorContains(t, err, "added-rows cannot be negative")
+}
+
+func TestAddSnapshotV3RequiresAddedRowsWhenFirstRowIDSet(t *testing.T) {
+ builder := builderWithoutChanges(3)
+ schemaID := 0
+ firstRowID := int64(0)
+
+ snapshot := Snapshot{
+ SnapshotID: 1,
+ ParentSnapshotID: nil,
+ SequenceNumber: 0,
+ TimestampMs: builder.base.LastUpdatedMillis() + 1,
+ ManifestList: "/snap-1.avro",
+ Summary: &Summary{Operation: OpAppend},
+ SchemaID: &schemaID,
+ FirstRowID: &firstRowID,
+ AddedRows: nil,
+ }
+
+ err := builder.AddSnapshot(&snapshot)
+ require.ErrorIs(t, err, ErrInvalidRowLineage)
+ require.ErrorContains(t, err, "added-rows is required when first-row-id
is set")
+}
+
+func TestAddSnapshotV2DoesNotRequireRowLineage(t *testing.T) {
+ builder := builderWithoutChanges(2)
+ schemaID := 0
+ snapshot := Snapshot{
+ SnapshotID: 1,
+ ParentSnapshotID: nil,
+ SequenceNumber: 0,
+ TimestampMs: builder.base.LastUpdatedMillis() + 1,
+ ManifestList: "/snap-1.avro",
+ Summary: &Summary{Operation: OpAppend},
+ SchemaID: &schemaID,
+ }
+
+ err := builder.AddSnapshot(&snapshot)
+ require.NoError(t, err)
+}
+
+func TestAddSnapshotV3AcceptsFirstRowIDEqualToNextRowID(t *testing.T) {
+ builder := builderWithoutChanges(3)
+ schemaID := 0
+ firstRowID := int64(0)
+ addedRows := int64(100)
+
+ snapshot := Snapshot{
+ SnapshotID: 1,
+ ParentSnapshotID: nil,
+ SequenceNumber: 0,
+ TimestampMs: builder.base.LastUpdatedMillis() + 1,
+ ManifestList: "/snap-1.avro",
+ Summary: &Summary{Operation: OpAppend},
+ SchemaID: &schemaID,
+ FirstRowID: &firstRowID,
+ AddedRows: &addedRows,
+ }
+
+ err := builder.AddSnapshot(&snapshot)
+ require.NoError(t, err)
+ require.Equal(t, int64(100), *builder.nextRowID)
+}
+
func generateTypeSchema(typ iceberg.Type) *iceberg.Schema {
sc := iceberg.NewSchema(0,
iceberg.NestedField{
diff --git a/table/snapshots.go b/table/snapshots.go
index 6c14c100..fbdd47de 100644
--- a/table/snapshots.go
+++ b/table/snapshots.go
@@ -42,8 +42,9 @@ const (
)
var (
- ErrInvalidOperation = errors.New("invalid operation value")
- ErrMissingOperation = errors.New("missing operation key")
+ ErrInvalidOperation = errors.New("invalid operation value")
+ ErrMissingOperation = errors.New("missing operation key")
+ ErrInvalidRowLineage = errors.New("invalid row lineage")
)
// ValidOperation ensures that a given string is one of the valid operation
@@ -255,6 +256,7 @@ type Snapshot struct {
Summary *Summary `json:"summary,omitempty"`
SchemaID *int `json:"schema-id,omitempty"`
FirstRowID *int64 `json:"first-row-id,omitempty"` // V3:
Starting row ID for this snapshot
+ AddedRows *int64 `json:"added-rows,omitempty"` // V3: Number
of rows added by this snapshot
}
func (s Snapshot) String() string {
@@ -287,6 +289,10 @@ func (s Snapshot) Equals(other Snapshot) bool {
case s.FirstRowID == nil && other.FirstRowID != nil:
fallthrough
case s.FirstRowID != nil && other.FirstRowID == nil:
+ fallthrough
+ case s.AddedRows == nil && other.AddedRows != nil:
+ fallthrough
+ case s.AddedRows != nil && other.AddedRows == nil:
return false
}
@@ -294,12 +300,27 @@ func (s Snapshot) Equals(other Snapshot) bool {
((s.ParentSnapshotID == other.ParentSnapshotID) ||
(*s.ParentSnapshotID == *other.ParentSnapshotID)) &&
((s.SchemaID == other.SchemaID) || (*s.SchemaID ==
*other.SchemaID)) &&
((s.FirstRowID == other.FirstRowID) || (*s.FirstRowID ==
*other.FirstRowID)) &&
+ ((s.AddedRows == other.AddedRows) || (*s.AddedRows ==
*other.AddedRows)) &&
s.SequenceNumber == other.SequenceNumber &&
s.TimestampMs == other.TimestampMs &&
s.ManifestList == other.ManifestList &&
s.Summary.Equals(other.Summary)
}
+func (s Snapshot) ValidateRowLineage() error {
+ if s.FirstRowID != nil && s.AddedRows == nil {
+ return fmt.Errorf("%w: added-rows is required when first-row-id
is set", ErrInvalidRowLineage)
+ }
+ if s.AddedRows != nil && *s.AddedRows < 0 {
+ return fmt.Errorf("%w: added-rows cannot be negative: %d",
ErrInvalidRowLineage, *s.AddedRows)
+ }
+ if s.FirstRowID != nil && *s.FirstRowID < 0 {
+ return fmt.Errorf("%w: first-row-id cannot be negative: %d",
ErrInvalidRowLineage, *s.FirstRowID)
+ }
+
+ return nil
+}
+
func (s Snapshot) Manifests(fio iceio.IO) (_ []iceberg.ManifestFile, err
error) {
if s.ManifestList != "" {
f, err := fio.Open(s.ManifestList)
diff --git a/table/snapshots_test.go b/table/snapshots_test.go
index 5be91b22..0e4d3b7e 100644
--- a/table/snapshots_test.go
+++ b/table/snapshots_test.go
@@ -115,3 +115,131 @@ func TestSnapshotString(t *testing.T) {
assert.Equal(t, `append, {"foo":"bar"}: id=25, parent_id=19,
schema_id=3, sequence_number=200, timestamp_ms=1602638573590,
manifest_list=s3:/a/b/c.avro`,
snapshot.String())
}
+
+func TestSerializeSnapshotWithRowLineage(t *testing.T) {
+ parentID := int64(19)
+ manifest, schemaid := "s3:/a/b/c.avro", 3
+ firstRowID := int64(0)
+ addedRows := int64(100)
+
+ snapshot := table.Snapshot{
+ SnapshotID: 25,
+ ParentSnapshotID: &parentID,
+ SequenceNumber: 200,
+ TimestampMs: 1602638573590,
+ ManifestList: manifest,
+ SchemaID: &schemaid,
+ FirstRowID: &firstRowID,
+ AddedRows: &addedRows,
+ Summary: &table.Summary{
+ Operation: table.OpAppend,
+ },
+ }
+
+ data, err := json.Marshal(snapshot)
+ require.NoError(t, err)
+
+ assert.JSONEq(t, `{
+ "snapshot-id": 25,
+ "parent-snapshot-id": 19,
+ "sequence-number": 200,
+ "timestamp-ms": 1602638573590,
+ "manifest-list": "s3:/a/b/c.avro",
+ "summary": {"operation": "append"},
+ "schema-id": 3,
+ "first-row-id": 0,
+ "added-rows": 100
+ }`, string(data))
+}
+
+func TestDeserializeSnapshotWithRowLineage(t *testing.T) {
+ jsonData := `{
+ "snapshot-id": 25,
+ "parent-snapshot-id": 19,
+ "sequence-number": 200,
+ "timestamp-ms": 1602638573590,
+ "manifest-list": "s3:/a/b/c.avro",
+ "summary": {"operation": "append"},
+ "schema-id": 3,
+ "first-row-id": 0,
+ "added-rows": 100
+ }`
+
+ var snapshot table.Snapshot
+ err := json.Unmarshal([]byte(jsonData), &snapshot)
+ require.NoError(t, err)
+
+ assert.Equal(t, int64(25), snapshot.SnapshotID)
+ require.NotNil(t, snapshot.FirstRowID)
+ assert.Equal(t, int64(0), *snapshot.FirstRowID)
+ require.NotNil(t, snapshot.AddedRows)
+ assert.Equal(t, int64(100), *snapshot.AddedRows)
+}
+
+func TestValidateRowLineage(t *testing.T) {
+ tests := []struct {
+ name string
+ firstRowID *int64
+ addedRows *int64
+ wantErr string
+ }{
+ {
+ name: "valid: both nil",
+ firstRowID: nil,
+ addedRows: nil,
+ wantErr: "",
+ },
+ {
+ name: "valid: both set",
+ firstRowID: ptr(int64(0)),
+ addedRows: ptr(int64(100)),
+ wantErr: "",
+ },
+ {
+ name: "valid: zero added rows",
+ firstRowID: ptr(int64(30)),
+ addedRows: ptr(int64(0)),
+ wantErr: "",
+ },
+ {
+ name: "invalid: first-row-id set but added-rows
nil",
+ firstRowID: ptr(int64(0)),
+ addedRows: nil,
+ wantErr: "added-rows is required when first-row-id
is set",
+ },
+ {
+ name: "invalid: negative added-rows",
+ firstRowID: ptr(int64(0)),
+ addedRows: ptr(int64(-1)),
+ wantErr: "added-rows cannot be negative: -1",
+ },
+ {
+ name: "invalid: negative first-row-id",
+ firstRowID: ptr(int64(-1)),
+ addedRows: ptr(int64(100)),
+ wantErr: "first-row-id cannot be negative: -1",
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ snapshot := table.Snapshot{
+ SnapshotID: 1,
+ FirstRowID: tt.firstRowID,
+ AddedRows: tt.addedRows,
+ }
+
+ err := snapshot.ValidateRowLineage()
+ if tt.wantErr == "" {
+ assert.NoError(t, err)
+ } else {
+ assert.ErrorIs(t, err,
table.ErrInvalidRowLineage)
+ assert.ErrorContains(t, err, tt.wantErr)
+ }
+ })
+ }
+}
+
+func ptr[T any](v T) *T {
+ return &v
+}