This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new cfab819d feat(table): add support for copy-on-write delete (#718)
cfab819d is described below

commit cfab819d919f3497a8f1f7e0062715cb407234fc
Author: Alex Normand <[email protected]>
AuthorDate: Sat Feb 14 14:19:38 2026 -0800

    feat(table): add support for copy-on-write delete (#718)
    
    ![alan](https://media1.tenor.com/m/ywHbbYEidf0AAAAd/hey-groundhog.gif)
    
    This adds support for performing delete operations on a table with the
    addition of `Delete(ctx context.Context, filter
    iceberg.BooleanExpression, snapshotProps iceberg.Properties, opts
    ...DeleteOption)` on both transaction and table (as with the other
    methods, the table version just wraps creating a new transaction, doing
    the thing and then committing it).
    
    I did some refactoring to reuse the deletion code from the overwrite
    implementation and also updated the integration tests to make it easier
    to add validation sql statements. I hope people agree that adding a test
    function in `validation.py` that wraps a spark SQL statement to perform
    the validation is less than ideal. To keep the validation SQL closer to
    the test logic, I refactored `validation.py` to have a `sql` argument
    which allows the integration test to run the validation script and pass
    it the SQL needed to get the output used for the assertions. The
    tradeoff is that some of the validations were running two sql statements
    which makes for two spark sql round trips now instead of a single one.
    However, for most of them, the validation was redundant (doing a count
    and _then_ getting the full rows which implicitly also shows the number
    of rows) so only one test is ending up running two sql statements.
    
    Thanks to @dontirun for contributing the overwrite support in
    https://github.com/apache/iceberg-go/pull/674 which set the foundation
    to make this very easy since the `copy-on-write` deletion is essentially
    a subset of the overwrite with filter.
---
 README.md                     |  21 ++---
 internal/recipe/validation.py |  60 ++-----------
 table/properties.go           |   9 ++
 table/table.go                |  23 +++++
 table/table_test.go           |  69 +++++++++++++++
 table/transaction.go          | 129 +++++++++++++++++++++++-----
 table/transaction_test.go     | 190 +++++++++++++++++++++++++-----------------
 7 files changed, 337 insertions(+), 164 deletions(-)

diff --git a/README.md b/README.md
index d6cb2416..0d3f2dea 100644
--- a/README.md
+++ b/README.md
@@ -93,16 +93,17 @@ $ cd iceberg-go/cmd/iceberg && go build .
 As long as the FileSystem is supported and the Catalog supports altering
 the table, the following tracks the current write support:
 
-| Operation         |Supported|
-|:-----------------:|:-------:|
-| Append Stream     |   X     |
-| Append Data Files |   X     |
-| Rewrite Files     |         |
-| Rewrite manifests |         |
-| Overwrite Files   |   X     |
-| Write Pos Delete  |         |
-| Write Eq Delete   |         |
-| Row Delta         |         |
+|        Operation        | Supported |
+|:-----------------------:|:---------:|
+|      Append Stream      |     X     |
+|    Append Data Files    |     X     |
+|      Rewrite Files      |           |
+|    Rewrite manifests    |           |
+|     Overwrite Files     |     X     |
+|  Copy-On-Write Delete   |     X     |
+|    Write Pos Delete     |           |
+|     Write Eq Delete     |           |
+|        Row Delta        |           |
 
 
 ### CLI Usage
diff --git a/internal/recipe/validation.py b/internal/recipe/validation.py
index 7c1b2ecb..c43366f2 100644
--- a/internal/recipe/validation.py
+++ b/internal/recipe/validation.py
@@ -21,66 +21,16 @@ from pyspark.sql import SparkSession
 spark = SparkSession.builder.getOrCreate()
 
 
-def testSetProperties():
-    spark.sql("SHOW TBLPROPERTIES 
default.go_test_set_properties").show(truncate=False)
-
-
-def testAddedFile():
-    spark.sql("SELECT COUNT(*) FROM default.test_partitioned_by_days").show(
-        truncate=False
-    )
-
-
-def testReadDifferentDataTypes():
-    spark.sql("DESCRIBE TABLE EXTENDED 
default.go_test_different_data_types").show(
-        truncate=False
-    )
-    spark.sql("SELECT * FROM 
default.go_test_different_data_types").show(truncate=False)
-
-
-def testReadSpecUpdate():
-    spark.sql("DESCRIBE TABLE EXTENDED default.go_test_update_spec").show(
-        truncate=False
-    )
-
-
-def testOverwriteBasic():
-    spark.sql("SELECT COUNT(*) FROM default.go_test_overwrite_basic").show(
-        truncate=False
-    )
-    spark.sql("SELECT * FROM default.go_test_overwrite_basic ORDER BY 
baz").show(
-        truncate=False
-    )
-
-
-def testOverwriteWithFilter():
-    spark.sql("SELECT COUNT(*) FROM default.go_test_overwrite_filter").show(
-        truncate=False
-    )
-    spark.sql("SELECT * FROM default.go_test_overwrite_filter ORDER BY 
baz").show(
+def runSQL(sql):
+    spark.sql(sql).show(
         truncate=False
     )
 
 
 if __name__ == "__main__":
     parser = argparse.ArgumentParser()
-    parser.add_argument("--test", type=str, required=True, help="Name of the 
test to run")
+    parser.add_argument("--sql", type=str, required=True, help="Validation SQL 
statement to execute")
     args = parser.parse_args()
 
-    if args.test == "TestSetProperties":
-        testSetProperties()
-
-    if args.test == "TestAddedFile":
-        testAddedFile()
-
-    if args.test == "TestReadDifferentDataTypes":
-        testReadDifferentDataTypes()
-
-    if args.test == "TestReadSpecUpdate":
-        testReadSpecUpdate()
-
-    if args.test == "TestOverwriteBasic":
-        testOverwriteBasic()
-
-    if args.test == "TestOverwriteWithFilter":
-        testOverwriteWithFilter()
+    if args.sql:
+        runSQL(args.sql)
\ No newline at end of file
diff --git a/table/properties.go b/table/properties.go
index cab74e83..560b0a1f 100644
--- a/table/properties.go
+++ b/table/properties.go
@@ -67,6 +67,9 @@ const (
        WritePartitionSummaryLimitKey     = "write.summary.partition-limit"
        WritePartitionSummaryLimitDefault = 0
 
+       WriteDeleteModeKey     = "write.delete.mode"
+       WriteDeleteModeDefault = WriteModeCopyOnWrite
+
        MetadataDeleteAfterCommitEnabledKey     = 
"write.metadata.delete-after-commit.enabled"
        MetadataDeleteAfterCommitEnabledDefault = false
 
@@ -119,3 +122,9 @@ const (
        MetadataCompressionCodecNone = "none"
        MetadataCompressionCodecGzip = "gzip"
 )
+
+// Write modes
+const (
+       WriteModeCopyOnWrite = "copy-on-write"
+       WriteModeMergeOnRead = "merge-on-read"
+)
diff --git a/table/table.go b/table/table.go
index e5e446ee..084b37ff 100644
--- a/table/table.go
+++ b/table/table.go
@@ -184,6 +184,29 @@ func (t Table) Overwrite(ctx context.Context, rdr 
array.RecordReader, snapshotPr
        return txn.Commit(ctx)
 }
 
+// Delete is a shortcut for NewTransaction().Delete() and then committing the 
transaction.
+//
+// The provided filter acts as a row-level predicate on existing data:
+//   - Files where all rows match the filter (strict match) are completely 
deleted
+//   - Files where some rows match and others don't (partial match) are 
rewritten to keep only non-matching rows
+//   - Files where no rows match the filter are kept unchanged
+//
+// The filter uses both inclusive and strict metrics evaluators on file 
statistics to classify files:
+//   - Inclusive evaluator identifies candidate files that may contain 
matching rows
+//   - Strict evaluator determines if all rows in a file must match the filter
+//   - Files that pass inclusive but not strict evaluation are rewritten with 
filtered data
+//
+// The concurrency parameter controls the level of parallelism for manifest 
processing and file rewriting and
+// can be overridden using the WithOverwriteConcurrency option. Defaults to 
runtime.GOMAXPROCS(0).
+func (t Table) Delete(ctx context.Context, filter iceberg.BooleanExpression, 
snapshotProps iceberg.Properties, opts ...DeleteOption) (*Table, error) {
+       txn := t.NewTransaction()
+       if err := txn.Delete(ctx, filter, snapshotProps, opts...); err != nil {
+               return nil, err
+       }
+
+       return txn.Commit(ctx)
+}
+
 func (t Table) AllManifests(ctx context.Context) 
iter.Seq2[iceberg.ManifestFile, error] {
        fs, err := t.fsF(ctx)
        if err != nil {
diff --git a/table/table_test.go b/table/table_test.go
index e98cf0ea..ae1bbdde 100644
--- a/table/table_test.go
+++ b/table/table_test.go
@@ -22,6 +22,7 @@ import (
        "compress/gzip"
        "context"
        "encoding/json"
+       "errors"
        "fmt"
        "io"
        "io/fs"
@@ -1663,6 +1664,74 @@ func (t *TableWritingTestSuite) TestOverwriteRecord() {
        t.Equal(table.OpAppend, snapshot.Summary.Operation) // Empty table 
overwrite becomes append
 }
 
+// TestDelete verifies that Table.Delete properly delegates to 
Transaction.Delete
+func (t *TableWritingTestSuite) TestDelete() {
+       testCases := []struct {
+               name        string
+               table       *table.Table
+               expectedErr error
+       }{
+               {
+                       name: "success with copy-on-write",
+                       table: t.createTable(
+                               table.Identifier{"default", 
"overwrite_record_wrapper_v" + strconv.Itoa(t.formatVersion)},
+                               t.formatVersion,
+                               *iceberg.UnpartitionedSpec,
+                               t.tableSchema,
+                       ),
+                       expectedErr: nil,
+               },
+               {
+                       name: "abort on merge-on-read",
+                       table: t.createTableWithProps(
+                               table.Identifier{"default", 
"overwrite_record_wrapper_v" + strconv.Itoa(t.formatVersion)},
+                               map[string]string{
+                                       table.PropertyFormatVersion: 
strconv.Itoa(t.formatVersion),
+                                       table.WriteDeleteModeKey:    
table.WriteModeMergeOnRead,
+                               },
+                               t.tableSchema,
+                       ),
+                       expectedErr: errors.New("only 'copy-on-write' is 
currently supported"),
+               },
+       }
+
+       for _, tc := range testCases {
+               t.Run(tc.name, func() {
+                       // Set up the test table with some data
+                       newTable, err := 
array.TableFromJSON(memory.DefaultAllocator, t.arrSchema, []string{
+                               `[{"foo": false, "bar": "wrapper_test", "baz": 
123, "qux": "2024-01-01"}]`,
+                       })
+                       t.Require().NoError(err)
+                       defer newTable.Release()
+
+                       tbl, err := tc.table.OverwriteTable(t.ctx, newTable, 1, 
nil)
+                       t.Require().NoError(err)
+
+                       // Validate the pre-requisite that data is present on 
the table before we go ahead and delete it
+                       arrowTable, err := tbl.Scan().ToArrowTable(t.ctx)
+                       t.Require().NoError(err)
+                       t.Equal(int64(1), arrowTable.NumRows())
+
+                       tbl, err = tbl.Delete(t.ctx, 
iceberg.EqualTo(iceberg.Reference("bar"), "wrapper_test"), nil)
+                       // If an error was expected, check that it's the 
correct one and abort validating the operation
+                       if tc.expectedErr != nil {
+                               t.Require().ErrorContains(err, 
tc.expectedErr.Error())
+
+                               return
+                       }
+
+                       snapshot := tbl.CurrentSnapshot()
+                       t.NotNil(snapshot)
+                       t.Equal(table.OpDelete, snapshot.Summary.Operation)
+
+                       arrowTable, err = tbl.Scan().ToArrowTable(t.ctx)
+                       t.Require().NoError(err)
+
+                       t.Equal(int64(0), arrowTable.NumRows())
+               })
+       }
+}
+
 func TestTableWriting(t *testing.T) {
        suite.Run(t, &TableWritingTestSuite{formatVersion: 1})
        suite.Run(t, &TableWritingTestSuite{formatVersion: 2})
diff --git a/table/transaction.go b/table/transaction.go
index 6bf1fc00..06b01631 100644
--- a/table/transaction.go
+++ b/table/transaction.go
@@ -40,6 +40,7 @@ type snapshotUpdate struct {
        txn           *Transaction
        io            io.WriteFileIO
        snapshotProps iceberg.Properties
+       operation     Operation
 }
 
 func (s snapshotUpdate) fastAppend() *snapshotProducer {
@@ -47,8 +48,8 @@ func (s snapshotUpdate) fastAppend() *snapshotProducer {
 }
 
 func (s snapshotUpdate) mergeOverwrite(commitUUID *uuid.UUID) 
*snapshotProducer {
-       op := OpOverwrite
-       if s.txn.meta.currentSnapshot() == nil {
+       op := s.operation
+       if s.operation == OpOverwrite && s.txn.meta.currentSnapshot() == nil {
                op = OpAppend
        }
 
@@ -120,7 +121,7 @@ func (t *Transaction) apply(updates []Update, reqs 
[]Requirement) error {
 
 func (t *Transaction) appendSnapshotProducer(afs io.IO, props 
iceberg.Properties) *snapshotProducer {
        manifestMerge := t.meta.props.GetBool(ManifestMergeEnabledKey, 
ManifestMergeEnabledDefault)
-       updateSnapshot := t.updateSnapshot(afs, props)
+       updateSnapshot := t.updateSnapshot(afs, props, OpAppend)
        if manifestMerge {
                return updateSnapshot.mergeAppend()
        }
@@ -128,11 +129,12 @@ func (t *Transaction) appendSnapshotProducer(afs io.IO, 
props iceberg.Properties
        return updateSnapshot.fastAppend()
 }
 
-func (t *Transaction) updateSnapshot(fs io.IO, props iceberg.Properties) 
snapshotUpdate {
+func (t *Transaction) updateSnapshot(fs io.IO, props iceberg.Properties, 
operation Operation) snapshotUpdate {
        return snapshotUpdate{
                txn:           t,
                io:            fs.(io.WriteFileIO),
                snapshotProps: props,
+               operation:     operation,
        }
 }
 
@@ -411,7 +413,7 @@ func (t *Transaction) ReplaceDataFiles(ctx context.Context, 
filesToDelete, files
        }
 
        commitUUID := uuid.New()
-       updater := t.updateSnapshot(fs, 
snapshotProps).mergeOverwrite(&commitUUID)
+       updater := t.updateSnapshot(fs, snapshotProps, 
OpOverwrite).mergeOverwrite(&commitUUID)
 
        for _, df := range markedForDeletion {
                updater.deleteDataFile(df)
@@ -482,7 +484,7 @@ func (t *Transaction) AddFiles(ctx context.Context, files 
[]string, snapshotProp
                return err
        }
 
-       updater := t.updateSnapshot(fs, snapshotProps).fastAppend()
+       updater := t.updateSnapshot(fs, snapshotProps, OpAppend).fastAppend()
 
        dataFiles := filesToDataFiles(ctx, fs, t.meta, slices.Values(files))
        for df, err := range dataFiles {
@@ -595,29 +597,61 @@ func (t *Transaction) Overwrite(ctx context.Context, rdr 
array.RecordReader, sna
                apply(&overwrite)
        }
 
+       updater, err := t.performCopyOnWriteDeletion(ctx, OpOverwrite, 
snapshotProps, overwrite.filter, overwrite.caseSensitive, overwrite.concurrency)
+       if err != nil {
+               return err
+       }
+
        fs, err := t.tbl.fsF(ctx)
        if err != nil {
                return err
        }
+       itr := recordsToDataFiles(ctx, t.tbl.Location(), t.meta, 
recordWritingArgs{
+               sc:        rdr.Schema(),
+               itr:       array.IterFromReader(rdr),
+               fs:        fs.(io.WriteFileIO),
+               writeUUID: &updater.commitUuid,
+       })
+
+       for df, err := range itr {
+               if err != nil {
+                       return err
+               }
+               updater.appendDataFile(df)
+       }
+
+       updates, reqs, err := updater.commit()
+       if err != nil {
+               return err
+       }
+
+       return t.apply(updates, reqs)
+}
+
+func (t *Transaction) performCopyOnWriteDeletion(ctx context.Context, 
operation Operation, snapshotProps iceberg.Properties, filter 
iceberg.BooleanExpression, caseSensitive bool, concurrency int) 
(*snapshotProducer, error) {
+       fs, err := t.tbl.fsF(ctx)
+       if err != nil {
+               return nil, err
+       }
 
        if t.meta.NameMapping() == nil {
                nameMapping := t.meta.CurrentSchema().NameMapping()
                mappingJson, err := json.Marshal(nameMapping)
                if err != nil {
-                       return err
+                       return nil, err
                }
                err = t.SetProperties(iceberg.Properties{DefaultNameMappingKey: 
string(mappingJson)})
                if err != nil {
-                       return err
+                       return nil, err
                }
        }
 
        commitUUID := uuid.New()
-       updater := t.updateSnapshot(fs, 
snapshotProps).mergeOverwrite(&commitUUID)
+       updater := t.updateSnapshot(fs, snapshotProps, 
operation).mergeOverwrite(&commitUUID)
 
-       filesToDelete, filesToRewrite, err := t.classifyFilesForOverwrite(ctx, 
fs, overwrite.filter, overwrite.caseSensitive, overwrite.concurrency)
+       filesToDelete, filesToRewrite, err := t.classifyFilesForOverwrite(ctx, 
fs, filter, caseSensitive, concurrency)
        if err != nil {
-               return err
+               return nil, err
        }
 
        for _, df := range filesToDelete {
@@ -625,25 +659,74 @@ func (t *Transaction) Overwrite(ctx context.Context, rdr 
array.RecordReader, sna
        }
 
        if len(filesToRewrite) > 0 {
-               if err := t.rewriteFilesWithFilter(ctx, fs, updater, 
filesToRewrite, overwrite.filter, overwrite.caseSensitive, 
overwrite.concurrency); err != nil {
-                       return err
+               if err := t.rewriteFilesWithFilter(ctx, fs, updater, 
filesToRewrite, filter, caseSensitive, concurrency); err != nil {
+                       return nil, err
                }
        }
 
-       itr := recordsToDataFiles(ctx, t.tbl.Location(), t.meta, 
recordWritingArgs{
-               sc:        rdr.Schema(),
-               itr:       array.IterFromReader(rdr),
-               fs:        fs.(io.WriteFileIO),
-               writeUUID: &updater.commitUuid,
-       })
+       return updater, nil
+}
 
-       for df, err := range itr {
-               if err != nil {
-                       return err
+type DeleteOption func(deleteOp *deleteOperation)
+
+type deleteOperation struct {
+       caseSensitive bool
+       concurrency   int
+}
+
+// WithDeleteConcurrency overwrites the default concurrency for delete 
operations.
+// Default: runtime.GOMAXPROCS(0)
+func WithDeleteConcurrency(concurrency int) DeleteOption {
+       return func(op *deleteOperation) {
+               if concurrency <= 0 {
+                       op.concurrency = runtime.GOMAXPROCS(0)
+
+                       return
                }
-               updater.appendDataFile(df)
+               op.concurrency = concurrency
        }
+}
+
+// WithDeleteCaseInsensitive changes the binding of the filter to be case 
insensitive instead of the
+// Default: case sensitive
+// Note that the sensitivity only applies to the field name and not the 
evaluation of the literals on string fields.
+func WithDeleteCaseInsensitive() DeleteOption {
+       return func(deleteOp *deleteOperation) {
+               deleteOp.caseSensitive = false
+       }
+}
 
+// Delete deletes records matching the provided filter.
+//
+// The provided filter acts as a row-level predicate on existing data:
+//   - Files where all rows match the filter (strict match) are completely 
deleted
+//   - Files where some rows match and others don't (partial match) are 
rewritten to keep only non-matching rows
+//   - Files where no rows match the filter are kept unchanged
+//
+// The filter uses both inclusive and strict metrics evaluators on file 
statistics to classify files:
+//   - Inclusive evaluator identifies candidate files that may contain 
matching rows
+//   - Strict evaluator determines if all rows in a file must match the filter
+//   - Files that pass inclusive but not strict evaluation are rewritten with 
filtered data
+//
+// The concurrency parameter controls the level of parallelism for manifest 
processing and file rewriting and
+// can be overridden using the WithOverwriteConcurrency option. Defaults to 
runtime.GOMAXPROCS(0).
+func (t *Transaction) Delete(ctx context.Context, filter 
iceberg.BooleanExpression, snapshotProps iceberg.Properties, opts 
...DeleteOption) error {
+       deleteOp := deleteOperation{
+               concurrency:   runtime.GOMAXPROCS(0),
+               caseSensitive: true,
+       }
+       for _, apply := range opts {
+               apply(&deleteOp)
+       }
+
+       writeDeleteMode := t.meta.props.Get(WriteDeleteModeKey, 
WriteDeleteModeDefault)
+       if writeDeleteMode != WriteModeCopyOnWrite {
+               return fmt.Errorf("'%s' is set to '%s' but only '%s' is 
currently supported", WriteDeleteModeKey, writeDeleteMode, WriteModeCopyOnWrite)
+       }
+       updater, err := t.performCopyOnWriteDeletion(ctx, OpDelete, 
snapshotProps, filter, deleteOp.caseSensitive, deleteOp.concurrency)
+       if err != nil {
+               return err
+       }
        updates, reqs, err := updater.commit()
        if err != nil {
                return err
diff --git a/table/transaction_test.go b/table/transaction_test.go
index 2a8f3bde..1f64adf1 100644
--- a/table/transaction_test.go
+++ b/table/transaction_test.go
@@ -92,8 +92,9 @@ func (s *SparkIntegrationTestSuite) TestSetProperties() {
        _, err = tx.Commit(s.ctx)
        s.Require().NoError(err)
 
-       expectedOutput := `
-+----------------------------------+---------------+
+       output, err := recipe.ExecuteSpark(s.T(), "./validation.py", "--sql", 
"SHOW TBLPROPERTIES default.go_test_set_properties")
+       s.Require().NoError(err)
+       s.Require().Contains(output, 
`+----------------------------------+---------------+
 |key                               |value          |
 +----------------------------------+---------------+
 |commit.manifest-merge.enabled     |true           |
@@ -103,15 +104,7 @@ func (s *SparkIntegrationTestSuite) TestSetProperties() {
 |format                            |iceberg/parquet|
 |format-version                    |2              |
 |write.parquet.compression-codec   |snappy         |
-+----------------------------------+---------------+
-`
-
-       output, err := recipe.ExecuteSpark(s.T(), "./validation.py", "--test", 
"TestSetProperties")
-       s.Require().NoError(err)
-       s.Require().True(
-               strings.HasSuffix(strings.TrimSpace(output), 
strings.TrimSpace(expectedOutput)),
-               "result does not contain expected output: %s", expectedOutput,
-       )
++----------------------------------+---------------+`)
 }
 
 func (s *SparkIntegrationTestSuite) TestAddFile() {
@@ -163,7 +156,7 @@ func (s *SparkIntegrationTestSuite) TestAddFile() {
 +--------+
 `
 
-       output, err := recipe.ExecuteSpark(s.T(), "./validation.py", "--test", 
"TestAddedFile")
+       output, err := recipe.ExecuteSpark(s.T(), "./validation.py", "--sql", 
"SELECT COUNT(*) FROM default.test_partitioned_by_days")
        s.Require().NoError(err)
        s.Require().True(
                strings.HasSuffix(strings.TrimSpace(output), 
strings.TrimSpace(expectedOutput)),
@@ -274,8 +267,9 @@ func (s *SparkIntegrationTestSuite) 
TestDifferentDataTypes() {
        _, err = tx.Commit(s.ctx)
        s.Require().NoError(err)
 
-       expectedSchema := `
-+------------------+-------------+-------+
+       output, err := recipe.ExecuteSpark(s.T(), "./validation.py", "--sql", 
"DESCRIBE TABLE EXTENDED default.go_test_different_data_types")
+       s.Require().NoError(err)
+       s.Require().Contains(output, `+------------------+-------------+-------+
 |col_name          |data_type    |comment|
 +------------------+-------------+-------+
 |bool              |boolean      |NULL   |
@@ -294,29 +288,17 @@ func (s *SparkIntegrationTestSuite) 
TestDifferentDataTypes() {
 |small_dec         |decimal(8,2) |NULL   |
 |med_dec           |decimal(16,2)|NULL   |
 |large_dec         |decimal(24,2)|NULL   |
-|list              |array<int>   |NULL   |
-`
+|list              |array<int>   |NULL   |`)
 
-       expectedOutput := `
-+-----+------+----------------------+----+----+-----+------+-------------------+-------------------+----------+------------------------------------+------+-------------------------------------------------+---------+-----------------+-------------------------+------------+
+       output, err = recipe.ExecuteSpark(s.T(), "./validation.py", "--sql", 
"SELECT * FROM default.go_test_different_data_types")
+       s.Require().NoError(err)
+       s.Require().Contains(output, 
`+-----+------+----------------------+----+----+-----+------+-------------------+-------------------+----------+------------------------------------+------+-------------------------------------------------+---------+-----------------+-------------------------+------------+
 |bool |string|string_long           |int |long|float|double|timestamp          
|timestamptz        |date      |uuid                                
|binary|fixed                                            |small_dec|med_dec     
     |large_dec                |list        |
 
+-----+------+----------------------+----+----+-----+------+-------------------+-------------------+----------+------------------------------------+------+-------------------------------------------------+---------+-----------------+-------------------------+------------+
 |false|a     |aaaaaaaaaaaaaaaaaaaaaa|1   |1   |0.0  |0.0   |2023-01-01 
11:25:00|2023-01-01 
19:25:00|2023-01-01|00000000-0000-0000-0000-000000000000|[01]  |[00 00 00 00 00 
00 00 00 00 00 00 00 00 00 00 
00]|123456.78|12345678901234.56|1234567890123456789012.34|[1, 2, 3]   |
 |NULL |NULL  |NULL                  |NULL|NULL|NULL |NULL  |NULL               
|NULL               |NULL      |NULL                                |NULL  
|NULL                                             |NULL     |NULL             
|NULL                     |NULL        |
 |true |z     |zzzzzzzzzzzzzzzzzzzzzz|9   |9   |0.9  |0.9   |2023-03-01 
11:25:00|2023-03-01 
19:25:00|2023-03-01|11111111-1111-1111-1111-111111111111|[12]  |[11 11 11 11 11 
11 11 11 11 11 11 11 11 11 11 
11]|876543.21|65432109876543.21|4321098765432109876543.21|[-1, -2, -3]|
-+-----+------+----------------------+----+----+-----+------+-------------------+-------------------+----------+------------------------------------+------+-------------------------------------------------+---------+-----------------+-------------------------+------------+
-`
-
-       output, err := recipe.ExecuteSpark(s.T(), "./validation.py", "--test", 
"TestReadDifferentDataTypes")
-       s.Require().NoError(err)
-       s.Require().True(
-               strings.Contains(strings.TrimSpace(output), 
strings.TrimSpace(expectedSchema)),
-               "result does not contain expected output: %s", expectedOutput,
-       )
-       s.Require().True(
-               strings.HasSuffix(strings.TrimSpace(output), 
strings.TrimSpace(expectedOutput)),
-               "result does not contain expected output: %s", expectedOutput,
-       )
++-----+------+----------------------+----+----+-----+------+-------------------+-------------------+----------+------------------------------------+------+-------------------------------------------------+---------+-----------------+-------------------------+------------+`)
 }
 
 func (s *SparkIntegrationTestSuite) TestUpdateSpec() {
@@ -345,22 +327,12 @@ func (s *SparkIntegrationTestSuite) TestUpdateSpec() {
        s.Require().NoError(err)
        _, err = tx.Commit(s.ctx)
 
-       partitionExpectedOutput := `
-|# Partitioning              |                                            |    
   |
+       output, err := recipe.ExecuteSpark(s.T(), "./validation.py", "--sql", 
"DESCRIBE TABLE EXTENDED default.go_test_update_spec")
+       s.Require().NoError(err)
+       s.Require().Contains(output, `|# Partitioning              |            
                                |       |
 |Part 0                      |truncate(5, bar)                            |    
   |
 |Part 1                      |bucket(3, baz)                              |    
   |
-|                            |                                            |    
   |
-`
-       metadataPartition := `
-|_partition                  |struct<bar_truncate:string,baz_bucket_3:int>|    
   |
-`
-
-       output, err := recipe.ExecuteSpark(s.T(), "./validation.py", "--test", 
"TestReadSpecUpdate")
-       s.Require().NoError(err)
-       s.Require().True(
-               strings.Contains(strings.TrimSpace(output), 
strings.TrimSpace(partitionExpectedOutput)),
-               "result does not contain expected output: %s", 
metadataPartition,
-       )
+|                            |                                            |    
   |`)
 }
 
 func (s *SparkIntegrationTestSuite) TestOverwriteBasic() {
@@ -407,27 +379,14 @@ func (s *SparkIntegrationTestSuite) TestOverwriteBasic() {
        _, err = tx.Commit(s.ctx)
        s.Require().NoError(err)
 
-       expectedOutput := `
-+--------+
-|count(1)|
-+--------+
-|2       |
-+--------+
-
-+-----+-----------+---+
+       output, err := recipe.ExecuteSpark(s.T(), "./validation.py", "--sql", 
"SELECT * FROM default.go_test_overwrite_basic ORDER BY baz")
+       s.Require().NoError(err)
+       s.Require().Contains(output, `+-----+-----------+---+
 |foo  |bar        |baz|
 +-----+-----------+---+
 |false|overwritten|300|
 |true |new_data   |400|
-+-----+-----------+---+
-`
-
-       output, err := recipe.ExecuteSpark(s.T(), "./validation.py", "--test", 
"TestOverwriteBasic")
-       s.Require().NoError(err)
-       s.Require().True(
-               strings.HasSuffix(strings.TrimSpace(output), 
strings.TrimSpace(expectedOutput)),
-               "result does not contain expected output: %s", expectedOutput,
-       )
++-----+-----------+---+`)
 }
 
 func (s *SparkIntegrationTestSuite) TestOverwriteWithFilter() {
@@ -474,27 +433,106 @@ func (s *SparkIntegrationTestSuite) 
TestOverwriteWithFilter() {
        _, err = tx.Commit(s.ctx)
        s.Require().NoError(err)
 
-       expectedOutput := `
-+--------+
-|count(1)|
-+--------+
-|2       |
-+--------+
-
-+-----+---------------+---+
+       output, err := recipe.ExecuteSpark(s.T(), "./validation.py", "--sql", 
"SELECT * FROM default.go_test_overwrite_filter ORDER BY baz")
+       s.Require().NoError(err)
+       s.Require().Contains(output, `+-----+---------------+---+
 |foo  |bar            |baz|
 +-----+---------------+---+
 |false|should_remain  |200|
 |true |new_replacement|999|
-+-----+---------------+---+
-`
++-----+---------------+---+`)
+}
+
+func (s *SparkIntegrationTestSuite) TestDelete() {
+       icebergSchema := iceberg.NewSchema(0,
+               iceberg.NestedField{ID: 1, Name: "first_name", Type: 
iceberg.PrimitiveTypes.String},
+               iceberg.NestedField{ID: 2, Name: "last_name", Type: 
iceberg.PrimitiveTypes.String},
+               iceberg.NestedField{ID: 3, Name: "age", Type: 
iceberg.PrimitiveTypes.Int32},
+       )
 
-       output, err := recipe.ExecuteSpark(s.T(), "./validation.py", "--test", 
"TestOverwriteWithFilter")
+       tbl, err := s.cat.CreateTable(s.ctx, catalog.ToIdentifier("default", 
"go_test_delete"), icebergSchema)
        s.Require().NoError(err)
-       s.Require().True(
-               strings.HasSuffix(strings.TrimSpace(output), 
strings.TrimSpace(expectedOutput)),
-               "result does not contain expected output: %s", expectedOutput,
+
+       arrowSchema, err := table.SchemaToArrowSchema(icebergSchema, nil, true, 
false)
+       s.Require().NoError(err)
+
+       initialTable, err := array.TableFromJSON(memory.DefaultAllocator, 
arrowSchema, []string{
+               `[
+                       {"first_name": "alan", "last_name": "gopher", "age": 7},
+                       {"first_name": "steve", "last_name": "gopher", "age": 
5},
+                       {"first_name": "dead", "last_name": "gopher", "age": 97}
+               ]`,
+       })
+       s.Require().NoError(err)
+       defer initialTable.Release()
+
+       tx := tbl.NewTransaction()
+       err = tx.AppendTable(s.ctx, initialTable, 3, nil)
+       s.Require().NoError(err)
+       tbl, err = tx.Commit(s.ctx)
+       s.Require().NoError(err)
+
+       // Delete the dead gopher and confirm that alan and steve are still 
present
+       filter := iceberg.EqualTo(iceberg.Reference("first_name"), "dead")
+       tx = tbl.NewTransaction()
+       err = tx.Delete(s.ctx, filter, nil)
+       s.Require().NoError(err)
+       _, err = tx.Commit(s.ctx)
+       s.Require().NoError(err)
+
+       output, err := recipe.ExecuteSpark(s.T(), "./validation.py", "--sql", 
"SELECT * FROM default.go_test_delete ORDER BY age")
+       s.Require().NoError(err)
+       s.Require().Contains(output, `|first_name|last_name|age|
++----------+---------+---+
+|steve     |gopher   |5  |
+|alan      |gopher   |7  |
++----------+---------+---+`)
+}
+
+func (s *SparkIntegrationTestSuite) TestDeleteInsensitive() {
+       icebergSchema := iceberg.NewSchema(0,
+               iceberg.NestedField{ID: 1, Name: "first_name", Type: 
iceberg.PrimitiveTypes.String},
+               iceberg.NestedField{ID: 2, Name: "last_name", Type: 
iceberg.PrimitiveTypes.String},
+               iceberg.NestedField{ID: 3, Name: "age", Type: 
iceberg.PrimitiveTypes.Int32},
        )
+
+       tbl, err := s.cat.CreateTable(s.ctx, catalog.ToIdentifier("default", 
"go_test_delete_insensitive"), icebergSchema)
+       s.Require().NoError(err)
+
+       arrowSchema, err := table.SchemaToArrowSchema(icebergSchema, nil, true, 
false)
+       s.Require().NoError(err)
+
+       initialTable, err := array.TableFromJSON(memory.DefaultAllocator, 
arrowSchema, []string{
+               `[
+                       {"first_name": "alan", "last_name": "gopher", "age": 7},
+                       {"first_name": "steve", "last_name": "gopher", "age": 
5},
+                       {"first_name": "dead", "last_name": "gopher", "age": 97}
+               ]`,
+       })
+       s.Require().NoError(err)
+       defer initialTable.Release()
+
+       tx := tbl.NewTransaction()
+       err = tx.AppendTable(s.ctx, initialTable, 3, nil)
+       s.Require().NoError(err)
+       tbl, err = tx.Commit(s.ctx)
+       s.Require().NoError(err)
+
+       // Delete the dead gopher and confirm that alan and steve are still 
present
+       filter := iceberg.EqualTo(iceberg.Reference("FIRST_NAME"), "dead")
+       tx = tbl.NewTransaction()
+       err = tx.Delete(s.ctx, filter, nil, table.WithDeleteCaseInsensitive())
+       s.Require().NoError(err)
+       _, err = tx.Commit(s.ctx)
+       s.Require().NoError(err)
+
+       output, err := recipe.ExecuteSpark(s.T(), "./validation.py", "--sql", 
"SELECT * FROM default.go_test_delete_insensitive ORDER BY age")
+       s.Require().NoError(err)
+       s.Require().Contains(output, `|first_name|last_name|age|
++----------+---------+---+
+|steve     |gopher   |5  |
+|alan      |gopher   |7  |
++----------+---------+---+`)
 }
 
 func TestSparkIntegration(t *testing.T) {

Reply via email to