This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new c2c15330 feat: table overwrite functionality (#674)
c2c15330 is described below

commit c2c15330325e7b8a7cdfe51cd441a54944b34828
Author: Arun Donti <[email protected]>
AuthorDate: Thu Feb 5 19:00:17 2026 -0500

    feat: table overwrite functionality (#674)
    
    Resolves #668
    
    There are quite a few things I'm not too sure about with this
    implementation (like the filtering logic or if I've properly tested it)
    
    ---------
    
    Co-authored-by: Arun Donti <[email protected]>
    Co-authored-by: Matt Topol <[email protected]>
---
 README.md                     |   2 +-
 internal/recipe/validation.py |  36 ++++-
 table/table.go                |  53 +++++++
 table/table_test.go           |  48 +++++-
 table/transaction.go          | 339 ++++++++++++++++++++++++++++++++++++++++++
 table/transaction_test.go     | 134 +++++++++++++++++
 6 files changed, 607 insertions(+), 5 deletions(-)

diff --git a/README.md b/README.md
index 451aeb8a..d6cb2416 100644
--- a/README.md
+++ b/README.md
@@ -99,7 +99,7 @@ the table, the following tracks the current write support:
 | Append Data Files |   X     |
 | Rewrite Files     |         |
 | Rewrite manifests |         |
-| Overwrite Files   |         |
+| Overwrite Files   |   X     |
 | Write Pos Delete  |         |
 | Write Eq Delete   |         |
 | Row Delta         |         |
diff --git a/internal/recipe/validation.py b/internal/recipe/validation.py
index 1ac9421a..7c1b2ecb 100644
--- a/internal/recipe/validation.py
+++ b/internal/recipe/validation.py
@@ -26,16 +26,40 @@ def testSetProperties():
 
 
 def testAddedFile():
-    spark.sql("SELECT COUNT(*) FROM 
default.test_partitioned_by_days").show(truncate=False)
+    spark.sql("SELECT COUNT(*) FROM default.test_partitioned_by_days").show(
+        truncate=False
+    )
 
 
 def testReadDifferentDataTypes():
-    spark.sql("DESCRIBE TABLE EXTENDED 
default.go_test_different_data_types").show(truncate=False)
+    spark.sql("DESCRIBE TABLE EXTENDED 
default.go_test_different_data_types").show(
+        truncate=False
+    )
     spark.sql("SELECT * FROM 
default.go_test_different_data_types").show(truncate=False)
 
 
 def testReadSpecUpdate():
-    spark.sql("DESCRIBE TABLE EXTENDED 
default.go_test_update_spec").show(truncate=False)
+    spark.sql("DESCRIBE TABLE EXTENDED default.go_test_update_spec").show(
+        truncate=False
+    )
+
+
+def testOverwriteBasic():
+    spark.sql("SELECT COUNT(*) FROM default.go_test_overwrite_basic").show(
+        truncate=False
+    )
+    spark.sql("SELECT * FROM default.go_test_overwrite_basic ORDER BY 
baz").show(
+        truncate=False
+    )
+
+
+def testOverwriteWithFilter():
+    spark.sql("SELECT COUNT(*) FROM default.go_test_overwrite_filter").show(
+        truncate=False
+    )
+    spark.sql("SELECT * FROM default.go_test_overwrite_filter ORDER BY 
baz").show(
+        truncate=False
+    )
 
 
 if __name__ == "__main__":
@@ -54,3 +78,9 @@ if __name__ == "__main__":
 
     if args.test == "TestReadSpecUpdate":
         testReadSpecUpdate()
+
+    if args.test == "TestOverwriteBasic":
+        testOverwriteBasic()
+
+    if args.test == "TestOverwriteWithFilter":
+        testOverwriteWithFilter()
diff --git a/table/table.go b/table/table.go
index 8abf6fb8..04f30777 100644
--- a/table/table.go
+++ b/table/table.go
@@ -129,6 +129,59 @@ func (t Table) Append(ctx context.Context, rdr 
array.RecordReader, snapshotProps
        return txn.Commit(ctx)
 }
 
+// OverwriteTable is a shortcut for NewTransaction().OverwriteTable() and then 
committing the transaction.
+//
+// The filter parameter determines which existing data to delete or rewrite:
+//   - If filter is nil or AlwaysTrue, all existing data files are deleted and 
replaced with new data.
+//   - If a filter is provided, it acts as a row-level predicate on existing 
data:
+//   - Files where all rows match the filter (strict match) are completely 
deleted
+//   - Files where some rows match and others don't (partial match) are 
rewritten to keep only non-matching rows
+//   - Files where no rows match the filter are kept unchanged
+//
+// The filter uses both inclusive and strict metrics evaluators on file 
statistics to classify files:
+//   - Inclusive evaluator identifies candidate files that may contain 
matching rows
+//   - Strict evaluator determines if all rows in a file must match the filter
+//   - Files that pass inclusive but not strict evaluation are rewritten with 
filtered data
+//
+// New data from the provided table is written to the table regardless of the 
filter.
+//
+// The batchSize parameter refers to the batch size for reading the input 
data, not the batch size for writes.
+// The concurrency parameter controls the level of parallelism. If concurrency 
<= 0, defaults to runtime.GOMAXPROCS(0).
+func (t Table) OverwriteTable(ctx context.Context, tbl arrow.Table, batchSize 
int64, filter iceberg.BooleanExpression, caseSensitive bool, concurrency int, 
snapshotProps iceberg.Properties) (*Table, error) {
+       txn := t.NewTransaction()
+       if err := txn.OverwriteTable(ctx, tbl, batchSize, filter, 
caseSensitive, concurrency, snapshotProps); err != nil {
+               return nil, err
+       }
+
+       return txn.Commit(ctx)
+}
+
+// Overwrite is a shortcut for NewTransaction().Overwrite() and then 
committing the transaction.
+//
+// The filter parameter determines which existing data to delete or rewrite:
+//   - If filter is nil or AlwaysTrue, all existing data files are deleted and 
replaced with new data.
+//   - If a filter is provided, it acts as a row-level predicate on existing 
data:
+//   - Files where all rows match the filter (strict match) are completely 
deleted
+//   - Files where some rows match and others don't (partial match) are 
rewritten to keep only non-matching rows
+//   - Files where no rows match the filter are kept unchanged
+//
+// The filter uses both inclusive and strict metrics evaluators on file 
statistics to classify files:
+//   - Inclusive evaluator identifies candidate files that may contain 
matching rows
+//   - Strict evaluator determines if all rows in a file must match the filter
+//   - Files that pass inclusive but not strict evaluation are rewritten with 
filtered data
+//
+// New data from the provided RecordReader is written to the table regardless 
of the filter.
+//
+// The concurrency parameter controls the level of parallelism. If concurrency 
<= 0, defaults to runtime.GOMAXPROCS(0).
+func (t Table) Overwrite(ctx context.Context, rdr array.RecordReader, filter 
iceberg.BooleanExpression, caseSensitive bool, concurrency int, snapshotProps 
iceberg.Properties) (*Table, error) {
+       txn := t.NewTransaction()
+       if err := txn.Overwrite(ctx, rdr, filter, caseSensitive, concurrency, 
snapshotProps); err != nil {
+               return nil, err
+       }
+
+       return txn.Commit(ctx)
+}
+
 func (t Table) AllManifests(ctx context.Context) 
iter.Seq2[iceberg.ManifestFile, error] {
        fs, err := t.fsF(ctx)
        if err != nil {
diff --git a/table/table_test.go b/table/table_test.go
index addc9346..a01a3f78 100644
--- a/table/table_test.go
+++ b/table/table_test.go
@@ -368,7 +368,7 @@ func (t *TableWritingTestSuite) createTable(identifier 
table.Identifier, formatV
                func(ctx context.Context) (iceio.IO, error) {
                        return iceio.LocalFS{}, nil
                },
-               nil,
+               &mockedCatalog{meta},
        )
 }
 
@@ -818,6 +818,8 @@ func (t *TableWritingTestSuite) 
TestAddFilesReferencedCurrentSnapshotIgnoreDupli
        t.Equal([]int32{0, 0, 0}, deleted)
 }
 
+// mockedCatalog is necessary for overwrite operations to simulate catalog 
behavior
+// during transaction commits without requiring a real catalog implementation.
 type mockedCatalog struct {
        metadata table.Metadata
 }
@@ -1617,6 +1619,50 @@ func (t *TableWritingTestSuite) TestMergeManifests() {
        t.True(array.TableEqual(resultB, resultC), "expected:\n %s\ngot:\n %s", 
resultB, resultC)
 }
 
+// TestOverwriteTable verifies that Table.OverwriteTable properly delegates to 
Transaction.OverwriteTable
+func (t *TableWritingTestSuite) TestOverwriteTable() {
+       ident := table.Identifier{"default", "overwrite_table_wrapper_v" + 
strconv.Itoa(t.formatVersion)}
+       tbl := t.createTable(ident, t.formatVersion, 
*iceberg.UnpartitionedSpec, t.tableSchema)
+       newTable, err := array.TableFromJSON(memory.DefaultAllocator, 
t.arrSchema, []string{
+               `[{"foo": false, "bar": "wrapper_test", "baz": 123, "qux": 
"2024-01-01"}]`,
+       })
+       t.Require().NoError(err)
+       defer newTable.Release()
+       resultTbl, err := tbl.OverwriteTable(t.ctx, newTable, 1, nil, true, 0, 
nil)
+       t.Require().NoError(err)
+       t.NotNil(resultTbl)
+
+       snapshot := resultTbl.CurrentSnapshot()
+       t.NotNil(snapshot)
+       t.Equal(table.OpAppend, snapshot.Summary.Operation) // Empty table 
overwrite becomes append
+}
+
+// TestOverwriteRecord verifies that Table.Overwrite properly delegates to 
Transaction.Overwrite
+func (t *TableWritingTestSuite) TestOverwriteRecord() {
+       ident := table.Identifier{"default", "overwrite_record_wrapper_v" + 
strconv.Itoa(t.formatVersion)}
+       tbl := t.createTable(ident, t.formatVersion, 
*iceberg.UnpartitionedSpec, t.tableSchema)
+
+       // Create test data as RecordReader
+       testTable, err := array.TableFromJSON(memory.DefaultAllocator, 
t.arrSchema, []string{
+               `[{"foo": true, "bar": "record_test", "baz": 456, "qux": 
"2024-01-02"}]`,
+       })
+       t.Require().NoError(err)
+       defer testTable.Release()
+
+       rdr := array.NewTableReader(testTable, 1)
+       defer rdr.Release()
+
+       // Test that Table.Overwrite works (delegates to transaction)
+       resultTbl, err := tbl.Overwrite(t.ctx, rdr, nil, true, 0, nil)
+       t.Require().NoError(err)
+       t.NotNil(resultTbl)
+
+       // Verify the operation worked
+       snapshot := resultTbl.CurrentSnapshot()
+       t.NotNil(snapshot)
+       t.Equal(table.OpAppend, snapshot.Summary.Operation) // Empty table 
overwrite becomes append
+}
+
 func TestTableWriting(t *testing.T) {
        suite.Run(t, &TableWritingTestSuite{formatVersion: 1})
        suite.Run(t, &TableWritingTestSuite{formatVersion: 2})
diff --git a/table/transaction.go b/table/transaction.go
index e3a2fe90..1187eba6 100644
--- a/table/transaction.go
+++ b/table/transaction.go
@@ -33,6 +33,7 @@ import (
        "github.com/apache/iceberg-go"
        "github.com/apache/iceberg-go/io"
        "github.com/google/uuid"
+       "golang.org/x/sync/errgroup"
 )
 
 type snapshotUpdate struct {
@@ -499,6 +500,344 @@ func (t *Transaction) AddFiles(ctx context.Context, files 
[]string, snapshotProp
        return t.apply(updates, reqs)
 }
 
+// OverwriteTable overwrites the table data using an Arrow Table, optionally 
with a filter.
+//
+// The filter parameter determines which existing data to delete or rewrite:
+//   - If filter is nil or AlwaysTrue, all existing data files are deleted and 
replaced with new data.
+//   - If a filter is provided, it acts as a row-level predicate on existing 
data:
+//   - Files where all rows match the filter (strict match) are completely 
deleted
+//   - Files where some rows match and others don't (partial match) are 
rewritten to keep only non-matching rows
+//   - Files where no rows match the filter are kept unchanged
+//
+// The filter uses both inclusive and strict metrics evaluators on file 
statistics to classify files:
+//   - Inclusive evaluator identifies candidate files that may contain 
matching rows
+//   - Strict evaluator determines if all rows in a file must match the filter
+//   - Files that pass inclusive but not strict evaluation are rewritten with 
filtered data
+//
+// New data from the provided table is written to the table regardless of the 
filter.
+//
+// The batchSize parameter refers to the batch size for reading the input 
data, not the batch size for writes.
+// The concurrency parameter controls the level of parallelism for manifest 
processing and file rewriting.
+// If concurrency <= 0, defaults to runtime.GOMAXPROCS(0).
+func (t *Transaction) OverwriteTable(ctx context.Context, tbl arrow.Table, 
batchSize int64, filter iceberg.BooleanExpression, caseSensitive bool, 
concurrency int, snapshotProps iceberg.Properties) error {
+       rdr := array.NewTableReader(tbl, batchSize)
+       defer rdr.Release()
+
+       return t.Overwrite(ctx, rdr, filter, caseSensitive, concurrency, 
snapshotProps)
+}
+
+// Overwrite overwrites the table data using a RecordReader, optionally with a 
filter.
+//
+// The filter parameter determines which existing data to delete or rewrite:
+//   - If filter is nil or AlwaysTrue, all existing data files are deleted and 
replaced with new data.
+//   - If a filter is provided, it acts as a row-level predicate on existing 
data:
+//   - Files where all rows match the filter (strict match) are completely 
deleted
+//   - Files where some rows match and others don't (partial match) are 
rewritten to keep only non-matching rows
+//   - Files where no rows match the filter are kept unchanged
+//
+// The filter uses both inclusive and strict metrics evaluators on file 
statistics to classify files:
+//   - Inclusive evaluator identifies candidate files that may contain 
matching rows
+//   - Strict evaluator determines if all rows in a file must match the filter
+//   - Files that pass inclusive but not strict evaluation are rewritten with 
filtered data
+//
+// New data from the provided RecordReader is written to the table regardless 
of the filter.
+//
+// The concurrency parameter controls the level of parallelism for manifest 
processing and file rewriting.
+// If concurrency <= 0, defaults to runtime.GOMAXPROCS(0).
+func (t *Transaction) Overwrite(ctx context.Context, rdr array.RecordReader, 
filter iceberg.BooleanExpression, caseSensitive bool, concurrency int, 
snapshotProps iceberg.Properties) error {
+       fs, err := t.tbl.fsF(ctx)
+       if err != nil {
+               return err
+       }
+
+       // Default concurrency if not specified
+       if concurrency <= 0 {
+               concurrency = runtime.GOMAXPROCS(0)
+       }
+
+       if t.meta.NameMapping() == nil {
+               nameMapping := t.meta.CurrentSchema().NameMapping()
+               mappingJson, err := json.Marshal(nameMapping)
+               if err != nil {
+                       return err
+               }
+               err = t.SetProperties(iceberg.Properties{DefaultNameMappingKey: 
string(mappingJson)})
+               if err != nil {
+                       return err
+               }
+       }
+
+       commitUUID := uuid.New()
+       updater := t.updateSnapshot(fs, 
snapshotProps).mergeOverwrite(&commitUUID)
+
+       filesToDelete, filesToRewrite, err := t.classifyFilesForOverwrite(ctx, 
fs, filter, concurrency)
+       if err != nil {
+               return err
+       }
+
+       for _, df := range filesToDelete {
+               updater.deleteDataFile(df)
+       }
+
+       if len(filesToRewrite) > 0 {
+               if err := t.rewriteFilesWithFilter(ctx, fs, updater, 
filesToRewrite, filter, concurrency); err != nil {
+                       return err
+               }
+       }
+
+       itr := recordsToDataFiles(ctx, t.tbl.Location(), t.meta, 
recordWritingArgs{
+               sc:        rdr.Schema(),
+               itr:       array.IterFromReader(rdr),
+               fs:        fs.(io.WriteFileIO),
+               writeUUID: &updater.commitUuid,
+       })
+
+       for df, err := range itr {
+               if err != nil {
+                       return err
+               }
+               updater.appendDataFile(df)
+       }
+
+       updates, reqs, err := updater.commit()
+       if err != nil {
+               return err
+       }
+
+       return t.apply(updates, reqs)
+}
+
+// classifyFilesForOverwrite classifies existing data files based on the 
provided filter.
+// Returns files to delete completely, files to rewrite partially, and any 
error.
+func (t *Transaction) classifyFilesForOverwrite(ctx context.Context, fs io.IO, 
filter iceberg.BooleanExpression, concurrency int) (filesToDelete, 
filesToRewrite []iceberg.DataFile, err error) {
+       s := t.meta.currentSnapshot()
+       if s == nil {
+               return nil, nil, nil
+       }
+
+       if filter == nil || filter.Equals(iceberg.AlwaysTrue{}) {
+               for df, err := range s.dataFiles(fs, nil) {
+                       if err != nil {
+                               return nil, nil, err
+                       }
+                       if df.ContentType() == iceberg.EntryContentData {
+                               filesToDelete = append(filesToDelete, df)
+                       }
+               }
+
+               return filesToDelete, filesToRewrite, nil
+       }
+
+       return t.classifyFilesForFilteredOverwrite(ctx, fs, filter, concurrency)
+}
+
+// classifyFilesForFilteredOverwrite classifies files for filtered overwrite 
operations.
+// Returns files to delete completely, files to rewrite partially, and any 
error.
+func (t *Transaction) classifyFilesForFilteredOverwrite(ctx context.Context, 
fs io.IO, filter iceberg.BooleanExpression, concurrency int) (filesToDelete, 
filesToRewrite []iceberg.DataFile, err error) {
+       schema := t.meta.CurrentSchema()
+
+       inclusiveEvaluator, err := newInclusiveMetricsEvaluator(schema, filter, 
true, false)
+       if err != nil {
+               return nil, nil, fmt.Errorf("failed to create inclusive metrics 
evaluator: %w", err)
+       }
+
+       strictEvaluator, err := newStrictMetricsEvaluator(schema, filter, true, 
false)
+       if err != nil {
+               return nil, nil, fmt.Errorf("failed to create strict metrics 
evaluator: %w", err)
+       }
+
+       var manifestEval func(iceberg.ManifestFile) (bool, error)
+       meta, err := t.meta.Build()
+       if err != nil {
+               return nil, nil, fmt.Errorf("failed to build metadata: %w", err)
+       }
+       spec := meta.PartitionSpec()
+       if !spec.IsUnpartitioned() {
+               manifestEval, err = newManifestEvaluator(spec, schema, filter, 
true)
+               if err != nil {
+                       return nil, nil, fmt.Errorf("failed to create manifest 
evaluator: %w", err)
+               }
+       }
+
+       s := t.meta.currentSnapshot()
+       manifests, err := s.Manifests(fs)
+       if err != nil {
+               return nil, nil, fmt.Errorf("failed to get manifests: %w", err)
+       }
+
+       type classifiedFiles struct {
+               toDelete  []iceberg.DataFile
+               toRewrite []iceberg.DataFile
+       }
+
+       var (
+               mu             sync.Mutex
+               allFilesToDel  []iceberg.DataFile
+               allFilesToRewr []iceberg.DataFile
+       )
+
+       g, _ := errgroup.WithContext(ctx)
+       g.SetLimit(min(concurrency, len(manifests)))
+
+       for _, manifest := range manifests {
+               manifest := manifest // capture loop variable
+               g.Go(func() error {
+                       if manifestEval != nil {
+                               match, err := manifestEval(manifest)
+                               if err != nil {
+                                       return fmt.Errorf("failed to evaluate 
manifest %s: %w", manifest.FilePath(), err)
+                               }
+                               if !match {
+                                       return nil
+                               }
+                       }
+
+                       entries, err := manifest.FetchEntries(fs, false)
+                       if err != nil {
+                               return fmt.Errorf("failed to fetch manifest 
entries: %w", err)
+                       }
+
+                       localDelete := make([]iceberg.DataFile, 0)
+                       localRewrite := make([]iceberg.DataFile, 0)
+
+                       for _, entry := range entries {
+                               if entry.Status() == iceberg.EntryStatusDELETED 
{
+                                       continue
+                               }
+
+                               df := entry.DataFile()
+                               if df.ContentType() != iceberg.EntryContentData 
{
+                                       continue
+                               }
+
+                               inclusive, err := inclusiveEvaluator(df)
+                               if err != nil {
+                                       return fmt.Errorf("failed to evaluate 
data file %s with inclusive evaluator: %w", df.FilePath(), err)
+                               }
+
+                               if !inclusive {
+                                       continue
+                               }
+
+                               strict, err := strictEvaluator(df)
+                               if err != nil {
+                                       return fmt.Errorf("failed to evaluate 
data file %s with strict evaluator: %w", df.FilePath(), err)
+                               }
+
+                               if strict {
+                                       localDelete = append(localDelete, df)
+                               } else {
+                                       localRewrite = append(localRewrite, df)
+                               }
+                       }
+
+                       if len(localDelete) > 0 || len(localRewrite) > 0 {
+                               mu.Lock()
+                               allFilesToDel = append(allFilesToDel, 
localDelete...)
+                               allFilesToRewr = append(allFilesToRewr, 
localRewrite...)
+                               mu.Unlock()
+                       }
+
+                       return nil
+               })
+       }
+
+       if err := g.Wait(); err != nil {
+               return nil, nil, err
+       }
+
+       return allFilesToDel, allFilesToRewr, nil
+}
+
+// rewriteFilesWithFilter rewrites data files by preserving only rows that do 
NOT match the filter
+func (t *Transaction) rewriteFilesWithFilter(ctx context.Context, fs io.IO, 
updater *snapshotProducer, files []iceberg.DataFile, filter 
iceberg.BooleanExpression, concurrency int) error {
+       complementFilter := iceberg.NewNot(filter)
+
+       for _, originalFile := range files {
+               // Use a separate UUID for rewrite operations to avoid filename 
collisions with new data files
+               rewriteUUID := uuid.New()
+               rewrittenFiles, err := t.rewriteSingleFile(ctx, fs, 
originalFile, complementFilter, rewriteUUID, concurrency)
+               if err != nil {
+                       return fmt.Errorf("failed to rewrite file %s: %w", 
originalFile.FilePath(), err)
+               }
+
+               updater.deleteDataFile(originalFile)
+               for _, rewrittenFile := range rewrittenFiles {
+                       updater.appendDataFile(rewrittenFile)
+               }
+       }
+
+       return nil
+}
+
+// rewriteSingleFile reads a single data file, applies the filter, and writes 
new files with filtered data
+func (t *Transaction) rewriteSingleFile(ctx context.Context, fs io.IO, 
originalFile iceberg.DataFile, filter iceberg.BooleanExpression, commitUUID 
uuid.UUID, concurrency int) ([]iceberg.DataFile, error) {
+       scanTask := &FileScanTask{
+               File:   originalFile,
+               Start:  0,
+               Length: originalFile.FileSizeBytes(),
+       }
+
+       boundFilter, err := iceberg.BindExpr(t.meta.CurrentSchema(), filter, 
true)
+       if err != nil {
+               return nil, fmt.Errorf("failed to bind filter: %w", err)
+       }
+
+       meta, err := t.meta.Build()
+       if err != nil {
+               return nil, fmt.Errorf("failed to build metadata: %w", err)
+       }
+
+       scanner := &arrowScan{
+               metadata:        meta,
+               fs:              fs,
+               projectedSchema: t.meta.CurrentSchema(),
+               boundRowFilter:  boundFilter,
+               caseSensitive:   true,
+               rowLimit:        -1, // No limit
+               concurrency:     concurrency,
+       }
+
+       arrowSchema, recordIter, err := scanner.GetRecords(ctx, 
[]FileScanTask{*scanTask})
+       if err != nil {
+               return nil, fmt.Errorf("failed to get records from original 
file: %w", err)
+       }
+
+       // Wrap the iterator to release records after consumption
+       releaseIter := func(yield func(arrow.RecordBatch, error) bool) {
+               for rec, err := range recordIter {
+                       if err != nil {
+                               yield(nil, err)
+
+                               return
+                       }
+                       if !yield(rec, nil) {
+                               rec.Release()
+
+                               return
+                       }
+                       rec.Release()
+               }
+       }
+
+       var result []iceberg.DataFile
+       itr := recordsToDataFiles(ctx, t.tbl.Location(), t.meta, 
recordWritingArgs{
+               sc:        arrowSchema,
+               itr:       releaseIter,
+               fs:        fs.(io.WriteFileIO),
+               writeUUID: &commitUUID,
+       })
+
+       for df, err := range itr {
+               if err != nil {
+                       return nil, err
+               }
+               result = append(result, df)
+       }
+
+       return result, nil
+}
+
 func (t *Transaction) Scan(opts ...ScanOption) (*Scan, error) {
        updatedMeta, err := t.meta.Build()
        if err != nil {
diff --git a/table/transaction_test.go b/table/transaction_test.go
index a0269cb3..d2655fa1 100644
--- a/table/transaction_test.go
+++ b/table/transaction_test.go
@@ -363,6 +363,140 @@ func (s *SparkIntegrationTestSuite) TestUpdateSpec() {
        )
 }
 
+func (s *SparkIntegrationTestSuite) TestOverwriteBasic() {
+       icebergSchema := iceberg.NewSchema(0,
+               iceberg.NestedField{ID: 1, Name: "foo", Type: 
iceberg.PrimitiveTypes.Bool},
+               iceberg.NestedField{ID: 2, Name: "bar", Type: 
iceberg.PrimitiveTypes.String},
+               iceberg.NestedField{ID: 3, Name: "baz", Type: 
iceberg.PrimitiveTypes.Int32},
+       )
+
+       tbl, err := s.cat.CreateTable(s.ctx, catalog.ToIdentifier("default", 
"go_test_overwrite_basic"), icebergSchema)
+       s.Require().NoError(err)
+
+       // Create initial data
+       arrowSchema, err := table.SchemaToArrowSchema(icebergSchema, nil, true, 
false)
+       s.Require().NoError(err)
+
+       initialTable, err := array.TableFromJSON(memory.DefaultAllocator, 
arrowSchema, []string{
+               `[
+                       {"foo": true, "bar": "initial", "baz": 100},
+                       {"foo": false, "bar": "old_data", "baz": 200}
+               ]`,
+       })
+       s.Require().NoError(err)
+       defer initialTable.Release()
+
+       tx := tbl.NewTransaction()
+       err = tx.AppendTable(s.ctx, initialTable, 2, nil)
+       s.Require().NoError(err)
+       tbl, err = tx.Commit(s.ctx)
+       s.Require().NoError(err)
+
+       overwriteTable, err := array.TableFromJSON(memory.DefaultAllocator, 
arrowSchema, []string{
+               `[
+                       {"foo": false, "bar": "overwritten", "baz": 300},
+                       {"foo": true, "bar": "new_data", "baz": 400}
+               ]`,
+       })
+       s.Require().NoError(err)
+       defer overwriteTable.Release()
+
+       tx = tbl.NewTransaction()
+       err = tx.OverwriteTable(s.ctx, overwriteTable, 2, nil, true, 0, nil)
+       s.Require().NoError(err)
+       _, err = tx.Commit(s.ctx)
+       s.Require().NoError(err)
+
+       expectedOutput := `
++--------+
+|count(1)|
++--------+
+|2       |
++--------+
+
++-----+-----------+---+
+|foo  |bar        |baz|
++-----+-----------+---+
+|false|overwritten|300|
+|true |new_data   |400|
++-----+-----------+---+
+`
+
+       output, err := recipe.ExecuteSpark(s.T(), "./validation.py", "--test", 
"TestOverwriteBasic")
+       s.Require().NoError(err)
+       s.Require().True(
+               strings.HasSuffix(strings.TrimSpace(output), 
strings.TrimSpace(expectedOutput)),
+               "result does not contain expected output: %s", expectedOutput,
+       )
+}
+
+func (s *SparkIntegrationTestSuite) TestOverwriteWithFilter() {
+       icebergSchema := iceberg.NewSchema(0,
+               iceberg.NestedField{ID: 1, Name: "foo", Type: 
iceberg.PrimitiveTypes.Bool},
+               iceberg.NestedField{ID: 2, Name: "bar", Type: 
iceberg.PrimitiveTypes.String},
+               iceberg.NestedField{ID: 3, Name: "baz", Type: 
iceberg.PrimitiveTypes.Int32},
+       )
+
+       tbl, err := s.cat.CreateTable(s.ctx, catalog.ToIdentifier("default", 
"go_test_overwrite_filter"), icebergSchema)
+       s.Require().NoError(err)
+
+       arrowSchema, err := table.SchemaToArrowSchema(icebergSchema, nil, true, 
false)
+       s.Require().NoError(err)
+
+       initialTable, err := array.TableFromJSON(memory.DefaultAllocator, 
arrowSchema, []string{
+               `[
+                       {"foo": true, "bar": "should_be_replaced", "baz": 100},
+                       {"foo": false, "bar": "should_remain", "baz": 200},
+                       {"foo": true, "bar": "also_replaced", "baz": 300}
+               ]`,
+       })
+       s.Require().NoError(err)
+       defer initialTable.Release()
+
+       tx := tbl.NewTransaction()
+       err = tx.AppendTable(s.ctx, initialTable, 3, nil)
+       s.Require().NoError(err)
+       tbl, err = tx.Commit(s.ctx)
+       s.Require().NoError(err)
+
+       overwriteTable, err := array.TableFromJSON(memory.DefaultAllocator, 
arrowSchema, []string{
+               `[
+                       {"foo": true, "bar": "new_replacement", "baz": 999}
+               ]`,
+       })
+       s.Require().NoError(err)
+       defer overwriteTable.Release()
+
+       filter := iceberg.EqualTo(iceberg.Reference("foo"), true)
+       tx = tbl.NewTransaction()
+       err = tx.OverwriteTable(s.ctx, overwriteTable, 1, filter, true, 0, nil)
+       s.Require().NoError(err)
+       _, err = tx.Commit(s.ctx)
+       s.Require().NoError(err)
+
+       expectedOutput := `
++--------+
+|count(1)|
++--------+
+|2       |
++--------+
+
++-----+---------------+---+
+|foo  |bar            |baz|
++-----+---------------+---+
+|false|should_remain  |200|
+|true |new_replacement|999|
++-----+---------------+---+
+`
+
+       output, err := recipe.ExecuteSpark(s.T(), "./validation.py", "--test", 
"TestOverwriteWithFilter")
+       s.Require().NoError(err)
+       s.Require().True(
+               strings.HasSuffix(strings.TrimSpace(output), 
strings.TrimSpace(expectedOutput)),
+               "result does not contain expected output: %s", expectedOutput,
+       )
+}
+
 func TestSparkIntegration(t *testing.T) {
        suite.Run(t, new(SparkIntegrationTestSuite))
 }

Reply via email to