This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new b4ca5150 feat(manifest): Add tests for Entries and migrate
snapshotProducer (#1031)
b4ca5150 is described below
commit b4ca5150b6b43c60f35566ee4cc2677354c7b583
Author: Artem Alperin <[email protected]>
AuthorDate: Wed May 6 21:32:47 2026 +0200
feat(manifest): Add tests for Entries and migrate snapshotProducer (#1031)
Related to https://github.com/apache/iceberg-go/pull/985
---
manifest.go | 18 ++++++
manifest_test.go | 148 ++++++++++++++++++++++++++++++++++++++++++++
table/snapshot_producers.go | 51 +++++----------
3 files changed, 183 insertions(+), 34 deletions(-)
diff --git a/manifest.go b/manifest.go
index c79222d1..602586f6 100644
--- a/manifest.go
+++ b/manifest.go
@@ -492,6 +492,24 @@ type ManifestFile interface {
//
// Prefer Entries over FetchEntries when walking large manifests
// since it avoids loading every entry into memory at once.
+ //
+ // Iteration contract:
+ //
+ // - On the first error encountered while opening the manifest,
decoding
+ // a record, or applying inheritance, the iterator yields (nil, err)
+ // and then stops. Callers must treat any non-nil error as terminal
and
+ // break or return without consuming further values.
+ // - When iteration ends without an error from the read path, the
+ // iterator may yield a final (nil, closeErr) pair if closing the
+ // underlying file or manifest reader returns an error. This
terminal
+ // close error is reported only when the consumer ranged through
every
+ // value; an early break suppresses it (see below).
+ // - Breaking out of the range loop (or any other early termination of
+ // the yield function) is safe: the iterator releases the underlying
+ // file handle and reader before returning, and no close error from
+ // that path is yielded — the caller has already signalled it is no
+ // longer interested in further values, so an extra synthetic
+ // (nil, closeErr) tail would be discarded anyway.
Entries(fs iceio.IO, discardDeleted bool) iter.Seq2[ManifestEntry,
error]
// FetchEntries reads the manifest list file to fetch the list of
// manifest entries using the provided file system IO interface.
diff --git a/manifest_test.go b/manifest_test.go
index be5e31f0..36966d45 100644
--- a/manifest_test.go
+++ b/manifest_test.go
@@ -23,11 +23,13 @@ import (
"encoding/json"
"errors"
"io"
+ "io/fs"
"strconv"
"testing"
"time"
"github.com/apache/iceberg-go/internal"
+ iceio "github.com/apache/iceberg-go/io"
"github.com/stretchr/testify/suite"
"github.com/twmb/avro"
"github.com/twmb/avro/ocf"
@@ -2061,3 +2063,149 @@ func (m *ManifestTestSuite)
TestWriteManifestClosesWriterOnEntryError() {
m.Require().ErrorContains(err, "only entries with status ADDED")
m.Require().ErrorIs(err, errLimitedWrite)
}
+
+type trackCloseFile struct {
+ contents *bytes.Reader
+ closeCount int
+ closeErr error
+ readLimit int
+ readErr error
+ nRead int
+}
+
+var _ iceio.File = (*trackCloseFile)(nil)
+
+func newTrackCloseFile(data []byte) *trackCloseFile {
+ return &trackCloseFile{contents: bytes.NewReader(data)}
+}
+
+func (f *trackCloseFile) Stat() (fs.FileInfo, error) { return nil, nil }
+
+func (f *trackCloseFile) Read(p []byte) (int, error) {
+ if f.readErr != nil && f.nRead >= f.readLimit {
+ return 0, f.readErr
+ }
+ if f.readErr != nil && f.nRead+len(p) > f.readLimit {
+ p = p[:f.readLimit-f.nRead]
+ }
+ n, err := f.contents.Read(p)
+ f.nRead += n
+
+ return n, err
+}
+
+func (f *trackCloseFile) Close() error {
+ f.closeCount++
+
+ return f.closeErr
+}
+
+func (f *trackCloseFile) Seek(offset int64, whence int) (int64, error) {
+ return f.contents.Seek(offset, whence)
+}
+
+func (f *trackCloseFile) ReadAt(p []byte, off int64) (int, error) {
+ return f.contents.ReadAt(p, off)
+}
+
+var (
+ errMidStreamRead = errors.New("simulated mid-stream read error")
+ errCloseFinalPair = errors.New("simulated close error")
+)
+
+func (m *ManifestTestSuite) TestEntriesEarlyBreakClosesFile() {
+ var mockfs internal.MockFS
+ manifest := manifestFile{
+ version: 2,
+ SpecID: 1,
+ Path: manifestFileRecordsV2[0].FilePath(),
+ }
+
+ file := newTrackCloseFile(m.v2ManifestEntries.Bytes())
+ mockfs.Test(m.T())
+ mockfs.On("Open", manifest.FilePath()).Return(file, nil)
+ defer mockfs.AssertExpectations(m.T())
+
+ yielded := 0
+ for entry, err := range manifest.Entries(&mockfs, false) {
+ m.Require().NoError(err, "no error expected on a healthy
manifest before break")
+ m.Require().NotNil(entry)
+ yielded++
+ if yielded == 1 {
+ break
+ }
+ }
+ m.Equal(1, yielded, "iteration must stop after the first entry on early
break")
+ m.Equal(1, file.closeCount, "file must be closed exactly once on early
break")
+}
+
+func (m *ManifestTestSuite) TestEntriesMidStreamErrorYieldsAndStops() {
+ var mockfs internal.MockFS
+ manifest := manifestFile{
+ version: 2,
+ SpecID: 1,
+ Path: manifestFileRecordsV2[0].FilePath(),
+ }
+
+ data := m.v2ManifestEntries.Bytes()
+ file := newTrackCloseFile(data)
+ file.readLimit = len(data) / 2
+ file.readErr = errMidStreamRead
+ mockfs.Test(m.T())
+ mockfs.On("Open", manifest.FilePath()).Return(file, nil)
+ defer mockfs.AssertExpectations(m.T())
+
+ var (
+ entries []ManifestEntry
+ gotError error
+ yields int
+ )
+ for entry, err := range manifest.Entries(&mockfs, false) {
+ yields++
+ if err != nil {
+ gotError = err
+
+ break
+ }
+ entries = append(entries, entry)
+ }
+ m.Require().NotNil(gotError, "iterator must yield a non-nil error")
+ m.Require().ErrorIs(gotError, errMidStreamRead,
+ "yielded error must equal or wrap the simulated mid-stream read
error")
+ m.Equal(yields, len(entries)+1,
+ "the error pair must follow zero or more entry pairs and stop
iteration")
+ m.Equal(1, file.closeCount, "file must be closed exactly once after a
mid-stream error")
+}
+
+func (m *ManifestTestSuite) TestEntriesCloseErrorAsFinalPair() {
+ var mockfs internal.MockFS
+ manifest := manifestFile{
+ version: 2,
+ SpecID: 1,
+ Path: manifestFileRecordsV2[0].FilePath(),
+ }
+
+ file := newTrackCloseFile(m.v2ManifestEntries.Bytes())
+ file.closeErr = errCloseFinalPair
+ mockfs.Test(m.T())
+ mockfs.On("Open", manifest.FilePath()).Return(file, nil)
+ defer mockfs.AssertExpectations(m.T())
+
+ var (
+ entries []ManifestEntry
+ errs []error
+ )
+ for entry, err := range manifest.Entries(&mockfs, false) {
+ if err != nil {
+ errs = append(errs, err)
+
+ continue
+ }
+ entries = append(entries, entry)
+ }
+ m.Len(entries, 2, "iteration must consume every entry before the
terminal close pair")
+ m.Require().Len(errs, 1, "iterator must yield exactly one terminal
close error")
+ m.ErrorIs(errs[0], errCloseFinalPair,
+ "terminal error must equal or wrap the simulated close error")
+ m.Equal(1, file.closeCount, "file must be closed exactly once even when
Close returns an error")
+}
diff --git a/table/snapshot_producers.go b/table/snapshot_producers.go
index 488e72d3..85c15f35 100644
--- a/table/snapshot_producers.go
+++ b/table/snapshot_producers.go
@@ -158,14 +158,14 @@ func (of *overwriteFiles) existingManifests()
([]iceberg.ManifestFile, error) {
}
for _, m := range manifestList {
- entries, err := of.base.fetchManifestEntry(m, true)
- if err != nil {
- return existingFiles, err
- }
-
- foundDeleted := make([]iceberg.ManifestEntry, 0)
- notDeleted := make([]iceberg.ManifestEntry, 0, len(entries))
- for _, entry := range entries {
+ // Counts may be -1 (unset) on V1 manifests, so clamp before
allocating.
+ capacity := int(m.AddedDataFiles()) + int(m.ExistingDataFiles())
+ notDeleted := make([]iceberg.ManifestEntry, 0, max(0, capacity))
+ foundDeletedCount := 0
+ for entry, err := range of.base.iterManifestEntries(m, true) {
+ if err != nil {
+ return existingFiles, err
+ }
path := entry.DataFile().FilePath()
content := entry.DataFile().ContentType()
_, isDeletedData := of.base.deletedFiles[path]
@@ -174,13 +174,13 @@ func (of *overwriteFiles) existingManifests()
([]iceberg.ManifestFile, error) {
isData := content == iceberg.EntryContentData
matched := (isDeletedData && isData) ||
(isDeletedDelete && !isData)
if matched {
- foundDeleted = append(foundDeleted, entry)
+ foundDeletedCount++
} else {
notDeleted = append(notDeleted, entry)
}
}
- if len(foundDeleted) == 0 {
+ if foundDeletedCount == 0 {
existingFiles = append(existingFiles, m)
continue
@@ -286,13 +286,13 @@ func (of *overwriteFiles) deletedEntries(ctx
context.Context) ([]iceberg.Manifes
}
getEntries := func(m iceberg.ManifestFile) ([]iceberg.ManifestEntry,
error) {
- entries, err := of.base.fetchManifestEntry(m, true)
- if err != nil {
- return nil, err
- }
-
- result := make([]iceberg.ManifestEntry, 0, len(entries))
- for _, entry := range entries {
+ // Counts may be -1 (unset) on V1 manifests, so clamp before
allocating.
+ capacity := int(m.AddedDataFiles()) + int(m.ExistingDataFiles())
+ result := make([]iceberg.ManifestEntry, 0, max(0, capacity))
+ for entry, err := range of.base.iterManifestEntries(m, true) {
+ if err != nil {
+ return nil, err
+ }
path := entry.DataFile().FilePath()
content := entry.DataFile().ContentType()
@@ -608,23 +608,6 @@ func (sp *snapshotProducer) newManifestOutput()
(io.WriteCloser, string, error)
return f, filepath, nil
}
-func (sp *snapshotProducer) fetchManifestEntry(m iceberg.ManifestFile,
discardDeleted bool) ([]iceberg.ManifestEntry, error) {
- capacity := int(m.AddedDataFiles()) + int(m.ExistingDataFiles())
- if !discardDeleted {
- capacity += int(m.DeletedDataFiles())
- }
- // Counts may be -1 (unset) on V1 manifests, so clamp before allocating.
- entries := make([]iceberg.ManifestEntry, 0, max(0, capacity))
- for entry, err := range m.Entries(sp.io, discardDeleted) {
- if err != nil {
- return nil, err
- }
- entries = append(entries, entry)
- }
-
- return entries, nil
-}
-
func (sp *snapshotProducer) iterManifestEntries(m iceberg.ManifestFile,
discardDeleted bool) iter.Seq2[iceberg.ManifestEntry, error] {
return m.Entries(sp.io, discardDeleted)
}