This is an automated email from the ASF dual-hosted git repository.

laskoviymishka pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 85387f82 feat(manifest)!: Replace FetchEntries with iterator approach 
(#985)
85387f82 is described below

commit 85387f825a79146496ffcfdb28536186c939c584
Author: Artem Alperin <[email protected]>
AuthorDate: Wed May 6 16:00:33 2026 +0200

    feat(manifest)!: Replace FetchEntries with iterator approach (#985)
    
    `FetchEntries` materializes every manifest entry into a
    `[]ManifestEntry` before returning, so a caller pays the full
    per-manifest entry slice even when it only needs to filter, count, or
    break out on first match. On tables with thousands of entries per
    manifest this shows up as avoidable memory pressure.
---
 catalog/rest/rest_integration_test.go    |   7 +-
 catalog/sql/sql_integration_test.go      |   7 +-
 cmd/iceberg/output.go                    |  17 +++--
 manifest.go                              | 119 +++++++++++++++++++++++--------
 manifest_test.go                         |   7 +-
 table/compaction/eq_delete_collect.go    |  18 +++--
 table/conflict_validation.go             |   9 ++-
 table/orphan_cleanup.go                  |  10 ++-
 table/orphan_cleanup_integration_test.go |   6 +-
 table/rewrite_data_files_test.go         |  10 ++-
 table/row_delta_test.go                  |   6 +-
 table/scanner.go                         |  12 ++--
 table/snapshot_producers.go              |  29 ++++++--
 table/snapshots.go                       |  29 ++++----
 table/table_test.go                      |  13 ++--
 table/transaction.go                     |  10 ++-
 table/updates.go                         |  20 +++---
 17 files changed, 201 insertions(+), 128 deletions(-)

diff --git a/catalog/rest/rest_integration_test.go 
b/catalog/rest/rest_integration_test.go
index c0a7d217..9394fa83 100644
--- a/catalog/rest/rest_integration_test.go
+++ b/catalog/rest/rest_integration_test.go
@@ -351,8 +351,11 @@ func (s *RestIntegrationSuite) TestWriteCommitTable() {
 
        s.Len(mf, 1)
        s.EqualValues(1, mf[0].AddedDataFiles())
-       entries, err := mf[0].FetchEntries(mustFS(s.T(), updated), false)
-       s.Require().NoError(err)
+       entries := make([]iceberg.ManifestEntry, 0, 1)
+       for entry, err := range mf[0].Entries(mustFS(s.T(), updated), false) {
+               s.Require().NoError(err)
+               entries = append(entries, entry)
+       }
 
        s.Len(entries, 1)
        s.Equal(pqfile, entries[0].DataFile().FilePath())
diff --git a/catalog/sql/sql_integration_test.go 
b/catalog/sql/sql_integration_test.go
index 82f83bba..9f8a2793 100644
--- a/catalog/sql/sql_integration_test.go
+++ b/catalog/sql/sql_integration_test.go
@@ -313,8 +313,11 @@ func (s *SQLIntegrationSuite) TestWriteCommitTable() {
        updatedFS, err := updated.FS(s.ctx)
        s.Require().NoError(err)
 
-       entries, err := mf[0].FetchEntries(updatedFS, false)
-       s.Require().NoError(err)
+       entries := make([]iceberg.ManifestEntry, 0, 1)
+       for entry, err := range mf[0].Entries(updatedFS, false) {
+               s.Require().NoError(err)
+               entries = append(entries, entry)
+       }
 
        s.Len(entries, 1)
        s.Equal(pqfile, entries[0].DataFile().FilePath())
diff --git a/cmd/iceberg/output.go b/cmd/iceberg/output.go
index 695ba28d..32a3a101 100644
--- a/cmd/iceberg/output.go
+++ b/cmd/iceberg/output.go
@@ -150,16 +150,21 @@ func (t textOutput) Files(tbl *table.Table, history bool) 
{
                        snapshotTree = append(snapshotTree, 
pterm.LeveledListItem{
                                Level: 1, Text: "Manifest: " + m.FilePath(),
                        })
-                       datafiles, err := m.FetchEntries(afs, false)
-                       if err != nil {
-                               t.Error(err)
-                               os.Exit(1)
-                       }
-                       for _, e := range datafiles {
+                       var iterErr error
+                       for e, err := range m.Entries(afs, false) {
+                               if err != nil {
+                                       iterErr = err
+
+                                       break
+                               }
                                snapshotTree = append(snapshotTree, 
pterm.LeveledListItem{
                                        Level: 2, Text: "Datafile: " + 
e.DataFile().FilePath(),
                                })
                        }
+                       if iterErr != nil {
+                               t.Error(iterErr)
+                               os.Exit(1)
+                       }
                }
        }
 
diff --git a/manifest.go b/manifest.go
index da781c8f..82de7c1c 100644
--- a/manifest.go
+++ b/manifest.go
@@ -22,6 +22,7 @@ import (
        "errors"
        "fmt"
        "io"
+       "iter"
        "math"
        "math/big"
        "reflect"
@@ -339,8 +340,40 @@ func (m *manifestFile) FirstRowID() *int64 { return 
m.FirstRowIDValue }
 
 func (m *manifestFile) HasAddedFiles() bool    { return m.AddedFilesCount != 0 
}
 func (m *manifestFile) HasExistingFiles() bool { return m.ExistingFilesCount 
!= 0 }
-func (m *manifestFile) FetchEntries(fs iceio.IO, discardDeleted bool) 
([]ManifestEntry, error) {
-       return fetchManifestEntries(m, fs, discardDeleted)
+
+func (m *manifestFile) Entries(fs iceio.IO, discardDeleted bool) 
iter.Seq2[ManifestEntry, error] {
+       return func(yield func(ManifestEntry, error) bool) {
+               f, err := fs.Open(m.FilePath())
+               if err != nil {
+                       yield(nil, err)
+
+                       return
+               }
+               aborted := false
+               defer func() {
+                       if cerr := f.Close(); cerr != nil && !aborted {
+                               yield(nil, cerr)
+                       }
+               }()
+
+               for entry, err := range iterManifest(m, f, discardDeleted) {
+                       if !yield(entry, err) {
+                               aborted = true
+
+                               return
+                       }
+               }
+       }
+}
+
+func (m *manifestFile) FetchEntries(fs iceio.IO, discardDeleted bool) (_ 
[]ManifestEntry, err error) {
+       f, openErr := fs.Open(m.FilePath())
+       if openErr != nil {
+               return nil, openErr
+       }
+       defer internal.CheckedClose(f, &err)
+
+       return ReadManifest(m, f, discardDeleted)
 }
 
 func getFieldIDMap(sc *avro.Schema) (map[string]int, map[int]string, 
map[int]int) {
@@ -395,16 +428,6 @@ type hasFieldToIDMap interface {
        setFieldIDToFixedSizeMap(map[int]int)
 }
 
-func fetchManifestEntries(m ManifestFile, fs iceio.IO, discardDeleted bool) (_ 
[]ManifestEntry, err error) {
-       f, err := fs.Open(m.FilePath())
-       if err != nil {
-               return nil, err
-       }
-       defer internal.CheckedClose(f, &err)
-
-       return ReadManifest(m, f, discardDeleted)
-}
-
 // ManifestFile is the interface which covers both V1 and V2 manifest files.
 type ManifestFile interface {
        // Version returns the version number of this manifest file.
@@ -463,10 +486,20 @@ type ManifestFile interface {
        HasAddedFiles() bool
        // HasExistingFiles returns true if ExistingDataFiles > 0 or if it was 
null.
        HasExistingFiles() bool
+       // Entries streams the manifest entries from the manifest file using
+       // the provided file system IO interface. Entries that have been
+       // marked as deleted are skipped if discardDeleted is true.
+       //
+       // Prefer Entries over FetchEntries when walking large manifests
+       // since it avoids loading every entry into memory at once.
+       Entries(fs iceio.IO, discardDeleted bool) iter.Seq2[ManifestEntry, 
error]
        // FetchEntries reads the manifest list file to fetch the list of
        // manifest entries using the provided file system IO interface.
        // If discardDeleted is true, entries for files containing deleted rows
        // will be skipped.
+       //
+       // Deprecated: Use Entries instead, which streams manifest entries via 
an
+       // iterator and avoids loading every entry into memory at once.
        FetchEntries(fs iceio.IO, discardDeleted bool) ([]ManifestEntry, error)
        // // WriteEntries writes a list of manifest entries to a provided
        // // io.Writer. The version of the manifest file is used to determine 
the
@@ -751,33 +784,61 @@ func (c *ManifestReader) ReadEntry() (ManifestEntry, 
error) {
        return tmp, nil
 }
 
+// iterManifest returns an iterator that streams manifest entries from
+// the provided reader without buffering them. If discardDeleted is true,
+// entries whose status is "deleted" are skipped.
+func iterManifest(m ManifestFile, f io.Reader, discardDeleted bool) 
iter.Seq2[ManifestEntry, error] {
+       return func(yield func(ManifestEntry, error) bool) {
+               manifestReader, err := NewManifestReader(m, f)
+               if err != nil {
+                       yield(nil, err)
+
+                       return
+               }
+               aborted := false
+               defer func() {
+                       if cerr := manifestReader.Close(); cerr != nil && 
!aborted {
+                               yield(nil, cerr)
+                       }
+               }()
+
+               for {
+                       entry, err := manifestReader.ReadEntry()
+                       if err != nil {
+                               if errors.Is(err, io.EOF) {
+                                       return
+                               }
+                               if !yield(nil, err) {
+                                       aborted = true
+                               }
+
+                               return
+                       }
+                       if discardDeleted && entry.Status() == 
EntryStatusDELETED {
+                               continue
+                       }
+                       if !yield(entry, nil) {
+                               aborted = true
+
+                               return
+                       }
+               }
+       }
+}
+
 // ReadManifest reads in an avro list file and returns a slice
 // of manifest entries or an error if one is encountered. If discardDeleted
 // is true, the returned slice omits entries whose status is "deleted".
 func ReadManifest(m ManifestFile, f io.Reader, discardDeleted bool) 
([]ManifestEntry, error) {
-       manifestReader, err := NewManifestReader(m, f)
-       if err != nil {
-               return nil, err
-       }
-       defer func() {
-               _ = manifestReader.Close()
-       }()
-
        var results []ManifestEntry
-       for {
-               entry, err := manifestReader.ReadEntry()
+       for entry, err := range iterManifest(m, f, discardDeleted) {
                if err != nil {
-                       if errors.Is(err, io.EOF) {
-                               return results, nil
-                       }
-
                        return results, err
                }
-               if discardDeleted && entry.Status() == EntryStatusDELETED {
-                       continue
-               }
                results = append(results, entry)
        }
+
+       return results, nil
 }
 
 // ReadManifestList reads in an avro manifest list file and returns a slice
diff --git a/manifest_test.go b/manifest_test.go
index 64407b74..be5e31f0 100644
--- a/manifest_test.go
+++ b/manifest_test.go
@@ -554,8 +554,11 @@ func (m *ManifestTestSuite) TestManifestEntriesV1() {
                Contents: bytes.NewReader(m.v1ManifestEntries.Bytes()),
        }, nil)
        defer mockfs.AssertExpectations(m.T())
-       entries, err := manifest.FetchEntries(&mockfs, false)
-       m.Require().NoError(err)
+       entries := make([]ManifestEntry, 0, 2)
+       for entry, err := range manifest.Entries(&mockfs, false) {
+               m.Require().NoError(err)
+               entries = append(entries, entry)
+       }
        m.Len(entries, 2)
        m.Zero(manifest.PartitionSpecID())
        m.Zero(manifest.SnapshotID())
diff --git a/table/compaction/eq_delete_collect.go 
b/table/compaction/eq_delete_collect.go
index 145b7a4f..58f22379 100644
--- a/table/compaction/eq_delete_collect.go
+++ b/table/compaction/eq_delete_collect.go
@@ -71,11 +71,10 @@ func CollectDeadEqualityDeletes(
                if m.ManifestContent() != iceberg.ManifestContentDeletes {
                        continue
                }
-               entries, err := m.FetchEntries(fs, true)
-               if err != nil {
-                       return nil, err
-               }
-               for _, e := range entries {
+               for e, err := range m.Entries(fs, true) {
+                       if err != nil {
+                               return nil, err
+                       }
                        if e.DataFile().ContentType() != 
iceberg.EntryContentEqDeletes {
                                continue
                        }
@@ -95,11 +94,10 @@ func CollectDeadEqualityDeletes(
                if m.ManifestContent() != iceberg.ManifestContentData {
                        continue
                }
-               entries, err := m.FetchEntries(fs, true)
-               if err != nil {
-                       return nil, err
-               }
-               for _, e := range entries {
+               for e, err := range m.Entries(fs, true) {
+                       if err != nil {
+                               return nil, err
+                       }
                        df := e.DataFile()
                        if df.ContentType() != iceberg.EntryContentData {
                                continue
diff --git a/table/conflict_validation.go b/table/conflict_validation.go
index d626e996..1e12eace 100644
--- a/table/conflict_validation.go
+++ b/table/conflict_validation.go
@@ -385,11 +385,10 @@ func validateAddedDataFilesMatchingFilter(ctx 
*conflictContext, filter iceberg.B
                        }
 
                        pEval := partitionEvals.Get(int(mf.PartitionSpecID()))
-                       entries, err := mf.FetchEntries(ctx.fs, false)
-                       if err != nil {
-                               return fmt.Errorf("reading entries from 
manifest %s: %w", mf.FilePath(), err)
-                       }
-                       for _, e := range entries {
+                       for e, err := range mf.Entries(ctx.fs, false) {
+                               if err != nil {
+                                       return fmt.Errorf("reading entries from 
manifest %s: %w", mf.FilePath(), err)
+                               }
                                if e.Status() != iceberg.EntryStatusADDED || 
e.SnapshotID() != snap.SnapshotID {
                                        continue
                                }
diff --git a/table/orphan_cleanup.go b/table/orphan_cleanup.go
index 28e0adac..81fa84ed 100644
--- a/table/orphan_cleanup.go
+++ b/table/orphan_cleanup.go
@@ -268,12 +268,10 @@ func (t Table) getReferencedFiles(fs iceio.IO) 
(map[string]bool, error) {
                for _, manifest := range manifestFiles {
                        referenced[manifest.FilePath()] = true
 
-                       entries, err := manifest.FetchEntries(fs, false)
-                       if err != nil {
-                               return nil, fmt.Errorf("failed to read manifest 
entries: %w", err)
-                       }
-
-                       for _, entry := range entries {
+                       for entry, err := range manifest.Entries(fs, false) {
+                               if err != nil {
+                                       return nil, fmt.Errorf("failed to read 
manifest entries: %w", err)
+                               }
                                referenced[entry.DataFile().FilePath()] = true
                        }
                }
diff --git a/table/orphan_cleanup_integration_test.go 
b/table/orphan_cleanup_integration_test.go
index cce3fd0b..a9734b95 100644
--- a/table/orphan_cleanup_integration_test.go
+++ b/table/orphan_cleanup_integration_test.go
@@ -285,10 +285,8 @@ func (s *OrphanCleanupIntegrationSuite) 
TestOrphanCleanupActualDeletion() {
 
        for manifest, err := range tbl.AllManifests(s.ctx) {
                s.Require().NoError(err)
-               entries, err := manifest.FetchEntries(fs, false)
-               s.Require().NoError(err)
-
-               for _, entry := range entries {
+               for entry, err := range manifest.Entries(fs, false) {
+                       s.Require().NoError(err)
                        dataFile := entry.DataFile().FilePath()
                        s.True(s.fileExists(fs, dataFile), "Data file should 
still exist: %s", dataFile)
                }
diff --git a/table/rewrite_data_files_test.go b/table/rewrite_data_files_test.go
index bdf7a6ea..ce12a8fd 100644
--- a/table/rewrite_data_files_test.go
+++ b/table/rewrite_data_files_test.go
@@ -569,9 +569,8 @@ func snapshotContainsDeleteFile(t *testing.T, tbl 
*table.Table, path string) boo
                if m.ManifestContent() != iceberg.ManifestContentDeletes {
                        continue
                }
-               entries, err := m.FetchEntries(fs, false)
-               require.NoError(t, err)
-               for _, e := range entries {
+               for e, err := range m.Entries(fs, false) {
+                       require.NoError(t, err)
                        if e.Status() == iceberg.EntryStatusDELETED {
                                continue
                        }
@@ -656,9 +655,8 @@ func readKeySeqNums(t *testing.T, tbl *table.Table) 
(eqDelete, smallMin, preserv
 
        smallMin = int64(1<<62 - 1)
        for _, m := range manifests {
-               entries, err := m.FetchEntries(fs, false)
-               require.NoError(t, err)
-               for _, e := range entries {
+               for e, err := range m.Entries(fs, false) {
+                       require.NoError(t, err)
                        if e.Status() == iceberg.EntryStatusDELETED {
                                continue
                        }
diff --git a/table/row_delta_test.go b/table/row_delta_test.go
index 05489561..107fbd63 100644
--- a/table/row_delta_test.go
+++ b/table/row_delta_test.go
@@ -381,10 +381,8 @@ func TestRowDeltaManifestContents(t *testing.T) {
 
        // Verify manifest entries have correct content types
        for _, m := range manifests {
-               entries, err := m.FetchEntries(fs, true)
-               require.NoError(t, err)
-
-               for _, e := range entries {
+               for e, err := range m.Entries(fs, true) {
+                       require.NoError(t, err)
                        if m.ManifestContent() == iceberg.ManifestContentData {
                                assert.Equal(t, iceberg.EntryContentData, 
e.DataFile().ContentType())
                        } else {
diff --git a/table/scanner.go b/table/scanner.go
index a3811682..7263c028 100644
--- a/table/scanner.go
+++ b/table/scanner.go
@@ -153,13 +153,13 @@ func GetPartitionRecord(dataFile iceberg.DataFile, 
partitionType *iceberg.Struct
 func openManifest(io io.IO, manifest iceberg.ManifestFile,
        partitionFilter, metricsEval func(iceberg.DataFile) (bool, error),
 ) ([]iceberg.ManifestEntry, error) {
-       entries, err := manifest.FetchEntries(io, true)
-       if err != nil {
-               return nil, err
-       }
+       // Counts may be -1 (unset) on V1 manifests, so clamp before allocating.
+       out := make([]iceberg.ManifestEntry, 0, max(0, 
int(manifest.AddedDataFiles())+int(manifest.ExistingDataFiles())))
+       for entry, err := range manifest.Entries(io, true) {
+               if err != nil {
+                       return nil, err
+               }
 
-       out := make([]iceberg.ManifestEntry, 0, len(entries))
-       for _, entry := range entries {
                p, err := partitionFilter(entry.DataFile())
                if err != nil {
                        return nil, err
diff --git a/table/snapshot_producers.go b/table/snapshot_producers.go
index c98b7cb7..488e72d3 100644
--- a/table/snapshot_producers.go
+++ b/table/snapshot_producers.go
@@ -22,6 +22,7 @@ import (
        "errors"
        "fmt"
        "io"
+       "iter"
        "maps"
        "slices"
        "sync/atomic"
@@ -350,12 +351,11 @@ func (m *manifestMergeManager) createManifest(specID int, 
bin []iceberg.Manifest
        defer internal.CheckedClose(wr, &err)
 
        for _, manifest := range bin {
-               entries, err := m.snap.fetchManifestEntry(manifest, false)
-               if err != nil {
-                       return nil, err
-               }
+               for entry, err := range m.snap.iterManifestEntries(manifest, 
false) {
+                       if err != nil {
+                               return nil, err
+                       }
 
-               for _, entry := range entries {
                        switch {
                        case entry.Status() == iceberg.EntryStatusDELETED && 
entry.SnapshotID() == m.snap.snapshotID:
                                // only files deleted by this snapshot should 
be added to the new manifest
@@ -609,7 +609,24 @@ func (sp *snapshotProducer) newManifestOutput() 
(io.WriteCloser, string, error)
 }
 
 func (sp *snapshotProducer) fetchManifestEntry(m iceberg.ManifestFile, 
discardDeleted bool) ([]iceberg.ManifestEntry, error) {
-       return m.FetchEntries(sp.io, discardDeleted)
+       capacity := int(m.AddedDataFiles()) + int(m.ExistingDataFiles())
+       if !discardDeleted {
+               capacity += int(m.DeletedDataFiles())
+       }
+       // Counts may be -1 (unset) on V1 manifests, so clamp before allocating.
+       entries := make([]iceberg.ManifestEntry, 0, max(0, capacity))
+       for entry, err := range m.Entries(sp.io, discardDeleted) {
+               if err != nil {
+                       return nil, err
+               }
+               entries = append(entries, entry)
+       }
+
+       return entries, nil
+}
+
+func (sp *snapshotProducer) iterManifestEntries(m iceberg.ManifestFile, 
discardDeleted bool) iter.Seq2[iceberg.ManifestEntry, error] {
+       return m.Entries(sp.io, discardDeleted)
 }
 
 func (sp *snapshotProducer) manifests(ctx context.Context) (_ 
[]iceberg.ManifestFile, err error) {
diff --git a/table/snapshots.go b/table/snapshots.go
index ef109b25..f23f30c0 100644
--- a/table/snapshots.go
+++ b/table/snapshots.go
@@ -345,20 +345,19 @@ func (s Snapshot) dataFiles(fio iceio.IO, fileFilter 
set[iceberg.ManifestEntryCo
                }
 
                for _, m := range manifests {
-                       dataFiles, err := m.FetchEntries(fio, false)
-                       if err != nil {
-                               yield(nil, err)
+                       for entry, err := range m.Entries(fio, false) {
+                               if err != nil {
+                                       yield(nil, err)
 
-                               return
-                       }
+                                       return
+                               }
 
-                       for _, f := range dataFiles {
                                if fileFilter != nil {
-                                       if _, ok := 
fileFilter[f.DataFile().ContentType()]; !ok {
+                                       if _, ok := 
fileFilter[entry.DataFile().ContentType()]; !ok {
                                                continue
                                        }
                                }
-                               if !yield(f.DataFile(), nil) {
+                               if !yield(entry.DataFile(), nil) {
                                        return
                                }
                        }
@@ -389,15 +388,13 @@ func (s Snapshot) entries(fio iceio.IO, manifestContent 
iceberg.ManifestContent)
                                continue
                        }
 
-                       entries, err := m.FetchEntries(fio, false)
-                       if err != nil {
-                               yield(nil, err)
+                       for entry, err := range m.Entries(fio, false) {
+                               if err != nil {
+                                       yield(nil, err)
 
-                               return
-                       }
-
-                       for _, e := range entries {
-                               if !yield(e, nil) {
+                                       return
+                               }
+                               if !yield(entry, nil) {
                                        return
                                }
                        }
diff --git a/table/table_test.go b/table/table_test.go
index 6913ea4b..f10c8019 100644
--- a/table/table_test.go
+++ b/table/table_test.go
@@ -662,10 +662,8 @@ func (t *TableWritingTestSuite) 
TestAddFilesPartitionedTable() {
        t.Require().NoError(err)
 
        for _, manifest := range m {
-               entries, err := manifest.FetchEntries(mustFS(t.T(), tbl), false)
-               t.Require().NoError(err)
-
-               for _, e := range entries {
+               for e, err := range manifest.Entries(mustFS(t.T(), tbl), false) 
{
+                       t.Require().NoError(err)
                        t.Equal(map[int]any{
                                1000: int32(123), 1001: int32(650),
                        }, e.DataFile().Partition())
@@ -2333,8 +2331,11 @@ func (t *TableWritingTestSuite) TestMergeManifests() {
        t.Len(manifestList, 1)
        t.validateManifestFileLength(mustFS(t.T(), tblA), manifestList[0])
 
-       entries, err := manifestList[0].FetchEntries(mustFS(t.T(), tblA), false)
-       t.Require().NoError(err)
+       entries := make([]iceberg.ManifestEntry, 0, 3)
+       for entry, err := range manifestList[0].Entries(mustFS(t.T(), tblA), 
false) {
+               t.Require().NoError(err)
+               entries = append(entries, entry)
+       }
        t.Len(entries, 3)
 
        // entries should match the snapshot ID they were added in
diff --git a/table/transaction.go b/table/transaction.go
index 5a11ce59..1ffa18de 100644
--- a/table/transaction.go
+++ b/table/transaction.go
@@ -1400,15 +1400,13 @@ func (t *Transaction) 
classifyFilesForFilteredDeletions(ctx context.Context, fs
                                }
                        }
 
-                       entries, err := manifest.FetchEntries(fs, false)
-                       if err != nil {
-                               return fmt.Errorf("failed to fetch manifest 
entries: %w", err)
-                       }
-
                        localDelete := make([]iceberg.DataFile, 0)
                        localRewrite := make([]iceberg.DataFile, 0)
 
-                       for _, entry := range entries {
+                       for entry, err := range manifest.Entries(fs, false) {
+                               if err != nil {
+                                       return fmt.Errorf("failed to fetch 
manifest entries: %w", err)
+                               }
                                if entry.Status() == iceberg.EntryStatusDELETED 
{
                                        continue
                                }
diff --git a/table/updates.go b/table/updates.go
index dda9a532..cfe723a5 100644
--- a/table/updates.go
+++ b/table/updates.go
@@ -496,12 +496,10 @@ func (u *removeSnapshotsUpdate) PostCommit(ctx 
context.Context, preTable *Table,
                for _, man := range mans {
                        filesToDelete[man.FilePath()] = struct{}{}
 
-                       entries, err := man.FetchEntries(prefs, false)
-                       if err != nil {
-                               return err
-                       }
-
-                       for _, entry := range entries {
+                       for entry, err := range man.Entries(prefs, false) {
+                               if err != nil {
+                                       return err
+                               }
                                filesToDelete[entry.DataFile().FilePath()] = 
struct{}{}
                        }
                }
@@ -516,12 +514,10 @@ func (u *removeSnapshotsUpdate) PostCommit(ctx 
context.Context, preTable *Table,
                for _, man := range mans {
                        delete(filesToDelete, man.FilePath())
 
-                       entries, err := man.FetchEntries(prefs, false)
-                       if err != nil {
-                               return err
-                       }
-
-                       for _, entry := range entries {
+                       for entry, err := range man.Entries(prefs, false) {
+                               if err != nil {
+                                       return err
+                               }
                                if entry.Status() != iceberg.EntryStatusDELETED 
{
                                        delete(filesToDelete, 
entry.DataFile().FilePath())
                                }

Reply via email to