This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new c55f73a6 fix(table): handle early return for commit (#687)
c55f73a6 is described below
commit c55f73a6cf1e96ddcf5f28e69648733247687ed0
Author: ferhat elmas <[email protected]>
AuthorDate: Wed Jan 21 01:40:31 2026 +0100
fix(table): handle early return for commit (#687)
related #644, #681
Signed-off-by: ferhat elmas <[email protected]>
---
table/snapshot_producers.go | 10 ++--
table/snapshot_producers_test.go | 101 +++++++++++++++++++++++++++++++++++++++
2 files changed, 106 insertions(+), 5 deletions(-)
diff --git a/table/snapshot_producers.go b/table/snapshot_producers.go
index 04156194..8022f5a4 100644
--- a/table/snapshot_producers.go
+++ b/table/snapshot_producers.go
@@ -524,6 +524,11 @@ func (sp *snapshotProducer) fetchManifestEntry(m
iceberg.ManifestFile, discardDe
}
func (sp *snapshotProducer) manifests() (_ []iceberg.ManifestFile, err error) {
+ deleted, err := sp.deletedEntries()
+ if err != nil {
+ return nil, err
+ }
+
var g errgroup.Group
results := [...][]iceberg.ManifestFile{nil, nil, nil}
@@ -571,11 +576,6 @@ func (sp *snapshotProducer) manifests() (_
[]iceberg.ManifestFile, err error) {
})
}
- deleted, err := sp.deletedEntries()
- if err != nil {
- return nil, err
- }
-
if len(deleted) > 0 {
g.Go(func() error {
partitionGroups := map[int][]iceberg.ManifestEntry{}
diff --git a/table/snapshot_producers_test.go b/table/snapshot_producers_test.go
index 161c9503..1c6922f1 100644
--- a/table/snapshot_producers_test.go
+++ b/table/snapshot_producers_test.go
@@ -354,6 +354,13 @@ func (t *trackingIO) GetUnclosedWriters() []string {
return unclosed
}
+func (t *trackingIO) GetWriterCount() int {
+ t.writersMu.Lock()
+ defer t.writersMu.Unlock()
+
+ return len(t.writers)
+}
+
// TestManifestWriterClosesUnderlyingFile tests that when using
newManifestWriter,
// the underlying file writer is properly closed. This test is related to
issue #644 and #681
// where blob.Writer was never closed, causing table corruption.
@@ -455,3 +462,97 @@ func TestOverwriteExistingManifestsClosesUnderlyingFile(t
*testing.T) {
unclosed := trackIO.GetUnclosedWriters()
require.Empty(t, unclosed, "all file writers should be closed after
existingManifests, but these are still open: %v", unclosed)
}
+
+// errorOnDeletedEntries is a producerImpl that returns an error from
deletedEntries()
+// to test that file writers are properly closed even when deletedEntries
fails.
+type errorOnDeletedEntries struct {
+ base *snapshotProducer
+ err error
+ waitForWriter <-chan struct{} // optional signal before returning
error
+ cancelWaitForWriter <-chan struct{} // optional cancel channel
+}
+
+func (e *errorOnDeletedEntries) processManifests(manifests
[]iceberg.ManifestFile) ([]iceberg.ManifestFile, error) {
+ return manifests, nil
+}
+
+func (e *errorOnDeletedEntries) existingManifests() ([]iceberg.ManifestFile,
error) {
+ return nil, nil
+}
+
+func (e *errorOnDeletedEntries) deletedEntries() ([]iceberg.ManifestEntry,
error) {
+ if e.waitForWriter != nil {
+ select {
+ case <-e.waitForWriter:
+ case <-e.cancelWaitForWriter:
+ return nil, e.err
+ }
+ }
+
+ return nil, e.err
+}
+
+// blockingTrackingIO extends trackingIO to signal when a writer is created.
+type blockingTrackingIO struct {
+ *trackingIO
+ writerCreated chan struct{}
+ signalOnce sync.Once
+}
+
+func newBlockingTrackingIO() *blockingTrackingIO {
+ return &blockingTrackingIO{
+ trackingIO: newTrackingIO(),
+ writerCreated: make(chan struct{}),
+ }
+}
+
+func (b *blockingTrackingIO) Create(name string) (iceio.FileWriter, error) {
+ writer, err := b.trackingIO.Create(name)
+ b.signalOnce.Do(func() {
+ close(b.writerCreated)
+ })
+
+ return writer, err
+}
+
+// This test verifies that NO writers are created when deletedEntries() fails,
+// because the error should be returned before any goroutines start.
+func TestManifestsClosesWriterWhenDeletedEntriesFails(t *testing.T) {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ blockingIO := newBlockingTrackingIO()
+ spec := iceberg.NewPartitionSpec()
+ txn := createTestTransaction(t, blockingIO, spec)
+
+ sp := createSnapshotProducer(OpAppend, txn, blockingIO, nil, nil)
+ errDeletedEntries := errors.New("simulated deletedEntries error")
+ sp.producerImpl = &errorOnDeletedEntries{
+ base: sp,
+ err: errDeletedEntries,
+ waitForWriter: blockingIO.writerCreated,
+ cancelWaitForWriter: ctx.Done(),
+ }
+
+ df := newTestDataFile(t, spec, "file://data-1.parquet", nil)
+ sp.appendDataFile(df)
+
+ done := make(chan struct{})
+ var manifestsErr error
+ go func() {
+ _, manifestsErr = sp.manifests()
+ close(done)
+ }()
+
+ select {
+ case <-done:
+ require.ErrorIs(t, manifestsErr, errDeletedEntries)
+ writerCount := blockingIO.GetWriterCount()
+ require.NotZero(t, writerCount, "test setup error: expected
writer to be created")
+ require.Fail(t, "goroutine started before deletedEntries()
check, creating a writer that could be orphaned")
+
+ case <-time.After(100 * time.Millisecond):
+ writerCount := blockingIO.GetWriterCount()
+ require.Zero(t, writerCount, "expected no writers to be created
when deletedEntries is called first")
+ }
+}