This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new c2c31b6f refactor(table): wrap manifest writing into closures (#685)
c2c31b6f is described below
commit c2c31b6f293931b1836567634ad205dc786fcfc9
Author: ferhat elmas <[email protected]>
AuthorDate: Mon Jan 19 23:39:51 2026 +0100
refactor(table): wrap manifest writing into closures (#685)
* more straightforward resource management
* for loops don't accumulate defers
* runs on panics
related to #644, #682
Signed-off-by: ferhat elmas <[email protected]>
---
table/snapshot_producers.go | 62 +++++++++++++++++++++++++--------------------
1 file changed, 34 insertions(+), 28 deletions(-)
diff --git a/table/snapshot_producers.go b/table/snapshot_producers.go
index d9cfff60..04156194 100644
--- a/table/snapshot_producers.go
+++ b/table/snapshot_producers.go
@@ -157,38 +157,35 @@ func (of *overwriteFiles) existingManifests()
([]iceberg.ManifestFile, error) {
continue
}
- spec, err :=
of.base.txn.meta.GetSpecByID(int(m.PartitionSpecID()))
- if err != nil {
- return existingFiles, err
- }
-
- wr, path, counter, fileCloser, err :=
of.base.newManifestWriter(*spec)
- if err != nil {
- return existingFiles, err
- }
-
- for _, entry := range notDeleted {
- if err := wr.Existing(entry); err != nil {
- internal.CheckedClose(wr, &err)
- internal.CheckedClose(fileCloser, &err)
+ // wrap in a function to ensure that the writer is closed even
if a panic occurs
+ rewriteManifest := func(m iceberg.ManifestFile, notDeleted
[]iceberg.ManifestEntry) (_ iceberg.ManifestFile, retErr error) {
+ spec, err :=
of.base.txn.meta.GetSpecByID(int(m.PartitionSpecID()))
+ if err != nil {
+ return nil, err
+ }
- return existingFiles, err
+ wr, path, counter, fileCloser, err :=
of.base.newManifestWriter(*spec)
+ if err != nil {
+ return nil, err
}
- }
+ defer internal.CheckedClose(fileCloser, &retErr)
+ defer internal.CheckedClose(wr, &retErr)
- // close the writer to force a flush and ensure counter.Count
is accurate
- if err := wr.Close(); err != nil {
- internal.CheckedClose(fileCloser, &err)
+ for _, entry := range notDeleted {
+ if err := wr.Existing(entry); err != nil {
+ return nil, err
+ }
+ }
- return existingFiles, err
- }
+ // close the writer to force a flush and ensure
counter.Count is accurate
+ if err := wr.Close(); err != nil {
+ return nil, err
+ }
- // close the underlying file writer
- if err := fileCloser.Close(); err != nil {
- return existingFiles, err
+ return wr.ToManifestFile(path, counter.Count)
}
- mf, err := wr.ToManifestFile(path, counter.Count)
+ mf, err := rewriteManifest(m, notDeleted)
if err != nil {
return existingFiles, err
}
@@ -589,15 +586,24 @@ func (sp *snapshotProducer) manifests() (_
[]iceberg.ManifestFile, err error) {
partitionGroups[specid] = append(group, entry)
}
- for specid, entries := range partitionGroups {
+ writeGroup := func(specid int, entries
[]iceberg.ManifestEntry) (_ iceberg.ManifestFile, retErr error) {
out, path, err := sp.newManifestOutput()
if err != nil {
- return err
+ return nil, err
}
- defer internal.CheckedClose(out, &err)
+ defer internal.CheckedClose(out, &retErr)
mf, err := iceberg.WriteManifest(path, out,
sp.txn.meta.formatVersion,
sp.spec(specid),
sp.txn.meta.CurrentSchema(), sp.snapshotID, entries)
+ if err != nil {
+ return nil, err
+ }
+
+ return mf, nil
+ }
+
+ for specid, entries := range partitionGroups {
+ mf, err := writeGroup(specid, entries)
if err != nil {
return err
}