This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new a33fb54d refactor: Update Overwrite API to Use Options (#717)
a33fb54d is described below

commit a33fb54d81a1e6db6b7117963c1287cccb0a8d4f
Author: Alex Normand <[email protected]>
AuthorDate: Tue Feb 10 10:00:02 2026 -0800

    refactor: Update Overwrite API to Use Options (#717)
    
    ![little
    
change](https://media1.tenor.com/m/x5h2y8ZpkUYAAAAC/it-weill-change-a-little-bit-it-will-change.gif)
    
    This makes a few tweaks to the API for the [recently
    added](https://github.com/apache/iceberg-go/pull/674) `Overwrite` API to
    favor options for parameters that have a reasonable sensible default and
    migrates them to `OverwriteOptions`.
    
    Technically, it also fixes an issue that was introduced where the
    caseSensitive parameter was just ignored and hardcoded to `true` in the
    functions that support the `Overwrite` functionality but this wasn't the
    primary motivation on my end. I just happened to notice it as I was
    refactoring.
---
 table/table.go            |  18 ++++----
 table/table_test.go       |   4 +-
 table/transaction.go      | 113 ++++++++++++++++++++++++++++++++--------------
 table/transaction_test.go |   6 +--
 4 files changed, 93 insertions(+), 48 deletions(-)

diff --git a/table/table.go b/table/table.go
index 04f30777..e5e446ee 100644
--- a/table/table.go
+++ b/table/table.go
@@ -131,7 +131,7 @@ func (t Table) Append(ctx context.Context, rdr 
array.RecordReader, snapshotProps
 
 // OverwriteTable is a shortcut for NewTransaction().OverwriteTable() and then 
committing the transaction.
 //
-// The filter parameter determines which existing data to delete or rewrite:
+// An optional filter (see WithOverwriteFilter) determines which existing data 
to delete or rewrite:
 //   - If filter is nil or AlwaysTrue, all existing data files are deleted and 
replaced with new data.
 //   - If a filter is provided, it acts as a row-level predicate on existing 
data:
 //   - Files where all rows match the filter (strict match) are completely 
deleted
@@ -146,10 +146,11 @@ func (t Table) Append(ctx context.Context, rdr 
array.RecordReader, snapshotProps
 // New data from the provided table is written to the table regardless of the 
filter.
 //
 // The batchSize parameter refers to the batch size for reading the input 
data, not the batch size for writes.
-// The concurrency parameter controls the level of parallelism. If concurrency 
<= 0, defaults to runtime.GOMAXPROCS(0).
-func (t Table) OverwriteTable(ctx context.Context, tbl arrow.Table, batchSize 
int64, filter iceberg.BooleanExpression, caseSensitive bool, concurrency int, 
snapshotProps iceberg.Properties) (*Table, error) {
+// The concurrency parameter controls the level of parallelism for manifest 
processing and file rewriting and
+// can be overridden using the WithOverwriteConcurrency option. Defaults to 
runtime.GOMAXPROCS(0).
+func (t Table) OverwriteTable(ctx context.Context, tbl arrow.Table, batchSize 
int64, snapshotProps iceberg.Properties, opts ...OverwriteOption) (*Table, 
error) {
        txn := t.NewTransaction()
-       if err := txn.OverwriteTable(ctx, tbl, batchSize, filter, 
caseSensitive, concurrency, snapshotProps); err != nil {
+       if err := txn.OverwriteTable(ctx, tbl, batchSize, snapshotProps, 
opts...); err != nil {
                return nil, err
        }
 
@@ -158,7 +159,7 @@ func (t Table) OverwriteTable(ctx context.Context, tbl 
arrow.Table, batchSize in
 
 // Overwrite is a shortcut for NewTransaction().Overwrite() and then 
committing the transaction.
 //
-// The filter parameter determines which existing data to delete or rewrite:
+// An optional filter (see WithOverwriteFilter) determines which existing data 
to delete or rewrite:
 //   - If filter is nil or AlwaysTrue, all existing data files are deleted and 
replaced with new data.
 //   - If a filter is provided, it acts as a row-level predicate on existing 
data:
 //   - Files where all rows match the filter (strict match) are completely 
deleted
@@ -172,10 +173,11 @@ func (t Table) OverwriteTable(ctx context.Context, tbl 
arrow.Table, batchSize in
 //
 // New data from the provided RecordReader is written to the table regardless 
of the filter.
 //
-// The concurrency parameter controls the level of parallelism. If concurrency 
<= 0, defaults to runtime.GOMAXPROCS(0).
-func (t Table) Overwrite(ctx context.Context, rdr array.RecordReader, filter 
iceberg.BooleanExpression, caseSensitive bool, concurrency int, snapshotProps 
iceberg.Properties) (*Table, error) {
+// The concurrency parameter controls the level of parallelism for manifest 
processing and file rewriting and
+// can be overridden using the WithOverwriteConcurrency option. Defaults to 
runtime.GOMAXPROCS(0).
+func (t Table) Overwrite(ctx context.Context, rdr array.RecordReader, 
snapshotProps iceberg.Properties, opts ...OverwriteOption) (*Table, error) {
        txn := t.NewTransaction()
-       if err := txn.Overwrite(ctx, rdr, filter, caseSensitive, concurrency, 
snapshotProps); err != nil {
+       if err := txn.Overwrite(ctx, rdr, snapshotProps, opts...); err != nil {
                return nil, err
        }
 
diff --git a/table/table_test.go b/table/table_test.go
index a01a3f78..e98cf0ea 100644
--- a/table/table_test.go
+++ b/table/table_test.go
@@ -1628,7 +1628,7 @@ func (t *TableWritingTestSuite) TestOverwriteTable() {
        })
        t.Require().NoError(err)
        defer newTable.Release()
-       resultTbl, err := tbl.OverwriteTable(t.ctx, newTable, 1, nil, true, 0, 
nil)
+       resultTbl, err := tbl.OverwriteTable(t.ctx, newTable, 1, nil)
        t.Require().NoError(err)
        t.NotNil(resultTbl)
 
@@ -1653,7 +1653,7 @@ func (t *TableWritingTestSuite) TestOverwriteRecord() {
        defer rdr.Release()
 
        // Test that Table.Overwrite works (delegates to transaction)
-       resultTbl, err := tbl.Overwrite(t.ctx, rdr, nil, true, 0, nil)
+       resultTbl, err := tbl.Overwrite(t.ctx, rdr, nil, 
table.WithOverwriteConcurrency(1))
        t.Require().NoError(err)
        t.NotNil(resultTbl)
 
diff --git a/table/transaction.go b/table/transaction.go
index 1187eba6..6bf1fc00 100644
--- a/table/transaction.go
+++ b/table/transaction.go
@@ -500,9 +500,9 @@ func (t *Transaction) AddFiles(ctx context.Context, files 
[]string, snapshotProp
        return t.apply(updates, reqs)
 }
 
-// OverwriteTable overwrites the table data using an Arrow Table, optionally 
with a filter.
+// OverwriteTable overwrites the table data using an Arrow Table.
 //
-// The filter parameter determines which existing data to delete or rewrite:
+// An optional filter (see WithOverwriteFilter) determines which existing data 
to delete or rewrite:
 //   - If filter is nil or AlwaysTrue, all existing data files are deleted and 
replaced with new data.
 //   - If a filter is provided, it acts as a row-level predicate on existing 
data:
 //   - Files where all rows match the filter (strict match) are completely 
deleted
@@ -517,18 +517,58 @@ func (t *Transaction) AddFiles(ctx context.Context, files 
[]string, snapshotProp
 // New data from the provided table is written to the table regardless of the 
filter.
 //
 // The batchSize parameter refers to the batch size for reading the input 
data, not the batch size for writes.
-// The concurrency parameter controls the level of parallelism for manifest 
processing and file rewriting.
+// The concurrency parameter controls the level of parallelism for manifest 
processing and file rewriting and
+// can be overridden using the WithOverwriteConcurrency option.
 // If concurrency <= 0, defaults to runtime.GOMAXPROCS(0).
-func (t *Transaction) OverwriteTable(ctx context.Context, tbl arrow.Table, 
batchSize int64, filter iceberg.BooleanExpression, caseSensitive bool, 
concurrency int, snapshotProps iceberg.Properties) error {
+func (t *Transaction) OverwriteTable(ctx context.Context, tbl arrow.Table, 
batchSize int64, snapshotProps iceberg.Properties, opts ...OverwriteOption) 
error {
        rdr := array.NewTableReader(tbl, batchSize)
        defer rdr.Release()
 
-       return t.Overwrite(ctx, rdr, filter, caseSensitive, concurrency, 
snapshotProps)
+       return t.Overwrite(ctx, rdr, snapshotProps, opts...)
 }
 
-// Overwrite overwrites the table data using a RecordReader, optionally with a 
filter.
+type overwriteOperation struct {
+       concurrency   int
+       filter        iceberg.BooleanExpression
+       caseSensitive bool
+}
+
+// OverwriteOption applies options to overwrite operations
+type OverwriteOption func(op *overwriteOperation)
+
+// WithOverwriteConcurrency overwrites the default concurrency for overwrite 
operations.
+// Default: runtime.GOMAXPROCS(0)
+func WithOverwriteConcurrency(concurrency int) OverwriteOption {
+       return func(op *overwriteOperation) {
+               if concurrency <= 0 {
+                       op.concurrency = runtime.GOMAXPROCS(0)
+
+                       return
+               }
+               op.concurrency = concurrency
+       }
+}
+
+// WithOverwriteFilter overwrites the default deletion filter on overwrite 
operations.
+// Default: iceberg.AlwaysTrue
+func WithOverwriteFilter(filter iceberg.BooleanExpression) OverwriteOption {
+       return func(op *overwriteOperation) {
+               op.filter = filter
+       }
+}
+
+// WithOverwriteCaseInsensitive overwrites the default case sensitivity that 
applies on the binding of the filter.
+// Default: case sensitive
+// Note that the sensitivity only applies to the field name and not the 
evaluation of the literals on string fields.
+func WithOverwriteCaseInsensitive() OverwriteOption {
+       return func(op *overwriteOperation) {
+               op.caseSensitive = false
+       }
+}
+
+// Overwrite overwrites the table data using a RecordReader.
 //
-// The filter parameter determines which existing data to delete or rewrite:
+// An optional filter (see WithOverwriteFilter) determines which existing data 
to delete or rewrite:
 //   - If filter is nil or AlwaysTrue, all existing data files are deleted and 
replaced with new data.
 //   - If a filter is provided, it acts as a row-level predicate on existing 
data:
 //   - Files where all rows match the filter (strict match) are completely 
deleted
@@ -542,19 +582,24 @@ func (t *Transaction) OverwriteTable(ctx context.Context, 
tbl arrow.Table, batch
 //
 // New data from the provided RecordReader is written to the table regardless 
of the filter.
 //
-// The concurrency parameter controls the level of parallelism for manifest 
processing and file rewriting.
+// The concurrency parameter controls the level of parallelism for manifest 
processing and file rewriting and
+// can be overridden using the WithOverwriteConcurrency option.
 // If concurrency <= 0, defaults to runtime.GOMAXPROCS(0).
-func (t *Transaction) Overwrite(ctx context.Context, rdr array.RecordReader, 
filter iceberg.BooleanExpression, caseSensitive bool, concurrency int, 
snapshotProps iceberg.Properties) error {
+func (t *Transaction) Overwrite(ctx context.Context, rdr array.RecordReader, 
snapshotProps iceberg.Properties, opts ...OverwriteOption) error {
+       overwrite := overwriteOperation{
+               concurrency:   runtime.GOMAXPROCS(0),
+               filter:        iceberg.AlwaysTrue{},
+               caseSensitive: true,
+       }
+       for _, apply := range opts {
+               apply(&overwrite)
+       }
+
        fs, err := t.tbl.fsF(ctx)
        if err != nil {
                return err
        }
 
-       // Default concurrency if not specified
-       if concurrency <= 0 {
-               concurrency = runtime.GOMAXPROCS(0)
-       }
-
        if t.meta.NameMapping() == nil {
                nameMapping := t.meta.CurrentSchema().NameMapping()
                mappingJson, err := json.Marshal(nameMapping)
@@ -570,7 +615,7 @@ func (t *Transaction) Overwrite(ctx context.Context, rdr 
array.RecordReader, fil
        commitUUID := uuid.New()
        updater := t.updateSnapshot(fs, 
snapshotProps).mergeOverwrite(&commitUUID)
 
-       filesToDelete, filesToRewrite, err := t.classifyFilesForOverwrite(ctx, 
fs, filter, concurrency)
+       filesToDelete, filesToRewrite, err := t.classifyFilesForOverwrite(ctx, 
fs, overwrite.filter, overwrite.caseSensitive, overwrite.concurrency)
        if err != nil {
                return err
        }
@@ -580,7 +625,7 @@ func (t *Transaction) Overwrite(ctx context.Context, rdr 
array.RecordReader, fil
        }
 
        if len(filesToRewrite) > 0 {
-               if err := t.rewriteFilesWithFilter(ctx, fs, updater, 
filesToRewrite, filter, concurrency); err != nil {
+               if err := t.rewriteFilesWithFilter(ctx, fs, updater, 
filesToRewrite, overwrite.filter, overwrite.caseSensitive, 
overwrite.concurrency); err != nil {
                        return err
                }
        }
@@ -609,7 +654,7 @@ func (t *Transaction) Overwrite(ctx context.Context, rdr 
array.RecordReader, fil
 
 // classifyFilesForOverwrite classifies existing data files based on the 
provided filter.
 // Returns files to delete completely, files to rewrite partially, and any 
error.
-func (t *Transaction) classifyFilesForOverwrite(ctx context.Context, fs io.IO, 
filter iceberg.BooleanExpression, concurrency int) (filesToDelete, 
filesToRewrite []iceberg.DataFile, err error) {
+func (t *Transaction) classifyFilesForOverwrite(ctx context.Context, fs io.IO, 
filter iceberg.BooleanExpression, caseSensitive bool, concurrency int) 
(filesToDelete, filesToRewrite []iceberg.DataFile, err error) {
        s := t.meta.currentSnapshot()
        if s == nil {
                return nil, nil, nil
@@ -628,20 +673,20 @@ func (t *Transaction) classifyFilesForOverwrite(ctx 
context.Context, fs io.IO, f
                return filesToDelete, filesToRewrite, nil
        }
 
-       return t.classifyFilesForFilteredOverwrite(ctx, fs, filter, concurrency)
+       return t.classifyFilesForFilteredOverwrite(ctx, fs, filter, 
caseSensitive, concurrency)
 }
 
 // classifyFilesForFilteredOverwrite classifies files for filtered overwrite 
operations.
 // Returns files to delete completely, files to rewrite partially, and any 
error.
-func (t *Transaction) classifyFilesForFilteredOverwrite(ctx context.Context, 
fs io.IO, filter iceberg.BooleanExpression, concurrency int) (filesToDelete, 
filesToRewrite []iceberg.DataFile, err error) {
+func (t *Transaction) classifyFilesForFilteredOverwrite(ctx context.Context, 
fs io.IO, filter iceberg.BooleanExpression, caseSensitive bool, concurrency 
int) (filesToDelete, filesToRewrite []iceberg.DataFile, err error) {
        schema := t.meta.CurrentSchema()
 
-       inclusiveEvaluator, err := newInclusiveMetricsEvaluator(schema, filter, 
true, false)
+       inclusiveEvaluator, err := newInclusiveMetricsEvaluator(schema, filter, 
caseSensitive, false)
        if err != nil {
                return nil, nil, fmt.Errorf("failed to create inclusive metrics 
evaluator: %w", err)
        }
 
-       strictEvaluator, err := newStrictMetricsEvaluator(schema, filter, true, 
false)
+       strictEvaluator, err := newStrictMetricsEvaluator(schema, filter, 
caseSensitive, false)
        if err != nil {
                return nil, nil, fmt.Errorf("failed to create strict metrics 
evaluator: %w", err)
        }
@@ -653,21 +698,19 @@ func (t *Transaction) 
classifyFilesForFilteredOverwrite(ctx context.Context, fs
        }
        spec := meta.PartitionSpec()
        if !spec.IsUnpartitioned() {
-               manifestEval, err = newManifestEvaluator(spec, schema, filter, 
true)
+               manifestEval, err = newManifestEvaluator(spec, schema, filter, 
caseSensitive)
                if err != nil {
                        return nil, nil, fmt.Errorf("failed to create manifest 
evaluator: %w", err)
                }
        }
 
        s := t.meta.currentSnapshot()
-       manifests, err := s.Manifests(fs)
-       if err != nil {
-               return nil, nil, fmt.Errorf("failed to get manifests: %w", err)
-       }
-
-       type classifiedFiles struct {
-               toDelete  []iceberg.DataFile
-               toRewrite []iceberg.DataFile
+       var manifests []iceberg.ManifestFile
+       if s != nil {
+               manifests, err = s.Manifests(fs)
+               if err != nil {
+                       return nil, nil, fmt.Errorf("failed to get manifests: 
%w", err)
+               }
        }
 
        var (
@@ -750,13 +793,13 @@ func (t *Transaction) 
classifyFilesForFilteredOverwrite(ctx context.Context, fs
 }
 
 // rewriteFilesWithFilter rewrites data files by preserving only rows that do 
NOT match the filter
-func (t *Transaction) rewriteFilesWithFilter(ctx context.Context, fs io.IO, 
updater *snapshotProducer, files []iceberg.DataFile, filter 
iceberg.BooleanExpression, concurrency int) error {
+func (t *Transaction) rewriteFilesWithFilter(ctx context.Context, fs io.IO, 
updater *snapshotProducer, files []iceberg.DataFile, filter 
iceberg.BooleanExpression, caseSensitive bool, concurrency int) error {
        complementFilter := iceberg.NewNot(filter)
 
        for _, originalFile := range files {
                // Use a separate UUID for rewrite operations to avoid filename 
collisions with new data files
                rewriteUUID := uuid.New()
-               rewrittenFiles, err := t.rewriteSingleFile(ctx, fs, 
originalFile, complementFilter, rewriteUUID, concurrency)
+               rewrittenFiles, err := t.rewriteSingleFile(ctx, fs, 
originalFile, complementFilter, caseSensitive, rewriteUUID, concurrency)
                if err != nil {
                        return fmt.Errorf("failed to rewrite file %s: %w", 
originalFile.FilePath(), err)
                }
@@ -771,14 +814,14 @@ func (t *Transaction) rewriteFilesWithFilter(ctx 
context.Context, fs io.IO, upda
 }
 
 // rewriteSingleFile reads a single data file, applies the filter, and writes 
new files with filtered data
-func (t *Transaction) rewriteSingleFile(ctx context.Context, fs io.IO, 
originalFile iceberg.DataFile, filter iceberg.BooleanExpression, commitUUID 
uuid.UUID, concurrency int) ([]iceberg.DataFile, error) {
+func (t *Transaction) rewriteSingleFile(ctx context.Context, fs io.IO, 
originalFile iceberg.DataFile, filter iceberg.BooleanExpression, caseSensitive 
bool, commitUUID uuid.UUID, concurrency int) ([]iceberg.DataFile, error) {
        scanTask := &FileScanTask{
                File:   originalFile,
                Start:  0,
                Length: originalFile.FileSizeBytes(),
        }
 
-       boundFilter, err := iceberg.BindExpr(t.meta.CurrentSchema(), filter, 
true)
+       boundFilter, err := iceberg.BindExpr(t.meta.CurrentSchema(), filter, 
caseSensitive)
        if err != nil {
                return nil, fmt.Errorf("failed to bind filter: %w", err)
        }
@@ -793,7 +836,7 @@ func (t *Transaction) rewriteSingleFile(ctx 
context.Context, fs io.IO, originalF
                fs:              fs,
                projectedSchema: t.meta.CurrentSchema(),
                boundRowFilter:  boundFilter,
-               caseSensitive:   true,
+               caseSensitive:   caseSensitive,
                rowLimit:        -1, // No limit
                concurrency:     concurrency,
        }
diff --git a/table/transaction_test.go b/table/transaction_test.go
index d2655fa1..2a8f3bde 100644
--- a/table/transaction_test.go
+++ b/table/transaction_test.go
@@ -135,7 +135,7 @@ func (s *SparkIntegrationTestSuite) TestAddFile() {
        bldr.Field(2).(*array.Int32Builder).Append(13)
        bldr.Field(3).(*array.StringBuilder).Append("m")
 
-       rec := bldr.NewRecord()
+       rec := bldr.NewRecordBatch()
        defer rec.Release()
 
        fw, err := mustFS(s.T(), tbl).(iceio.WriteFileIO).Create(filename)
@@ -402,7 +402,7 @@ func (s *SparkIntegrationTestSuite) TestOverwriteBasic() {
        defer overwriteTable.Release()
 
        tx = tbl.NewTransaction()
-       err = tx.OverwriteTable(s.ctx, overwriteTable, 2, nil, true, 0, nil)
+       err = tx.OverwriteTable(s.ctx, overwriteTable, 2, nil)
        s.Require().NoError(err)
        _, err = tx.Commit(s.ctx)
        s.Require().NoError(err)
@@ -469,7 +469,7 @@ func (s *SparkIntegrationTestSuite) 
TestOverwriteWithFilter() {
 
        filter := iceberg.EqualTo(iceberg.Reference("foo"), true)
        tx = tbl.NewTransaction()
-       err = tx.OverwriteTable(s.ctx, overwriteTable, 1, filter, true, 0, nil)
+       err = tx.OverwriteTable(s.ctx, overwriteTable, 1, nil, 
table.WithOverwriteFilter(filter))
        s.Require().NoError(err)
        _, err = tx.Commit(s.ctx)
        s.Require().NoError(err)

Reply via email to