This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new 1cf6c7a9 fix(table): AddFiles: close file after usage and parallelize
(#799)
1cf6c7a9 is described below
commit 1cf6c7a9031f2fda4627ec2a7e76eaed9c403052
Author: starpact <[email protected]>
AuthorDate: Wed Mar 25 07:23:32 2026 +0800
fix(table): AddFiles: close file after usage and parallelize (#799)
Previously files are closed only after the whole iteration finishes:
https://github.com/apache/iceberg-go/blob/55bdfbf836359142f7c010b0b80f0b7e06af2dca/table/arrow_utils.go#L1372
causing resource leak within a single `filesToDataFiles` invocation as
the file holds an open S3 read stream:
https://github.com/apache/iceberg-go/blob/55bdfbf836359142f7c010b0b80f0b7e06af2dca/io/gocloud/blob.go#L39
A sample trace can demonstrate it clearly:
<img width="1367" height="559" alt="image"
src="https://github.com/user-attachments/assets/c98e2b86-1ef6-4a08-b8fc-008fafc3f4aa"
/>
Besides, I also added parallelism to `AddFiles`.
Future improvements:
Parallelism partially "fixes" the performance issue of `AddFiles` but a
more fundamental improvement is related to IO, currently for extracting
Parquet metadata, the IO layer needs to:
1. do one GetObject and hold the read stream. The size is used by
`Seek`, but the reader is never used(but still consumes quite some
memory due to underlying buffering)
2. do one range GetObject for the last 8 bytes
3. do one range GetObject for the footer
A common trick is optimistically prefetching the footer(e.g. 512KB) to
coleasce the above 3 requests. However, this requires leaking the IO
abstraction, e.g. by determining the file format from the path and
returning different implementations accordingly, which may require more
discussions.
---
table/arrow_utils.go | 109 +++++++++++++++++++++++++++------------------------
table/table_test.go | 2 +-
table/transaction.go | 59 +++++++++++++++++++---------
3 files changed, 99 insertions(+), 71 deletions(-)
diff --git a/table/arrow_utils.go b/table/arrow_utils.go
index 16bbc008..3215512d 100644
--- a/table/arrow_utils.go
+++ b/table/arrow_utils.go
@@ -42,6 +42,7 @@ import (
tblutils "github.com/apache/iceberg-go/table/internal"
"github.com/google/uuid"
"github.com/pterm/pterm"
+ "golang.org/x/sync/errgroup"
)
// constants to look for as Keys in Arrow field metadata
@@ -1343,72 +1344,78 @@ func computeStatsPlan(sc *iceberg.Schema, props
iceberg.Properties) (map[int]tbl
return result, nil
}
-func filesToDataFiles(ctx context.Context, fileIO iceio.IO, meta
*MetadataBuilder, paths iter.Seq[string]) iter.Seq2[iceberg.DataFile, error] {
- return func(yield func(iceberg.DataFile, error) bool) {
- defer func() {
- if r := recover(); r != nil {
- switch e := r.(type) {
- case string:
- yield(nil, fmt.Errorf("error
encountered during file conversion: %s", e))
- case error:
- yield(nil, fmt.Errorf("error
encountered during file conversion: %w", e))
- }
- }
- }()
-
- partitionSpec, err := meta.CurrentSpec()
- if err != nil || partitionSpec == nil {
- yield(nil, fmt.Errorf("%w: cannot add files without a
current spec", err))
-
- return
- }
+func filesToDataFiles(ctx context.Context, fileIO iceio.IO, meta
*MetadataBuilder, filePaths []string, concurrency int) (_ []iceberg.DataFile,
err error) {
+ partitionSpec, err := meta.CurrentSpec()
+ if err != nil || partitionSpec == nil {
+ return nil, fmt.Errorf("%w: cannot add files without a current
spec", err)
+ }
- currentSchema, currentSpec := meta.CurrentSchema(),
*partitionSpec
+ currentSchema, currentSpec := meta.CurrentSchema(), *partitionSpec
- for filePath := range paths {
- format := tblutils.FormatFromFileName(filePath)
- rdr := must(format.Open(ctx, fileIO, filePath))
- // TODO: take a look at this defer Close() and consider
refactoring
- defer rdr.Close()
+ dataFiles := make([]iceberg.DataFile, len(filePaths))
+ eg, ctx := errgroup.WithContext(ctx)
+ eg.SetLimit(concurrency)
+ for i, filePath := range filePaths {
+ eg.Go(func() (err error) {
+ defer func() {
+ if r := recover(); r != nil {
+ switch e := r.(type) {
+ case error:
+ err = fmt.Errorf("error
encountered during file conversion: %w", e)
+ default:
+ err = fmt.Errorf("error
encountered during file conversion: %v", e)
+ }
+ }
+ }()
- arrSchema := must(rdr.Schema())
+ dataFiles[i] = fileToDataFile(ctx, fileIO, filePath,
currentSchema, currentSpec, meta.props)
- if err := checkArrowSchemaCompat(currentSchema,
arrSchema, false); err != nil {
- yield(nil, err)
+ return nil
+ })
+ }
- return
- }
+ if err := eg.Wait(); err != nil {
+ return nil, err
+ }
- pathToIDSchema := currentSchema
- if fileHasIDs := must(VisitArrowSchema(arrSchema,
hasIDs{})); fileHasIDs {
- pathToIDSchema =
must(ArrowSchemaToIceberg(arrSchema, false, nil))
- }
+ return dataFiles, nil
+}
- statistics :=
format.DataFileStatsFromMeta(rdr.Metadata(),
must(computeStatsPlan(currentSchema, meta.props)),
- must(format.PathToIDMapping(pathToIDSchema)))
+func fileToDataFile(ctx context.Context, fileIO iceio.IO, filePath string,
currentSchema *iceberg.Schema, currentSpec iceberg.PartitionSpec, props
iceberg.Properties) iceberg.DataFile {
+ format := tblutils.FormatFromFileName(filePath)
+ rdr := must(format.Open(ctx, fileIO, filePath))
+ defer rdr.Close()
- partitionValues := make(map[int]any)
- if !currentSpec.Equals(*iceberg.UnpartitionedSpec) {
- for _, field := range currentSpec.Fields() {
- if !field.Transform.PreservesOrder() {
- yield(nil, fmt.Errorf("cannot
infer partition value from parquet metadata for a non-linear partition field:
%s with transform %s", field.Name, field.Transform))
+ arrSchema := must(rdr.Schema())
+ if err := checkArrowSchemaCompat(currentSchema, arrSchema, false); err
!= nil {
+ panic(err)
+ }
- return
- }
+ pathToIDSchema := currentSchema
+ if fileHasIDs := must(VisitArrowSchema(arrSchema, hasIDs{}));
fileHasIDs {
+ pathToIDSchema = must(ArrowSchemaToIceberg(arrSchema, false,
nil))
+ }
+ statistics := format.DataFileStatsFromMeta(
+ rdr.Metadata(),
+ must(computeStatsPlan(currentSchema, props)),
+ must(format.PathToIDMapping(pathToIDSchema)),
+ )
- partitionVal :=
statistics.PartitionValue(field, currentSchema)
- if partitionVal != nil {
- partitionValues[field.FieldID]
= partitionVal
- }
- }
+ partitionValues := make(map[int]any)
+ if !currentSpec.Equals(*iceberg.UnpartitionedSpec) {
+ for _, field := range currentSpec.Fields() {
+ if !field.Transform.PreservesOrder() {
+ panic(fmt.Errorf("cannot infer partition value
from parquet metadata for a non-linear partition field: %s with transform %s",
field.Name, field.Transform))
}
- df := statistics.ToDataFile(currentSchema, currentSpec,
filePath, iceberg.ParquetFile, iceberg.EntryContentData, rdr.SourceFileSize(),
partitionValues)
- if !yield(df, nil) {
- return
+ partitionVal := statistics.PartitionValue(field,
currentSchema)
+ if partitionVal != nil {
+ partitionValues[field.FieldID] = partitionVal
}
}
}
+
+ return statistics.ToDataFile(currentSchema, currentSpec, filePath,
iceberg.ParquetFile, iceberg.EntryContentData, rdr.SourceFileSize(),
partitionValues)
}
func recordNBytes(rec arrow.RecordBatch) (total int64) {
diff --git a/table/table_test.go b/table/table_test.go
index 59927d1f..9f573343 100644
--- a/table/table_test.go
+++ b/table/table_test.go
@@ -596,7 +596,7 @@ func (t *TableWritingTestSuite)
TestAddFilesFailsSchemaMismatch() {
tx := tbl.NewTransaction()
err = tx.AddFiles(t.ctx, files, nil, false)
t.Error(err)
- t.EqualError(err, `invalid schema: mismatch in fields:
+ t.EqualError(err, `error encountered during file conversion: invalid
schema: mismatch in fields:
| Table Field | Requested Field
✅ | 1: foo: optional boolean | 1: foo: optional boolean
✅ | 2: bar: optional string | 2: bar: optional string
diff --git a/table/transaction.go b/table/transaction.go
index a28457a3..655ee748 100644
--- a/table/transaction.go
+++ b/table/transaction.go
@@ -25,7 +25,6 @@ import (
"fmt"
"iter"
"runtime"
- "slices"
"sync"
"time"
@@ -421,12 +420,12 @@ func (t *Transaction) ReplaceDataFiles(ctx
context.Context, filesToDelete, files
updater.deleteDataFile(df)
}
- dataFiles := filesToDataFiles(ctx, fs, t.meta,
slices.Values(filesToAdd))
- for df, err := range dataFiles {
- if err != nil {
- return err
- }
- updater.appendDataFile(df)
+ dataFiles, err := filesToDataFiles(ctx, fs, t.meta, filesToAdd, 1)
+ if err != nil {
+ return err
+ }
+ for _, dataFile := range dataFiles {
+ updater.appendDataFile(dataFile)
}
updates, reqs, err := updater.commit()
@@ -728,14 +727,36 @@ func (t *Transaction) ReplaceDataFilesWithDataFiles(ctx
context.Context, filesTo
return t.apply(updates, reqs)
}
-func (t *Transaction) AddFiles(ctx context.Context, files []string,
snapshotProps iceberg.Properties, ignoreDuplicates bool) error {
- set := make(map[string]string)
- for _, f := range files {
- set[f] = f
+type AddFilesOption func(addFilesOp *addFilesOperation)
+
+type addFilesOperation struct {
+ concurrency int
+}
+
+// WithAddFilesConcurrency overwrites the default concurrency for add files
operation.
+// Default: runtime.GOMAXPROCS(0)
+func WithAddFilesConcurrency(concurrency int) AddFilesOption {
+ return func(op *addFilesOperation) {
+ if concurrency > 0 {
+ op.concurrency = concurrency
+ }
}
+}
- if len(set) != len(files) {
- return errors.New("file paths must be unique for AddFiles")
+func (t *Transaction) AddFiles(ctx context.Context, filePaths []string,
snapshotProps iceberg.Properties, ignoreDuplicates bool, opts
...AddFilesOption) error {
+ addFilesOp := addFilesOperation{
+ concurrency: runtime.GOMAXPROCS(0),
+ }
+ for _, apply := range opts {
+ apply(&addFilesOp)
+ }
+
+ set := make(map[string]struct{}, len(filePaths))
+ for _, filePath := range filePaths {
+ if _, ok := set[filePath]; ok {
+ return errors.New("file paths must be unique for
AddFiles")
+ }
+ set[filePath] = struct{}{}
}
if !ignoreDuplicates {
@@ -779,12 +800,12 @@ func (t *Transaction) AddFiles(ctx context.Context, files
[]string, snapshotProp
updater := t.updateSnapshot(fs, snapshotProps, OpAppend).fastAppend()
- dataFiles := filesToDataFiles(ctx, fs, t.meta, slices.Values(files))
- for df, err := range dataFiles {
- if err != nil {
- return err
- }
- updater.appendDataFile(df)
+ dataFiles, err := filesToDataFiles(ctx, fs, t.meta, filePaths,
addFilesOp.concurrency)
+ if err != nil {
+ return err
+ }
+ for _, dataFile := range dataFiles {
+ updater.appendDataFile(dataFile)
}
updates, reqs, err := updater.commit()