This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new c2c15330 feat: table overwrite functionality (#674)
c2c15330 is described below
commit c2c15330325e7b8a7cdfe51cd441a54944b34828
Author: Arun Donti <[email protected]>
AuthorDate: Thu Feb 5 19:00:17 2026 -0500
feat: table overwrite functionality (#674)
Resolves #668
There are quite a few things I'm not too sure about with this
implementation (like the filtering logic or if I've properly tested it)
---------
Co-authored-by: Arun Donti <[email protected]>
Co-authored-by: Matt Topol <[email protected]>
---
README.md | 2 +-
internal/recipe/validation.py | 36 ++++-
table/table.go | 53 +++++++
table/table_test.go | 48 +++++-
table/transaction.go | 339 ++++++++++++++++++++++++++++++++++++++++++
table/transaction_test.go | 134 +++++++++++++++++
6 files changed, 607 insertions(+), 5 deletions(-)
diff --git a/README.md b/README.md
index 451aeb8a..d6cb2416 100644
--- a/README.md
+++ b/README.md
@@ -99,7 +99,7 @@ the table, the following tracks the current write support:
| Append Data Files | X |
| Rewrite Files | |
| Rewrite manifests | |
-| Overwrite Files | |
+| Overwrite Files | X |
| Write Pos Delete | |
| Write Eq Delete | |
| Row Delta | |
diff --git a/internal/recipe/validation.py b/internal/recipe/validation.py
index 1ac9421a..7c1b2ecb 100644
--- a/internal/recipe/validation.py
+++ b/internal/recipe/validation.py
@@ -26,16 +26,40 @@ def testSetProperties():
def testAddedFile():
- spark.sql("SELECT COUNT(*) FROM
default.test_partitioned_by_days").show(truncate=False)
+ spark.sql("SELECT COUNT(*) FROM default.test_partitioned_by_days").show(
+ truncate=False
+ )
def testReadDifferentDataTypes():
- spark.sql("DESCRIBE TABLE EXTENDED
default.go_test_different_data_types").show(truncate=False)
+ spark.sql("DESCRIBE TABLE EXTENDED
default.go_test_different_data_types").show(
+ truncate=False
+ )
spark.sql("SELECT * FROM
default.go_test_different_data_types").show(truncate=False)
def testReadSpecUpdate():
- spark.sql("DESCRIBE TABLE EXTENDED
default.go_test_update_spec").show(truncate=False)
+ spark.sql("DESCRIBE TABLE EXTENDED default.go_test_update_spec").show(
+ truncate=False
+ )
+
+
+def testOverwriteBasic():
+ spark.sql("SELECT COUNT(*) FROM default.go_test_overwrite_basic").show(
+ truncate=False
+ )
+ spark.sql("SELECT * FROM default.go_test_overwrite_basic ORDER BY
baz").show(
+ truncate=False
+ )
+
+
+def testOverwriteWithFilter():
+ spark.sql("SELECT COUNT(*) FROM default.go_test_overwrite_filter").show(
+ truncate=False
+ )
+ spark.sql("SELECT * FROM default.go_test_overwrite_filter ORDER BY
baz").show(
+ truncate=False
+ )
if __name__ == "__main__":
@@ -54,3 +78,9 @@ if __name__ == "__main__":
if args.test == "TestReadSpecUpdate":
testReadSpecUpdate()
+
+ if args.test == "TestOverwriteBasic":
+ testOverwriteBasic()
+
+ if args.test == "TestOverwriteWithFilter":
+ testOverwriteWithFilter()
diff --git a/table/table.go b/table/table.go
index 8abf6fb8..04f30777 100644
--- a/table/table.go
+++ b/table/table.go
@@ -129,6 +129,59 @@ func (t Table) Append(ctx context.Context, rdr
array.RecordReader, snapshotProps
return txn.Commit(ctx)
}
+// OverwriteTable is a shortcut for NewTransaction().OverwriteTable() and then
committing the transaction.
+//
+// The filter parameter determines which existing data to delete or rewrite:
+// - If filter is nil or AlwaysTrue, all existing data files are deleted and
replaced with new data.
+// - If a filter is provided, it acts as a row-level predicate on existing
data:
+// - Files where all rows match the filter (strict match) are completely
deleted
+// - Files where some rows match and others don't (partial match) are
rewritten to keep only non-matching rows
+// - Files where no rows match the filter are kept unchanged
+//
+// The filter uses both inclusive and strict metrics evaluators on file
statistics to classify files:
+// - Inclusive evaluator identifies candidate files that may contain
matching rows
+// - Strict evaluator determines if all rows in a file must match the filter
+// - Files that pass inclusive but not strict evaluation are rewritten with
filtered data
+//
+// New data from the provided table is written to the table regardless of the
filter.
+//
+// The batchSize parameter refers to the batch size for reading the input
data, not the batch size for writes.
+// The concurrency parameter controls the level of parallelism. If concurrency
<= 0, defaults to runtime.GOMAXPROCS(0).
+func (t Table) OverwriteTable(ctx context.Context, tbl arrow.Table, batchSize
int64, filter iceberg.BooleanExpression, caseSensitive bool, concurrency int,
snapshotProps iceberg.Properties) (*Table, error) {
+ txn := t.NewTransaction()
+ if err := txn.OverwriteTable(ctx, tbl, batchSize, filter,
caseSensitive, concurrency, snapshotProps); err != nil {
+ return nil, err
+ }
+
+ return txn.Commit(ctx)
+}
+
+// Overwrite is a shortcut for NewTransaction().Overwrite() and then
committing the transaction.
+//
+// The filter parameter determines which existing data to delete or rewrite:
+// - If filter is nil or AlwaysTrue, all existing data files are deleted and
replaced with new data.
+// - If a filter is provided, it acts as a row-level predicate on existing
data:
+// - Files where all rows match the filter (strict match) are completely
deleted
+// - Files where some rows match and others don't (partial match) are
rewritten to keep only non-matching rows
+// - Files where no rows match the filter are kept unchanged
+//
+// The filter uses both inclusive and strict metrics evaluators on file
statistics to classify files:
+// - Inclusive evaluator identifies candidate files that may contain
matching rows
+// - Strict evaluator determines if all rows in a file must match the filter
+// - Files that pass inclusive but not strict evaluation are rewritten with
filtered data
+//
+// New data from the provided RecordReader is written to the table regardless
of the filter.
+//
+// The concurrency parameter controls the level of parallelism. If concurrency
<= 0, defaults to runtime.GOMAXPROCS(0).
+func (t Table) Overwrite(ctx context.Context, rdr array.RecordReader, filter
iceberg.BooleanExpression, caseSensitive bool, concurrency int, snapshotProps
iceberg.Properties) (*Table, error) {
+ txn := t.NewTransaction()
+ if err := txn.Overwrite(ctx, rdr, filter, caseSensitive, concurrency,
snapshotProps); err != nil {
+ return nil, err
+ }
+
+ return txn.Commit(ctx)
+}
+
func (t Table) AllManifests(ctx context.Context)
iter.Seq2[iceberg.ManifestFile, error] {
fs, err := t.fsF(ctx)
if err != nil {
diff --git a/table/table_test.go b/table/table_test.go
index addc9346..a01a3f78 100644
--- a/table/table_test.go
+++ b/table/table_test.go
@@ -368,7 +368,7 @@ func (t *TableWritingTestSuite) createTable(identifier
table.Identifier, formatV
func(ctx context.Context) (iceio.IO, error) {
return iceio.LocalFS{}, nil
},
- nil,
+ &mockedCatalog{meta},
)
}
@@ -818,6 +818,8 @@ func (t *TableWritingTestSuite)
TestAddFilesReferencedCurrentSnapshotIgnoreDupli
t.Equal([]int32{0, 0, 0}, deleted)
}
+// mockedCatalog is necessary for overwrite operations to simulate catalog
behavior
+// during transaction commits without requiring a real catalog implementation.
type mockedCatalog struct {
metadata table.Metadata
}
@@ -1617,6 +1619,50 @@ func (t *TableWritingTestSuite) TestMergeManifests() {
t.True(array.TableEqual(resultB, resultC), "expected:\n %s\ngot:\n %s",
resultB, resultC)
}
+// TestOverwriteTable verifies that Table.OverwriteTable properly delegates to
Transaction.OverwriteTable
+func (t *TableWritingTestSuite) TestOverwriteTable() {
+ ident := table.Identifier{"default", "overwrite_table_wrapper_v" +
strconv.Itoa(t.formatVersion)}
+ tbl := t.createTable(ident, t.formatVersion,
*iceberg.UnpartitionedSpec, t.tableSchema)
+ newTable, err := array.TableFromJSON(memory.DefaultAllocator,
t.arrSchema, []string{
+ `[{"foo": false, "bar": "wrapper_test", "baz": 123, "qux":
"2024-01-01"}]`,
+ })
+ t.Require().NoError(err)
+ defer newTable.Release()
+ resultTbl, err := tbl.OverwriteTable(t.ctx, newTable, 1, nil, true, 0,
nil)
+ t.Require().NoError(err)
+ t.NotNil(resultTbl)
+
+ snapshot := resultTbl.CurrentSnapshot()
+ t.NotNil(snapshot)
+ t.Equal(table.OpAppend, snapshot.Summary.Operation) // Empty table
overwrite becomes append
+}
+
+// TestOverwriteRecord verifies that Table.Overwrite properly delegates to
Transaction.Overwrite
+func (t *TableWritingTestSuite) TestOverwriteRecord() {
+ ident := table.Identifier{"default", "overwrite_record_wrapper_v" +
strconv.Itoa(t.formatVersion)}
+ tbl := t.createTable(ident, t.formatVersion,
*iceberg.UnpartitionedSpec, t.tableSchema)
+
+ // Create test data as RecordReader
+ testTable, err := array.TableFromJSON(memory.DefaultAllocator,
t.arrSchema, []string{
+ `[{"foo": true, "bar": "record_test", "baz": 456, "qux":
"2024-01-02"}]`,
+ })
+ t.Require().NoError(err)
+ defer testTable.Release()
+
+ rdr := array.NewTableReader(testTable, 1)
+ defer rdr.Release()
+
+ // Test that Table.Overwrite works (delegates to transaction)
+ resultTbl, err := tbl.Overwrite(t.ctx, rdr, nil, true, 0, nil)
+ t.Require().NoError(err)
+ t.NotNil(resultTbl)
+
+ // Verify the operation worked
+ snapshot := resultTbl.CurrentSnapshot()
+ t.NotNil(snapshot)
+ t.Equal(table.OpAppend, snapshot.Summary.Operation) // Empty table
overwrite becomes append
+}
+
func TestTableWriting(t *testing.T) {
suite.Run(t, &TableWritingTestSuite{formatVersion: 1})
suite.Run(t, &TableWritingTestSuite{formatVersion: 2})
diff --git a/table/transaction.go b/table/transaction.go
index e3a2fe90..1187eba6 100644
--- a/table/transaction.go
+++ b/table/transaction.go
@@ -33,6 +33,7 @@ import (
"github.com/apache/iceberg-go"
"github.com/apache/iceberg-go/io"
"github.com/google/uuid"
+ "golang.org/x/sync/errgroup"
)
type snapshotUpdate struct {
@@ -499,6 +500,344 @@ func (t *Transaction) AddFiles(ctx context.Context, files
[]string, snapshotProp
return t.apply(updates, reqs)
}
+// OverwriteTable overwrites the table data using an Arrow Table, optionally
with a filter.
+//
+// The filter parameter determines which existing data to delete or rewrite:
+// - If filter is nil or AlwaysTrue, all existing data files are deleted and
replaced with new data.
+// - If a filter is provided, it acts as a row-level predicate on existing
data:
+// - Files where all rows match the filter (strict match) are completely
deleted
+// - Files where some rows match and others don't (partial match) are
rewritten to keep only non-matching rows
+// - Files where no rows match the filter are kept unchanged
+//
+// The filter uses both inclusive and strict metrics evaluators on file
statistics to classify files:
+// - Inclusive evaluator identifies candidate files that may contain
matching rows
+// - Strict evaluator determines if all rows in a file must match the filter
+// - Files that pass inclusive but not strict evaluation are rewritten with
filtered data
+//
+// New data from the provided table is written to the table regardless of the
filter.
+//
+// The batchSize parameter refers to the batch size for reading the input
data, not the batch size for writes.
+// The concurrency parameter controls the level of parallelism for manifest
processing and file rewriting.
+// If concurrency <= 0, defaults to runtime.GOMAXPROCS(0).
+func (t *Transaction) OverwriteTable(ctx context.Context, tbl arrow.Table,
batchSize int64, filter iceberg.BooleanExpression, caseSensitive bool,
concurrency int, snapshotProps iceberg.Properties) error {
+ rdr := array.NewTableReader(tbl, batchSize)
+ defer rdr.Release()
+
+ return t.Overwrite(ctx, rdr, filter, caseSensitive, concurrency,
snapshotProps)
+}
+
+// Overwrite overwrites the table data using a RecordReader, optionally with a
filter.
+//
+// The filter parameter determines which existing data to delete or rewrite:
+// - If filter is nil or AlwaysTrue, all existing data files are deleted and
replaced with new data.
+// - If a filter is provided, it acts as a row-level predicate on existing
data:
+// - Files where all rows match the filter (strict match) are completely
deleted
+// - Files where some rows match and others don't (partial match) are
rewritten to keep only non-matching rows
+// - Files where no rows match the filter are kept unchanged
+//
+// The filter uses both inclusive and strict metrics evaluators on file
statistics to classify files:
+// - Inclusive evaluator identifies candidate files that may contain
matching rows
+// - Strict evaluator determines if all rows in a file must match the filter
+// - Files that pass inclusive but not strict evaluation are rewritten with
filtered data
+//
+// New data from the provided RecordReader is written to the table regardless
of the filter.
+//
+// The concurrency parameter controls the level of parallelism for manifest
processing and file rewriting.
+// If concurrency <= 0, defaults to runtime.GOMAXPROCS(0).
+func (t *Transaction) Overwrite(ctx context.Context, rdr array.RecordReader,
filter iceberg.BooleanExpression, caseSensitive bool, concurrency int,
snapshotProps iceberg.Properties) error {
+ fs, err := t.tbl.fsF(ctx)
+ if err != nil {
+ return err
+ }
+
+ // Default concurrency if not specified
+ if concurrency <= 0 {
+ concurrency = runtime.GOMAXPROCS(0)
+ }
+
+ if t.meta.NameMapping() == nil {
+ nameMapping := t.meta.CurrentSchema().NameMapping()
+ mappingJson, err := json.Marshal(nameMapping)
+ if err != nil {
+ return err
+ }
+ err = t.SetProperties(iceberg.Properties{DefaultNameMappingKey:
string(mappingJson)})
+ if err != nil {
+ return err
+ }
+ }
+
+ commitUUID := uuid.New()
+ updater := t.updateSnapshot(fs,
snapshotProps).mergeOverwrite(&commitUUID)
+
+ filesToDelete, filesToRewrite, err := t.classifyFilesForOverwrite(ctx,
fs, filter, concurrency)
+ if err != nil {
+ return err
+ }
+
+ for _, df := range filesToDelete {
+ updater.deleteDataFile(df)
+ }
+
+ if len(filesToRewrite) > 0 {
+ if err := t.rewriteFilesWithFilter(ctx, fs, updater,
filesToRewrite, filter, concurrency); err != nil {
+ return err
+ }
+ }
+
+ itr := recordsToDataFiles(ctx, t.tbl.Location(), t.meta,
recordWritingArgs{
+ sc: rdr.Schema(),
+ itr: array.IterFromReader(rdr),
+ fs: fs.(io.WriteFileIO),
+ writeUUID: &updater.commitUuid,
+ })
+
+ for df, err := range itr {
+ if err != nil {
+ return err
+ }
+ updater.appendDataFile(df)
+ }
+
+ updates, reqs, err := updater.commit()
+ if err != nil {
+ return err
+ }
+
+ return t.apply(updates, reqs)
+}
+
+// classifyFilesForOverwrite classifies existing data files based on the
provided filter.
+// Returns files to delete completely, files to rewrite partially, and any
error.
+func (t *Transaction) classifyFilesForOverwrite(ctx context.Context, fs io.IO,
filter iceberg.BooleanExpression, concurrency int) (filesToDelete,
filesToRewrite []iceberg.DataFile, err error) {
+ s := t.meta.currentSnapshot()
+ if s == nil {
+ return nil, nil, nil
+ }
+
+ if filter == nil || filter.Equals(iceberg.AlwaysTrue{}) {
+ for df, err := range s.dataFiles(fs, nil) {
+ if err != nil {
+ return nil, nil, err
+ }
+ if df.ContentType() == iceberg.EntryContentData {
+ filesToDelete = append(filesToDelete, df)
+ }
+ }
+
+ return filesToDelete, filesToRewrite, nil
+ }
+
+ return t.classifyFilesForFilteredOverwrite(ctx, fs, filter, concurrency)
+}
+
+// classifyFilesForFilteredOverwrite classifies files for filtered overwrite
operations.
+// Returns files to delete completely, files to rewrite partially, and any
error.
+func (t *Transaction) classifyFilesForFilteredOverwrite(ctx context.Context,
fs io.IO, filter iceberg.BooleanExpression, concurrency int) (filesToDelete,
filesToRewrite []iceberg.DataFile, err error) {
+ schema := t.meta.CurrentSchema()
+
+ inclusiveEvaluator, err := newInclusiveMetricsEvaluator(schema, filter,
true, false)
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to create inclusive metrics
evaluator: %w", err)
+ }
+
+ strictEvaluator, err := newStrictMetricsEvaluator(schema, filter, true,
false)
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to create strict metrics
evaluator: %w", err)
+ }
+
+ var manifestEval func(iceberg.ManifestFile) (bool, error)
+ meta, err := t.meta.Build()
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to build metadata: %w", err)
+ }
+ spec := meta.PartitionSpec()
+ if !spec.IsUnpartitioned() {
+ manifestEval, err = newManifestEvaluator(spec, schema, filter,
true)
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to create manifest
evaluator: %w", err)
+ }
+ }
+
+ s := t.meta.currentSnapshot()
+ manifests, err := s.Manifests(fs)
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to get manifests: %w", err)
+ }
+
+ type classifiedFiles struct {
+ toDelete []iceberg.DataFile
+ toRewrite []iceberg.DataFile
+ }
+
+ var (
+ mu sync.Mutex
+ allFilesToDel []iceberg.DataFile
+ allFilesToRewr []iceberg.DataFile
+ )
+
+ g, _ := errgroup.WithContext(ctx)
+ g.SetLimit(min(concurrency, len(manifests)))
+
+ for _, manifest := range manifests {
+ manifest := manifest // capture loop variable
+ g.Go(func() error {
+ if manifestEval != nil {
+ match, err := manifestEval(manifest)
+ if err != nil {
+ return fmt.Errorf("failed to evaluate
manifest %s: %w", manifest.FilePath(), err)
+ }
+ if !match {
+ return nil
+ }
+ }
+
+ entries, err := manifest.FetchEntries(fs, false)
+ if err != nil {
+ return fmt.Errorf("failed to fetch manifest
entries: %w", err)
+ }
+
+ localDelete := make([]iceberg.DataFile, 0)
+ localRewrite := make([]iceberg.DataFile, 0)
+
+ for _, entry := range entries {
+ if entry.Status() == iceberg.EntryStatusDELETED
{
+ continue
+ }
+
+ df := entry.DataFile()
+ if df.ContentType() != iceberg.EntryContentData
{
+ continue
+ }
+
+ inclusive, err := inclusiveEvaluator(df)
+ if err != nil {
+ return fmt.Errorf("failed to evaluate
data file %s with inclusive evaluator: %w", df.FilePath(), err)
+ }
+
+ if !inclusive {
+ continue
+ }
+
+ strict, err := strictEvaluator(df)
+ if err != nil {
+ return fmt.Errorf("failed to evaluate
data file %s with strict evaluator: %w", df.FilePath(), err)
+ }
+
+ if strict {
+ localDelete = append(localDelete, df)
+ } else {
+ localRewrite = append(localRewrite, df)
+ }
+ }
+
+ if len(localDelete) > 0 || len(localRewrite) > 0 {
+ mu.Lock()
+ allFilesToDel = append(allFilesToDel,
localDelete...)
+ allFilesToRewr = append(allFilesToRewr,
localRewrite...)
+ mu.Unlock()
+ }
+
+ return nil
+ })
+ }
+
+ if err := g.Wait(); err != nil {
+ return nil, nil, err
+ }
+
+ return allFilesToDel, allFilesToRewr, nil
+}
+
+// rewriteFilesWithFilter rewrites data files by preserving only rows that do
NOT match the filter
+func (t *Transaction) rewriteFilesWithFilter(ctx context.Context, fs io.IO,
updater *snapshotProducer, files []iceberg.DataFile, filter
iceberg.BooleanExpression, concurrency int) error {
+ complementFilter := iceberg.NewNot(filter)
+
+ for _, originalFile := range files {
+ // Use a separate UUID for rewrite operations to avoid filename
collisions with new data files
+ rewriteUUID := uuid.New()
+ rewrittenFiles, err := t.rewriteSingleFile(ctx, fs,
originalFile, complementFilter, rewriteUUID, concurrency)
+ if err != nil {
+ return fmt.Errorf("failed to rewrite file %s: %w",
originalFile.FilePath(), err)
+ }
+
+ updater.deleteDataFile(originalFile)
+ for _, rewrittenFile := range rewrittenFiles {
+ updater.appendDataFile(rewrittenFile)
+ }
+ }
+
+ return nil
+}
+
+// rewriteSingleFile reads a single data file, applies the filter, and writes
new files with filtered data
+func (t *Transaction) rewriteSingleFile(ctx context.Context, fs io.IO,
originalFile iceberg.DataFile, filter iceberg.BooleanExpression, commitUUID
uuid.UUID, concurrency int) ([]iceberg.DataFile, error) {
+ scanTask := &FileScanTask{
+ File: originalFile,
+ Start: 0,
+ Length: originalFile.FileSizeBytes(),
+ }
+
+ boundFilter, err := iceberg.BindExpr(t.meta.CurrentSchema(), filter,
true)
+ if err != nil {
+ return nil, fmt.Errorf("failed to bind filter: %w", err)
+ }
+
+ meta, err := t.meta.Build()
+ if err != nil {
+ return nil, fmt.Errorf("failed to build metadata: %w", err)
+ }
+
+ scanner := &arrowScan{
+ metadata: meta,
+ fs: fs,
+ projectedSchema: t.meta.CurrentSchema(),
+ boundRowFilter: boundFilter,
+ caseSensitive: true,
+ rowLimit: -1, // No limit
+ concurrency: concurrency,
+ }
+
+ arrowSchema, recordIter, err := scanner.GetRecords(ctx,
[]FileScanTask{*scanTask})
+ if err != nil {
+ return nil, fmt.Errorf("failed to get records from original
file: %w", err)
+ }
+
+ // Wrap the iterator to release records after consumption
+ releaseIter := func(yield func(arrow.RecordBatch, error) bool) {
+ for rec, err := range recordIter {
+ if err != nil {
+ yield(nil, err)
+
+ return
+ }
+ if !yield(rec, nil) {
+ rec.Release()
+
+ return
+ }
+ rec.Release()
+ }
+ }
+
+ var result []iceberg.DataFile
+ itr := recordsToDataFiles(ctx, t.tbl.Location(), t.meta,
recordWritingArgs{
+ sc: arrowSchema,
+ itr: releaseIter,
+ fs: fs.(io.WriteFileIO),
+ writeUUID: &commitUUID,
+ })
+
+ for df, err := range itr {
+ if err != nil {
+ return nil, err
+ }
+ result = append(result, df)
+ }
+
+ return result, nil
+}
+
func (t *Transaction) Scan(opts ...ScanOption) (*Scan, error) {
updatedMeta, err := t.meta.Build()
if err != nil {
diff --git a/table/transaction_test.go b/table/transaction_test.go
index a0269cb3..d2655fa1 100644
--- a/table/transaction_test.go
+++ b/table/transaction_test.go
@@ -363,6 +363,140 @@ func (s *SparkIntegrationTestSuite) TestUpdateSpec() {
)
}
+func (s *SparkIntegrationTestSuite) TestOverwriteBasic() {
+ icebergSchema := iceberg.NewSchema(0,
+ iceberg.NestedField{ID: 1, Name: "foo", Type:
iceberg.PrimitiveTypes.Bool},
+ iceberg.NestedField{ID: 2, Name: "bar", Type:
iceberg.PrimitiveTypes.String},
+ iceberg.NestedField{ID: 3, Name: "baz", Type:
iceberg.PrimitiveTypes.Int32},
+ )
+
+ tbl, err := s.cat.CreateTable(s.ctx, catalog.ToIdentifier("default",
"go_test_overwrite_basic"), icebergSchema)
+ s.Require().NoError(err)
+
+ // Create initial data
+ arrowSchema, err := table.SchemaToArrowSchema(icebergSchema, nil, true,
false)
+ s.Require().NoError(err)
+
+ initialTable, err := array.TableFromJSON(memory.DefaultAllocator,
arrowSchema, []string{
+ `[
+ {"foo": true, "bar": "initial", "baz": 100},
+ {"foo": false, "bar": "old_data", "baz": 200}
+ ]`,
+ })
+ s.Require().NoError(err)
+ defer initialTable.Release()
+
+ tx := tbl.NewTransaction()
+ err = tx.AppendTable(s.ctx, initialTable, 2, nil)
+ s.Require().NoError(err)
+ tbl, err = tx.Commit(s.ctx)
+ s.Require().NoError(err)
+
+ overwriteTable, err := array.TableFromJSON(memory.DefaultAllocator,
arrowSchema, []string{
+ `[
+ {"foo": false, "bar": "overwritten", "baz": 300},
+ {"foo": true, "bar": "new_data", "baz": 400}
+ ]`,
+ })
+ s.Require().NoError(err)
+ defer overwriteTable.Release()
+
+ tx = tbl.NewTransaction()
+ err = tx.OverwriteTable(s.ctx, overwriteTable, 2, nil, true, 0, nil)
+ s.Require().NoError(err)
+ _, err = tx.Commit(s.ctx)
+ s.Require().NoError(err)
+
+ expectedOutput := `
++--------+
+|count(1)|
++--------+
+|2 |
++--------+
+
++-----+-----------+---+
+|foo |bar |baz|
++-----+-----------+---+
+|false|overwritten|300|
+|true |new_data |400|
++-----+-----------+---+
+`
+
+ output, err := recipe.ExecuteSpark(s.T(), "./validation.py", "--test",
"TestOverwriteBasic")
+ s.Require().NoError(err)
+ s.Require().True(
+ strings.HasSuffix(strings.TrimSpace(output),
strings.TrimSpace(expectedOutput)),
+ "result does not contain expected output: %s", expectedOutput,
+ )
+}
+
+func (s *SparkIntegrationTestSuite) TestOverwriteWithFilter() {
+ icebergSchema := iceberg.NewSchema(0,
+ iceberg.NestedField{ID: 1, Name: "foo", Type:
iceberg.PrimitiveTypes.Bool},
+ iceberg.NestedField{ID: 2, Name: "bar", Type:
iceberg.PrimitiveTypes.String},
+ iceberg.NestedField{ID: 3, Name: "baz", Type:
iceberg.PrimitiveTypes.Int32},
+ )
+
+ tbl, err := s.cat.CreateTable(s.ctx, catalog.ToIdentifier("default",
"go_test_overwrite_filter"), icebergSchema)
+ s.Require().NoError(err)
+
+ arrowSchema, err := table.SchemaToArrowSchema(icebergSchema, nil, true,
false)
+ s.Require().NoError(err)
+
+ initialTable, err := array.TableFromJSON(memory.DefaultAllocator,
arrowSchema, []string{
+ `[
+ {"foo": true, "bar": "should_be_replaced", "baz": 100},
+ {"foo": false, "bar": "should_remain", "baz": 200},
+ {"foo": true, "bar": "also_replaced", "baz": 300}
+ ]`,
+ })
+ s.Require().NoError(err)
+ defer initialTable.Release()
+
+ tx := tbl.NewTransaction()
+ err = tx.AppendTable(s.ctx, initialTable, 3, nil)
+ s.Require().NoError(err)
+ tbl, err = tx.Commit(s.ctx)
+ s.Require().NoError(err)
+
+ overwriteTable, err := array.TableFromJSON(memory.DefaultAllocator,
arrowSchema, []string{
+ `[
+ {"foo": true, "bar": "new_replacement", "baz": 999}
+ ]`,
+ })
+ s.Require().NoError(err)
+ defer overwriteTable.Release()
+
+ filter := iceberg.EqualTo(iceberg.Reference("foo"), true)
+ tx = tbl.NewTransaction()
+ err = tx.OverwriteTable(s.ctx, overwriteTable, 1, filter, true, 0, nil)
+ s.Require().NoError(err)
+ _, err = tx.Commit(s.ctx)
+ s.Require().NoError(err)
+
+ expectedOutput := `
++--------+
+|count(1)|
++--------+
+|2 |
++--------+
+
++-----+---------------+---+
+|foo |bar |baz|
++-----+---------------+---+
+|false|should_remain |200|
+|true |new_replacement|999|
++-----+---------------+---+
+`
+
+ output, err := recipe.ExecuteSpark(s.T(), "./validation.py", "--test",
"TestOverwriteWithFilter")
+ s.Require().NoError(err)
+ s.Require().True(
+ strings.HasSuffix(strings.TrimSpace(output),
strings.TrimSpace(expectedOutput)),
+ "result does not contain expected output: %s", expectedOutput,
+ )
+}
+
func TestSparkIntegration(t *testing.T) {
suite.Run(t, new(SparkIntegrationTestSuite))
}