This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new c2c31b6f refactor(table): wrap manifest writing into closures (#685)
c2c31b6f is described below

commit c2c31b6f293931b1836567634ad205dc786fcfc9
Author: ferhat elmas <[email protected]>
AuthorDate: Mon Jan 19 23:39:51 2026 +0100

    refactor(table): wrap manifest writing into closures (#685)
    
    * more straightforward resource management
    * for loops don't accumulate defers
    * runs on panics
    
    related to #644, #682
    
    Signed-off-by: ferhat elmas <[email protected]>
---
 table/snapshot_producers.go | 62 +++++++++++++++++++++++++--------------------
 1 file changed, 34 insertions(+), 28 deletions(-)

diff --git a/table/snapshot_producers.go b/table/snapshot_producers.go
index d9cfff60..04156194 100644
--- a/table/snapshot_producers.go
+++ b/table/snapshot_producers.go
@@ -157,38 +157,35 @@ func (of *overwriteFiles) existingManifests() 
([]iceberg.ManifestFile, error) {
                        continue
                }
 
-               spec, err := 
of.base.txn.meta.GetSpecByID(int(m.PartitionSpecID()))
-               if err != nil {
-                       return existingFiles, err
-               }
-
-               wr, path, counter, fileCloser, err := 
of.base.newManifestWriter(*spec)
-               if err != nil {
-                       return existingFiles, err
-               }
-
-               for _, entry := range notDeleted {
-                       if err := wr.Existing(entry); err != nil {
-                               internal.CheckedClose(wr, &err)
-                               internal.CheckedClose(fileCloser, &err)
+               // wrap in a function to ensure that the writer is closed even 
if a panic occurs
+               rewriteManifest := func(m iceberg.ManifestFile, notDeleted 
[]iceberg.ManifestEntry) (_ iceberg.ManifestFile, retErr error) {
+                       spec, err := 
of.base.txn.meta.GetSpecByID(int(m.PartitionSpecID()))
+                       if err != nil {
+                               return nil, err
+                       }
 
-                               return existingFiles, err
+                       wr, path, counter, fileCloser, err := 
of.base.newManifestWriter(*spec)
+                       if err != nil {
+                               return nil, err
                        }
-               }
+                       defer internal.CheckedClose(fileCloser, &retErr)
+                       defer internal.CheckedClose(wr, &retErr)
 
-               // close the writer to force a flush and ensure counter.Count 
is accurate
-               if err := wr.Close(); err != nil {
-                       internal.CheckedClose(fileCloser, &err)
+                       for _, entry := range notDeleted {
+                               if err := wr.Existing(entry); err != nil {
+                                       return nil, err
+                               }
+                       }
 
-                       return existingFiles, err
-               }
+                       // close the writer to force a flush and ensure 
counter.Count is accurate
+                       if err := wr.Close(); err != nil {
+                               return nil, err
+                       }
 
-               // close the underlying file writer
-               if err := fileCloser.Close(); err != nil {
-                       return existingFiles, err
+                       return wr.ToManifestFile(path, counter.Count)
                }
 
-               mf, err := wr.ToManifestFile(path, counter.Count)
+               mf, err := rewriteManifest(m, notDeleted)
                if err != nil {
                        return existingFiles, err
                }
@@ -589,15 +586,24 @@ func (sp *snapshotProducer) manifests() (_ 
[]iceberg.ManifestFile, err error) {
                                partitionGroups[specid] = append(group, entry)
                        }
 
-                       for specid, entries := range partitionGroups {
+                       writeGroup := func(specid int, entries 
[]iceberg.ManifestEntry) (_ iceberg.ManifestFile, retErr error) {
                                out, path, err := sp.newManifestOutput()
                                if err != nil {
-                                       return err
+                                       return nil, err
                                }
-                               defer internal.CheckedClose(out, &err)
+                               defer internal.CheckedClose(out, &retErr)
 
                                mf, err := iceberg.WriteManifest(path, out, 
sp.txn.meta.formatVersion,
                                        sp.spec(specid), 
sp.txn.meta.CurrentSchema(), sp.snapshotID, entries)
+                               if err != nil {
+                                       return nil, err
+                               }
+
+                               return mf, nil
+                       }
+
+                       for specid, entries := range partitionGroups {
+                               mf, err := writeGroup(specid, entries)
                                if err != nil {
                                        return err
                                }

Reply via email to