nastra commented on code in PR #3: URL: https://github.com/apache/iceberg-go/pull/3#discussion_r1326996787
########## manifest.go: ########## @@ -0,0 +1,655 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package iceberg + +import ( + "io" + "sync" + + iceio "github.com/apache/iceberg-go/io" + + "github.com/hamba/avro/v2/ocf" +) + +// ManifestContent indicates the type of data inside of the files +// described by a manifest. This will indicate whether the data files +// contain active data or deleted rows. +type ManifestContent int32 + +const ( + ManifestContentData ManifestContent = 0 + ManifestContentDeletes ManifestContent = 1 +) + +type fieldSummary struct { + ContainsNull bool `avro:"contains_null"` + ContainsNaN *bool `avro:"contains_nan"` + LowerBound *[]byte `avro:"lower_bound"` + UpperBound *[]byte `avro:"upper_bound"` +} + +type manifestFileV1 struct { + Path string `avro:"manifest_path"` + Len int64 `avro:"manifest_length"` + PartitionSpecID int32 `avro:"partition_spec_id"` + Content ManifestContent `avro:"content"` + AddedSnapshotID *int64 `avro:"added_snapshot_id"` + AddedFilesCount *int32 `avro:"added_data_files_count"` + ExistingFilesCount *int32 `avro:"existing_data_files_count"` + DeletedFilesCount *int32 `avro:"deleted_data_files_count"` + AddedRowsCount *int64 `avro:"added_rows_count"` + ExistingRowsCount *int64 `avro:"existing_rows_count"` + DeletedRowsCount *int64 `avro:"deleted_rows_count"` + Partitions *[]fieldSummary `avro:"partitions"` + KeyMetadata []byte `avro:"key_metadata"` +} + +func (*manifestFileV1) Version() int { return 1 } +func (m *manifestFileV1) FilePath() string { return m.Path } +func (m *manifestFileV1) Length() int64 { return m.Len } +func (m *manifestFileV1) PartitionID() int32 { return m.PartitionSpecID } +func (m *manifestFileV1) ManifestContent() ManifestContent { return m.Content } +func (m *manifestFileV1) SnapshotID() int64 { + if m.AddedSnapshotID == nil { + return 0 + } + return *m.AddedSnapshotID +} + +func (m *manifestFileV1) AddedDataFiles() int32 { + if m.AddedFilesCount == nil { + return 0 + } + return *m.AddedFilesCount +} + +func (m *manifestFileV1) ExistingDataFiles() int32 { + if m.ExistingFilesCount == nil { + return 0 + } + return *m.ExistingFilesCount +} + +func (m *manifestFileV1) DeletedDataFiles() int32 { + if m.DeletedFilesCount == nil { + return 0 + } + return *m.DeletedFilesCount +} + +func (m *manifestFileV1) AddedRows() int64 { + if m.AddedRowsCount == nil { + return 0 + } + return *m.AddedRowsCount +} + +func (m *manifestFileV1) ExistingRows() int64 { + if m.ExistingRowsCount == nil { + return 0 + } + return *m.ExistingRowsCount +} + +func (m *manifestFileV1) DeletedRows() int64 { + if m.DeletedRowsCount == nil { + return 0 + } + return *m.DeletedRowsCount +} + +func (m *manifestFileV1) HasAddedFiles() bool { + return m.AddedFilesCount == nil || *m.AddedFilesCount > 0 +} + +func (m *manifestFileV1) HasExistingFiles() bool { + return m.ExistingFilesCount == nil || *m.ExistingFilesCount > 0 +} + +func (m *manifestFileV1) SequenceNum() int64 { return 0 } +func (m *manifestFileV1) MinSequenceNum() int64 { return 0 } +func (m *manifestFileV1) Metadata() []byte { return m.KeyMetadata } +func (m *manifestFileV1) PartitionList() []fieldSummary { + if m.Partitions == nil { + return nil + } + return *m.Partitions +} + +func (m *manifestFileV1) FetchEntries(fs iceio.IO, discardDeleted bool) ([]ManifestEntry, error) { + return fetchManifestEntries(m, fs, discardDeleted) +} + +type manifestFileV2 struct { + Path string `avro:"manifest_path"` + Len int64 `avro:"manifest_length"` + PartitionSpecID int32 `avro:"partition_spec_id"` + Content ManifestContent `avro:"content"` + SeqNumber int64 `avro:"sequence_number"` + MinSeqNumber int64 `avro:"min_sequence_number"` + AddedSnapshotID int64 `avro:"added_snapshot_id"` + AddedFilesCount int32 `avro:"added_files_count"` + ExistingFilesCount int32 `avro:"existing_files_count"` + DeletedFilesCount int32 `avro:"deleted_files_count"` + AddedRowsCount int64 `avro:"added_rows_count"` + ExistingRowsCount int64 `avro:"existing_rows_count"` + DeletedRowsCount int64 `avro:"deleted_rows_count"` + Partitions *[]fieldSummary `avro:"partitions"` + KeyMetadata []byte `avro:"key_metadata"` +} + +func (*manifestFileV2) Version() int { return 2 } + +func (m *manifestFileV2) FilePath() string { return m.Path } +func (m *manifestFileV2) Length() int64 { return m.Len } +func (m *manifestFileV2) PartitionID() int32 { return m.PartitionSpecID } +func (m *manifestFileV2) ManifestContent() ManifestContent { return m.Content } +func (m *manifestFileV2) SnapshotID() int64 { + return m.AddedSnapshotID +} + +func (m *manifestFileV2) AddedDataFiles() int32 { + return m.AddedFilesCount +} + +func (m *manifestFileV2) ExistingDataFiles() int32 { + return m.ExistingFilesCount +} + +func (m *manifestFileV2) DeletedDataFiles() int32 { + return m.DeletedFilesCount +} + +func (m *manifestFileV2) AddedRows() int64 { + return m.AddedRowsCount +} + +func (m *manifestFileV2) ExistingRows() int64 { + return m.ExistingRowsCount +} + +func (m *manifestFileV2) DeletedRows() int64 { + return m.DeletedRowsCount +} + +func (m *manifestFileV2) SequenceNum() int64 { return m.SeqNumber } +func (m *manifestFileV2) MinSequenceNum() int64 { return m.MinSeqNumber } +func (m *manifestFileV2) Metadata() []byte { return m.KeyMetadata } + +func (m *manifestFileV2) PartitionList() []fieldSummary { + if m.Partitions == nil { + return nil + } + return *m.Partitions +} + +func (m *manifestFileV2) HasAddedFiles() bool { + return m.AddedFilesCount > 0 +} + +func (m *manifestFileV2) HasExistingFiles() bool { + return m.ExistingFilesCount > 0 +} + +func (m *manifestFileV2) FetchEntries(fs iceio.IO, discardDeleted bool) ([]ManifestEntry, error) { + return fetchManifestEntries(m, fs, discardDeleted) +} + +func fetchManifestEntries(m ManifestFile, fs iceio.IO, discardDeleted bool) ([]ManifestEntry, error) { + f, err := fs.Open(m.FilePath()) + if err != nil { + return nil, err + } + defer f.Close() + + dec, err := ocf.NewDecoder(f) + if err != nil { + return nil, err + } + + metadata := dec.Metadata() + isVer1 := true + if string(metadata["format-version"]) == "2" { + isVer1 = false + } + + results := make([]ManifestEntry, 0) + for dec.HasNext() { + var tmp ManifestEntry + if isVer1 { + tmp = &manifestEntryV1{} + } else { + tmp = &manifestEntryV2{} + } + + if err := dec.Decode(tmp); err != nil { + return nil, err + } + + if !discardDeleted || tmp.Status() != EntryStatusDELETED { + tmp.inheritSeqNum(m) + results = append(results, tmp) + } + } + + return results, dec.Error() +} + +// ManifestFile is the interface which covers both V1 and V2 manifest files. +type ManifestFile interface { + // Version returns the version number of this manifest file. + // It should be 1 or 2. + Version() int + // FilePath is the location URI of this manifest file. + FilePath() string + // Length is the length in bytes of the manifest file. + Length() int64 + // PartitionID is the ID of the partition spec used to write + // this manifest. It must be listed in the table metadata + // partition-specs. + PartitionID() int32 + // ManifestContent is the type of files tracked by this manifest, + // either data or delete files. All v1 manifests track data files. + ManifestContent() ManifestContent + // SnapshotID is the ID of the snapshot where this manifest file + // was added. + SnapshotID() int64 + // AddedDataFiles returns the number of entries in the manifest that + // have the status of EntryStatusADDED. + AddedDataFiles() int32 + // ExistingDataFiles returns the number of entries in the manifest + // which have the status of EntryStatusEXISTING. + ExistingDataFiles() int32 + // DeletedDataFiles returns the number of entries in the manifest + // which have the status of EntryStatusDELETED. + DeletedDataFiles() int32 + // AddedRows returns the number of rows in all files of the manifest + // that have status EntryStatusADDED. + AddedRows() int64 + // ExistingRows returns the number of rows in all files of the manifest + // which have status EntryStatusEXISTING. + ExistingRows() int64 + // DeletedRows returns the number of rows in all files of the manifest + // which have status EntryStatusDELETED. + DeletedRows() int64 + // SequenceNum returns the sequence number when this manifest was + // added to the table. Will be 0 for v1 manifest lists. + SequenceNum() int64 + // MinSequenceNum is the minimum data sequence number of all live data + // or delete files in the manifest. Will be 0 for v1 manifest lists. + MinSequenceNum() int64 + // Metadata returns implementation-specific key metadata for encryption + // if it exists in the manifest list. + Metadata() []byte + // PartitionList returns a list of field summaries for each partition + // field in the spec. Each field in the list corresponds to a field in + // the manifest file's partition spec. + PartitionList() []fieldSummary + + // HasAddedFiles returns true if AddedDataFiles > 0 or if it was null. + HasAddedFiles() bool + // HasExistingFiles returns true if ExistingDataFiles > 0 or if it was null. + HasExistingFiles() bool + // FetchEntries reads the manifest list file to fetch the list of + // manifest entries using the provided file system IO interface. + // If discardDeleted is true, entries for files containing deleted rows + // will be skipped. + FetchEntries(fs iceio.IO, discardDeleted bool) ([]ManifestEntry, error) +} + +// ReadManifestList reads in an avro manifest list file and returns a slice +// of manifest files or an error if one is encountered. +func ReadManifestList(in io.Reader) ([]ManifestFile, error) { + dec, err := ocf.NewDecoder(in) + if err != nil { + return nil, err + } + + out := make([]ManifestFile, 0) + + for dec.HasNext() { + var file ManifestFile + if string(dec.Metadata()["format-version"]) == "2" { + file = &manifestFileV2{} + } else { + file = &manifestFileV1{} + } + + if err := dec.Decode(file); err != nil { + return nil, err + } + out = append(out, file) + } + + return out, dec.Error() +} + +// ManifestEntryStatus defines constants for the entry status of +// existing, added or deleted. +type ManifestEntryStatus int8 + +const ( + EntryStatusEXISTING ManifestEntryStatus = 0 + EntryStatusADDED ManifestEntryStatus = 1 + EntryStatusDELETED ManifestEntryStatus = 2 +) + +// ManifestEntryContent defines constants for the type of file contents +// in the file entries. Data, Position based deletes and equality based +// deletes. +type ManifestEntryContent int8 + +const ( + EntryContentData ManifestEntryContent = 0 + EntryContentPosDeletes ManifestEntryContent = 1 + EntryContentEqDeletes ManifestEntryContent = 2 +) + +// FileFormat defines constants for the format of data files. +type FileFormat string + +const ( + AvroFile FileFormat = "AVRO" + OrcFile FileFormat = "ORC" + ParquetFile FileFormat = "PARQUET" +) + +type colMap[K, V any] struct { + Key K `avro:"key"` + Value V `avro:"value"` +} + +func avroColMapToMap[K comparable, V any](c *[]colMap[K, V]) map[K]V { + if c == nil { + return nil + } + + out := make(map[K]V) + for _, data := range *c { + out[data.Key] = data.Value + } + return out +} + +type dataFile struct { + Content ManifestEntryContent `avro:"content"` + Path string `avro:"file_path"` + Format FileFormat `avro:"file_format"` + PartitionData map[string]any `avro:"partition"` + RecordCount int64 `avro:"record_count"` + FileSize int64 `avro:"file_size_in_bytes"` + BlockSizeInBytes int64 `avro:"block_size_in_bytes"` Review Comment: I think this shouldn't be written for V2 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
