This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new 3a7fe957 feat(table): Table Metadata V3 support (#585)
3a7fe957 is described below
commit 3a7fe957b0ff7a1366c77cd8a38172872df3f865
Author: Dao Thanh Tung <[email protected]>
AuthorDate: Tue Oct 21 16:30:28 2025 +0100
feat(table): Table Metadata V3 support (#585)
List of changes implemented
- Add `NextRowID` field to `commonMetadata`
- Add `FirstRowID` field to `Snapshot` struct
- Add v3 metadata parsing/ validation
---------
Signed-off-by: dttung2905 <[email protected]>
---
table/metadata.go | 143 +++++++++++++++++++++++++++++++--
table/metadata_internal_test.go | 170 ++++++++++++++++++++++++++++++++++++++++
table/snapshots.go | 6 ++
3 files changed, 314 insertions(+), 5 deletions(-)
diff --git a/table/metadata.go b/table/metadata.go
index 12374bde..f116525e 100644
--- a/table/metadata.go
+++ b/table/metadata.go
@@ -37,7 +37,7 @@ import (
const (
partitionFieldStartID = 1000
- supportedTableFormatVersion = 2
+ supportedTableFormatVersion = 3
oneMinuteInMs int64 = 60_000
)
@@ -136,6 +136,9 @@ type Metadata interface {
NameMapping() iceberg.NameMapping
LastSequenceNumber() int64
+ // NextRowID returns the next available row ID for v3 tables.
+ // Returns 0 for v1/v2 tables or if not set.
+ NextRowID() int64
// Statistics returns an optional list of table statistics.
// Table statistics files are valid Puffin files.
// StatisticsFile are informational. A reader can choose to ignore
statistics information.
@@ -178,6 +181,8 @@ type MetadataBuilder struct {
previousFileEntry *MetadataLogEntry
// >v1 specific
lastSequenceNumber *int64
+ // >v2 specific (V3)
+ nextRowID *int64
// update tracking
lastAddedSchemaID *int
lastAddedPartitionID *int
@@ -235,6 +240,11 @@ func MetadataBuilderFromBase(metadata Metadata,
currentFileLocation string) (*Me
b.lastSequenceNumber = &seq
}
+ if metadata.Version() >= 3 {
+ nextRowID := metadata.NextRowID()
+ b.nextRowID = &nextRowID
+ }
+
if metadata.CurrentSnapshot() != nil {
b.currentSnapshotID = &metadata.CurrentSnapshot().SnapshotID
}
@@ -961,6 +971,24 @@ func (b *MetadataBuilder) Build() (Metadata, error) {
commonMetadata: *common,
}, nil
+ case 3:
+ var lastSequenceNumber int64
+ var nextRowID int64
+
+ if b.lastSequenceNumber != nil {
+ lastSequenceNumber = *b.lastSequenceNumber
+ }
+
+ if b.nextRowID != nil {
+ nextRowID = *b.nextRowID
+ }
+
+ return &metadataV3{
+ LastSeqNum: lastSequenceNumber,
+ NextRowIDValue: nextRowID,
+ commonMetadata: *common,
+ }, nil
+
default:
return nil, fmt.Errorf("%w: unknown format version %d",
ErrInvalidMetadata, b.formatVersion)
}
@@ -1112,6 +1140,8 @@ func ParseMetadataBytes(b []byte) (Metadata, error) {
ret = initMetadataV1Deser()
case 2:
ret = initMetadataV2Deser()
+ case 3:
+ ret = initMetadataV3Deser()
default:
return nil, ErrInvalidMetadataFormatVersion
}
@@ -1147,6 +1177,10 @@ type commonMetadata struct {
SnapshotRefs map[string]SnapshotRef `json:"refs,omitempty"`
StatisticsList []StatisticsFile
`json:"statistics,omitempty"`
PartitionStatsList []PartitionStatisticsFile
`json:"partition-statistics,omitempty"`
+ // V2+ fields
+ LastSequenceNumber *int64 `json:"last-sequence-number,omitempty"`
+ // V3+ fields
+ NextRowID *int64 `json:"next-row-id,omitempty"` // V3: Next available
row ID
}
func initCommonMetadataForDeserialization() commonMetadata {
@@ -1545,6 +1579,7 @@ func initMetadataV1Deser() *metadataV1 {
}
func (m *metadataV1) LastSequenceNumber() int64 { return 0 }
+func (m *metadataV1) NextRowID() int64 { return 0 } // V1 doesn't
support row lineage
func (m *metadataV1) Equals(other Metadata) bool {
rhs, ok := other.(*metadataV1)
@@ -1644,6 +1679,7 @@ func initMetadataV2Deser() *metadataV2 {
}
func (m *metadataV2) LastSequenceNumber() int64 { return m.LastSeqNum }
+func (m *metadataV2) NextRowID() int64 { return 0 } // V2 doesn't
support row lineage
func (m *metadataV2) Equals(other Metadata) bool {
rhs, ok := other.(*metadataV2)
@@ -1676,7 +1712,7 @@ func (m *metadataV2) UnmarshalJSON(b []byte) error {
}
func (m *metadataV2) validate() error {
- if err := m.checkLastSequenceNumber(); err != nil {
+ if err := checkLastSequenceNumber(m, m.SnapshotList); err != nil {
return err
}
@@ -1687,11 +1723,108 @@ func (m *metadataV2) validate() error {
return nil
}
-func (m *metadataV2) checkLastSequenceNumber() error {
+// metadataV3 represents table metadata for format version 3
+type metadataV3 struct {
+ LastSeqNum int64 `json:"last-sequence-number"`
+ NextRowIDValue int64 `json:"next-row-id"`
+
+ commonMetadata
+}
+
+func initMetadataV3Deser() *metadataV3 {
+ return &metadataV3{
+ LastSeqNum: -1,
+ NextRowIDValue: -1,
+ commonMetadata: initCommonMetadataForDeserialization(),
+ }
+}
+
+func (m *metadataV3) LastSequenceNumber() int64 { return m.LastSeqNum }
+func (m *metadataV3) NextRowID() int64 {
+ if m.NextRowIDValue == -1 {
+ return 0 // For v1/v2 compatibility when field is missing
+ }
+
+ return max(0, m.NextRowIDValue)
+}
+
+func (m *metadataV3) Equals(other Metadata) bool {
+ rhs, ok := other.(*metadataV3)
+ if !ok {
+ return false
+ }
+
+ if m == rhs {
+ return true
+ }
+
+ return m.LastSeqNum == rhs.LastSeqNum &&
+ m.NextRowIDValue == rhs.NextRowIDValue &&
+ m.commonMetadata.Equals(&rhs.commonMetadata)
+}
+
+func (m *metadataV3) UnmarshalJSON(b []byte) error {
+ type Alias metadataV3
+ aux := (*Alias)(m)
+
+ // Set LastColumnId to -1 to indicate that it is not set as
LastColumnId = 0 is a valid value for when no schema is present
+ aux.LastColumnId = -1
+
+ if err := json.Unmarshal(b, aux); err != nil {
+ return err
+ }
+
+ m.preValidate()
+
+ return m.validate()
+}
+
+func (m *metadataV3) validate() error {
+ if err := checkLastSequenceNumber(m, m.SnapshotList); err != nil {
+ return err
+ }
+
+ if err := m.checkNextRowID(); err != nil {
+ return err
+ }
+
+ if err := m.commonMetadata.validate(); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (m *metadataV3) checkNextRowID() error {
+ if m.NextRowIDValue < 0 {
+ if m.NextRowIDValue == -1 {
+ return fmt.Errorf("%w: next-row-id is required for v3
tables", ErrInvalidMetadata)
+ }
+
+ return fmt.Errorf("%w: next-row-id must be non-negative, got
%d", ErrInvalidMetadata, m.NextRowIDValue)
+ }
+
for _, snap := range m.SnapshotList {
- if snap.SequenceNumber > m.LastSequenceNumber() {
+ if snap.FirstRowID != nil && *snap.FirstRowID < 0 {
+ return fmt.Errorf("%w: snapshot %d has invalid
first-row-id %d",
+ ErrInvalidMetadata, snap.SnapshotID,
*snap.FirstRowID)
+ }
+ }
+
+ return nil
+}
+
+// SequenceNumberValidator defines an interface for types that can validate
sequence numbers
+type SequenceNumberValidator interface {
+ LastSequenceNumber() int64
+}
+
+// checkLastSequenceNumber validates that all snapshots have sequence numbers
<= the last sequence number
+func checkLastSequenceNumber(validator SequenceNumberValidator, snapshotList
[]Snapshot) error {
+ for _, snap := range snapshotList {
+ if snap.SequenceNumber > validator.LastSequenceNumber() {
return fmt.Errorf("%w: snapshot %d has sequence number
%d which is greater than last-sequence-number %d",
- ErrInvalidMetadata, snap.SnapshotID,
snap.SequenceNumber, m.LastSequenceNumber())
+ ErrInvalidMetadata, snap.SnapshotID,
snap.SequenceNumber, validator.LastSequenceNumber())
}
}
diff --git a/table/metadata_internal_test.go b/table/metadata_internal_test.go
index 3401780f..5dd27098 100644
--- a/table/metadata_internal_test.go
+++ b/table/metadata_internal_test.go
@@ -95,6 +95,71 @@ const ExampleTableMetadataV2 = `{
"refs": {"test": {"snapshot-id": 3051729675574597004, "type": "tag",
"max-ref-age-ms": 10000000}}
}`
+const ExampleTableMetadataV3 = `{
+ "format-version": 3,
+ "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
+ "location": "s3://bucket/test/location",
+ "last-sequence-number": 34,
+ "last-updated-ms": 1602638573590,
+ "last-column-id": 3,
+ "next-row-id": 1000,
+ "current-schema-id": 1,
+ "schemas": [
+ {"type": "struct", "schema-id": 0, "fields": [{"id": 1, "name": "x",
"required": true, "type": "long"}]},
+ {
+ "type": "struct",
+ "schema-id": 1,
+ "identifier-field-ids": [1, 2],
+ "fields": [
+ {"id": 1, "name": "x", "required": true, "type": "long"},
+ {"id": 2, "name": "y", "required": true, "type": "long",
"doc": "comment"},
+ {"id": 3, "name": "z", "required": true, "type": "long"}
+ ]
+ }
+ ],
+ "default-spec-id": 0,
+ "partition-specs": [{"spec-id": 0, "fields": [{"name": "x", "transform":
"identity", "source-id": 1, "field-id": 1000}]}],
+ "last-partition-id": 1000,
+ "default-sort-order-id": 3,
+ "sort-orders": [
+ {
+ "order-id": 3,
+ "fields": [
+ {"transform": "identity", "source-id": 2, "direction": "asc",
"null-order": "nulls-first"},
+ {"transform": "bucket[4]", "source-id": 3, "direction":
"desc", "null-order": "nulls-last"}
+ ]
+ }
+ ],
+ "properties": {"read.split.target.size": "134217728"},
+ "current-snapshot-id": 3055729675574597004,
+ "snapshots": [
+ {
+ "snapshot-id": 3051729675574597004,
+ "timestamp-ms": 1515100955770,
+ "sequence-number": 0,
+ "summary": {"operation": "append"},
+ "manifest-list": "s3://a/b/1.avro",
+ "first-row-id": 1000
+ },
+ {
+ "snapshot-id": 3055729675574597004,
+ "parent-snapshot-id": 3051729675574597004,
+ "timestamp-ms": 1555100955770,
+ "sequence-number": 1,
+ "summary": {"operation": "append"},
+ "manifest-list": "s3://a/b/2.avro",
+ "schema-id": 1,
+ "first-row-id": 2000
+ }
+ ],
+ "snapshot-log": [
+ {"snapshot-id": 3051729675574597004, "timestamp-ms": 1515100955770},
+ {"snapshot-id": 3055729675574597004, "timestamp-ms": 1555100955770}
+ ],
+ "metadata-log": [{"metadata-file": "s3://bucket/.../v1.json",
"timestamp-ms": 1515100}],
+ "refs": {"test": {"snapshot-id": 3051729675574597004, "type": "tag",
"max-ref-age-ms": 10000000}}
+}`
+
const ExampleTableMetadataV1 = `{
"format-version": 1,
"table-uuid": "d20125c8-7284-442c-9aea-15fee620737c",
@@ -195,6 +260,111 @@ func TestMetadataV2Parsing(t *testing.T) {
assert.EqualValues(t, "134217728",
meta.Properties()["read.split.target.size"])
}
+func TestMetadataV3Parsing(t *testing.T) {
+ meta, err := ParseMetadataBytes([]byte(ExampleTableMetadataV3))
+ require.NoError(t, err)
+ require.NotNil(t, meta)
+
+ assert.IsType(t, (*metadataV3)(nil), meta)
+ assert.Equal(t, 3, meta.Version())
+
+ data := meta.(*metadataV3)
+ assert.Equal(t, uuid.MustParse("9c12d441-03fe-4693-9a96-a0705ddf69c1"),
data.UUID)
+ assert.Equal(t, "s3://bucket/test/location", data.Location())
+ assert.Equal(t, int64(34), data.LastSeqNum)
+ assert.Equal(t, int64(1000), data.NextRowIDValue)
+ assert.Equal(t, int64(1602638573590), data.LastUpdatedMS)
+ assert.Equal(t, 3, data.LastColumnId)
+ assert.Equal(t, 0, data.SchemaList[0].ID)
+ assert.Equal(t, 1, data.CurrentSchemaID)
+ assert.Equal(t, 0, data.Specs[0].ID())
+ assert.Equal(t, 0, data.DefaultSpecID)
+ assert.Equal(t, 1000, *data.LastPartitionID)
+ assert.EqualValues(t, "134217728", data.Props["read.split.target.size"])
+ assert.EqualValues(t, 3055729675574597004, *data.CurrentSnapshotID)
+ assert.EqualValues(t, 3051729675574597004,
data.SnapshotList[0].SnapshotID)
+ assert.Equal(t, int64(1515100955770), data.SnapshotLog[0].TimestampMs)
+ assert.Equal(t, 3, data.SortOrderList[0].OrderID())
+ assert.Equal(t, 3, data.DefaultSortOrderID)
+
+ // Test v3 specific fields
+ assert.Equal(t, int64(1000), meta.NextRowID())
+ assert.Equal(t, int64(34), meta.LastSequenceNumber())
+
+ // Test snapshot first-row-id
+ assert.Len(t, meta.Snapshots(), 2)
+ assert.Equal(t, data.SnapshotList[1], *meta.CurrentSnapshot())
+ assert.Equal(t, data.SnapshotList[0], *meta.SnapshotByName("test"))
+ assert.EqualValues(t, "134217728",
meta.Properties()["read.split.target.size"])
+
+ firstSnapshot := data.SnapshotList[0]
+ assert.NotNil(t, firstSnapshot.FirstRowID)
+ assert.Equal(t, int64(1000), *firstSnapshot.FirstRowID)
+
+ secondSnapshot := data.SnapshotList[1]
+ assert.NotNil(t, secondSnapshot.FirstRowID)
+ assert.Equal(t, int64(2000), *secondSnapshot.FirstRowID)
+}
+
+func TestMetadataV3Builder(t *testing.T) {
+ // Test creating v3 metadata with builder
+ schema := iceberg.NewSchema(0,
+ iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int64, Required: true},
+ )
+
+ builder, err := NewMetadataBuilder(3)
+ require.NoError(t, err)
+ builder.SetUUID(uuid.New())
+ builder.SetLoc("s3://test/location")
+ builder.SetLastUpdatedMS()
+
+ require.NoError(t, builder.AddSchema(schema))
+ require.NoError(t, builder.SetCurrentSchemaID(0))
+
+ spec := iceberg.NewPartitionSpec()
+ require.NoError(t, builder.AddPartitionSpec(&spec, true))
+ require.NoError(t, builder.SetDefaultSpecID(0))
+
+ sortOrder := UnsortedSortOrder
+ require.NoError(t, builder.AddSortOrder(&sortOrder))
+ require.NoError(t, builder.SetDefaultSortOrderID(0))
+
+ metadata, err := builder.Build()
+ require.NoError(t, err)
+
+ assert.Equal(t, 3, metadata.Version())
+ assert.Equal(t, int64(0), metadata.NextRowID())
+}
+
+func TestSnapshotV3FirstRowID(t *testing.T) {
+ // Test snapshot with first-row-id
+ firstRowID := int64(1000)
+ snapshot := Snapshot{
+ SnapshotID: 12345,
+ SequenceNumber: 1,
+ TimestampMs: 1602638573590,
+ ManifestList: "s3://test/manifest-list.avro",
+ FirstRowID: &firstRowID,
+ }
+
+ // Test equals with first-row-id
+ otherSnapshot := Snapshot{
+ SnapshotID: 12345,
+ SequenceNumber: 1,
+ TimestampMs: 1602638573590,
+ ManifestList: "s3://test/manifest-list.avro",
+ FirstRowID: &firstRowID,
+ }
+
+ assert.True(t, snapshot.Equals(otherSnapshot), "Snapshots with same
first-row-id should be equal")
+
+ // Test with different first-row-id
+ differentFirstRowID := int64(2000)
+ otherSnapshot.FirstRowID = &differentFirstRowID
+
+ assert.False(t, snapshot.Equals(otherSnapshot), "Snapshots with
different first-row-id should not be equal")
+}
+
func TestParsingCorrectTypes(t *testing.T) {
var meta metadataV2
require.NoError(t, json.Unmarshal([]byte(ExampleTableMetadataV2),
&meta))
diff --git a/table/snapshots.go b/table/snapshots.go
index 0bc2249c..6c14c100 100644
--- a/table/snapshots.go
+++ b/table/snapshots.go
@@ -254,6 +254,7 @@ type Snapshot struct {
ManifestList string `json:"manifest-list,omitempty"`
Summary *Summary `json:"summary,omitempty"`
SchemaID *int `json:"schema-id,omitempty"`
+ FirstRowID *int64 `json:"first-row-id,omitempty"` // V3:
Starting row ID for this snapshot
}
func (s Snapshot) String() string {
@@ -282,12 +283,17 @@ func (s Snapshot) Equals(other Snapshot) bool {
case s.SchemaID == nil && other.SchemaID != nil:
fallthrough
case s.SchemaID != nil && other.SchemaID == nil:
+ fallthrough
+ case s.FirstRowID == nil && other.FirstRowID != nil:
+ fallthrough
+ case s.FirstRowID != nil && other.FirstRowID == nil:
return false
}
return s.SnapshotID == other.SnapshotID &&
((s.ParentSnapshotID == other.ParentSnapshotID) ||
(*s.ParentSnapshotID == *other.ParentSnapshotID)) &&
((s.SchemaID == other.SchemaID) || (*s.SchemaID ==
*other.SchemaID)) &&
+ ((s.FirstRowID == other.FirstRowID) || (*s.FirstRowID ==
*other.FirstRowID)) &&
s.SequenceNumber == other.SequenceNumber &&
s.TimestampMs == other.TimestampMs &&
s.ManifestList == other.ManifestList &&