This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new 1d3320a1 fix(table): close writers on error for every exit path (#667)
1d3320a1 is described below
commit 1d3320a113a0f08999a470cf3e928888045829bd
Author: ferhat elmas <[email protected]>
AuthorDate: Thu Jan 8 17:20:31 2026 +0100
fix(table): close writers on error for every exit path (#667)
related to #644
Signed-off-by: ferhat elmas <[email protected]>
---
manifest.go | 17 +--
manifest_test.go | 62 +++++++++
table/snapshot_producers.go | 24 ++--
table/snapshot_producers_test.go | 286 +++++++++++++++++++++++++++++++++++++++
4 files changed, 367 insertions(+), 22 deletions(-)
diff --git a/manifest.go b/manifest.go
index a0d4c611..c02c79da 100644
--- a/manifest.go
+++ b/manifest.go
@@ -1442,11 +1442,8 @@ func (m *ManifestListWriter) AddManifests(files
[]ManifestFile) error {
}
// WriteManifestList writes a list of manifest files to an avro file.
-func WriteManifestList(version int, out io.Writer, snapshotID int64,
parentSnapshotID, sequenceNumber *int64, firstRowId int64, files
[]ManifestFile) error {
- var (
- writer *ManifestListWriter
- err error
- )
+func WriteManifestList(version int, out io.Writer, snapshotID int64,
parentSnapshotID, sequenceNumber *int64, firstRowId int64, files
[]ManifestFile) (err error) {
+ var writer *ManifestListWriter
switch version {
case 1:
@@ -1468,12 +1465,9 @@ func WriteManifestList(version int, out io.Writer,
snapshotID int64, parentSnaps
if err != nil {
return err
}
+ defer internal.CheckedClose(writer, &err)
- if err = writer.AddManifests(files); err != nil {
- return err
- }
-
- return writer.Close()
+ return writer.AddManifests(files)
}
func WriteManifest(
@@ -1484,13 +1478,14 @@ func WriteManifest(
schema *Schema,
snapshotID int64,
entries []ManifestEntry,
-) (ManifestFile, error) {
+) (mf ManifestFile, err error) {
cnt := &internal.CountingWriter{W: out}
w, err := NewManifestWriter(version, cnt, spec, schema, snapshotID)
if err != nil {
return nil, err
}
+ defer internal.CheckedClose(w, &err)
for _, entry := range entries {
if err := w.addEntry(entry.(*manifestEntry)); err != nil {
diff --git a/manifest_test.go b/manifest_test.go
index 2ad2edaa..1192474e 100644
--- a/manifest_test.go
+++ b/manifest_test.go
@@ -19,6 +19,7 @@ package iceberg
import (
"bytes"
+ "errors"
"io"
"testing"
"time"
@@ -1447,3 +1448,64 @@ func (m *ManifestTestSuite)
TestV3PrepareEntrySequenceNumberValidation() {
m.Require().NoError(err)
m.Equal(entry4, result)
}
+
+var errLimitedWrite = errors.New("write limit exceeded")
+
+type limitedWriter struct {
+ limit int
+ written int
+ err error
+}
+
+func (w *limitedWriter) Write(p []byte) (int, error) {
+ if w.written+len(p) > w.limit {
+ return 0, w.err
+ }
+ w.written += len(p)
+
+ return len(p), nil
+}
+
+func (m *ManifestTestSuite) TestWriteManifestListClosesWriterOnError() {
+ seqNum := int64(7)
+ var header bytes.Buffer
+ writer, err := NewManifestListWriterV2(&header, snapshotID, seqNum, nil)
+ m.Require().NoError(err)
+ m.Require().NoError(writer.Close())
+
+ out := &limitedWriter{limit: header.Len(), err: errLimitedWrite}
+ err = WriteManifestList(2, out, snapshotID, nil, &seqNum, 0,
[]ManifestFile{
+ manifestFileRecordsV2[0],
+ manifestFileRecordsV1[0],
+ })
+ m.Require().Error(err)
+ m.Require().ErrorContains(err, "ManifestListWriter only supports
version 2 manifest files")
+ m.Require().ErrorIs(err, errLimitedWrite)
+}
+
+func (m *ManifestTestSuite) TestWriteManifestClosesWriterOnEntryError() {
+ partitionSpec := NewPartitionSpecID(1,
+ PartitionField{FieldID: 1000, SourceID: 1, Name: "VendorID",
Transform: IdentityTransform{}},
+ PartitionField{FieldID: 1001, SourceID: 2, Name:
"tpep_pickup_datetime", Transform: IdentityTransform{}})
+
+ var header bytes.Buffer
+ writer, err := NewManifestWriter(2, &header, partitionSpec, testSchema,
entrySnapshotID)
+ m.Require().NoError(err)
+ headerLen := header.Len()
+
+ m.Require().NoError(writer.Add(manifestEntryV2Records[0]))
+ m.Require().NoError(writer.Close())
+
+ badEntry := *manifestEntryV2Records[1]
+ badEntry.EntryStatus = EntryStatusEXISTING
+ badEntry.SeqNum = nil
+
+ out := &limitedWriter{limit: headerLen, err: errLimitedWrite}
+ _, err = WriteManifest("test.avro", out, 2, partitionSpec, testSchema,
entrySnapshotID, []ManifestEntry{
+ manifestEntryV2Records[0],
+ &badEntry,
+ })
+ m.Require().Error(err)
+ m.Require().ErrorContains(err, "only entries with status ADDED")
+ m.Require().ErrorIs(err, errLimitedWrite)
+}
diff --git a/table/snapshot_producers.go b/table/snapshot_producers.go
index 7352ab20..fbb082fe 100644
--- a/table/snapshot_producers.go
+++ b/table/snapshot_producers.go
@@ -169,6 +169,8 @@ func (of *overwriteFiles) existingManifests()
([]iceberg.ManifestFile, error) {
for _, entry := range notDeleted {
if err := wr.Existing(entry); err != nil {
+ internal.CheckedClose(wr, &err)
+
return existingFiles, err
}
}
@@ -260,11 +262,12 @@ func (m *manifestMergeManager) groupBySpec(manifests
[]iceberg.ManifestFile) map
return groups
}
-func (m *manifestMergeManager) createManifest(specID int, bin
[]iceberg.ManifestFile) (iceberg.ManifestFile, error) {
+func (m *manifestMergeManager) createManifest(specID int, bin
[]iceberg.ManifestFile) (mf iceberg.ManifestFile, err error) {
wr, path, counter, err := m.snap.newManifestWriter(m.snap.spec(specID))
if err != nil {
return nil, err
}
+ defer internal.CheckedClose(wr, &err)
for _, manifest := range bin {
entries, err := m.snap.fetchManifestEntry(manifest, false)
@@ -276,19 +279,17 @@ func (m *manifestMergeManager) createManifest(specID int,
bin []iceberg.Manifest
switch {
case entry.Status() == iceberg.EntryStatusDELETED &&
entry.SnapshotID() == m.snap.snapshotID:
// only files deleted by this snapshot should
be added to the new manifest
- if err = wr.Delete(entry); err != nil {
- return nil, err
- }
+ err = wr.Delete(entry)
case entry.Status() == iceberg.EntryStatusADDED &&
entry.SnapshotID() == m.snap.snapshotID:
// added entries from this snapshot are still
added, otherwise they should be existing
- if err = wr.Add(entry); err != nil {
- return nil, err
- }
+ err = wr.Add(entry)
case entry.Status() != iceberg.EntryStatusDELETED:
// add all non-deleted files from the old
manifest as existing files
- if err = wr.Existing(entry); err != nil {
- return nil, err
- }
+ err = wr.Existing(entry)
+ }
+
+ if err != nil {
+ return nil, err
}
}
}
@@ -522,7 +523,7 @@ func (sp *snapshotProducer) manifests() (_
[]iceberg.ManifestFile, err error) {
results := [...][]iceberg.ManifestFile{nil, nil, nil}
if len(sp.addedFiles) > 0 {
- g.Go(func() error {
+ g.Go(func() (err error) {
out, path, err := sp.newManifestOutput()
if err != nil {
return err
@@ -540,6 +541,7 @@ func (sp *snapshotProducer) manifests() (_
[]iceberg.ManifestFile, err error) {
if err != nil {
return err
}
+ defer internal.CheckedClose(wr, &err)
for _, df := range sp.addedFiles {
err :=
wr.Add(iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &sp.snapshotID,
diff --git a/table/snapshot_producers_test.go b/table/snapshot_producers_test.go
new file mode 100644
index 00000000..0866f2f4
--- /dev/null
+++ b/table/snapshot_producers_test.go
@@ -0,0 +1,286 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+ "bytes"
+ "context"
+ "errors"
+ "io"
+ "io/fs"
+ "testing"
+ "time"
+
+ "github.com/apache/iceberg-go"
+ "github.com/apache/iceberg-go/internal"
+ iceio "github.com/apache/iceberg-go/io"
+ "github.com/stretchr/testify/require"
+)
+
+var errLimitedWrite = errors.New("write limit exceeded")
+
+type limitedWriteCloser struct {
+ limit int
+ written int
+ err error
+}
+
+func (w *limitedWriteCloser) Write(p []byte) (int, error) {
+ if w.written+len(p) > w.limit {
+ return 0, w.err
+ }
+ w.written += len(p)
+
+ return len(p), nil
+}
+
+func (w *limitedWriteCloser) Close() error {
+ return nil
+}
+
+func (w *limitedWriteCloser) ReadFrom(r io.Reader) (int64, error) {
+ return io.Copy(w, r)
+}
+
+type memIO struct {
+ limit int
+ err error
+ files map[string][]byte
+}
+
+func newMemIO(limit int, err error) *memIO {
+ return &memIO{
+ limit: limit,
+ err: err,
+ files: make(map[string][]byte),
+ }
+}
+
+func (m *memIO) Open(name string) (iceio.File, error) {
+ data, ok := m.files[name]
+ if !ok {
+ return nil, fs.ErrNotExist
+ }
+
+ return &internal.MockFile{Contents: bytes.NewReader(data)}, nil
+}
+
+func (m *memIO) Create(name string) (iceio.FileWriter, error) {
+ return &limitedWriteCloser{limit: m.limit, err: m.err}, nil
+}
+
+func (m *memIO) WriteFile(name string, content []byte) error {
+ m.files[name] = append([]byte(nil), content...)
+
+ return nil
+}
+
+func (m *memIO) Remove(name string) error {
+ delete(m.files, name)
+
+ return nil
+}
+
+func manifestHeaderSize(t *testing.T, version int, spec iceberg.PartitionSpec,
schema *iceberg.Schema) int {
+ t.Helper()
+
+ var buf bytes.Buffer
+ writer, err := iceberg.NewManifestWriter(version, &buf, spec, schema, 1)
+ require.NoError(t, err, "new manifest writer")
+ _ = writer.Close()
+
+ return buf.Len()
+}
+
+func manifestSize(t *testing.T, version int, spec iceberg.PartitionSpec,
schema *iceberg.Schema, snapshotID int64, entries []iceberg.ManifestEntry) int {
+ t.Helper()
+
+ var buf bytes.Buffer
+ _, err := iceberg.WriteManifest("size.avro", &buf, version, spec,
schema, snapshotID, entries)
+ require.NoError(t, err, "write manifest for size")
+
+ return buf.Len()
+}
+
+func newTestDataFile(t *testing.T, spec iceberg.PartitionSpec, path string,
partition map[int]any) iceberg.DataFile {
+ t.Helper()
+
+ builder, err := iceberg.NewDataFileBuilder(
+ spec,
+ iceberg.EntryContentData,
+ path,
+ iceberg.ParquetFile,
+ partition,
+ nil,
+ nil,
+ 1,
+ 1,
+ )
+ require.NoError(t, err, "new data file builder")
+
+ return builder.Build()
+}
+
+func TestSnapshotProducerManifestsClosesWriterOnError(t *testing.T) {
+ schema := iceberg.NewSchema(0, iceberg.NestedField{
+ ID: 1, Name: "id", Type: iceberg.PrimitiveTypes.Int32,
Required: true,
+ })
+ spec := iceberg.NewPartitionSpec(iceberg.PartitionField{
+ SourceID: 1, FieldID: 1000, Name: "id", Transform:
iceberg.IdentityTransform{},
+ })
+
+ meta, err := NewMetadata(schema, &spec, UnsortedSortOrder,
"table-location", nil)
+ require.NoError(t, err, "new metadata")
+ spec = meta.PartitionSpec()
+ schema = meta.CurrentSchema()
+ fieldID := 0
+ for field := range spec.Fields() {
+ fieldID = field.FieldID
+
+ break
+ }
+ require.NotZero(t, fieldID, "partition field id")
+
+ mem := newMemIO(manifestHeaderSize(t, 2, spec, schema), errLimitedWrite)
+ tbl := New(Identifier{"db", "tbl"}, meta, "metadata.json",
func(context.Context) (iceio.IO, error) {
+ return mem, nil
+ }, nil)
+ txn := tbl.NewTransaction()
+
+ sp := newFastAppendFilesProducer(OpAppend, txn, mem, nil, nil)
+ validPartition := map[int]any{fieldID: int32(1)}
+ sp.appendDataFile(newTestDataFile(t, spec, "file://data-1.parquet",
validPartition))
+ sp.appendDataFile(newTestDataFile(t, spec, "file://data-2.parquet",
nil))
+
+ _, err = sp.manifests()
+ require.ErrorIs(t, err, errLimitedWrite)
+}
+
+func TestManifestMergeManagerClosesWriterOnError(t *testing.T) {
+ schema := iceberg.NewSchema(0, iceberg.NestedField{
+ ID: 1, Name: "id", Type: iceberg.PrimitiveTypes.Int32,
Required: true,
+ })
+ spec := iceberg.NewPartitionSpec()
+
+ mem := newMemIO(manifestHeaderSize(t, 2, spec, schema), errLimitedWrite)
+ meta, err := NewMetadata(schema, &spec, UnsortedSortOrder,
"table-location", nil)
+ require.NoError(t, err, "new metadata")
+
+ tbl := New(Identifier{"db", "tbl"}, meta, "metadata.json",
func(context.Context) (iceio.IO, error) {
+ return mem, nil
+ }, nil)
+ txn := tbl.NewTransaction()
+
+ sp := newFastAppendFilesProducer(OpAppend, txn, mem, nil, nil)
+ df := newTestDataFile(t, spec, "file://data-1.parquet", nil)
+ entries := []iceberg.ManifestEntry{
+ iceberg.NewManifestEntry(iceberg.EntryStatusADDED,
&sp.snapshotID, nil, nil, df),
+ }
+
+ manifestPath := "table-location/metadata/manifest-1.avro"
+ var manifestBuf bytes.Buffer
+ manifestFile, err := iceberg.WriteManifest(manifestPath, &manifestBuf,
2, spec, schema, sp.snapshotID, entries)
+ require.NoError(t, err, "write manifest")
+ require.NoError(t, mem.WriteFile(manifestPath, manifestBuf.Bytes()))
+
+ missingManifest := iceberg.NewManifestFile(2,
"table-location/metadata/missing.avro", 1, int32(spec.ID()), sp.snapshotID).
+ Build()
+
+ mgr := manifestMergeManager{snap: sp}
+ _, err = mgr.createManifest(spec.ID(), []iceberg.ManifestFile{
+ manifestFile,
+ missingManifest,
+ })
+ require.ErrorIs(t, err, errLimitedWrite)
+}
+
+func TestOverwriteFilesExistingManifestsClosesWriterOnError(t *testing.T) {
+ schema := iceberg.NewSchema(0, iceberg.NestedField{
+ ID: 1, Name: "id", Type: iceberg.PrimitiveTypes.Int32,
Required: true,
+ })
+ spec := iceberg.NewPartitionSpec(iceberg.PartitionField{
+ SourceID: 1, FieldID: 1000, Name: "id", Transform:
iceberg.IdentityTransform{},
+ })
+
+ meta, err := NewMetadata(schema, &spec, UnsortedSortOrder,
"table-location", nil)
+ require.NoError(t, err, "new metadata")
+
+ spec = meta.PartitionSpec()
+ schema = meta.CurrentSchema()
+ fieldID := 0
+ for field := range spec.Fields() {
+ fieldID = field.FieldID
+
+ break
+ }
+ require.NotZero(t, fieldID, "partition field id")
+
+ snapshotID := int64(100)
+ seqNum := int64(-1)
+ validSeq := int64(42)
+ manifestPath := "table-location/metadata/manifest-1.avro"
+ manifestListPath := "table-location/metadata/snap-1.avro"
+
+ validPartition := map[int]any{fieldID: int32(1)}
+ validFile := newTestDataFile(t, spec, "file://valid.parquet",
validPartition)
+ sizeEntries := []iceberg.ManifestEntry{
+ iceberg.NewManifestEntry(iceberg.EntryStatusEXISTING,
&snapshotID, &validSeq, nil, validFile),
+ }
+ headerLen := manifestHeaderSize(t, 2, spec, schema)
+ manifestLen := manifestSize(t, 2, spec, schema, snapshotID, sizeEntries)
+ require.Greater(t, manifestLen, headerLen, "manifest size")
+
+ mem := newMemIO(manifestLen-1, errLimitedWrite)
+ tbl := New(Identifier{"db", "tbl"}, meta, "metadata.json",
func(context.Context) (iceio.IO, error) {
+ return mem, nil
+ }, nil)
+ txn := tbl.NewTransaction()
+
+ deletedFile := newTestDataFile(t, spec, "file://deleted.parquet",
validPartition)
+ invalidFile := newTestDataFile(t, spec, "file://invalid.parquet",
validPartition)
+ entries := []iceberg.ManifestEntry{
+ iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &snapshotID,
&validSeq, nil, deletedFile),
+ iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &snapshotID,
&validSeq, nil, validFile),
+ iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &snapshotID,
nil, nil, invalidFile),
+ }
+
+ var manifestBuf bytes.Buffer
+ manifestFile, err := iceberg.WriteManifest(manifestPath, &manifestBuf,
2, spec, schema, snapshotID, entries)
+ require.NoError(t, err, "write manifest")
+ require.NoError(t, mem.WriteFile(manifestPath, manifestBuf.Bytes()))
+
+ var listBuf bytes.Buffer
+ err = iceberg.WriteManifestList(2, &listBuf, snapshotID, nil, &seqNum,
0, []iceberg.ManifestFile{manifestFile})
+ require.NoError(t, err, "write manifest list")
+ require.NoError(t, mem.WriteFile(manifestListPath, listBuf.Bytes()))
+
+ snap := Snapshot{
+ SnapshotID: snapshotID,
+ SequenceNumber: seqNum,
+ TimestampMs: time.Now().UnixMilli(),
+ ManifestList: manifestListPath,
+ }
+ txn.meta.snapshotList = []Snapshot{snap}
+ txn.meta.currentSnapshotID = &snapshotID
+
+ sp := newOverwriteFilesProducer(OpOverwrite, txn, mem, nil, nil)
+ sp.deleteDataFile(deletedFile)
+
+ _, err = sp.existingManifests()
+ require.ErrorIs(t, err, errLimitedWrite)
+}