This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new b4ca5150 feat(manifest): Add tests for Entries and migrate 
snapshotProducer (#1031)
b4ca5150 is described below

commit b4ca5150b6b43c60f35566ee4cc2677354c7b583
Author: Artem Alperin <[email protected]>
AuthorDate: Wed May 6 21:32:47 2026 +0200

    feat(manifest): Add tests for Entries and migrate snapshotProducer (#1031)
    
    Related to https://github.com/apache/iceberg-go/pull/985
---
 manifest.go                 |  18 ++++++
 manifest_test.go            | 148 ++++++++++++++++++++++++++++++++++++++++++++
 table/snapshot_producers.go |  51 +++++----------
 3 files changed, 183 insertions(+), 34 deletions(-)

diff --git a/manifest.go b/manifest.go
index c79222d1..602586f6 100644
--- a/manifest.go
+++ b/manifest.go
@@ -492,6 +492,24 @@ type ManifestFile interface {
        //
        // Prefer Entries over FetchEntries when walking large manifests
        // since it avoids loading every entry into memory at once.
+       //
+       // Iteration contract:
+       //
+       //   - On the first error encountered while opening the manifest, 
decoding
+       //     a record, or applying inheritance, the iterator yields (nil, err)
+       //     and then stops. Callers must treat any non-nil error as terminal 
and
+       //     break or return without consuming further values.
+       //   - When iteration ends without an error from the read path, the
+       //     iterator may yield a final (nil, closeErr) pair if closing the
+       //     underlying file or manifest reader returns an error. This 
terminal
+       //     close error is reported only when the consumer ranged through 
every
+       //     value; an early break suppresses it (see below).
+       //   - Breaking out of the range loop (or any other early termination of
+       //     the yield function) is safe: the iterator releases the underlying
+       //     file handle and reader before returning, and no close error from
+       //     that path is yielded — the caller has already signalled it is no
+       //     longer interested in further values, so an extra synthetic
+       //     (nil, closeErr) tail would be discarded anyway.
        Entries(fs iceio.IO, discardDeleted bool) iter.Seq2[ManifestEntry, 
error]
        // FetchEntries reads the manifest list file to fetch the list of
        // manifest entries using the provided file system IO interface.
diff --git a/manifest_test.go b/manifest_test.go
index be5e31f0..36966d45 100644
--- a/manifest_test.go
+++ b/manifest_test.go
@@ -23,11 +23,13 @@ import (
        "encoding/json"
        "errors"
        "io"
+       "io/fs"
        "strconv"
        "testing"
        "time"
 
        "github.com/apache/iceberg-go/internal"
+       iceio "github.com/apache/iceberg-go/io"
        "github.com/stretchr/testify/suite"
        "github.com/twmb/avro"
        "github.com/twmb/avro/ocf"
@@ -2061,3 +2063,149 @@ func (m *ManifestTestSuite) 
TestWriteManifestClosesWriterOnEntryError() {
        m.Require().ErrorContains(err, "only entries with status ADDED")
        m.Require().ErrorIs(err, errLimitedWrite)
 }
+
+type trackCloseFile struct {
+       contents   *bytes.Reader
+       closeCount int
+       closeErr   error
+       readLimit  int
+       readErr    error
+       nRead      int
+}
+
+var _ iceio.File = (*trackCloseFile)(nil)
+
+func newTrackCloseFile(data []byte) *trackCloseFile {
+       return &trackCloseFile{contents: bytes.NewReader(data)}
+}
+
+func (f *trackCloseFile) Stat() (fs.FileInfo, error) { return nil, nil }
+
+func (f *trackCloseFile) Read(p []byte) (int, error) {
+       if f.readErr != nil && f.nRead >= f.readLimit {
+               return 0, f.readErr
+       }
+       if f.readErr != nil && f.nRead+len(p) > f.readLimit {
+               p = p[:f.readLimit-f.nRead]
+       }
+       n, err := f.contents.Read(p)
+       f.nRead += n
+
+       return n, err
+}
+
+func (f *trackCloseFile) Close() error {
+       f.closeCount++
+
+       return f.closeErr
+}
+
+func (f *trackCloseFile) Seek(offset int64, whence int) (int64, error) {
+       return f.contents.Seek(offset, whence)
+}
+
+func (f *trackCloseFile) ReadAt(p []byte, off int64) (int, error) {
+       return f.contents.ReadAt(p, off)
+}
+
+var (
+       errMidStreamRead  = errors.New("simulated mid-stream read error")
+       errCloseFinalPair = errors.New("simulated close error")
+)
+
+func (m *ManifestTestSuite) TestEntriesEarlyBreakClosesFile() {
+       var mockfs internal.MockFS
+       manifest := manifestFile{
+               version: 2,
+               SpecID:  1,
+               Path:    manifestFileRecordsV2[0].FilePath(),
+       }
+
+       file := newTrackCloseFile(m.v2ManifestEntries.Bytes())
+       mockfs.Test(m.T())
+       mockfs.On("Open", manifest.FilePath()).Return(file, nil)
+       defer mockfs.AssertExpectations(m.T())
+
+       yielded := 0
+       for entry, err := range manifest.Entries(&mockfs, false) {
+               m.Require().NoError(err, "no error expected on a healthy 
manifest before break")
+               m.Require().NotNil(entry)
+               yielded++
+               if yielded == 1 {
+                       break
+               }
+       }
+       m.Equal(1, yielded, "iteration must stop after the first entry on early 
break")
+       m.Equal(1, file.closeCount, "file must be closed exactly once on early 
break")
+}
+
+func (m *ManifestTestSuite) TestEntriesMidStreamErrorYieldsAndStops() {
+       var mockfs internal.MockFS
+       manifest := manifestFile{
+               version: 2,
+               SpecID:  1,
+               Path:    manifestFileRecordsV2[0].FilePath(),
+       }
+
+       data := m.v2ManifestEntries.Bytes()
+       file := newTrackCloseFile(data)
+       file.readLimit = len(data) / 2
+       file.readErr = errMidStreamRead
+       mockfs.Test(m.T())
+       mockfs.On("Open", manifest.FilePath()).Return(file, nil)
+       defer mockfs.AssertExpectations(m.T())
+
+       var (
+               entries  []ManifestEntry
+               gotError error
+               yields   int
+       )
+       for entry, err := range manifest.Entries(&mockfs, false) {
+               yields++
+               if err != nil {
+                       gotError = err
+
+                       break
+               }
+               entries = append(entries, entry)
+       }
+       m.Require().NotNil(gotError, "iterator must yield a non-nil error")
+       m.Require().ErrorIs(gotError, errMidStreamRead,
+               "yielded error must equal or wrap the simulated mid-stream read 
error")
+       m.Equal(yields, len(entries)+1,
+               "the error pair must follow zero or more entry pairs and stop 
iteration")
+       m.Equal(1, file.closeCount, "file must be closed exactly once after a 
mid-stream error")
+}
+
+func (m *ManifestTestSuite) TestEntriesCloseErrorAsFinalPair() {
+       var mockfs internal.MockFS
+       manifest := manifestFile{
+               version: 2,
+               SpecID:  1,
+               Path:    manifestFileRecordsV2[0].FilePath(),
+       }
+
+       file := newTrackCloseFile(m.v2ManifestEntries.Bytes())
+       file.closeErr = errCloseFinalPair
+       mockfs.Test(m.T())
+       mockfs.On("Open", manifest.FilePath()).Return(file, nil)
+       defer mockfs.AssertExpectations(m.T())
+
+       var (
+               entries []ManifestEntry
+               errs    []error
+       )
+       for entry, err := range manifest.Entries(&mockfs, false) {
+               if err != nil {
+                       errs = append(errs, err)
+
+                       continue
+               }
+               entries = append(entries, entry)
+       }
+       m.Len(entries, 2, "iteration must consume every entry before the 
terminal close pair")
+       m.Require().Len(errs, 1, "iterator must yield exactly one terminal 
close error")
+       m.ErrorIs(errs[0], errCloseFinalPair,
+               "terminal error must equal or wrap the simulated close error")
+       m.Equal(1, file.closeCount, "file must be closed exactly once even when 
Close returns an error")
+}
diff --git a/table/snapshot_producers.go b/table/snapshot_producers.go
index 488e72d3..85c15f35 100644
--- a/table/snapshot_producers.go
+++ b/table/snapshot_producers.go
@@ -158,14 +158,14 @@ func (of *overwriteFiles) existingManifests() 
([]iceberg.ManifestFile, error) {
        }
 
        for _, m := range manifestList {
-               entries, err := of.base.fetchManifestEntry(m, true)
-               if err != nil {
-                       return existingFiles, err
-               }
-
-               foundDeleted := make([]iceberg.ManifestEntry, 0)
-               notDeleted := make([]iceberg.ManifestEntry, 0, len(entries))
-               for _, entry := range entries {
+               // Counts may be -1 (unset) on V1 manifests, so clamp before 
allocating.
+               capacity := int(m.AddedDataFiles()) + int(m.ExistingDataFiles())
+               notDeleted := make([]iceberg.ManifestEntry, 0, max(0, capacity))
+               foundDeletedCount := 0
+               for entry, err := range of.base.iterManifestEntries(m, true) {
+                       if err != nil {
+                               return existingFiles, err
+                       }
                        path := entry.DataFile().FilePath()
                        content := entry.DataFile().ContentType()
                        _, isDeletedData := of.base.deletedFiles[path]
@@ -174,13 +174,13 @@ func (of *overwriteFiles) existingManifests() 
([]iceberg.ManifestFile, error) {
                        isData := content == iceberg.EntryContentData
                        matched := (isDeletedData && isData) || 
(isDeletedDelete && !isData)
                        if matched {
-                               foundDeleted = append(foundDeleted, entry)
+                               foundDeletedCount++
                        } else {
                                notDeleted = append(notDeleted, entry)
                        }
                }
 
-               if len(foundDeleted) == 0 {
+               if foundDeletedCount == 0 {
                        existingFiles = append(existingFiles, m)
 
                        continue
@@ -286,13 +286,13 @@ func (of *overwriteFiles) deletedEntries(ctx 
context.Context) ([]iceberg.Manifes
        }
 
        getEntries := func(m iceberg.ManifestFile) ([]iceberg.ManifestEntry, 
error) {
-               entries, err := of.base.fetchManifestEntry(m, true)
-               if err != nil {
-                       return nil, err
-               }
-
-               result := make([]iceberg.ManifestEntry, 0, len(entries))
-               for _, entry := range entries {
+               // Counts may be -1 (unset) on V1 manifests, so clamp before 
allocating.
+               capacity := int(m.AddedDataFiles()) + int(m.ExistingDataFiles())
+               result := make([]iceberg.ManifestEntry, 0, max(0, capacity))
+               for entry, err := range of.base.iterManifestEntries(m, true) {
+                       if err != nil {
+                               return nil, err
+                       }
                        path := entry.DataFile().FilePath()
                        content := entry.DataFile().ContentType()
 
@@ -608,23 +608,6 @@ func (sp *snapshotProducer) newManifestOutput() 
(io.WriteCloser, string, error)
        return f, filepath, nil
 }
 
-func (sp *snapshotProducer) fetchManifestEntry(m iceberg.ManifestFile, 
discardDeleted bool) ([]iceberg.ManifestEntry, error) {
-       capacity := int(m.AddedDataFiles()) + int(m.ExistingDataFiles())
-       if !discardDeleted {
-               capacity += int(m.DeletedDataFiles())
-       }
-       // Counts may be -1 (unset) on V1 manifests, so clamp before allocating.
-       entries := make([]iceberg.ManifestEntry, 0, max(0, capacity))
-       for entry, err := range m.Entries(sp.io, discardDeleted) {
-               if err != nil {
-                       return nil, err
-               }
-               entries = append(entries, entry)
-       }
-
-       return entries, nil
-}
-
 func (sp *snapshotProducer) iterManifestEntries(m iceberg.ManifestFile, 
discardDeleted bool) iter.Seq2[iceberg.ManifestEntry, error] {
        return m.Entries(sp.io, discardDeleted)
 }

Reply via email to