This is an automated email from the ASF dual-hosted git repository. zeroshade pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push: new 498d83f6 feat(manifest): Extent manifest to support iceberg v3 format (#560) 498d83f6 is described below commit 498d83f67b2519ade8fb3b749b45924659f12cf2 Author: Dao Thanh Tung <ttdao.2...@accountancy.smu.edu.sg> AuthorDate: Wed Sep 17 21:38:47 2025 +0100 feat(manifest): Extent manifest to support iceberg v3 format (#560) Hi team, This is the first of the many PR to support for Iceberg v3. In this PR, I try to outline some structural changes to the interface without making it too big and hard to review. More PR to come as we implement manifest list writer for v3 --------- Signed-off-by: dttung2905 <ttdao.2...@accountancy.smu.edu.sg> --- internal/avro_schemas.go | 162 +++++++++++++++++++++++++++++++++++++++++++- manifest.go | 118 ++++++++++++++++++++++++++------ manifest_test.go | 6 +- table/evaluators_test.go | 4 ++ table/snapshot_producers.go | 4 +- 5 files changed, 265 insertions(+), 29 deletions(-) diff --git a/internal/avro_schemas.go b/internal/avro_schemas.go index 6e33d634..e066bdaf 100644 --- a/internal/avro_schemas.go +++ b/internal/avro_schemas.go @@ -364,6 +364,164 @@ func init() { Must(avro.NewField("file_sequence_number", NullableLongSchema, WithFieldID(4))), // leave data_file for dynamic generation }))) + + AvroSchemaCache.Add("manifest_list_file_v3", Must(avro.NewRecordSchema("manifest_file", "", []*avro.Field{ + Must(avro.NewField("manifest_path", + StringSchema, + avro.WithDoc("Location URI with FS scheme"), + WithFieldID(500))), + Must(avro.NewField("manifest_length", + LongSchema, + avro.WithDoc("Total file size in bytes"), + WithFieldID(501))), + Must(avro.NewField("partition_spec_id", + IntSchema, + avro.WithDoc("Spec ID used to write"), + WithFieldID(502))), + Must(avro.NewField("content", IntSchema, + avro.WithDoc("Content type"), + avro.WithDefault(0), + WithFieldID(517))), + Must(avro.NewField("sequence_number", LongSchema, + avro.WithDoc("Sequence number"), + avro.WithDefault(int64(0)), + WithFieldID(515))), + Must(avro.NewField("min_sequence_number", LongSchema, + avro.WithDoc("Minimum sequence number"), + avro.WithDefault(int64(0)), + WithFieldID(516))), + Must(avro.NewField("added_snapshot_id", + LongSchema, + avro.WithDoc("Snapshot ID that added the manifest"), + WithFieldID(503))), + Must(avro.NewField("added_files_count", + IntSchema, + avro.WithDoc("Added entry count"), + WithFieldID(504))), + Must(avro.NewField("existing_files_count", + IntSchema, + avro.WithDoc("Existing entry count"), + WithFieldID(505))), + Must(avro.NewField("deleted_files_count", + IntSchema, + avro.WithDoc("Deleted entry count"), + WithFieldID(506))), + Must(avro.NewField("partitions", + NullableSchema( + avro.NewArraySchema(AvroSchemaCache.Get("field_summary"), + WithElementID(508))), + avro.WithDoc("Partition field summaries"), + WithFieldID(507))), + Must(avro.NewField("added_rows_count", + LongSchema, + avro.WithDoc("Added row count"), + WithFieldID(512))), + Must(avro.NewField("existing_rows_count", + LongSchema, + avro.WithDoc("Existing row count"), + WithFieldID(513))), + Must(avro.NewField("deleted_rows_count", + LongSchema, + avro.WithDoc("Deleted row count"), + WithFieldID(514))), + Must(avro.NewField("key_metadata", NullableBinarySchema, + avro.WithDoc("Key metadata"), + WithFieldID(519))), + Must(avro.NewField("first_row_id", NullableLongSchema, + avro.WithDoc("First row ID"), + WithFieldID(520))), + }))) + AvroSchemaCache.Add("data_file_v3", Must(avro.NewRecordSchema("r2", "", []*avro.Field{ + Must(avro.NewField("content", IntSchema, + avro.WithDoc("Content type"), + avro.WithDefault(0), + WithFieldID(134))), + Must(avro.NewField("file_path", + StringSchema, + avro.WithDoc("Location URI with FS scheme"), + WithFieldID(100))), + Must(avro.NewField("file_format", + StringSchema, + avro.WithDoc("File format name: avro, orc, parquet"), + WithFieldID(101))), + // skip partition field, we'll add that dynamically as needed + Must(avro.NewField("record_count", + LongSchema, + avro.WithDoc("Number of records in the file"), + WithFieldID(103))), + Must(avro.NewField("file_size_in_bytes", + LongSchema, + avro.WithDoc("Size of the file in bytes"), + WithFieldID(104))), + Must(avro.NewField("column_sizes", + NullableSchema(newMapSchema("k117_v118", IntSchema, LongSchema, 117, 118)), + avro.WithDoc("map of column id to total size on disk"), + WithFieldID(108))), + Must(avro.NewField("value_counts", + NullableSchema(newMapSchema("k119_v120", IntSchema, LongSchema, 119, 120)), + avro.WithDoc("map of value to count"), + WithFieldID(109))), + Must(avro.NewField("null_value_counts", + NullableSchema(newMapSchema("k121_v122", IntSchema, LongSchema, 121, 122)), + avro.WithDoc("map of value to count"), + WithFieldID(110))), + Must(avro.NewField("nan_value_counts", + NullableSchema(newMapSchema("k138_v139", IntSchema, LongSchema, 138, 139)), + avro.WithDoc("map of value to count"), + WithFieldID(137))), + Must(avro.NewField("lower_bounds", + NullableSchema(newMapSchema("k126_v127", IntSchema, BinarySchema, 126, 127)), + avro.WithDoc("map of column id to lower bound"), + WithFieldID(125))), + Must(avro.NewField("upper_bounds", + NullableSchema(newMapSchema("k129_v130", IntSchema, BinarySchema, 129, 130)), + avro.WithDoc("map of column id to upper bound"), + WithFieldID(128))), + Must(avro.NewField("key_metadata", NullableBinarySchema, + avro.WithDoc("Encryption Key Metadata Blob"), + WithFieldID(131))), + Must(avro.NewField("split_offsets", + NullableSchema(avro.NewArraySchema(LongSchema, + WithElementID(133))), + avro.WithDoc("splitable offsets"), + WithFieldID(132))), + Must(avro.NewField("equality_ids", + NullableSchema(avro.NewArraySchema(LongSchema, + WithElementID(136))), + avro.WithDoc("field ids used to determine row equality in equality delete files"), + WithFieldID(135))), + Must(avro.NewField("sort_order_id", + NullableIntSchema, + avro.WithDoc("Sort order ID"), + WithFieldID(140))), + Must(avro.NewField("first_row_id", + NullableLongSchema, + avro.WithDoc("The _row_id for the first row in the data file"), + WithFieldID(142))), + Must(avro.NewField("first_row_id", + NullableIntSchema, + avro.WithDoc("The _row_id for the first row in the data file"), + WithFieldID(142))), + Must(avro.NewField("referenced_data_file", + NullableSchema(StringSchema), + avro.WithDoc("Fully qualified location of a data file that all deletes reference"), + WithFieldID(143))), + Must(avro.NewField("content_offset", + NullableLongSchema, + avro.WithDoc("The offset in the file where the content starts"), + WithFieldID(144))), + Must(avro.NewField("content_size_in_bytes", + NullableLongSchema, + avro.WithDoc("The length of the referenced content stored in the file"), + WithFieldID(145))), + }))) + AvroSchemaCache.Add("manifest_entry_v3", Must(avro.NewRecordSchema("manifest_entry", "", []*avro.Field{ + Must(avro.NewField("status", IntSchema, WithFieldID(0))), + Must(avro.NewField("snapshot_id", NullableLongSchema, WithFieldID(1))), + Must(avro.NewField("sequence_number", NullableLongSchema, WithFieldID(3))), + Must(avro.NewField("file_sequence_number", NullableLongSchema, WithFieldID(4))), + // leave data_file for dynamic generation + }))) } func newDataFileSchema(partitionType avro.Schema, version int) (avro.Schema, error) { @@ -382,7 +540,7 @@ func newDataFileSchema(partitionType avro.Schema, version int) (avro.Schema, err func NewManifestFileSchema(version int) (avro.Schema, error) { switch version { - case 1, 2: + case 1, 2, 3: default: return nil, fmt.Errorf("unsupported iceberg spec version: %d", version) } @@ -394,7 +552,7 @@ func NewManifestFileSchema(version int) (avro.Schema, error) { func NewManifestEntrySchema(partitionType avro.Schema, version int) (avro.Schema, error) { switch version { - case 1, 2: + case 1, 2, 3: default: return nil, fmt.Errorf("unsupported iceberg spec version: %d", version) } diff --git a/manifest.go b/manifest.go index 4669726e..0f2483b0 100644 --- a/manifest.go +++ b/manifest.go @@ -321,6 +321,7 @@ type manifestFile struct { DeletedRowsCount int64 `avro:"deleted_rows_count"` PartitionList *[]FieldSummary `avro:"partitions"` Key []byte `avro:"key_metadata"` + FirstRowId *int64 `avro:"first_row_id"` version int `avro:"-"` } @@ -849,6 +850,14 @@ func (v2writerImpl) prepareEntry(entry *manifestEntry, snapshotID int64) (Manife return entry, nil } +type v3writerImpl struct{} + +func (v3writerImpl) content() ManifestContent { return ManifestContentData } +func (v3writerImpl) prepareEntry(entry *manifestEntry, snapshotID int64) (ManifestEntry, error) { + // TODO : implement this for v3 + return entry, fmt.Errorf("%w: manifest list writer for v3", ErrNotImplemented) +} + type fieldStats interface { toSummary() FieldSummary update(value any) error @@ -1029,6 +1038,8 @@ func NewManifestWriter(version int, out io.Writer, spec PartitionSpec, schema *S impl = v1writerImpl{} case 2: impl = v2writerImpl{} + case 3: + impl = v3writerImpl{} default: return nil, fmt.Errorf("unsupported manifest version: %d", version) } @@ -1203,6 +1214,7 @@ type ManifestListWriter struct { commitSnapshotID int64 sequenceNumber int64 writer *ocf.Encoder + nextRowID *int64 } func NewManifestListWriterV1(out io.Writer, snapshotID int64, parentSnapshot *int64) (*ManifestListWriter, error) { @@ -1246,6 +1258,11 @@ func NewManifestListWriterV2(out io.Writer, snapshotID, sequenceNumber int64, pa }) } +func NewManifestListWriterV3() (*ManifestListWriter, error) { + // TODO: Implement v3 writer + return nil, fmt.Errorf("%w: manifest list writer for v3", ErrNotImplemented) +} + func (m *ManifestListWriter) init(meta map[string][]byte) error { fileSchema, err := internal.NewManifestFileSchema(m.version) if err != nil { @@ -1295,13 +1312,22 @@ func (m *ManifestListWriter) AddManifests(files []ManifestFile) error { } } - case 2: + case 2, 3: for _, file := range files { - if file.Version() != 2 { - return fmt.Errorf("%w: ManifestListWriter only supports version 2 manifest files", ErrInvalidArgument) + if file.Version() != m.version { + return fmt.Errorf("%w: ManifestListWriter only supports version %d manifest files", ErrInvalidArgument, m.version) } wrapped := *(file.(*manifestFile)) + if m.version == 3 { + // Ref: https://github.com/apache/iceberg/blob/ea2071568dc66148b483a82eefedcd2992b435f7/core/src/main/java/org/apache/iceberg/ManifestListWriter.java#L157-L168 + if wrapped.Content == ManifestContentData && wrapped.FirstRowId == nil { + if m.nextRowID != nil { + wrapped.FirstRowId = m.nextRowID + *m.nextRowID += wrapped.ExistingRowsCount + wrapped.AddedRowsCount + } + } + } if wrapped.SeqNumber == -1 { // if the sequence number is being assigned here, // then the manifest must be created by the current @@ -1335,7 +1361,7 @@ func (m *ManifestListWriter) AddManifests(files []ManifestFile) error { } // WriteManifestList writes a list of manifest files to an avro file. -func WriteManifestList(version int, out io.Writer, snapshotID int64, parentSnapshotID, sequenceNumber *int64, files []ManifestFile) error { +func WriteManifestList(version int, out io.Writer, snapshotID int64, parentSnapshotID, sequenceNumber *int64, firstRowId int64, files []ManifestFile) error { var ( writer *ManifestListWriter err error @@ -1349,6 +1375,13 @@ func WriteManifestList(version int, out io.Writer, snapshotID int64, parentSnaps return errors.New("sequence number is required for V2 tables") } writer, err = NewManifestListWriterV2(out, snapshotID, *sequenceNumber, parentSnapshotID) + case 3: + if sequenceNumber == nil { + return errors.New("sequence number is required for V3 tables") + } + // TODO + return fmt.Errorf("%w: manifest list writer for v3", ErrNotImplemented) + // writer, err = NewManifestListWriterV3() default: return fmt.Errorf("unsupported manifest version: %d", version) } @@ -1496,24 +1529,28 @@ func avroPartitionData(input map[int]any, logicalTypes map[int]avro.LogicalType) } type dataFile struct { - Content ManifestEntryContent `avro:"content"` - Path string `avro:"file_path"` - Format FileFormat `avro:"file_format"` - PartitionData map[string]any `avro:"partition"` - RecordCount int64 `avro:"record_count"` - FileSize int64 `avro:"file_size_in_bytes"` - BlockSizeInBytes int64 `avro:"block_size_in_bytes"` - ColSizes *[]colMap[int, int64] `avro:"column_sizes"` - ValCounts *[]colMap[int, int64] `avro:"value_counts"` - NullCounts *[]colMap[int, int64] `avro:"null_value_counts"` - NaNCounts *[]colMap[int, int64] `avro:"nan_value_counts"` - DistinctCounts *[]colMap[int, int64] `avro:"distinct_counts"` - LowerBounds *[]colMap[int, []byte] `avro:"lower_bounds"` - UpperBounds *[]colMap[int, []byte] `avro:"upper_bounds"` - Key *[]byte `avro:"key_metadata"` - Splits *[]int64 `avro:"split_offsets"` - EqualityIDs *[]int `avro:"equality_ids"` - SortOrder *int `avro:"sort_order_id"` + Content ManifestEntryContent `avro:"content"` + Path string `avro:"file_path"` + Format FileFormat `avro:"file_format"` + PartitionData map[string]any `avro:"partition"` + RecordCount int64 `avro:"record_count"` + FileSize int64 `avro:"file_size_in_bytes"` + BlockSizeInBytes int64 `avro:"block_size_in_bytes"` + ColSizes *[]colMap[int, int64] `avro:"column_sizes"` + ValCounts *[]colMap[int, int64] `avro:"value_counts"` + NullCounts *[]colMap[int, int64] `avro:"null_value_counts"` + NaNCounts *[]colMap[int, int64] `avro:"nan_value_counts"` + DistinctCounts *[]colMap[int, int64] `avro:"distinct_counts"` + LowerBounds *[]colMap[int, []byte] `avro:"lower_bounds"` + UpperBounds *[]colMap[int, []byte] `avro:"upper_bounds"` + Key *[]byte `avro:"key_metadata"` + Splits *[]int64 `avro:"split_offsets"` + EqualityIDs *[]int `avro:"equality_ids"` + SortOrder *int `avro:"sort_order_id"` + FirstRowIDField *int64 `avro:"first_row_id_field"` + ReferencedDataFileField *string `avro:"referenced_data_file"` + ContentOffsetField *int64 `avro:"content_offset"` + ContentSizeInBytesField *int64 `avro:"content_size_in_bytes"` colSizeMap map[int]int64 valCntMap map[int]int64 @@ -1642,6 +1679,11 @@ func (d *dataFile) EqualityFieldIDs() []int { func (d *dataFile) SortOrderID() *int { return d.SortOrder } +func (d *dataFile) FirstRowID() *int64 { return d.FirstRowIDField } +func (d *dataFile) ReferencedDataFile() *string { return d.ReferencedDataFileField } +func (d *dataFile) ContentSizeInBytes() *int64 { return d.ContentSizeInBytesField } +func (d *dataFile) ContentOffset() *int64 { return d.ContentOffsetField } + type ManifestEntryBuilder struct { m *manifestEntry } @@ -1909,6 +1951,30 @@ func (b *DataFileBuilder) SortOrderID(id int) *DataFileBuilder { return b } +func (b *DataFileBuilder) FirstRowID(id int64) *DataFileBuilder { + b.d.FirstRowIDField = &id + + return b +} + +func (b *DataFileBuilder) ReferencedDataFile(path string) *DataFileBuilder { + b.d.ReferencedDataFileField = &path + + return b +} + +func (b *DataFileBuilder) ContentOffset(offset int64) *DataFileBuilder { + b.d.ContentOffsetField = &offset + + return b +} + +func (b *DataFileBuilder) ContentSizeInBytes(size int64) *DataFileBuilder { + b.d.ContentSizeInBytesField = &size + + return b +} + func (b *DataFileBuilder) Build() DataFile { return b.d } @@ -1976,6 +2042,14 @@ type DataFile interface { // SpecID returns the partition spec id for this data file, inherited // from the manifest that the data file was read from SpecID() int32 + // FirstRowID returns the first row ID for this data file ( v3+ only ) + FirstRowID() *int64 + // ReferencedDataFile returns the location of the data file that deletion vector reference + ReferencedDataFile() *string + // ContentOffset returns the offset in the file where the content starts ( v3+ only ) + ContentOffset() *int64 + // ContentSizeInBytes returns the length of referenced contented stored in the file (v3+ only) + ContentSizeInBytes() *int64 } // ManifestEntry is an interface for both v1 and v2 manifest entries. diff --git a/manifest_test.go b/manifest_test.go index cf78a6c2..3e58906b 100644 --- a/manifest_test.go +++ b/manifest_test.go @@ -406,9 +406,9 @@ type ManifestTestSuite struct { } func (m *ManifestTestSuite) writeManifestList() { - m.Require().NoError(WriteManifestList(1, &m.v1ManifestList, snapshotID, nil, nil, manifestFileRecordsV1)) + m.Require().NoError(WriteManifestList(1, &m.v1ManifestList, snapshotID, nil, nil, 0, manifestFileRecordsV1)) unassignedSequenceNum := int64(-1) - m.Require().NoError(WriteManifestList(2, &m.v2ManifestList, snapshotID, nil, &unassignedSequenceNum, manifestFileRecordsV2)) + m.Require().NoError(WriteManifestList(2, &m.v2ManifestList, snapshotID, nil, &unassignedSequenceNum, 0, manifestFileRecordsV2)) } func (m *ManifestTestSuite) writeManifestEntries() { @@ -659,7 +659,7 @@ func (m *ManifestTestSuite) TestReadManifestListIncompleteSchema() { // any cache. (Note: if working correctly, this will have no such side effect.) var buf bytes.Buffer seqNum := int64(9876) - err := WriteManifestList(2, &buf, 1234, nil, &seqNum, []ManifestFile{ + err := WriteManifestList(2, &buf, 1234, nil, &seqNum, 0, []ManifestFile{ NewManifestFile(2, "s3://bucket/namespace/table/metadata/abcd-0123.avro", 99, 0, 1234).Build(), }) m.NoError(err) diff --git a/table/evaluators_test.go b/table/evaluators_test.go index d6cfe06b..cec68b43 100644 --- a/table/evaluators_test.go +++ b/table/evaluators_test.go @@ -1103,6 +1103,10 @@ func (*mockDataFile) SplitOffsets() []int64 { return nil } func (*mockDataFile) EqualityFieldIDs() []int { return nil } func (*mockDataFile) SortOrderID() *int { return nil } func (m *mockDataFile) SpecID() int32 { return m.specid } +func (*mockDataFile) FirstRowID() *int64 { return nil } +func (*mockDataFile) ReferencedDataFile() *string { return nil } +func (*mockDataFile) ContentOffset() *int64 { return nil } +func (*mockDataFile) ContentSizeInBytes() *int64 { return nil } type InclusiveMetricsTestSuite struct { suite.Suite diff --git a/table/snapshot_producers.go b/table/snapshot_producers.go index 58721ed4..1cf4ecd5 100644 --- a/table/snapshot_producers.go +++ b/table/snapshot_producers.go @@ -682,9 +682,9 @@ func (sp *snapshotProducer) commit() ([]Update, []Requirement, error) { return nil, nil, err } defer out.Close() - + // TODO: Implement v3 here err = iceberg.WriteManifestList(sp.txn.meta.formatVersion, out, - sp.snapshotID, parentSnapshot, &nextSequence, newManifests) + sp.snapshotID, parentSnapshot, &nextSequence, 0, newManifests) if err != nil { return nil, nil, err }