nastra commented on code in PR #3: URL: https://github.com/apache/iceberg-go/pull/3#discussion_r1326981591
########## manifest.go: ########## @@ -0,0 +1,655 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package iceberg + +import ( + "io" + "sync" + + iceio "github.com/apache/iceberg-go/io" + + "github.com/hamba/avro/v2/ocf" +) + +// ManifestContent indicates the type of data inside of the files +// described by a manifest. This will indicate whether the data files +// contain active data or deleted rows. +type ManifestContent int32 + +const ( + ManifestContentData ManifestContent = 0 + ManifestContentDeletes ManifestContent = 1 +) + +type fieldSummary struct { + ContainsNull bool `avro:"contains_null"` + ContainsNaN *bool `avro:"contains_nan"` + LowerBound *[]byte `avro:"lower_bound"` + UpperBound *[]byte `avro:"upper_bound"` +} + +type manifestFileV1 struct { + Path string `avro:"manifest_path"` + Len int64 `avro:"manifest_length"` + PartitionSpecID int32 `avro:"partition_spec_id"` + Content ManifestContent `avro:"content"` + AddedSnapshotID *int64 `avro:"added_snapshot_id"` + AddedFilesCount *int32 `avro:"added_data_files_count"` + ExistingFilesCount *int32 `avro:"existing_data_files_count"` + DeletedFilesCount *int32 `avro:"deleted_data_files_count"` + AddedRowsCount *int64 `avro:"added_rows_count"` + ExistingRowsCount *int64 `avro:"existing_rows_count"` + DeletedRowsCount *int64 `avro:"deleted_rows_count"` + Partitions *[]fieldSummary `avro:"partitions"` + KeyMetadata []byte `avro:"key_metadata"` +} + +func (*manifestFileV1) Version() int { return 1 } +func (m *manifestFileV1) FilePath() string { return m.Path } +func (m *manifestFileV1) Length() int64 { return m.Len } +func (m *manifestFileV1) PartitionID() int32 { return m.PartitionSpecID } +func (m *manifestFileV1) ManifestContent() ManifestContent { return m.Content } +func (m *manifestFileV1) SnapshotID() int64 { + if m.AddedSnapshotID == nil { + return 0 + } + return *m.AddedSnapshotID +} + +func (m *manifestFileV1) AddedDataFiles() int32 { + if m.AddedFilesCount == nil { + return 0 + } + return *m.AddedFilesCount +} + +func (m *manifestFileV1) ExistingDataFiles() int32 { + if m.ExistingFilesCount == nil { + return 0 + } + return *m.ExistingFilesCount +} + +func (m *manifestFileV1) DeletedDataFiles() int32 { + if m.DeletedFilesCount == nil { + return 0 + } + return *m.DeletedFilesCount +} + +func (m *manifestFileV1) AddedRows() int64 { + if m.AddedRowsCount == nil { + return 0 + } + return *m.AddedRowsCount +} + +func (m *manifestFileV1) ExistingRows() int64 { + if m.ExistingRowsCount == nil { + return 0 + } + return *m.ExistingRowsCount +} + +func (m *manifestFileV1) DeletedRows() int64 { + if m.DeletedRowsCount == nil { + return 0 + } + return *m.DeletedRowsCount +} + +func (m *manifestFileV1) HasAddedFiles() bool { + return m.AddedFilesCount == nil || *m.AddedFilesCount > 0 +} + +func (m *manifestFileV1) HasExistingFiles() bool { + return m.ExistingFilesCount == nil || *m.ExistingFilesCount > 0 +} + +func (m *manifestFileV1) SequenceNum() int64 { return 0 } +func (m *manifestFileV1) MinSequenceNum() int64 { return 0 } +func (m *manifestFileV1) Metadata() []byte { return m.KeyMetadata } +func (m *manifestFileV1) PartitionList() []fieldSummary { + if m.Partitions == nil { + return nil + } + return *m.Partitions +} + +func (m *manifestFileV1) FetchEntries(fs iceio.IO, discardDeleted bool) ([]ManifestEntry, error) { + return fetchManifestEntries(m, fs, discardDeleted) +} + +type manifestFileV2 struct { + Path string `avro:"manifest_path"` + Len int64 `avro:"manifest_length"` + PartitionSpecID int32 `avro:"partition_spec_id"` + Content ManifestContent `avro:"content"` + SeqNumber int64 `avro:"sequence_number"` + MinSeqNumber int64 `avro:"min_sequence_number"` + AddedSnapshotID int64 `avro:"added_snapshot_id"` + AddedFilesCount int32 `avro:"added_files_count"` + ExistingFilesCount int32 `avro:"existing_files_count"` + DeletedFilesCount int32 `avro:"deleted_files_count"` + AddedRowsCount int64 `avro:"added_rows_count"` + ExistingRowsCount int64 `avro:"existing_rows_count"` + DeletedRowsCount int64 `avro:"deleted_rows_count"` + Partitions *[]fieldSummary `avro:"partitions"` + KeyMetadata []byte `avro:"key_metadata"` +} + +func (*manifestFileV2) Version() int { return 2 } + +func (m *manifestFileV2) FilePath() string { return m.Path } +func (m *manifestFileV2) Length() int64 { return m.Len } +func (m *manifestFileV2) PartitionID() int32 { return m.PartitionSpecID } +func (m *manifestFileV2) ManifestContent() ManifestContent { return m.Content } +func (m *manifestFileV2) SnapshotID() int64 { + return m.AddedSnapshotID +} + +func (m *manifestFileV2) AddedDataFiles() int32 { + return m.AddedFilesCount +} + +func (m *manifestFileV2) ExistingDataFiles() int32 { + return m.ExistingFilesCount +} + +func (m *manifestFileV2) DeletedDataFiles() int32 { + return m.DeletedFilesCount +} + +func (m *manifestFileV2) AddedRows() int64 { + return m.AddedRowsCount +} + +func (m *manifestFileV2) ExistingRows() int64 { + return m.ExistingRowsCount +} + +func (m *manifestFileV2) DeletedRows() int64 { + return m.DeletedRowsCount +} + +func (m *manifestFileV2) SequenceNum() int64 { return m.SeqNumber } +func (m *manifestFileV2) MinSequenceNum() int64 { return m.MinSeqNumber } +func (m *manifestFileV2) Metadata() []byte { return m.KeyMetadata } + +func (m *manifestFileV2) PartitionList() []fieldSummary { + if m.Partitions == nil { + return nil + } + return *m.Partitions +} + +func (m *manifestFileV2) HasAddedFiles() bool { + return m.AddedFilesCount > 0 +} + +func (m *manifestFileV2) HasExistingFiles() bool { + return m.ExistingFilesCount > 0 +} + +func (m *manifestFileV2) FetchEntries(fs iceio.IO, discardDeleted bool) ([]ManifestEntry, error) { + return fetchManifestEntries(m, fs, discardDeleted) +} + +func fetchManifestEntries(m ManifestFile, fs iceio.IO, discardDeleted bool) ([]ManifestEntry, error) { + f, err := fs.Open(m.FilePath()) + if err != nil { + return nil, err + } + defer f.Close() + + dec, err := ocf.NewDecoder(f) + if err != nil { + return nil, err + } + + metadata := dec.Metadata() + isVer1 := true + if string(metadata["format-version"]) == "2" { + isVer1 = false + } + + results := make([]ManifestEntry, 0) + for dec.HasNext() { + var tmp ManifestEntry + if isVer1 { + tmp = &manifestEntryV1{} + } else { + tmp = &manifestEntryV2{} + } + + if err := dec.Decode(tmp); err != nil { + return nil, err + } + + if !discardDeleted || tmp.Status() != EntryStatusDELETED { + tmp.inheritSeqNum(m) + results = append(results, tmp) + } + } + + return results, dec.Error() +} + +// ManifestFile is the interface which covers both V1 and V2 manifest files. +type ManifestFile interface { + // Version returns the version number of this manifest file. + // It should be 1 or 2. + Version() int + // FilePath is the location URI of this manifest file. + FilePath() string + // Length is the length in bytes of the manifest file. + Length() int64 + // PartitionID is the ID of the partition spec used to write + // this manifest. It must be listed in the table metadata + // partition-specs. + PartitionID() int32 + // ManifestContent is the type of files tracked by this manifest, + // either data or delete files. All v1 manifests track data files. + ManifestContent() ManifestContent + // SnapshotID is the ID of the snapshot where this manifest file + // was added. + SnapshotID() int64 + // AddedDataFiles returns the number of entries in the manifest that + // have the status of EntryStatusADDED. + AddedDataFiles() int32 + // ExistingDataFiles returns the number of entries in the manifest + // which have the status of EntryStatusEXISTING. + ExistingDataFiles() int32 + // DeletedDataFiles returns the number of entries in the manifest + // which have the status of EntryStatusDELETED. + DeletedDataFiles() int32 + // AddedRows returns the number of rows in all files of the manifest + // that have status EntryStatusADDED. + AddedRows() int64 + // ExistingRows returns the number of rows in all files of the manifest + // which have status EntryStatusEXISTING. + ExistingRows() int64 + // DeletedRows returns the number of rows in all files of the manifest + // which have status EntryStatusDELETED. + DeletedRows() int64 + // SequenceNum returns the sequence number when this manifest was + // added to the table. Will be 0 for v1 manifest lists. + SequenceNum() int64 + // MinSequenceNum is the minimum data sequence number of all live data + // or delete files in the manifest. Will be 0 for v1 manifest lists. + MinSequenceNum() int64 + // Metadata returns implementation-specific key metadata for encryption + // if it exists in the manifest list. + Metadata() []byte Review Comment: I'd rather name this `KeyMetadata()` to have a clear distinction later between this and table metadata -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
