This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 1cf6c7a9 fix(table): AddFiles: close file after usage and parallelize 
(#799)
1cf6c7a9 is described below

commit 1cf6c7a9031f2fda4627ec2a7e76eaed9c403052
Author: starpact <[email protected]>
AuthorDate: Wed Mar 25 07:23:32 2026 +0800

    fix(table): AddFiles: close file after usage and parallelize (#799)
    
    Previously files are closed only after the whole iteration finishes:
    
    
https://github.com/apache/iceberg-go/blob/55bdfbf836359142f7c010b0b80f0b7e06af2dca/table/arrow_utils.go#L1372
    causing resource leak within a single `filesToDataFiles` invocation as
    the file holds an open S3 read stream:
    
    
https://github.com/apache/iceberg-go/blob/55bdfbf836359142f7c010b0b80f0b7e06af2dca/io/gocloud/blob.go#L39
    
    A sample trace can demonstrate it clearly:
    <img width="1367" height="559" alt="image"
    
src="https://github.com/user-attachments/assets/c98e2b86-1ef6-4a08-b8fc-008fafc3f4aa";
    />
    
    Besides, I also added parallelism to `AddFiles`.
    
    Future improvements:
    Parallelism partially "fixes" the performance issue of `AddFiles` but a
    more fundamental improvement is related to IO, currently for extracting
    Parquet metadata, the IO layer needs to:
    1. do one GetObject and hold the read stream. The size is used by
    `Seek`, but the reader is never used(but still consumes quite some
    memory due to underlying buffering)
    2. do one range GetObject for the last 8 bytes
    3. do one range GetObject for the footer
    
    A common trick is optimistically prefetching the footer(e.g. 512KB) to
    coleasce the above 3 requests. However, this requires leaking the IO
    abstraction, e.g. by determining the file format from the path and
    returning different implementations accordingly, which may require more
    discussions.
---
 table/arrow_utils.go | 109 +++++++++++++++++++++++++++------------------------
 table/table_test.go  |   2 +-
 table/transaction.go |  59 +++++++++++++++++++---------
 3 files changed, 99 insertions(+), 71 deletions(-)

diff --git a/table/arrow_utils.go b/table/arrow_utils.go
index 16bbc008..3215512d 100644
--- a/table/arrow_utils.go
+++ b/table/arrow_utils.go
@@ -42,6 +42,7 @@ import (
        tblutils "github.com/apache/iceberg-go/table/internal"
        "github.com/google/uuid"
        "github.com/pterm/pterm"
+       "golang.org/x/sync/errgroup"
 )
 
 // constants to look for as Keys in Arrow field metadata
@@ -1343,72 +1344,78 @@ func computeStatsPlan(sc *iceberg.Schema, props 
iceberg.Properties) (map[int]tbl
        return result, nil
 }
 
-func filesToDataFiles(ctx context.Context, fileIO iceio.IO, meta 
*MetadataBuilder, paths iter.Seq[string]) iter.Seq2[iceberg.DataFile, error] {
-       return func(yield func(iceberg.DataFile, error) bool) {
-               defer func() {
-                       if r := recover(); r != nil {
-                               switch e := r.(type) {
-                               case string:
-                                       yield(nil, fmt.Errorf("error 
encountered during file conversion: %s", e))
-                               case error:
-                                       yield(nil, fmt.Errorf("error 
encountered during file conversion: %w", e))
-                               }
-                       }
-               }()
-
-               partitionSpec, err := meta.CurrentSpec()
-               if err != nil || partitionSpec == nil {
-                       yield(nil, fmt.Errorf("%w: cannot add files without a 
current spec", err))
-
-                       return
-               }
+func filesToDataFiles(ctx context.Context, fileIO iceio.IO, meta 
*MetadataBuilder, filePaths []string, concurrency int) (_ []iceberg.DataFile, 
err error) {
+       partitionSpec, err := meta.CurrentSpec()
+       if err != nil || partitionSpec == nil {
+               return nil, fmt.Errorf("%w: cannot add files without a current 
spec", err)
+       }
 
-               currentSchema, currentSpec := meta.CurrentSchema(), 
*partitionSpec
+       currentSchema, currentSpec := meta.CurrentSchema(), *partitionSpec
 
-               for filePath := range paths {
-                       format := tblutils.FormatFromFileName(filePath)
-                       rdr := must(format.Open(ctx, fileIO, filePath))
-                       // TODO: take a look at this defer Close() and consider 
refactoring
-                       defer rdr.Close()
+       dataFiles := make([]iceberg.DataFile, len(filePaths))
+       eg, ctx := errgroup.WithContext(ctx)
+       eg.SetLimit(concurrency)
+       for i, filePath := range filePaths {
+               eg.Go(func() (err error) {
+                       defer func() {
+                               if r := recover(); r != nil {
+                                       switch e := r.(type) {
+                                       case error:
+                                               err = fmt.Errorf("error 
encountered during file conversion: %w", e)
+                                       default:
+                                               err = fmt.Errorf("error 
encountered during file conversion: %v", e)
+                                       }
+                               }
+                       }()
 
-                       arrSchema := must(rdr.Schema())
+                       dataFiles[i] = fileToDataFile(ctx, fileIO, filePath, 
currentSchema, currentSpec, meta.props)
 
-                       if err := checkArrowSchemaCompat(currentSchema, 
arrSchema, false); err != nil {
-                               yield(nil, err)
+                       return nil
+               })
+       }
 
-                               return
-                       }
+       if err := eg.Wait(); err != nil {
+               return nil, err
+       }
 
-                       pathToIDSchema := currentSchema
-                       if fileHasIDs := must(VisitArrowSchema(arrSchema, 
hasIDs{})); fileHasIDs {
-                               pathToIDSchema = 
must(ArrowSchemaToIceberg(arrSchema, false, nil))
-                       }
+       return dataFiles, nil
+}
 
-                       statistics := 
format.DataFileStatsFromMeta(rdr.Metadata(), 
must(computeStatsPlan(currentSchema, meta.props)),
-                               must(format.PathToIDMapping(pathToIDSchema)))
+func fileToDataFile(ctx context.Context, fileIO iceio.IO, filePath string, 
currentSchema *iceberg.Schema, currentSpec iceberg.PartitionSpec, props 
iceberg.Properties) iceberg.DataFile {
+       format := tblutils.FormatFromFileName(filePath)
+       rdr := must(format.Open(ctx, fileIO, filePath))
+       defer rdr.Close()
 
-                       partitionValues := make(map[int]any)
-                       if !currentSpec.Equals(*iceberg.UnpartitionedSpec) {
-                               for _, field := range currentSpec.Fields() {
-                                       if !field.Transform.PreservesOrder() {
-                                               yield(nil, fmt.Errorf("cannot 
infer partition value from parquet metadata for a non-linear partition field: 
%s with transform %s", field.Name, field.Transform))
+       arrSchema := must(rdr.Schema())
+       if err := checkArrowSchemaCompat(currentSchema, arrSchema, false); err 
!= nil {
+               panic(err)
+       }
 
-                                               return
-                                       }
+       pathToIDSchema := currentSchema
+       if fileHasIDs := must(VisitArrowSchema(arrSchema, hasIDs{})); 
fileHasIDs {
+               pathToIDSchema = must(ArrowSchemaToIceberg(arrSchema, false, 
nil))
+       }
+       statistics := format.DataFileStatsFromMeta(
+               rdr.Metadata(),
+               must(computeStatsPlan(currentSchema, props)),
+               must(format.PathToIDMapping(pathToIDSchema)),
+       )
 
-                                       partitionVal := 
statistics.PartitionValue(field, currentSchema)
-                                       if partitionVal != nil {
-                                               partitionValues[field.FieldID] 
= partitionVal
-                                       }
-                               }
+       partitionValues := make(map[int]any)
+       if !currentSpec.Equals(*iceberg.UnpartitionedSpec) {
+               for _, field := range currentSpec.Fields() {
+                       if !field.Transform.PreservesOrder() {
+                               panic(fmt.Errorf("cannot infer partition value 
from parquet metadata for a non-linear partition field: %s with transform %s", 
field.Name, field.Transform))
                        }
 
-                       df := statistics.ToDataFile(currentSchema, currentSpec, 
filePath, iceberg.ParquetFile, iceberg.EntryContentData, rdr.SourceFileSize(), 
partitionValues)
-                       if !yield(df, nil) {
-                               return
+                       partitionVal := statistics.PartitionValue(field, 
currentSchema)
+                       if partitionVal != nil {
+                               partitionValues[field.FieldID] = partitionVal
                        }
                }
        }
+
+       return statistics.ToDataFile(currentSchema, currentSpec, filePath, 
iceberg.ParquetFile, iceberg.EntryContentData, rdr.SourceFileSize(), 
partitionValues)
 }
 
 func recordNBytes(rec arrow.RecordBatch) (total int64) {
diff --git a/table/table_test.go b/table/table_test.go
index 59927d1f..9f573343 100644
--- a/table/table_test.go
+++ b/table/table_test.go
@@ -596,7 +596,7 @@ func (t *TableWritingTestSuite) 
TestAddFilesFailsSchemaMismatch() {
        tx := tbl.NewTransaction()
        err = tx.AddFiles(t.ctx, files, nil, false)
        t.Error(err)
-       t.EqualError(err, `invalid schema: mismatch in fields:
+       t.EqualError(err, `error encountered during file conversion: invalid 
schema: mismatch in fields:
    | Table Field              | Requested Field         
 ✅ | 1: foo: optional boolean | 1: foo: optional boolean
 ✅ | 2: bar: optional string  | 2: bar: optional string 
diff --git a/table/transaction.go b/table/transaction.go
index a28457a3..655ee748 100644
--- a/table/transaction.go
+++ b/table/transaction.go
@@ -25,7 +25,6 @@ import (
        "fmt"
        "iter"
        "runtime"
-       "slices"
        "sync"
        "time"
 
@@ -421,12 +420,12 @@ func (t *Transaction) ReplaceDataFiles(ctx 
context.Context, filesToDelete, files
                updater.deleteDataFile(df)
        }
 
-       dataFiles := filesToDataFiles(ctx, fs, t.meta, 
slices.Values(filesToAdd))
-       for df, err := range dataFiles {
-               if err != nil {
-                       return err
-               }
-               updater.appendDataFile(df)
+       dataFiles, err := filesToDataFiles(ctx, fs, t.meta, filesToAdd, 1)
+       if err != nil {
+               return err
+       }
+       for _, dataFile := range dataFiles {
+               updater.appendDataFile(dataFile)
        }
 
        updates, reqs, err := updater.commit()
@@ -728,14 +727,36 @@ func (t *Transaction) ReplaceDataFilesWithDataFiles(ctx 
context.Context, filesTo
        return t.apply(updates, reqs)
 }
 
-func (t *Transaction) AddFiles(ctx context.Context, files []string, 
snapshotProps iceberg.Properties, ignoreDuplicates bool) error {
-       set := make(map[string]string)
-       for _, f := range files {
-               set[f] = f
+type AddFilesOption func(addFilesOp *addFilesOperation)
+
+type addFilesOperation struct {
+       concurrency int
+}
+
+// WithAddFilesConcurrency overwrites the default concurrency for add files 
operation.
+// Default: runtime.GOMAXPROCS(0)
+func WithAddFilesConcurrency(concurrency int) AddFilesOption {
+       return func(op *addFilesOperation) {
+               if concurrency > 0 {
+                       op.concurrency = concurrency
+               }
        }
+}
 
-       if len(set) != len(files) {
-               return errors.New("file paths must be unique for AddFiles")
+func (t *Transaction) AddFiles(ctx context.Context, filePaths []string, 
snapshotProps iceberg.Properties, ignoreDuplicates bool, opts 
...AddFilesOption) error {
+       addFilesOp := addFilesOperation{
+               concurrency: runtime.GOMAXPROCS(0),
+       }
+       for _, apply := range opts {
+               apply(&addFilesOp)
+       }
+
+       set := make(map[string]struct{}, len(filePaths))
+       for _, filePath := range filePaths {
+               if _, ok := set[filePath]; ok {
+                       return errors.New("file paths must be unique for 
AddFiles")
+               }
+               set[filePath] = struct{}{}
        }
 
        if !ignoreDuplicates {
@@ -779,12 +800,12 @@ func (t *Transaction) AddFiles(ctx context.Context, files 
[]string, snapshotProp
 
        updater := t.updateSnapshot(fs, snapshotProps, OpAppend).fastAppend()
 
-       dataFiles := filesToDataFiles(ctx, fs, t.meta, slices.Values(files))
-       for df, err := range dataFiles {
-               if err != nil {
-                       return err
-               }
-               updater.appendDataFile(df)
+       dataFiles, err := filesToDataFiles(ctx, fs, t.meta, filePaths, 
addFilesOp.concurrency)
+       if err != nil {
+               return err
+       }
+       for _, dataFile := range dataFiles {
+               updater.appendDataFile(dataFile)
        }
 
        updates, reqs, err := updater.commit()

Reply via email to