This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new c55f73a6 fix(table): handle early return for commit (#687)
c55f73a6 is described below

commit c55f73a6cf1e96ddcf5f28e69648733247687ed0
Author: ferhat elmas <[email protected]>
AuthorDate: Wed Jan 21 01:40:31 2026 +0100

    fix(table): handle early return for commit (#687)
    
    related #644, #681
    
    Signed-off-by: ferhat elmas <[email protected]>
---
 table/snapshot_producers.go      |  10 ++--
 table/snapshot_producers_test.go | 101 +++++++++++++++++++++++++++++++++++++++
 2 files changed, 106 insertions(+), 5 deletions(-)

diff --git a/table/snapshot_producers.go b/table/snapshot_producers.go
index 04156194..8022f5a4 100644
--- a/table/snapshot_producers.go
+++ b/table/snapshot_producers.go
@@ -524,6 +524,11 @@ func (sp *snapshotProducer) fetchManifestEntry(m 
iceberg.ManifestFile, discardDe
 }
 
 func (sp *snapshotProducer) manifests() (_ []iceberg.ManifestFile, err error) {
+       deleted, err := sp.deletedEntries()
+       if err != nil {
+               return nil, err
+       }
+
        var g errgroup.Group
 
        results := [...][]iceberg.ManifestFile{nil, nil, nil}
@@ -571,11 +576,6 @@ func (sp *snapshotProducer) manifests() (_ 
[]iceberg.ManifestFile, err error) {
                })
        }
 
-       deleted, err := sp.deletedEntries()
-       if err != nil {
-               return nil, err
-       }
-
        if len(deleted) > 0 {
                g.Go(func() error {
                        partitionGroups := map[int][]iceberg.ManifestEntry{}
diff --git a/table/snapshot_producers_test.go b/table/snapshot_producers_test.go
index 161c9503..1c6922f1 100644
--- a/table/snapshot_producers_test.go
+++ b/table/snapshot_producers_test.go
@@ -354,6 +354,13 @@ func (t *trackingIO) GetUnclosedWriters() []string {
        return unclosed
 }
 
+func (t *trackingIO) GetWriterCount() int {
+       t.writersMu.Lock()
+       defer t.writersMu.Unlock()
+
+       return len(t.writers)
+}
+
 // TestManifestWriterClosesUnderlyingFile tests that when using 
newManifestWriter,
 // the underlying file writer is properly closed. This test is related to 
issue #644 and #681
 // where blob.Writer was never closed, causing table corruption.
@@ -455,3 +462,97 @@ func TestOverwriteExistingManifestsClosesUnderlyingFile(t 
*testing.T) {
        unclosed := trackIO.GetUnclosedWriters()
        require.Empty(t, unclosed, "all file writers should be closed after 
existingManifests, but these are still open: %v", unclosed)
 }
+
+// errorOnDeletedEntries is a producerImpl that returns an error from 
deletedEntries()
+// to test that file writers are properly closed even when deletedEntries 
fails.
+type errorOnDeletedEntries struct {
+       base                *snapshotProducer
+       err                 error
+       waitForWriter       <-chan struct{} // optional signal before returning 
error
+       cancelWaitForWriter <-chan struct{} // optional cancel channel
+}
+
+func (e *errorOnDeletedEntries) processManifests(manifests 
[]iceberg.ManifestFile) ([]iceberg.ManifestFile, error) {
+       return manifests, nil
+}
+
+func (e *errorOnDeletedEntries) existingManifests() ([]iceberg.ManifestFile, 
error) {
+       return nil, nil
+}
+
+func (e *errorOnDeletedEntries) deletedEntries() ([]iceberg.ManifestEntry, 
error) {
+       if e.waitForWriter != nil {
+               select {
+               case <-e.waitForWriter:
+               case <-e.cancelWaitForWriter:
+                       return nil, e.err
+               }
+       }
+
+       return nil, e.err
+}
+
+// blockingTrackingIO extends trackingIO to signal when a writer is created.
+type blockingTrackingIO struct {
+       *trackingIO
+       writerCreated chan struct{}
+       signalOnce    sync.Once
+}
+
+func newBlockingTrackingIO() *blockingTrackingIO {
+       return &blockingTrackingIO{
+               trackingIO:    newTrackingIO(),
+               writerCreated: make(chan struct{}),
+       }
+}
+
+func (b *blockingTrackingIO) Create(name string) (iceio.FileWriter, error) {
+       writer, err := b.trackingIO.Create(name)
+       b.signalOnce.Do(func() {
+               close(b.writerCreated)
+       })
+
+       return writer, err
+}
+
+// This test verifies that NO writers are created when deletedEntries() fails,
+// because the error should be returned before any goroutines start.
+func TestManifestsClosesWriterWhenDeletedEntriesFails(t *testing.T) {
+       ctx, cancel := context.WithCancel(context.Background())
+       defer cancel()
+
+       blockingIO := newBlockingTrackingIO()
+       spec := iceberg.NewPartitionSpec()
+       txn := createTestTransaction(t, blockingIO, spec)
+
+       sp := createSnapshotProducer(OpAppend, txn, blockingIO, nil, nil)
+       errDeletedEntries := errors.New("simulated deletedEntries error")
+       sp.producerImpl = &errorOnDeletedEntries{
+               base:                sp,
+               err:                 errDeletedEntries,
+               waitForWriter:       blockingIO.writerCreated,
+               cancelWaitForWriter: ctx.Done(),
+       }
+
+       df := newTestDataFile(t, spec, "file://data-1.parquet", nil)
+       sp.appendDataFile(df)
+
+       done := make(chan struct{})
+       var manifestsErr error
+       go func() {
+               _, manifestsErr = sp.manifests()
+               close(done)
+       }()
+
+       select {
+       case <-done:
+               require.ErrorIs(t, manifestsErr, errDeletedEntries)
+               writerCount := blockingIO.GetWriterCount()
+               require.NotZero(t, writerCount, "test setup error: expected 
writer to be created")
+               require.Fail(t, "goroutine started before deletedEntries() 
check, creating a writer that could be orphaned")
+
+       case <-time.After(100 * time.Millisecond):
+               writerCount := blockingIO.GetWriterCount()
+               require.Zero(t, writerCount, "expected no writers to be created 
when deletedEntries is called first")
+       }
+}

Reply via email to