This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 498d83f6 feat(manifest): Extent manifest to support iceberg v3 format 
(#560)
498d83f6 is described below

commit 498d83f67b2519ade8fb3b749b45924659f12cf2
Author: Dao Thanh Tung <ttdao.2...@accountancy.smu.edu.sg>
AuthorDate: Wed Sep 17 21:38:47 2025 +0100

    feat(manifest): Extent manifest to support iceberg v3 format (#560)
    
    Hi team,
    This is the first of the many PR to support for Iceberg v3. In this PR,
    I try to outline some structural changes to the interface without making
    it too big and hard to review. More PR to come as we implement manifest
    list writer for v3
    
    ---------
    
    Signed-off-by: dttung2905 <ttdao.2...@accountancy.smu.edu.sg>
---
 internal/avro_schemas.go    | 162 +++++++++++++++++++++++++++++++++++++++++++-
 manifest.go                 | 118 ++++++++++++++++++++++++++------
 manifest_test.go            |   6 +-
 table/evaluators_test.go    |   4 ++
 table/snapshot_producers.go |   4 +-
 5 files changed, 265 insertions(+), 29 deletions(-)

diff --git a/internal/avro_schemas.go b/internal/avro_schemas.go
index 6e33d634..e066bdaf 100644
--- a/internal/avro_schemas.go
+++ b/internal/avro_schemas.go
@@ -364,6 +364,164 @@ func init() {
                Must(avro.NewField("file_sequence_number", NullableLongSchema, 
WithFieldID(4))),
                // leave data_file for dynamic generation
        })))
