This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new a33fb54d refactor: Update Overwrite API to Use Options (#717)
a33fb54d is described below
commit a33fb54d81a1e6db6b7117963c1287cccb0a8d4f
Author: Alex Normand <[email protected]>
AuthorDate: Tue Feb 10 10:00:02 2026 -0800
refactor: Update Overwrite API to Use Options (#717)

This makes a few tweaks to the API for the [recently
added](https://github.com/apache/iceberg-go/pull/674) `Overwrite` API to
favor options for parameters that have a reasonable sensible default and
migrates them to `OverwriteOptions`.
Technically, it also fixes an issue that was introduced where the
caseSensitive parameter was just ignored and hardcoded to `true` in the
functions that support the `Overwrite` functionality but this wasn't the
primary motivation on my end. I just happened to notice it as I was
refactoring.
---
table/table.go | 18 ++++----
table/table_test.go | 4 +-
table/transaction.go | 113 ++++++++++++++++++++++++++++++++--------------
table/transaction_test.go | 6 +--
4 files changed, 93 insertions(+), 48 deletions(-)
diff --git a/table/table.go b/table/table.go
index 04f30777..e5e446ee 100644
--- a/table/table.go
+++ b/table/table.go
@@ -131,7 +131,7 @@ func (t Table) Append(ctx context.Context, rdr
array.RecordReader, snapshotProps
// OverwriteTable is a shortcut for NewTransaction().OverwriteTable() and then
committing the transaction.
//
-// The filter parameter determines which existing data to delete or rewrite:
+// An optional filter (see WithOverwriteFilter) determines which existing data
to delete or rewrite:
// - If filter is nil or AlwaysTrue, all existing data files are deleted and
replaced with new data.
// - If a filter is provided, it acts as a row-level predicate on existing
data:
// - Files where all rows match the filter (strict match) are completely
deleted
@@ -146,10 +146,11 @@ func (t Table) Append(ctx context.Context, rdr
array.RecordReader, snapshotProps
// New data from the provided table is written to the table regardless of the
filter.
//
// The batchSize parameter refers to the batch size for reading the input
data, not the batch size for writes.
-// The concurrency parameter controls the level of parallelism. If concurrency
<= 0, defaults to runtime.GOMAXPROCS(0).
-func (t Table) OverwriteTable(ctx context.Context, tbl arrow.Table, batchSize
int64, filter iceberg.BooleanExpression, caseSensitive bool, concurrency int,
snapshotProps iceberg.Properties) (*Table, error) {
+// The concurrency parameter controls the level of parallelism for manifest
processing and file rewriting and
+// can be overridden using the WithOverwriteConcurrency option. Defaults to
runtime.GOMAXPROCS(0).
+func (t Table) OverwriteTable(ctx context.Context, tbl arrow.Table, batchSize
int64, snapshotProps iceberg.Properties, opts ...OverwriteOption) (*Table,
error) {
txn := t.NewTransaction()
- if err := txn.OverwriteTable(ctx, tbl, batchSize, filter,
caseSensitive, concurrency, snapshotProps); err != nil {
+ if err := txn.OverwriteTable(ctx, tbl, batchSize, snapshotProps,
opts...); err != nil {
return nil, err
}
@@ -158,7 +159,7 @@ func (t Table) OverwriteTable(ctx context.Context, tbl
arrow.Table, batchSize in
// Overwrite is a shortcut for NewTransaction().Overwrite() and then
committing the transaction.
//
-// The filter parameter determines which existing data to delete or rewrite:
+// An optional filter (see WithOverwriteFilter) determines which existing data
to delete or rewrite:
// - If filter is nil or AlwaysTrue, all existing data files are deleted and
replaced with new data.
// - If a filter is provided, it acts as a row-level predicate on existing
data:
// - Files where all rows match the filter (strict match) are completely
deleted
@@ -172,10 +173,11 @@ func (t Table) OverwriteTable(ctx context.Context, tbl
arrow.Table, batchSize in
//
// New data from the provided RecordReader is written to the table regardless
of the filter.
//
-// The concurrency parameter controls the level of parallelism. If concurrency
<= 0, defaults to runtime.GOMAXPROCS(0).
-func (t Table) Overwrite(ctx context.Context, rdr array.RecordReader, filter
iceberg.BooleanExpression, caseSensitive bool, concurrency int, snapshotProps
iceberg.Properties) (*Table, error) {
+// The concurrency parameter controls the level of parallelism for manifest
processing and file rewriting and
+// can be overridden using the WithOverwriteConcurrency option. Defaults to
runtime.GOMAXPROCS(0).
+func (t Table) Overwrite(ctx context.Context, rdr array.RecordReader,
snapshotProps iceberg.Properties, opts ...OverwriteOption) (*Table, error) {
txn := t.NewTransaction()
- if err := txn.Overwrite(ctx, rdr, filter, caseSensitive, concurrency,
snapshotProps); err != nil {
+ if err := txn.Overwrite(ctx, rdr, snapshotProps, opts...); err != nil {
return nil, err
}
diff --git a/table/table_test.go b/table/table_test.go
index a01a3f78..e98cf0ea 100644
--- a/table/table_test.go
+++ b/table/table_test.go
@@ -1628,7 +1628,7 @@ func (t *TableWritingTestSuite) TestOverwriteTable() {
})
t.Require().NoError(err)
defer newTable.Release()
- resultTbl, err := tbl.OverwriteTable(t.ctx, newTable, 1, nil, true, 0,
nil)
+ resultTbl, err := tbl.OverwriteTable(t.ctx, newTable, 1, nil)
t.Require().NoError(err)
t.NotNil(resultTbl)
@@ -1653,7 +1653,7 @@ func (t *TableWritingTestSuite) TestOverwriteRecord() {
defer rdr.Release()
// Test that Table.Overwrite works (delegates to transaction)
- resultTbl, err := tbl.Overwrite(t.ctx, rdr, nil, true, 0, nil)
+ resultTbl, err := tbl.Overwrite(t.ctx, rdr, nil,
table.WithOverwriteConcurrency(1))
t.Require().NoError(err)
t.NotNil(resultTbl)
diff --git a/table/transaction.go b/table/transaction.go
index 1187eba6..6bf1fc00 100644
--- a/table/transaction.go
+++ b/table/transaction.go
@@ -500,9 +500,9 @@ func (t *Transaction) AddFiles(ctx context.Context, files
[]string, snapshotProp
return t.apply(updates, reqs)
}
-// OverwriteTable overwrites the table data using an Arrow Table, optionally
with a filter.
+// OverwriteTable overwrites the table data using an Arrow Table.
//
-// The filter parameter determines which existing data to delete or rewrite:
+// An optional filter (see WithOverwriteFilter) determines which existing data
to delete or rewrite:
// - If filter is nil or AlwaysTrue, all existing data files are deleted and
replaced with new data.
// - If a filter is provided, it acts as a row-level predicate on existing
data:
// - Files where all rows match the filter (strict match) are completely
deleted
@@ -517,18 +517,58 @@ func (t *Transaction) AddFiles(ctx context.Context, files
[]string, snapshotProp
// New data from the provided table is written to the table regardless of the
filter.
//
// The batchSize parameter refers to the batch size for reading the input
data, not the batch size for writes.
-// The concurrency parameter controls the level of parallelism for manifest
processing and file rewriting.
+// The concurrency parameter controls the level of parallelism for manifest
processing and file rewriting and
+// can be overridden using the WithOverwriteConcurrency option.
// If concurrency <= 0, defaults to runtime.GOMAXPROCS(0).
-func (t *Transaction) OverwriteTable(ctx context.Context, tbl arrow.Table,
batchSize int64, filter iceberg.BooleanExpression, caseSensitive bool,
concurrency int, snapshotProps iceberg.Properties) error {
+func (t *Transaction) OverwriteTable(ctx context.Context, tbl arrow.Table,
batchSize int64, snapshotProps iceberg.Properties, opts ...OverwriteOption)
error {
rdr := array.NewTableReader(tbl, batchSize)
defer rdr.Release()
- return t.Overwrite(ctx, rdr, filter, caseSensitive, concurrency,
snapshotProps)
+ return t.Overwrite(ctx, rdr, snapshotProps, opts...)
}
-// Overwrite overwrites the table data using a RecordReader, optionally with a
filter.
+type overwriteOperation struct {
+ concurrency int
+ filter iceberg.BooleanExpression
+ caseSensitive bool
+}
+
+// OverwriteOption applies options to overwrite operations
+type OverwriteOption func(op *overwriteOperation)
+
+// WithOverwriteConcurrency overwrites the default concurrency for overwrite
operations.
+// Default: runtime.GOMAXPROCS(0)
+func WithOverwriteConcurrency(concurrency int) OverwriteOption {
+ return func(op *overwriteOperation) {
+ if concurrency <= 0 {
+ op.concurrency = runtime.GOMAXPROCS(0)
+
+ return
+ }
+ op.concurrency = concurrency
+ }
+}
+
+// WithOverwriteFilter overwrites the default deletion filter on overwrite
operations.
+// Default: iceberg.AlwaysTrue
+func WithOverwriteFilter(filter iceberg.BooleanExpression) OverwriteOption {
+ return func(op *overwriteOperation) {
+ op.filter = filter
+ }
+}
+
+// WithOverwriteCaseInsensitive overwrites the default case sensitivity that
applies on the binding of the filter.
+// Default: case sensitive
+// Note that the sensitivity only applies to the field name and not the
evaluation of the literals on string fields.
+func WithOverwriteCaseInsensitive() OverwriteOption {
+ return func(op *overwriteOperation) {
+ op.caseSensitive = false
+ }
+}
+
+// Overwrite overwrites the table data using a RecordReader.
//
-// The filter parameter determines which existing data to delete or rewrite:
+// An optional filter (see WithOverwriteFilter) determines which existing data
to delete or rewrite:
// - If filter is nil or AlwaysTrue, all existing data files are deleted and
replaced with new data.
// - If a filter is provided, it acts as a row-level predicate on existing
data:
// - Files where all rows match the filter (strict match) are completely
deleted
@@ -542,19 +582,24 @@ func (t *Transaction) OverwriteTable(ctx context.Context,
tbl arrow.Table, batch
//
// New data from the provided RecordReader is written to the table regardless
of the filter.
//
-// The concurrency parameter controls the level of parallelism for manifest
processing and file rewriting.
+// The concurrency parameter controls the level of parallelism for manifest
processing and file rewriting and
+// can be overridden using the WithOverwriteConcurrency option.
// If concurrency <= 0, defaults to runtime.GOMAXPROCS(0).
-func (t *Transaction) Overwrite(ctx context.Context, rdr array.RecordReader,
filter iceberg.BooleanExpression, caseSensitive bool, concurrency int,
snapshotProps iceberg.Properties) error {
+func (t *Transaction) Overwrite(ctx context.Context, rdr array.RecordReader,
snapshotProps iceberg.Properties, opts ...OverwriteOption) error {
+ overwrite := overwriteOperation{
+ concurrency: runtime.GOMAXPROCS(0),
+ filter: iceberg.AlwaysTrue{},
+ caseSensitive: true,
+ }
+ for _, apply := range opts {
+ apply(&overwrite)
+ }
+
fs, err := t.tbl.fsF(ctx)
if err != nil {
return err
}
- // Default concurrency if not specified
- if concurrency <= 0 {
- concurrency = runtime.GOMAXPROCS(0)
- }
-
if t.meta.NameMapping() == nil {
nameMapping := t.meta.CurrentSchema().NameMapping()
mappingJson, err := json.Marshal(nameMapping)
@@ -570,7 +615,7 @@ func (t *Transaction) Overwrite(ctx context.Context, rdr
array.RecordReader, fil
commitUUID := uuid.New()
updater := t.updateSnapshot(fs,
snapshotProps).mergeOverwrite(&commitUUID)
- filesToDelete, filesToRewrite, err := t.classifyFilesForOverwrite(ctx,
fs, filter, concurrency)
+ filesToDelete, filesToRewrite, err := t.classifyFilesForOverwrite(ctx,
fs, overwrite.filter, overwrite.caseSensitive, overwrite.concurrency)
if err != nil {
return err
}
@@ -580,7 +625,7 @@ func (t *Transaction) Overwrite(ctx context.Context, rdr
array.RecordReader, fil
}
if len(filesToRewrite) > 0 {
- if err := t.rewriteFilesWithFilter(ctx, fs, updater,
filesToRewrite, filter, concurrency); err != nil {
+ if err := t.rewriteFilesWithFilter(ctx, fs, updater,
filesToRewrite, overwrite.filter, overwrite.caseSensitive,
overwrite.concurrency); err != nil {
return err
}
}
@@ -609,7 +654,7 @@ func (t *Transaction) Overwrite(ctx context.Context, rdr
array.RecordReader, fil
// classifyFilesForOverwrite classifies existing data files based on the
provided filter.
// Returns files to delete completely, files to rewrite partially, and any
error.
-func (t *Transaction) classifyFilesForOverwrite(ctx context.Context, fs io.IO,
filter iceberg.BooleanExpression, concurrency int) (filesToDelete,
filesToRewrite []iceberg.DataFile, err error) {
+func (t *Transaction) classifyFilesForOverwrite(ctx context.Context, fs io.IO,
filter iceberg.BooleanExpression, caseSensitive bool, concurrency int)
(filesToDelete, filesToRewrite []iceberg.DataFile, err error) {
s := t.meta.currentSnapshot()
if s == nil {
return nil, nil, nil
@@ -628,20 +673,20 @@ func (t *Transaction) classifyFilesForOverwrite(ctx
context.Context, fs io.IO, f
return filesToDelete, filesToRewrite, nil
}
- return t.classifyFilesForFilteredOverwrite(ctx, fs, filter, concurrency)
+ return t.classifyFilesForFilteredOverwrite(ctx, fs, filter,
caseSensitive, concurrency)
}
// classifyFilesForFilteredOverwrite classifies files for filtered overwrite
operations.
// Returns files to delete completely, files to rewrite partially, and any
error.
-func (t *Transaction) classifyFilesForFilteredOverwrite(ctx context.Context,
fs io.IO, filter iceberg.BooleanExpression, concurrency int) (filesToDelete,
filesToRewrite []iceberg.DataFile, err error) {
+func (t *Transaction) classifyFilesForFilteredOverwrite(ctx context.Context,
fs io.IO, filter iceberg.BooleanExpression, caseSensitive bool, concurrency
int) (filesToDelete, filesToRewrite []iceberg.DataFile, err error) {
schema := t.meta.CurrentSchema()
- inclusiveEvaluator, err := newInclusiveMetricsEvaluator(schema, filter,
true, false)
+ inclusiveEvaluator, err := newInclusiveMetricsEvaluator(schema, filter,
caseSensitive, false)
if err != nil {
return nil, nil, fmt.Errorf("failed to create inclusive metrics
evaluator: %w", err)
}
- strictEvaluator, err := newStrictMetricsEvaluator(schema, filter, true,
false)
+ strictEvaluator, err := newStrictMetricsEvaluator(schema, filter,
caseSensitive, false)
if err != nil {
return nil, nil, fmt.Errorf("failed to create strict metrics
evaluator: %w", err)
}
@@ -653,21 +698,19 @@ func (t *Transaction)
classifyFilesForFilteredOverwrite(ctx context.Context, fs
}
spec := meta.PartitionSpec()
if !spec.IsUnpartitioned() {
- manifestEval, err = newManifestEvaluator(spec, schema, filter,
true)
+ manifestEval, err = newManifestEvaluator(spec, schema, filter,
caseSensitive)
if err != nil {
return nil, nil, fmt.Errorf("failed to create manifest
evaluator: %w", err)
}
}
s := t.meta.currentSnapshot()
- manifests, err := s.Manifests(fs)
- if err != nil {
- return nil, nil, fmt.Errorf("failed to get manifests: %w", err)
- }
-
- type classifiedFiles struct {
- toDelete []iceberg.DataFile
- toRewrite []iceberg.DataFile
+ var manifests []iceberg.ManifestFile
+ if s != nil {
+ manifests, err = s.Manifests(fs)
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to get manifests:
%w", err)
+ }
}
var (
@@ -750,13 +793,13 @@ func (t *Transaction)
classifyFilesForFilteredOverwrite(ctx context.Context, fs
}
// rewriteFilesWithFilter rewrites data files by preserving only rows that do
NOT match the filter
-func (t *Transaction) rewriteFilesWithFilter(ctx context.Context, fs io.IO,
updater *snapshotProducer, files []iceberg.DataFile, filter
iceberg.BooleanExpression, concurrency int) error {
+func (t *Transaction) rewriteFilesWithFilter(ctx context.Context, fs io.IO,
updater *snapshotProducer, files []iceberg.DataFile, filter
iceberg.BooleanExpression, caseSensitive bool, concurrency int) error {
complementFilter := iceberg.NewNot(filter)
for _, originalFile := range files {
// Use a separate UUID for rewrite operations to avoid filename
collisions with new data files
rewriteUUID := uuid.New()
- rewrittenFiles, err := t.rewriteSingleFile(ctx, fs,
originalFile, complementFilter, rewriteUUID, concurrency)
+ rewrittenFiles, err := t.rewriteSingleFile(ctx, fs,
originalFile, complementFilter, caseSensitive, rewriteUUID, concurrency)
if err != nil {
return fmt.Errorf("failed to rewrite file %s: %w",
originalFile.FilePath(), err)
}
@@ -771,14 +814,14 @@ func (t *Transaction) rewriteFilesWithFilter(ctx
context.Context, fs io.IO, upda
}
// rewriteSingleFile reads a single data file, applies the filter, and writes
new files with filtered data
-func (t *Transaction) rewriteSingleFile(ctx context.Context, fs io.IO,
originalFile iceberg.DataFile, filter iceberg.BooleanExpression, commitUUID
uuid.UUID, concurrency int) ([]iceberg.DataFile, error) {
+func (t *Transaction) rewriteSingleFile(ctx context.Context, fs io.IO,
originalFile iceberg.DataFile, filter iceberg.BooleanExpression, caseSensitive
bool, commitUUID uuid.UUID, concurrency int) ([]iceberg.DataFile, error) {
scanTask := &FileScanTask{
File: originalFile,
Start: 0,
Length: originalFile.FileSizeBytes(),
}
- boundFilter, err := iceberg.BindExpr(t.meta.CurrentSchema(), filter,
true)
+ boundFilter, err := iceberg.BindExpr(t.meta.CurrentSchema(), filter,
caseSensitive)
if err != nil {
return nil, fmt.Errorf("failed to bind filter: %w", err)
}
@@ -793,7 +836,7 @@ func (t *Transaction) rewriteSingleFile(ctx
context.Context, fs io.IO, originalF
fs: fs,
projectedSchema: t.meta.CurrentSchema(),
boundRowFilter: boundFilter,
- caseSensitive: true,
+ caseSensitive: caseSensitive,
rowLimit: -1, // No limit
concurrency: concurrency,
}
diff --git a/table/transaction_test.go b/table/transaction_test.go
index d2655fa1..2a8f3bde 100644
--- a/table/transaction_test.go
+++ b/table/transaction_test.go
@@ -135,7 +135,7 @@ func (s *SparkIntegrationTestSuite) TestAddFile() {
bldr.Field(2).(*array.Int32Builder).Append(13)
bldr.Field(3).(*array.StringBuilder).Append("m")
- rec := bldr.NewRecord()
+ rec := bldr.NewRecordBatch()
defer rec.Release()
fw, err := mustFS(s.T(), tbl).(iceio.WriteFileIO).Create(filename)
@@ -402,7 +402,7 @@ func (s *SparkIntegrationTestSuite) TestOverwriteBasic() {
defer overwriteTable.Release()
tx = tbl.NewTransaction()
- err = tx.OverwriteTable(s.ctx, overwriteTable, 2, nil, true, 0, nil)
+ err = tx.OverwriteTable(s.ctx, overwriteTable, 2, nil)
s.Require().NoError(err)
_, err = tx.Commit(s.ctx)
s.Require().NoError(err)
@@ -469,7 +469,7 @@ func (s *SparkIntegrationTestSuite)
TestOverwriteWithFilter() {
filter := iceberg.EqualTo(iceberg.Reference("foo"), true)
tx = tbl.NewTransaction()
- err = tx.OverwriteTable(s.ctx, overwriteTable, 1, filter, true, 0, nil)
+ err = tx.OverwriteTable(s.ctx, overwriteTable, 1, nil,
table.WithOverwriteFilter(filter))
s.Require().NoError(err)
_, err = tx.Commit(s.ctx)
s.Require().NoError(err)