This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new 913941c feat(manifest): Add ManifestReader, which provides access to
manifest metadata (#417)
913941c is described below
commit 913941cea8f08937a12f873f0c95981c1b7c7862
Author: Joshua Humphries <[email protected]>
AuthorDate: Tue May 6 15:31:51 2025 -0400
feat(manifest): Add ManifestReader, which provides access to manifest
metadata (#417)
This is an alternative to #415.
This closes #386.
This also exports the `ReadManifest` helper (previously named
`readManifestEntries`), so the read flow has symmetry with the existing
`NewManifestWriter` and `WriteManifest` for the write flow.
---
manifest.go | 264 +++++++++++++++++++++++++++++++++++++++++++------------
manifest_test.go | 39 +++++---
2 files changed, 235 insertions(+), 68 deletions(-)
diff --git a/manifest.go b/manifest.go
index e09e892..e741734 100644
--- a/manifest.go
+++ b/manifest.go
@@ -450,62 +450,7 @@ func fetchManifestEntries(m ManifestFile, fs iceio.IO,
discardDeleted bool) ([]M
}
defer f.Close()
- return readManifestEntries(m, f, discardDeleted)
-}
-
-func readManifestEntries(m ManifestFile, f io.Reader, discardDeleted bool)
([]ManifestEntry, error) {
- dec, err := ocf.NewDecoder(f,
ocf.WithDecoderSchemaCache(&avro.SchemaCache{}))
- if err != nil {
- return nil, err
- }
-
- metadata := dec.Metadata()
- sc := dec.Schema()
-
- fieldNameToID, fieldIDToLogicalType := getFieldIDMap(sc)
- isFallback := false
- if string(metadata["format-version"]) == "1" {
- for _, f := range sc.(*avro.RecordSchema).Fields() {
- if f.Name() == "snapshot_id" {
- if f.Type().Type() != avro.Union {
- isFallback = true
- }
-
- break
- }
- }
- }
-
- results := make([]ManifestEntry, 0)
- for dec.HasNext() {
- var tmp ManifestEntry
- if isFallback {
- tmp = &fallbackManifestEntry{
- manifestEntry: manifestEntry{Data: &dataFile{}},
- }
- } else {
- tmp = &manifestEntry{Data: &dataFile{}}
- }
-
- if err := dec.Decode(tmp); err != nil {
- return nil, err
- }
-
- if isFallback {
- tmp = tmp.(*fallbackManifestEntry).toEntry()
- }
-
- if !discardDeleted || tmp.Status() != EntryStatusDELETED {
- tmp.inherit(m)
- if fieldToIDMap, ok :=
tmp.DataFile().(hasFieldToIDMap); ok {
- fieldToIDMap.setFieldNameToIDMap(fieldNameToID)
-
fieldToIDMap.setFieldIDToLogicalTypeMap(fieldIDToLogicalType)
- }
- results = append(results, tmp)
- }
- }
-
- return results, dec.Error()
+ return ReadManifest(m, f, discardDeleted)
}
// ManifestFile is the interface which covers both V1 and V2 manifest files.
@@ -614,6 +559,213 @@ func decodeManifests[I interface {
return results, dec.Error()
}
+// ManifestReader reads the metadata and data from an avro manifest file.
+// This type is not thread-safe; its methods should not be called from
+// multiple goroutines.
+type ManifestReader struct {
+ dec *ocf.Decoder
+ file ManifestFile
+ formatVersion int
+ isFallback bool
+ content ManifestContent
+ fieldNameToID map[string]int
+ fieldIDToType map[int]avro.LogicalType
+
+ // The rest are lazily populated, on demand. Most readers
+ // will likely only try to load the entries.
+ schema Schema
+ schemaLoaded bool
+ partitionSpec PartitionSpec
+ partitionSpecLoaded bool
+}
+
+// NewManifestReader returns a value that can read the contents of an avro
manifest
+// file. If the caller is interested in the manifest entries in the file, it
must call
+// [ManifestReader.Entries] before closing the provided reader.
+func NewManifestReader(file ManifestFile, in io.Reader) (*ManifestReader,
error) {
+ dec, err := ocf.NewDecoder(in,
ocf.WithDecoderSchemaCache(&avro.SchemaCache{}))
+ if err != nil {
+ return nil, err
+ }
+
+ metadata := dec.Metadata()
+ sc := dec.Schema()
+
+ formatVersion, err := strconv.Atoi(string(metadata["format-version"]))
+ if err != nil {
+ return nil, fmt.Errorf("manifest file's 'format-version'
metadata is invalid: %w", err)
+ }
+ if formatVersion != file.Version() {
+ return nil, fmt.Errorf("manifest file's 'format-version'
metadata indicates version %d, but entry from manifest list indicates version
%d",
+ formatVersion, file.Version())
+ }
+
+ var content ManifestContent
+ switch contentStr := string(metadata["content"]); contentStr {
+ case "data":
+ content = ManifestContentData
+ case "deletes":
+ content = ManifestContentDeletes
+ default:
+ return nil, fmt.Errorf("manifest file's 'content' metadata is
invalid, should be \"data\" or \"deletes\" but instead is %q",
+ contentStr)
+ }
+ if content != file.ManifestContent() {
+ return nil, fmt.Errorf("manifest file's 'content' metadata
indicates %q, but entry from manifest list indicates %q",
+ content.String(), file.ManifestContent().String())
+ }
+
+ isFallback := false
+ if formatVersion == 1 {
+ for _, f := range sc.(*avro.RecordSchema).Fields() {
+ if f.Name() == "snapshot_id" {
+ if f.Type().Type() != avro.Union {
+ isFallback = true
+ }
+
+ break
+ }
+ }
+ }
+ fieldNameToID, fieldIDToType := getFieldIDMap(sc)
+
+ return &ManifestReader{
+ dec: dec,
+ file: file,
+ formatVersion: formatVersion,
+ isFallback: isFallback,
+ content: content,
+ fieldNameToID: fieldNameToID,
+ fieldIDToType: fieldIDToType,
+ }, nil
+}
+
+// Version returns the file's format version.
+func (c *ManifestReader) Version() int {
+ return c.formatVersion
+}
+
+// ManifestContent returns the type of content in the manifest file.
+func (c *ManifestReader) ManifestContent() ManifestContent {
+ return c.content
+}
+
+// SchemaID returns the schema ID encoded in the avro file's metadata.
+func (c *ManifestReader) SchemaID() (int, error) {
+ id, err := strconv.Atoi(string(c.dec.Metadata()["schema-id"]))
+ if err != nil {
+ return 0, fmt.Errorf("manifest file's 'schema-id' metadata is
invalid: %w", err)
+ }
+
+ return id, nil
+}
+
+// Schema returns the schema encoded in the avro file's metadata.
+func (c *ManifestReader) Schema() (*Schema, error) {
+ if !c.schemaLoaded {
+ schemaID, err := c.SchemaID()
+ if err != nil {
+ return nil, err
+ }
+ if err := json.Unmarshal(c.dec.Metadata()["schema"],
&c.schema); err != nil {
+ return nil, fmt.Errorf("manifest file's 'schema'
metadata is invalid: %w", err)
+ }
+ c.schema.ID = schemaID
+ c.schemaLoaded = true
+ }
+
+ return &c.schema, nil
+}
+
+// PartitionSpecID returns the partition spec ID encoded in the avro file's
metadata.
+func (c *ManifestReader) PartitionSpecID() (int, error) {
+ id, err := strconv.Atoi(string(c.dec.Metadata()["partition-spec-id"]))
+ if err != nil {
+ return 0, fmt.Errorf("manifest file's 'partition-spec-id'
metadata is invalid: %w", err)
+ }
+ if id != int(c.file.PartitionSpecID()) {
+ return 0, fmt.Errorf("manifest file's 'partition-spec-id'
metadata indicates %d, but entry from manifest list indicates %d",
+ id, c.file.PartitionSpecID())
+ }
+
+ return id, nil
+}
+
+// PartitionSpec returns the partition spec encoded in the avro file's
metadata.
+func (c *ManifestReader) PartitionSpec() (*PartitionSpec, error) {
+ if !c.partitionSpecLoaded {
+ partitionSpecID, err := c.PartitionSpecID()
+ if err != nil {
+ return nil, err
+ }
+ if err := json.Unmarshal(c.dec.Metadata()["partition-spec"],
&c.partitionSpec.fields); err != nil {
+ return nil, fmt.Errorf("manifest file's
'partition-spec' metadata is invalid: %w", err)
+ }
+ c.partitionSpec.id = partitionSpecID
+ c.partitionSpec.initialize()
+ c.partitionSpecLoaded = true
+ }
+
+ return &c.partitionSpec, nil
+}
+
+// ReadEntry reads the next manifest entry in the avro file's data.
+func (c *ManifestReader) ReadEntry() (ManifestEntry, error) {
+ if err := c.dec.Error(); err != nil {
+ return nil, err
+ }
+ if !c.dec.HasNext() {
+ return nil, io.EOF
+ }
+ var tmp ManifestEntry
+ if c.isFallback {
+ tmp = &fallbackManifestEntry{
+ manifestEntry: manifestEntry{Data: &dataFile{}},
+ }
+ } else {
+ tmp = &manifestEntry{Data: &dataFile{}}
+ }
+
+ if err := c.dec.Decode(tmp); err != nil {
+ return nil, err
+ }
+ if c.isFallback {
+ tmp = tmp.(*fallbackManifestEntry).toEntry()
+ }
+ tmp.inherit(c.file)
+ if fieldToIDMap, ok := tmp.DataFile().(hasFieldToIDMap); ok {
+ fieldToIDMap.setFieldNameToIDMap(c.fieldNameToID)
+ fieldToIDMap.setFieldIDToLogicalTypeMap(c.fieldIDToType)
+ }
+
+ return tmp, nil
+}
+
+// ReadManifest reads in an avro list file and returns a slice
+// of manifest entries or an error if one is encountered. If discardDeleted
+// is true, the returned slice omits entries whose status is "deleted".
+func ReadManifest(m ManifestFile, f io.Reader, discardDeleted bool)
([]ManifestEntry, error) {
+ manifestReader, err := NewManifestReader(m, f)
+ if err != nil {
+ return nil, err
+ }
+ var results []ManifestEntry
+ for {
+ entry, err := manifestReader.ReadEntry()
+ if err != nil {
+ if errors.Is(err, io.EOF) {
+ return results, nil
+ }
+
+ return results, err
+ }
+ if discardDeleted && entry.Status() == EntryStatusDELETED {
+ continue
+ }
+ results = append(results, entry)
+ }
+}
+
// ReadManifestList reads in an avro manifest list file and returns a slice
// of manifest files or an error if one is encountered.
func ReadManifestList(in io.Reader) ([]ManifestFile, error) {
diff --git a/manifest_test.go b/manifest_test.go
index aeda897..66bc07f 100644
--- a/manifest_test.go
+++ b/manifest_test.go
@@ -832,7 +832,7 @@ func (m *ManifestTestSuite)
TestReadManifestIncompleteSchema() {
)
m.NoError(err)
- entries, err := readManifestEntries(file, &buf, false)
+ entries, err := ReadManifest(file, &buf, false)
m.NoError(err)
m.Len(entries, 1)
@@ -907,26 +907,43 @@ func (m *ManifestTestSuite)
TestReadManifestIncompleteSchema() {
}
// This should fail because the file's schema is incomplete.
- _, err = readManifestEntries(file, &buf, false)
+ _, err = ReadManifest(file, &buf, false)
m.ErrorContains(err, "unknown type: r2")
}
func (m *ManifestTestSuite) TestManifestEntriesV2() {
- var mockfs internal.MockFS
manifest := manifestFile{
version: 2,
+ SpecID: 1,
Path: manifestFileRecordsV2[0].FilePath(),
}
- mockfs.Test(m.T())
- mockfs.On("Open", manifest.FilePath()).Return(&internal.MockFile{
+ partitionSpec := NewPartitionSpecID(1,
+ PartitionField{FieldID: 1000, SourceID: 1, Name: "VendorID",
Transform: IdentityTransform{}},
+ PartitionField{FieldID: 1001, SourceID: 2, Name:
"tpep_pickup_datetime", Transform: IdentityTransform{}})
+
+ mockedFile := &internal.MockFile{
Contents: bytes.NewReader(m.v2ManifestEntries.Bytes()),
- }, nil)
- defer mockfs.AssertExpectations(m.T())
- entries, err := manifest.FetchEntries(&mockfs, false)
+ }
+ manifestReader, err := NewManifestReader(&manifest, mockedFile)
m.Require().NoError(err)
- m.Len(entries, 2)
- m.Zero(manifest.PartitionSpecID())
+ m.Equal(2, manifestReader.Version())
+ m.Equal(ManifestContentData, manifestReader.ManifestContent())
+ loadedSchema, err := manifestReader.Schema()
+ m.Require().NoError(err)
+ m.True(loadedSchema.Equals(testSchema))
+ loadedPartitionSpec, err := manifestReader.PartitionSpec()
+ m.Require().NoError(err)
+ m.True(loadedPartitionSpec.Equals(partitionSpec))
+
+ entry1, err := manifestReader.ReadEntry()
+ m.Require().NoError(err)
+ _, err = manifestReader.ReadEntry()
+ m.Require().NoError(err)
+ _, err = manifestReader.ReadEntry()
+ m.Require().ErrorIs(err, io.EOF)
+
+ m.Equal(int32(1), manifest.PartitionSpecID())
m.Zero(manifest.SnapshotID())
m.Zero(manifest.AddedDataFiles())
m.Zero(manifest.ExistingDataFiles())
@@ -935,8 +952,6 @@ func (m *ManifestTestSuite) TestManifestEntriesV2() {
m.Zero(manifest.DeletedRows())
m.Zero(manifest.AddedRows())
- entry1 := entries[0]
-
m.Equal(EntryStatusADDED, entry1.Status())
m.Equal(entrySnapshotID, entry1.SnapshotID())
m.Zero(entry1.SequenceNum())