nastra commented on code in PR #3: URL: https://github.com/apache/iceberg-go/pull/3#discussion_r1326977977
########## manifest.go: ########## @@ -0,0 +1,655 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package iceberg + +import ( + "io" + "sync" + + iceio "github.com/apache/iceberg-go/io" + + "github.com/hamba/avro/v2/ocf" +) + +// ManifestContent indicates the type of data inside of the files +// described by a manifest. This will indicate whether the data files +// contain active data or deleted rows. +type ManifestContent int32 + +const ( + ManifestContentData ManifestContent = 0 + ManifestContentDeletes ManifestContent = 1 +) + +type fieldSummary struct { + ContainsNull bool `avro:"contains_null"` + ContainsNaN *bool `avro:"contains_nan"` + LowerBound *[]byte `avro:"lower_bound"` + UpperBound *[]byte `avro:"upper_bound"` +} + +type manifestFileV1 struct { + Path string `avro:"manifest_path"` + Len int64 `avro:"manifest_length"` + PartitionSpecID int32 `avro:"partition_spec_id"` + Content ManifestContent `avro:"content"` + AddedSnapshotID *int64 `avro:"added_snapshot_id"` + AddedFilesCount *int32 `avro:"added_data_files_count"` + ExistingFilesCount *int32 `avro:"existing_data_files_count"` + DeletedFilesCount *int32 `avro:"deleted_data_files_count"` + AddedRowsCount *int64 `avro:"added_rows_count"` + ExistingRowsCount *int64 `avro:"existing_rows_count"` + DeletedRowsCount *int64 `avro:"deleted_rows_count"` + Partitions *[]fieldSummary `avro:"partitions"` + KeyMetadata []byte `avro:"key_metadata"` +} + +func (*manifestFileV1) Version() int { return 1 } +func (m *manifestFileV1) FilePath() string { return m.Path } +func (m *manifestFileV1) Length() int64 { return m.Len } +func (m *manifestFileV1) PartitionID() int32 { return m.PartitionSpecID } +func (m *manifestFileV1) ManifestContent() ManifestContent { return m.Content } +func (m *manifestFileV1) SnapshotID() int64 { + if m.AddedSnapshotID == nil { + return 0 + } + return *m.AddedSnapshotID +} + +func (m *manifestFileV1) AddedDataFiles() int32 { + if m.AddedFilesCount == nil { + return 0 + } + return *m.AddedFilesCount +} + +func (m *manifestFileV1) ExistingDataFiles() int32 { + if m.ExistingFilesCount == nil { + return 0 + } + return *m.ExistingFilesCount +} + +func (m *manifestFileV1) DeletedDataFiles() int32 { + if m.DeletedFilesCount == nil { + return 0 + } + return *m.DeletedFilesCount +} + +func (m *manifestFileV1) AddedRows() int64 { + if m.AddedRowsCount == nil { + return 0 + } + return *m.AddedRowsCount +} + +func (m *manifestFileV1) ExistingRows() int64 { + if m.ExistingRowsCount == nil { + return 0 + } + return *m.ExistingRowsCount +} + +func (m *manifestFileV1) DeletedRows() int64 { + if m.DeletedRowsCount == nil { + return 0 + } + return *m.DeletedRowsCount +} + +func (m *manifestFileV1) HasAddedFiles() bool { + return m.AddedFilesCount == nil || *m.AddedFilesCount > 0 +} + +func (m *manifestFileV1) HasExistingFiles() bool { + return m.ExistingFilesCount == nil || *m.ExistingFilesCount > 0 +} + +func (m *manifestFileV1) SequenceNum() int64 { return 0 } +func (m *manifestFileV1) MinSequenceNum() int64 { return 0 } +func (m *manifestFileV1) Metadata() []byte { return m.KeyMetadata } +func (m *manifestFileV1) PartitionList() []fieldSummary { + if m.Partitions == nil { + return nil + } + return *m.Partitions +} + +func (m *manifestFileV1) FetchEntries(fs iceio.IO, discardDeleted bool) ([]ManifestEntry, error) { + return fetchManifestEntries(m, fs, discardDeleted) +} + +type manifestFileV2 struct { + Path string `avro:"manifest_path"` + Len int64 `avro:"manifest_length"` + PartitionSpecID int32 `avro:"partition_spec_id"` + Content ManifestContent `avro:"content"` + SeqNumber int64 `avro:"sequence_number"` + MinSeqNumber int64 `avro:"min_sequence_number"` + AddedSnapshotID int64 `avro:"added_snapshot_id"` + AddedFilesCount int32 `avro:"added_files_count"` + ExistingFilesCount int32 `avro:"existing_files_count"` + DeletedFilesCount int32 `avro:"deleted_files_count"` + AddedRowsCount int64 `avro:"added_rows_count"` + ExistingRowsCount int64 `avro:"existing_rows_count"` + DeletedRowsCount int64 `avro:"deleted_rows_count"` + Partitions *[]fieldSummary `avro:"partitions"` + KeyMetadata []byte `avro:"key_metadata"` +} + +func (*manifestFileV2) Version() int { return 2 } + +func (m *manifestFileV2) FilePath() string { return m.Path } +func (m *manifestFileV2) Length() int64 { return m.Len } +func (m *manifestFileV2) PartitionID() int32 { return m.PartitionSpecID } +func (m *manifestFileV2) ManifestContent() ManifestContent { return m.Content } +func (m *manifestFileV2) SnapshotID() int64 { + return m.AddedSnapshotID +} + +func (m *manifestFileV2) AddedDataFiles() int32 { + return m.AddedFilesCount +} + +func (m *manifestFileV2) ExistingDataFiles() int32 { + return m.ExistingFilesCount +} + +func (m *manifestFileV2) DeletedDataFiles() int32 { + return m.DeletedFilesCount +} + +func (m *manifestFileV2) AddedRows() int64 { + return m.AddedRowsCount +} + +func (m *manifestFileV2) ExistingRows() int64 { + return m.ExistingRowsCount +} + +func (m *manifestFileV2) DeletedRows() int64 { + return m.DeletedRowsCount +} + +func (m *manifestFileV2) SequenceNum() int64 { return m.SeqNumber } +func (m *manifestFileV2) MinSequenceNum() int64 { return m.MinSeqNumber } +func (m *manifestFileV2) Metadata() []byte { return m.KeyMetadata } + +func (m *manifestFileV2) PartitionList() []fieldSummary { + if m.Partitions == nil { + return nil + } + return *m.Partitions +} + +func (m *manifestFileV2) HasAddedFiles() bool { + return m.AddedFilesCount > 0 +} + +func (m *manifestFileV2) HasExistingFiles() bool { + return m.ExistingFilesCount > 0 +} + +func (m *manifestFileV2) FetchEntries(fs iceio.IO, discardDeleted bool) ([]ManifestEntry, error) { + return fetchManifestEntries(m, fs, discardDeleted) +} + +func fetchManifestEntries(m ManifestFile, fs iceio.IO, discardDeleted bool) ([]ManifestEntry, error) { + f, err := fs.Open(m.FilePath()) + if err != nil { + return nil, err + } + defer f.Close() + + dec, err := ocf.NewDecoder(f) + if err != nil { + return nil, err + } + + metadata := dec.Metadata() + isVer1 := true + if string(metadata["format-version"]) == "2" { + isVer1 = false + } + + results := make([]ManifestEntry, 0) + for dec.HasNext() { + var tmp ManifestEntry + if isVer1 { + tmp = &manifestEntryV1{} + } else { + tmp = &manifestEntryV2{} + } + + if err := dec.Decode(tmp); err != nil { + return nil, err + } + + if !discardDeleted || tmp.Status() != EntryStatusDELETED { + tmp.inheritSeqNum(m) + results = append(results, tmp) + } + } + + return results, dec.Error() +} + +// ManifestFile is the interface which covers both V1 and V2 manifest files. +type ManifestFile interface { + // Version returns the version number of this manifest file. + // It should be 1 or 2. + Version() int + // FilePath is the location URI of this manifest file. + FilePath() string + // Length is the length in bytes of the manifest file. + Length() int64 + // PartitionID is the ID of the partition spec used to write + // this manifest. It must be listed in the table metadata + // partition-specs. + PartitionID() int32 Review Comment: I think `PartitionSpecID()` is slightly better -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