+
+       AvroSchemaCache.Add("manifest_list_file_v3", 
Must(avro.NewRecordSchema("manifest_file", "", []*avro.Field{
+               Must(avro.NewField("manifest_path",
+                       StringSchema,
+                       avro.WithDoc("Location URI with FS scheme"),
+                       WithFieldID(500))),
+               Must(avro.NewField("manifest_length",
+                       LongSchema,
+                       avro.WithDoc("Total file size in bytes"),
+                       WithFieldID(501))),
+               Must(avro.NewField("partition_spec_id",
+                       IntSchema,
+                       avro.WithDoc("Spec ID used to write"),
+                       WithFieldID(502))),
+               Must(avro.NewField("content", IntSchema,
+                       avro.WithDoc("Content type"),
+                       avro.WithDefault(0),
+                       WithFieldID(517))),
+               Must(avro.NewField("sequence_number", LongSchema,
+                       avro.WithDoc("Sequence number"),
+                       avro.WithDefault(int64(0)),
+                       WithFieldID(515))),
+               Must(avro.NewField("min_sequence_number", LongSchema,
+                       avro.WithDoc("Minimum sequence number"),
+                       avro.WithDefault(int64(0)),
+                       WithFieldID(516))),
+               Must(avro.NewField("added_snapshot_id",
+                       LongSchema,
+                       avro.WithDoc("Snapshot ID that added the manifest"),
+                       WithFieldID(503))),
+               Must(avro.NewField("added_files_count",
+                       IntSchema,
+                       avro.WithDoc("Added entry count"),
+                       WithFieldID(504))),
+               Must(avro.NewField("existing_files_count",
+                       IntSchema,
+                       avro.WithDoc("Existing entry count"),
+                       WithFieldID(505))),
+               Must(avro.NewField("deleted_files_count",
+                       IntSchema,
+                       avro.WithDoc("Deleted entry count"),
+                       WithFieldID(506))),
+               Must(avro.NewField("partitions",
+                       NullableSchema(
+                               
avro.NewArraySchema(AvroSchemaCache.Get("field_summary"),
+                                       WithElementID(508))),
+                       avro.WithDoc("Partition field summaries"),
+                       WithFieldID(507))),
+               Must(avro.NewField("added_rows_count",
+                       LongSchema,
+                       avro.WithDoc("Added row count"),
+                       WithFieldID(512))),
+               Must(avro.NewField("existing_rows_count",
+                       LongSchema,
+                       avro.WithDoc("Existing row count"),
+                       WithFieldID(513))),
+               Must(avro.NewField("deleted_rows_count",
+                       LongSchema,
+                       avro.WithDoc("Deleted row count"),
+                       WithFieldID(514))),
+               Must(avro.NewField("key_metadata", NullableBinarySchema,
+                       avro.WithDoc("Key metadata"),
+                       WithFieldID(519))),
+               Must(avro.NewField("first_row_id", NullableLongSchema,
+                       avro.WithDoc("First row ID"),
+                       WithFieldID(520))),
+       })))
+       AvroSchemaCache.Add("data_file_v3", Must(avro.NewRecordSchema("r2", "", 
[]*avro.Field{
+               Must(avro.NewField("content", IntSchema,
+                       avro.WithDoc("Content type"),
+                       avro.WithDefault(0),
+                       WithFieldID(134))),
+               Must(avro.NewField("file_path",
+                       StringSchema,
+                       avro.WithDoc("Location URI with FS scheme"),
+                       WithFieldID(100))),
+               Must(avro.NewField("file_format",
+                       StringSchema,
+                       avro.WithDoc("File format name: avro, orc, parquet"),
+                       WithFieldID(101))),
+               // skip partition field, we'll add that dynamically as needed
+               Must(avro.NewField("record_count",
+                       LongSchema,
+                       avro.WithDoc("Number of records in the file"),
+                       WithFieldID(103))),
+               Must(avro.NewField("file_size_in_bytes",
+                       LongSchema,
+                       avro.WithDoc("Size of the file in bytes"),
+                       WithFieldID(104))),
+               Must(avro.NewField("column_sizes",
+                       NullableSchema(newMapSchema("k117_v118", IntSchema, 
LongSchema, 117, 118)),
+                       avro.WithDoc("map of column id to total size on disk"),
+                       WithFieldID(108))),
+               Must(avro.NewField("value_counts",
+                       NullableSchema(newMapSchema("k119_v120", IntSchema, 
LongSchema, 119, 120)),
+                       avro.WithDoc("map of value to count"),
+                       WithFieldID(109))),
+               Must(avro.NewField("null_value_counts",
+                       NullableSchema(newMapSchema("k121_v122", IntSchema, 
LongSchema, 121, 122)),
+                       avro.WithDoc("map of value to count"),
+                       WithFieldID(110))),
+               Must(avro.NewField("nan_value_counts",
+                       NullableSchema(newMapSchema("k138_v139", IntSchema, 
LongSchema, 138, 139)),
+                       avro.WithDoc("map of value to count"),
+                       WithFieldID(137))),
+               Must(avro.NewField("lower_bounds",
+                       NullableSchema(newMapSchema("k126_v127", IntSchema, 
BinarySchema, 126, 127)),
+                       avro.WithDoc("map of column id to lower bound"),
+                       WithFieldID(125))),
+               Must(avro.NewField("upper_bounds",
+                       NullableSchema(newMapSchema("k129_v130", IntSchema, 
BinarySchema, 129, 130)),
+                       avro.WithDoc("map of column id to upper bound"),
+                       WithFieldID(128))),
+               Must(avro.NewField("key_metadata", NullableBinarySchema,
+                       avro.WithDoc("Encryption Key Metadata Blob"),
+                       WithFieldID(131))),
+               Must(avro.NewField("split_offsets",
+                       NullableSchema(avro.NewArraySchema(LongSchema,
+                               WithElementID(133))),
+                       avro.WithDoc("splitable offsets"),
+                       WithFieldID(132))),
+               Must(avro.NewField("equality_ids",
+                       NullableSchema(avro.NewArraySchema(LongSchema,
+                               WithElementID(136))),
+                       avro.WithDoc("field ids used to determine row equality 
in equality delete files"),
+                       WithFieldID(135))),
+               Must(avro.NewField("sort_order_id",
+                       NullableIntSchema,
+                       avro.WithDoc("Sort order ID"),
+                       WithFieldID(140))),
+               Must(avro.NewField("first_row_id",
+                       NullableLongSchema,
+                       avro.WithDoc("The _row_id for the first row in the data 
file"),
+                       WithFieldID(142))),
+               Must(avro.NewField("first_row_id",
+                       NullableIntSchema,
+                       avro.WithDoc("The _row_id for the first row in the data 
file"),
+                       WithFieldID(142))),
+               Must(avro.NewField("referenced_data_file",
+                       NullableSchema(StringSchema),
+                       avro.WithDoc("Fully qualified location of a data file 
that all deletes reference"),
+                       WithFieldID(143))),
+               Must(avro.NewField("content_offset",
+                       NullableLongSchema,
+                       avro.WithDoc("The offset in the file where the content 
starts"),
+                       WithFieldID(144))),
+               Must(avro.NewField("content_size_in_bytes",
+                       NullableLongSchema,
+                       avro.WithDoc("The length of the referenced content 
stored in the file"),
+                       WithFieldID(145))),
+       })))
+       AvroSchemaCache.Add("manifest_entry_v3", 
Must(avro.NewRecordSchema("manifest_entry", "", []*avro.Field{
+               Must(avro.NewField("status", IntSchema, WithFieldID(0))),
+               Must(avro.NewField("snapshot_id", NullableLongSchema, 
WithFieldID(1))),
+               Must(avro.NewField("sequence_number", NullableLongSchema, 
WithFieldID(3))),
+               Must(avro.NewField("file_sequence_number", NullableLongSchema, 
WithFieldID(4))),
+               // leave data_file for dynamic generation
+       })))
 }
 
 func newDataFileSchema(partitionType avro.Schema, version int) (avro.Schema, 
error) {
@@ -382,7 +540,7 @@ func newDataFileSchema(partitionType avro.Schema, version 
int) (avro.Schema, err
 
 func NewManifestFileSchema(version int) (avro.Schema, error) {
        switch version {
-       case 1, 2:
+       case 1, 2, 3:
        default:
                return nil, fmt.Errorf("unsupported iceberg spec version: %d", 
version)
        }
@@ -394,7 +552,7 @@ func NewManifestFileSchema(version int) (avro.Schema, 
error) {
 
 func NewManifestEntrySchema(partitionType avro.Schema, version int) 
(avro.Schema, error) {
        switch version {
-       case 1, 2:
+       case 1, 2, 3:
        default:
                return nil, fmt.Errorf("unsupported iceberg spec version: %d", 
version)
        }
diff --git a/manifest.go b/manifest.go
index 4669726e..0f2483b0 100644
--- a/manifest.go
+++ b/manifest.go
@@ -321,6 +321,7 @@ type manifestFile struct {
        DeletedRowsCount   int64           `avro:"deleted_rows_count"`
        PartitionList      *[]FieldSummary `avro:"partitions"`
        Key                []byte          `avro:"key_metadata"`
+       FirstRowId         *int64          `avro:"first_row_id"`
 
        version int `avro:"-"`
 }
@@ -849,6 +850,14 @@ func (v2writerImpl) prepareEntry(entry *manifestEntry, 
snapshotID int64) (Manife
        return entry, nil
 }
 
+type v3writerImpl struct{}
+
+func (v3writerImpl) content() ManifestContent { return ManifestContentData }
+func (v3writerImpl) prepareEntry(entry *manifestEntry, snapshotID int64) 
(ManifestEntry, error) {
+       // TODO : implement this for v3
+       return entry, fmt.Errorf("%w: manifest list writer for v3", 
ErrNotImplemented)
+}
+
 type fieldStats interface {
        toSummary() FieldSummary
        update(value any) error
@@ -1029,6 +1038,8 @@ func NewManifestWriter(version int, out io.Writer, spec 
PartitionSpec, schema *S
                impl = v1writerImpl{}
        case 2:
                impl = v2writerImpl{}
+       case 3:
+               impl = v3writerImpl{}
        default:
                return nil, fmt.Errorf("unsupported manifest version: %d", 
version)
        }
@@ -1203,6 +1214,7 @@ type ManifestListWriter struct {
        commitSnapshotID int64
        sequenceNumber   int64
        writer           *ocf.Encoder
+       nextRowID        *int64
 }
 
 func NewManifestListWriterV1(out io.Writer, snapshotID int64, parentSnapshot 
*int64) (*ManifestListWriter, error) {
@@ -1246,6 +1258,11 @@ func NewManifestListWriterV2(out io.Writer, snapshotID, 
sequenceNumber int64, pa
        })
 }
 
+func NewManifestListWriterV3() (*ManifestListWriter, error) {
+       // TODO: Implement v3 writer
+       return nil, fmt.Errorf("%w: manifest list writer for v3", 
ErrNotImplemented)
+}
+
 func (m *ManifestListWriter) init(meta map[string][]byte) error {
        fileSchema, err := internal.NewManifestFileSchema(m.version)
        if err != nil {
@@ -1295,13 +1312,22 @@ func (m *ManifestListWriter) AddManifests(files 
[]ManifestFile) error {
                        }
                }
 
-       case 2:
+       case 2, 3:
                for _, file := range files {
-                       if file.Version() != 2 {
-                               return fmt.Errorf("%w: ManifestListWriter only 
supports version 2 manifest files", ErrInvalidArgument)
+                       if file.Version() != m.version {
+                               return fmt.Errorf("%w: ManifestListWriter only 
supports version %d manifest files", ErrInvalidArgument, m.version)
                        }
 
                        wrapped := *(file.(*manifestFile))
+                       if m.version == 3 {
+                               // Ref: 
https://github.com/apache/iceberg/blob/ea2071568dc66148b483a82eefedcd2992b435f7/core/src/main/java/org/apache/iceberg/ManifestListWriter.java#L157-L168
+                               if wrapped.Content == ManifestContentData && 
wrapped.FirstRowId == nil {
+                                       if m.nextRowID != nil {
+                                               wrapped.FirstRowId = m.nextRowID
+                                               *m.nextRowID += 
wrapped.ExistingRowsCount + wrapped.AddedRowsCount
+                                       }
+                               }
+                       }
                        if wrapped.SeqNumber == -1 {
                                // if the sequence number is being assigned 
here,
                                // then the manifest must be created by the 
current
@@ -1335,7 +1361,7 @@ func (m *ManifestListWriter) AddManifests(files 
[]ManifestFile) error {
 }
 
 // WriteManifestList writes a list of manifest files to an avro file.
-func WriteManifestList(version int, out io.Writer, snapshotID int64, 
parentSnapshotID, sequenceNumber *int64, files []ManifestFile) error {
+func WriteManifestList(version int, out io.Writer, snapshotID int64, 
parentSnapshotID, sequenceNumber *int64, firstRowId int64, files 
[]ManifestFile) error {
        var (
                writer *ManifestListWriter
                err    error
@@ -1349,6 +1375,13 @@ func WriteManifestList(version int, out io.Writer, 
snapshotID int64, parentSnaps
                        return errors.New("sequence number is required for V2 
tables")
                }
                writer, err = NewManifestListWriterV2(out, snapshotID, 
*sequenceNumber, parentSnapshotID)
+       case 3:
+               if sequenceNumber == nil {
+                       return errors.New("sequence number is required for V3 
tables")
+               }
+               // TODO
+               return fmt.Errorf("%w: manifest list writer for v3", 
ErrNotImplemented)
+               // writer, err = NewManifestListWriterV3()
        default:
                return fmt.Errorf("unsupported manifest version: %d", version)
        }
@@ -1496,24 +1529,28 @@ func avroPartitionData(input map[int]any, logicalTypes 
map[int]avro.LogicalType)
 }
 
 type dataFile struct {
-       Content          ManifestEntryContent   `avro:"content"`
-       Path             string                 `avro:"file_path"`
-       Format           FileFormat             `avro:"file_format"`
-       PartitionData    map[string]any         `avro:"partition"`
-       RecordCount      int64                  `avro:"record_count"`
-       FileSize         int64                  `avro:"file_size_in_bytes"`
-       BlockSizeInBytes int64                  `avro:"block_size_in_bytes"`
-       ColSizes         *[]colMap[int, int64]  `avro:"column_sizes"`
-       ValCounts        *[]colMap[int, int64]  `avro:"value_counts"`
-       NullCounts       *[]colMap[int, int64]  `avro:"null_value_counts"`
-       NaNCounts        *[]colMap[int, int64]  `avro:"nan_value_counts"`
-       DistinctCounts   *[]colMap[int, int64]  `avro:"distinct_counts"`
-       LowerBounds      *[]colMap[int, []byte] `avro:"lower_bounds"`
-       UpperBounds      *[]colMap[int, []byte] `avro:"upper_bounds"`
-       Key              *[]byte                `avro:"key_metadata"`
-       Splits           *[]int64               `avro:"split_offsets"`
-       EqualityIDs      *[]int                 `avro:"equality_ids"`
-       SortOrder        *int                   `avro:"sort_order_id"`
+       Content                 ManifestEntryContent   `avro:"content"`
+       Path                    string                 `avro:"file_path"`
+       Format                  FileFormat             `avro:"file_format"`
+       PartitionData           map[string]any         `avro:"partition"`
+       RecordCount             int64                  `avro:"record_count"`
+       FileSize                int64                  
`avro:"file_size_in_bytes"`
+       BlockSizeInBytes        int64                  
`avro:"block_size_in_bytes"`
+       ColSizes                *[]colMap[int, int64]  `avro:"column_sizes"`
+       ValCounts               *[]colMap[int, int64]  `avro:"value_counts"`
+       NullCounts              *[]colMap[int, int64]  
`avro:"null_value_counts"`
+       NaNCounts               *[]colMap[int, int64]  `avro:"nan_value_counts"`
+       DistinctCounts          *[]colMap[int, int64]  `avro:"distinct_counts"`
+       LowerBounds             *[]colMap[int, []byte] `avro:"lower_bounds"`
+       UpperBounds             *[]colMap[int, []byte] `avro:"upper_bounds"`
+       Key                     *[]byte                `avro:"key_metadata"`
+       Splits                  *[]int64               `avro:"split_offsets"`
+       EqualityIDs             *[]int                 `avro:"equality_ids"`
+       SortOrder               *int                   `avro:"sort_order_id"`
+       FirstRowIDField         *int64                 
`avro:"first_row_id_field"`
+       ReferencedDataFileField *string                
`avro:"referenced_data_file"`
+       ContentOffsetField      *int64                 `avro:"content_offset"`
+       ContentSizeInBytesField *int64                 
`avro:"content_size_in_bytes"`
 
        colSizeMap     map[int]int64
        valCntMap      map[int]int64
@@ -1642,6 +1679,11 @@ func (d *dataFile) EqualityFieldIDs() []int {
 
 func (d *dataFile) SortOrderID() *int { return d.SortOrder }
 
+func (d *dataFile) FirstRowID() *int64          { return d.FirstRowIDField }
+func (d *dataFile) ReferencedDataFile() *string { return 
d.ReferencedDataFileField }
+func (d *dataFile) ContentSizeInBytes() *int64  { return 
d.ContentSizeInBytesField }
+func (d *dataFile) ContentOffset() *int64       { return d.ContentOffsetField }
+
 type ManifestEntryBuilder struct {
        m *manifestEntry
 }
@@ -1909,6 +1951,30 @@ func (b *DataFileBuilder) SortOrderID(id int) 
*DataFileBuilder {
        return b
 }
 
+func (b *DataFileBuilder) FirstRowID(id int64) *DataFileBuilder {
+       b.d.FirstRowIDField = &id
+
+       return b
+}
+
+func (b *DataFileBuilder) ReferencedDataFile(path string) *DataFileBuilder {
+       b.d.ReferencedDataFileField = &path
+
+       return b
+}
+
+func (b *DataFileBuilder) ContentOffset(offset int64) *DataFileBuilder {
+       b.d.ContentOffsetField = &offset
+
+       return b
+}
+
+func (b *DataFileBuilder) ContentSizeInBytes(size int64) *DataFileBuilder {
+       b.d.ContentSizeInBytesField = &size
+
+       return b
+}
+
 func (b *DataFileBuilder) Build() DataFile {
        return b.d
 }
@@ -1976,6 +2042,14 @@ type DataFile interface {
        // SpecID returns the partition spec id for this data file, inherited
        // from the manifest that the data file was read from
        SpecID() int32
+       // FirstRowID returns the first row ID for this data file ( v3+ only )
+       FirstRowID() *int64
+       // ReferencedDataFile returns the location of the data file that 
deletion vector reference
+       ReferencedDataFile() *string
+       // ContentOffset returns the offset in the file where the content 
starts ( v3+ only )
+       ContentOffset() *int64
+       // ContentSizeInBytes returns the length of referenced contented stored 
in the file (v3+ only)
+       ContentSizeInBytes() *int64
 }
 
 // ManifestEntry is an interface for both v1 and v2 manifest entries.
diff --git a/manifest_test.go b/manifest_test.go
index cf78a6c2..3e58906b 100644
--- a/manifest_test.go
+++ b/manifest_test.go
@@ -406,9 +406,9 @@ type ManifestTestSuite struct {
 }
 
 func (m *ManifestTestSuite) writeManifestList() {
-       m.Require().NoError(WriteManifestList(1, &m.v1ManifestList, snapshotID, 
nil, nil, manifestFileRecordsV1))
+       m.Require().NoError(WriteManifestList(1, &m.v1ManifestList, snapshotID, 
nil, nil, 0, manifestFileRecordsV1))
        unassignedSequenceNum := int64(-1)
-       m.Require().NoError(WriteManifestList(2, &m.v2ManifestList, snapshotID, 
nil, &unassignedSequenceNum, manifestFileRecordsV2))
+       m.Require().NoError(WriteManifestList(2, &m.v2ManifestList, snapshotID, 
nil, &unassignedSequenceNum, 0, manifestFileRecordsV2))
 }
 
 func (m *ManifestTestSuite) writeManifestEntries() {
@@ -659,7 +659,7 @@ func (m *ManifestTestSuite) 
TestReadManifestListIncompleteSchema() {
        // any cache. (Note: if working correctly, this will have no such side 
effect.)
        var buf bytes.Buffer
        seqNum := int64(9876)
-       err := WriteManifestList(2, &buf, 1234, nil, &seqNum, []ManifestFile{
+       err := WriteManifestList(2, &buf, 1234, nil, &seqNum, 0, []ManifestFile{
                NewManifestFile(2, 
"s3://bucket/namespace/table/metadata/abcd-0123.avro", 99, 0, 1234).Build(),
        })
        m.NoError(err)
diff --git a/table/evaluators_test.go b/table/evaluators_test.go
index d6cfe06b..cec68b43 100644
--- a/table/evaluators_test.go
+++ b/table/evaluators_test.go
@@ -1103,6 +1103,10 @@ func (*mockDataFile) SplitOffsets() []int64              
       { return nil }
 func (*mockDataFile) EqualityFieldIDs() []int                   { return nil }
 func (*mockDataFile) SortOrderID() *int                         { return nil }
 func (m *mockDataFile) SpecID() int32                           { return 
m.specid }
+func (*mockDataFile) FirstRowID() *int64                        { return nil }
+func (*mockDataFile) ReferencedDataFile() *string               { return nil }
+func (*mockDataFile) ContentOffset() *int64                     { return nil }
+func (*mockDataFile) ContentSizeInBytes() *int64                { return nil }
 
 type InclusiveMetricsTestSuite struct {
        suite.Suite
diff --git a/table/snapshot_producers.go b/table/snapshot_producers.go
index 58721ed4..1cf4ecd5 100644
--- a/table/snapshot_producers.go
+++ b/table/snapshot_producers.go
@@ -682,9 +682,9 @@ func (sp *snapshotProducer) commit() ([]Update, 
[]Requirement, error) {
                return nil, nil, err
        }
        defer out.Close()
-
+       // TODO: Implement v3 here
        err = iceberg.WriteManifestList(sp.txn.meta.formatVersion, out,
-               sp.snapshotID, parentSnapshot, &nextSequence, newManifests)
+               sp.snapshotID, parentSnapshot, &nextSequence, 0, newManifests)
        if err != nil {
                return nil, nil, err
        }

Reply via email to