This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 913941c  feat(manifest): Add ManifestReader, which provides access to 
manifest metadata (#417)
913941c is described below

commit 913941cea8f08937a12f873f0c95981c1b7c7862
Author: Joshua Humphries <[email protected]>
AuthorDate: Tue May 6 15:31:51 2025 -0400

    feat(manifest): Add ManifestReader, which provides access to manifest 
metadata (#417)
    
    This is an alternative to #415.
    
    This closes #386.
    
    This also exports the `ReadManifest` helper (previously named
    `readManifestEntries`), so the read flow has symmetry with the existing
    `NewManifestWriter` and `WriteManifest` for the write flow.
---
 manifest.go      | 264 +++++++++++++++++++++++++++++++++++++++++++------------
 manifest_test.go |  39 +++++---
 2 files changed, 235 insertions(+), 68 deletions(-)

diff --git a/manifest.go b/manifest.go
index e09e892..e741734 100644
--- a/manifest.go
+++ b/manifest.go
@@ -450,62 +450,7 @@ func fetchManifestEntries(m ManifestFile, fs iceio.IO, 
discardDeleted bool) ([]M
        }
        defer f.Close()
 
-       return readManifestEntries(m, f, discardDeleted)
-}
-
-func readManifestEntries(m ManifestFile, f io.Reader, discardDeleted bool) 
([]ManifestEntry, error) {
-       dec, err := ocf.NewDecoder(f, 
ocf.WithDecoderSchemaCache(&avro.SchemaCache{}))
-       if err != nil {
-               return nil, err
-       }
-
-       metadata := dec.Metadata()
-       sc := dec.Schema()
-
-       fieldNameToID, fieldIDToLogicalType := getFieldIDMap(sc)
-       isFallback := false
-       if string(metadata["format-version"]) == "1" {
-               for _, f := range sc.(*avro.RecordSchema).Fields() {
-                       if f.Name() == "snapshot_id" {
-                               if f.Type().Type() != avro.Union {
-                                       isFallback = true
-                               }
-
-                               break
-                       }
-               }
-       }
-
-       results := make([]ManifestEntry, 0)
-       for dec.HasNext() {
-               var tmp ManifestEntry
-               if isFallback {
-                       tmp = &fallbackManifestEntry{
-                               manifestEntry: manifestEntry{Data: &dataFile{}},
-                       }
-               } else {
-                       tmp = &manifestEntry{Data: &dataFile{}}
-               }
-
-               if err := dec.Decode(tmp); err != nil {
-                       return nil, err
-               }
-
-               if isFallback {
-                       tmp = tmp.(*fallbackManifestEntry).toEntry()
-               }
-
-               if !discardDeleted || tmp.Status() != EntryStatusDELETED {
-                       tmp.inherit(m)
-                       if fieldToIDMap, ok := 
tmp.DataFile().(hasFieldToIDMap); ok {
-                               fieldToIDMap.setFieldNameToIDMap(fieldNameToID)
-                               
fieldToIDMap.setFieldIDToLogicalTypeMap(fieldIDToLogicalType)
-                       }
-                       results = append(results, tmp)
-               }
-       }
-
-       return results, dec.Error()
+       return ReadManifest(m, f, discardDeleted)
 }
 
 // ManifestFile is the interface which covers both V1 and V2 manifest files.
@@ -614,6 +559,213 @@ func decodeManifests[I interface {
        return results, dec.Error()
 }
 
+// ManifestReader reads the metadata and data from an avro manifest file.
+// This type is not thread-safe; its methods should not be called from
+// multiple goroutines.
+type ManifestReader struct {
+       dec           *ocf.Decoder
+       file          ManifestFile
+       formatVersion int
+       isFallback    bool
+       content       ManifestContent
+       fieldNameToID map[string]int
+       fieldIDToType map[int]avro.LogicalType
+
+       // The rest are lazily populated, on demand. Most readers
+       // will likely only try to load the entries.
+       schema              Schema
+       schemaLoaded        bool
+       partitionSpec       PartitionSpec
+       partitionSpecLoaded bool
+}
+
+// NewManifestReader returns a value that can read the contents of an avro 
manifest
+// file. If the caller is interested in the manifest entries in the file, it 
must call
+// [ManifestReader.Entries] before closing the provided reader.
+func NewManifestReader(file ManifestFile, in io.Reader) (*ManifestReader, 
error) {
+       dec, err := ocf.NewDecoder(in, 
ocf.WithDecoderSchemaCache(&avro.SchemaCache{}))
+       if err != nil {
+               return nil, err
+       }
+
+       metadata := dec.Metadata()
+       sc := dec.Schema()
+
+       formatVersion, err := strconv.Atoi(string(metadata["format-version"]))
+       if err != nil {
+               return nil, fmt.Errorf("manifest file's 'format-version' 
metadata is invalid: %w", err)
+       }
+       if formatVersion != file.Version() {
+               return nil, fmt.Errorf("manifest file's 'format-version' 
metadata indicates version %d, but entry from manifest list indicates version 
%d",
+                       formatVersion, file.Version())
+       }
+
+       var content ManifestContent
+       switch contentStr := string(metadata["content"]); contentStr {
+       case "data":
+               content = ManifestContentData
+       case "deletes":
+               content = ManifestContentDeletes
+       default:
+               return nil, fmt.Errorf("manifest file's 'content' metadata is 
invalid, should be \"data\" or \"deletes\" but instead is %q",
+                       contentStr)
+       }
+       if content != file.ManifestContent() {
+               return nil, fmt.Errorf("manifest file's 'content' metadata 
indicates %q, but entry from manifest list indicates %q",
+                       content.String(), file.ManifestContent().String())
+       }
+
+       isFallback := false
+       if formatVersion == 1 {
+               for _, f := range sc.(*avro.RecordSchema).Fields() {
+                       if f.Name() == "snapshot_id" {
+                               if f.Type().Type() != avro.Union {
+                                       isFallback = true
+                               }
+
+                               break
+                       }
+               }
+       }
+       fieldNameToID, fieldIDToType := getFieldIDMap(sc)
+
+       return &ManifestReader{
+               dec:           dec,
+               file:          file,
+               formatVersion: formatVersion,
+               isFallback:    isFallback,
+               content:       content,
+               fieldNameToID: fieldNameToID,
+               fieldIDToType: fieldIDToType,
+       }, nil
+}
+
+// Version returns the file's format version.
+func (c *ManifestReader) Version() int {
+       return c.formatVersion
+}
+
+// ManifestContent returns the type of content in the manifest file.
+func (c *ManifestReader) ManifestContent() ManifestContent {
+       return c.content
+}
+
+// SchemaID returns the schema ID encoded in the avro file's metadata.
+func (c *ManifestReader) SchemaID() (int, error) {
+       id, err := strconv.Atoi(string(c.dec.Metadata()["schema-id"]))
+       if err != nil {
+               return 0, fmt.Errorf("manifest file's 'schema-id' metadata is 
invalid: %w", err)
+       }
+
+       return id, nil
+}
+
+// Schema returns the schema encoded in the avro file's metadata.
+func (c *ManifestReader) Schema() (*Schema, error) {
+       if !c.schemaLoaded {
+               schemaID, err := c.SchemaID()
+               if err != nil {
+                       return nil, err
+               }
+               if err := json.Unmarshal(c.dec.Metadata()["schema"], 
&c.schema); err != nil {
+                       return nil, fmt.Errorf("manifest file's 'schema' 
metadata is invalid: %w", err)
+               }
+               c.schema.ID = schemaID
+               c.schemaLoaded = true
+       }
+
+       return &c.schema, nil
+}
+
+// PartitionSpecID returns the partition spec ID encoded in the avro file's 
metadata.
+func (c *ManifestReader) PartitionSpecID() (int, error) {
+       id, err := strconv.Atoi(string(c.dec.Metadata()["partition-spec-id"]))
+       if err != nil {
+               return 0, fmt.Errorf("manifest file's 'partition-spec-id' 
metadata is invalid: %w", err)
+       }
+       if id != int(c.file.PartitionSpecID()) {
+               return 0, fmt.Errorf("manifest file's 'partition-spec-id' 
metadata indicates %d, but entry from manifest list indicates %d",
+                       id, c.file.PartitionSpecID())
+       }
+
+       return id, nil
+}
+
+// PartitionSpec returns the partition spec encoded in the avro file's 
metadata.
+func (c *ManifestReader) PartitionSpec() (*PartitionSpec, error) {
+       if !c.partitionSpecLoaded {
+               partitionSpecID, err := c.PartitionSpecID()
+               if err != nil {
+                       return nil, err
+               }
+               if err := json.Unmarshal(c.dec.Metadata()["partition-spec"], 
&c.partitionSpec.fields); err != nil {
+                       return nil, fmt.Errorf("manifest file's 
'partition-spec' metadata is invalid: %w", err)
+               }
+               c.partitionSpec.id = partitionSpecID
+               c.partitionSpec.initialize()
+               c.partitionSpecLoaded = true
+       }
+
+       return &c.partitionSpec, nil
+}
+
+// ReadEntry reads the next manifest entry in the avro file's data.
+func (c *ManifestReader) ReadEntry() (ManifestEntry, error) {
+       if err := c.dec.Error(); err != nil {
+               return nil, err
+       }
+       if !c.dec.HasNext() {
+               return nil, io.EOF
+       }
+       var tmp ManifestEntry
+       if c.isFallback {
+               tmp = &fallbackManifestEntry{
+                       manifestEntry: manifestEntry{Data: &dataFile{}},
+               }
+       } else {
+               tmp = &manifestEntry{Data: &dataFile{}}
+       }
+
+       if err := c.dec.Decode(tmp); err != nil {
+               return nil, err
+       }
+       if c.isFallback {
+               tmp = tmp.(*fallbackManifestEntry).toEntry()
+       }
+       tmp.inherit(c.file)
+       if fieldToIDMap, ok := tmp.DataFile().(hasFieldToIDMap); ok {
+               fieldToIDMap.setFieldNameToIDMap(c.fieldNameToID)
+               fieldToIDMap.setFieldIDToLogicalTypeMap(c.fieldIDToType)
+       }
+
+       return tmp, nil
+}
+
+// ReadManifest reads in an avro list file and returns a slice
+// of manifest entries or an error if one is encountered. If discardDeleted
+// is true, the returned slice omits entries whose status is "deleted".
+func ReadManifest(m ManifestFile, f io.Reader, discardDeleted bool) 
([]ManifestEntry, error) {
+       manifestReader, err := NewManifestReader(m, f)
+       if err != nil {
+               return nil, err
+       }
+       var results []ManifestEntry
+       for {
+               entry, err := manifestReader.ReadEntry()
+               if err != nil {
+                       if errors.Is(err, io.EOF) {
+                               return results, nil
+                       }
+
+                       return results, err
+               }
+               if discardDeleted && entry.Status() == EntryStatusDELETED {
+                       continue
+               }
+               results = append(results, entry)
+       }
+}
+
 // ReadManifestList reads in an avro manifest list file and returns a slice
 // of manifest files or an error if one is encountered.
 func ReadManifestList(in io.Reader) ([]ManifestFile, error) {
diff --git a/manifest_test.go b/manifest_test.go
index aeda897..66bc07f 100644
--- a/manifest_test.go
+++ b/manifest_test.go
@@ -832,7 +832,7 @@ func (m *ManifestTestSuite) 
TestReadManifestIncompleteSchema() {
        )
        m.NoError(err)
 
-       entries, err := readManifestEntries(file, &buf, false)
+       entries, err := ReadManifest(file, &buf, false)
        m.NoError(err)
        m.Len(entries, 1)
 
@@ -907,26 +907,43 @@ func (m *ManifestTestSuite) 
TestReadManifestIncompleteSchema() {
        }
 
        // This should fail because the file's schema is incomplete.
-       _, err = readManifestEntries(file, &buf, false)
+       _, err = ReadManifest(file, &buf, false)
        m.ErrorContains(err, "unknown type: r2")
 }
 
 func (m *ManifestTestSuite) TestManifestEntriesV2() {
-       var mockfs internal.MockFS
        manifest := manifestFile{
                version: 2,
+               SpecID:  1,
                Path:    manifestFileRecordsV2[0].FilePath(),
        }
 
-       mockfs.Test(m.T())
-       mockfs.On("Open", manifest.FilePath()).Return(&internal.MockFile{
+       partitionSpec := NewPartitionSpecID(1,
+               PartitionField{FieldID: 1000, SourceID: 1, Name: "VendorID", 
Transform: IdentityTransform{}},
+               PartitionField{FieldID: 1001, SourceID: 2, Name: 
"tpep_pickup_datetime", Transform: IdentityTransform{}})
+
+       mockedFile := &internal.MockFile{
                Contents: bytes.NewReader(m.v2ManifestEntries.Bytes()),
-       }, nil)
-       defer mockfs.AssertExpectations(m.T())
-       entries, err := manifest.FetchEntries(&mockfs, false)
+       }
+       manifestReader, err := NewManifestReader(&manifest, mockedFile)
        m.Require().NoError(err)
-       m.Len(entries, 2)
-       m.Zero(manifest.PartitionSpecID())
+       m.Equal(2, manifestReader.Version())
+       m.Equal(ManifestContentData, manifestReader.ManifestContent())
+       loadedSchema, err := manifestReader.Schema()
+       m.Require().NoError(err)
+       m.True(loadedSchema.Equals(testSchema))
+       loadedPartitionSpec, err := manifestReader.PartitionSpec()
+       m.Require().NoError(err)
+       m.True(loadedPartitionSpec.Equals(partitionSpec))
+
+       entry1, err := manifestReader.ReadEntry()
+       m.Require().NoError(err)
+       _, err = manifestReader.ReadEntry()
+       m.Require().NoError(err)
+       _, err = manifestReader.ReadEntry()
+       m.Require().ErrorIs(err, io.EOF)
+
+       m.Equal(int32(1), manifest.PartitionSpecID())
        m.Zero(manifest.SnapshotID())
        m.Zero(manifest.AddedDataFiles())
        m.Zero(manifest.ExistingDataFiles())
@@ -935,8 +952,6 @@ func (m *ManifestTestSuite) TestManifestEntriesV2() {
        m.Zero(manifest.DeletedRows())
        m.Zero(manifest.AddedRows())
 
-       entry1 := entries[0]
-
        m.Equal(EntryStatusADDED, entry1.Status())
        m.Equal(entrySnapshotID, entry1.SnapshotID())
        m.Zero(entry1.SequenceNum())

Reply via email to