This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new cfab819d feat(table): add support for copy-on-write delete (#718)
cfab819d is described below
commit cfab819d919f3497a8f1f7e0062715cb407234fc
Author: Alex Normand <[email protected]>
AuthorDate: Sat Feb 14 14:19:38 2026 -0800
feat(table): add support for copy-on-write delete (#718)

This adds support for performing delete operations on a table with the
addition of `Delete(ctx context.Context, filter
iceberg.BooleanExpression, snapshotProps iceberg.Properties, opts
...DeleteOption)` on both transaction and table (as with the other
methods, the table version just wraps creating a new transaction, doing
the thing and then committing it).
I did some refactoring to reuse the deletion code from the overwrite
implementation and also updated the integration tests to make it easier
to add validation sql statements. I hope people agree that adding a test
function in `validation.py` that wraps a spark SQL statement to perform
the validation is less than ideal. To keep the validation SQL closer to
the test logic, I refactored `validation.py` to have a `sql` argument
which allows the integration test to run the validation script and pass
it the SQL needed to get the output used for the assertions. The
tradeoff is that some of the validations were running two sql statements
which makes for two spark sql round trips now instead of a single one.
However, for most of them, the validation was redundant (doing a count
and _then_ getting the full rows which implicitly also shows the number
of rows) so only one test is ending up running two sql statements.
Thanks to @dontirun for contributing the overwrite support in
https://github.com/apache/iceberg-go/pull/674 which set the foundation
to make this very easy since the `copy-on-write` deletion is essentially
a subset of the overwrite with filter.
---
README.md | 21 ++---
internal/recipe/validation.py | 60 ++-----------
table/properties.go | 9 ++
table/table.go | 23 +++++
table/table_test.go | 69 +++++++++++++++
table/transaction.go | 129 +++++++++++++++++++++++-----
table/transaction_test.go | 190 +++++++++++++++++++++++++-----------------
7 files changed, 337 insertions(+), 164 deletions(-)
diff --git a/README.md b/README.md
index d6cb2416..0d3f2dea 100644
--- a/README.md
+++ b/README.md
@@ -93,16 +93,17 @@ $ cd iceberg-go/cmd/iceberg && go build .
As long as the FileSystem is supported and the Catalog supports altering
the table, the following tracks the current write support:
-| Operation |Supported|
-|:-----------------:|:-------:|
-| Append Stream | X |
-| Append Data Files | X |
-| Rewrite Files | |
-| Rewrite manifests | |
-| Overwrite Files | X |
-| Write Pos Delete | |
-| Write Eq Delete | |
-| Row Delta | |
+| Operation | Supported |
+|:-----------------------:|:---------:|
+| Append Stream | X |
+| Append Data Files | X |
+| Rewrite Files | |
+| Rewrite manifests | |
+| Overwrite Files | X |
+| Copy-On-Write Delete | X |
+| Write Pos Delete | |
+| Write Eq Delete | |
+| Row Delta | |
### CLI Usage
diff --git a/internal/recipe/validation.py b/internal/recipe/validation.py
index 7c1b2ecb..c43366f2 100644
--- a/internal/recipe/validation.py
+++ b/internal/recipe/validation.py
@@ -21,66 +21,16 @@ from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
-def testSetProperties():
- spark.sql("SHOW TBLPROPERTIES
default.go_test_set_properties").show(truncate=False)
-
-
-def testAddedFile():
- spark.sql("SELECT COUNT(*) FROM default.test_partitioned_by_days").show(
- truncate=False
- )
-
-
-def testReadDifferentDataTypes():
- spark.sql("DESCRIBE TABLE EXTENDED
default.go_test_different_data_types").show(
- truncate=False
- )
- spark.sql("SELECT * FROM
default.go_test_different_data_types").show(truncate=False)
-
-
-def testReadSpecUpdate():
- spark.sql("DESCRIBE TABLE EXTENDED default.go_test_update_spec").show(
- truncate=False
- )
-
-
-def testOverwriteBasic():
- spark.sql("SELECT COUNT(*) FROM default.go_test_overwrite_basic").show(
- truncate=False
- )
- spark.sql("SELECT * FROM default.go_test_overwrite_basic ORDER BY
baz").show(
- truncate=False
- )
-
-
-def testOverwriteWithFilter():
- spark.sql("SELECT COUNT(*) FROM default.go_test_overwrite_filter").show(
- truncate=False
- )
- spark.sql("SELECT * FROM default.go_test_overwrite_filter ORDER BY
baz").show(
+def runSQL(sql):
+ spark.sql(sql).show(
truncate=False
)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
- parser.add_argument("--test", type=str, required=True, help="Name of the
test to run")
+ parser.add_argument("--sql", type=str, required=True, help="Validation SQL
statement to execute")
args = parser.parse_args()
- if args.test == "TestSetProperties":
- testSetProperties()
-
- if args.test == "TestAddedFile":
- testAddedFile()
-
- if args.test == "TestReadDifferentDataTypes":
- testReadDifferentDataTypes()
-
- if args.test == "TestReadSpecUpdate":
- testReadSpecUpdate()
-
- if args.test == "TestOverwriteBasic":
- testOverwriteBasic()
-
- if args.test == "TestOverwriteWithFilter":
- testOverwriteWithFilter()
+ if args.sql:
+ runSQL(args.sql)
\ No newline at end of file
diff --git a/table/properties.go b/table/properties.go
index cab74e83..560b0a1f 100644
--- a/table/properties.go
+++ b/table/properties.go
@@ -67,6 +67,9 @@ const (
WritePartitionSummaryLimitKey = "write.summary.partition-limit"
WritePartitionSummaryLimitDefault = 0
+ WriteDeleteModeKey = "write.delete.mode"
+ WriteDeleteModeDefault = WriteModeCopyOnWrite
+
MetadataDeleteAfterCommitEnabledKey =
"write.metadata.delete-after-commit.enabled"
MetadataDeleteAfterCommitEnabledDefault = false
@@ -119,3 +122,9 @@ const (
MetadataCompressionCodecNone = "none"
MetadataCompressionCodecGzip = "gzip"
)
+
+// Write modes
+const (
+ WriteModeCopyOnWrite = "copy-on-write"
+ WriteModeMergeOnRead = "merge-on-read"
+)
diff --git a/table/table.go b/table/table.go
index e5e446ee..084b37ff 100644
--- a/table/table.go
+++ b/table/table.go
@@ -184,6 +184,29 @@ func (t Table) Overwrite(ctx context.Context, rdr
array.RecordReader, snapshotPr
return txn.Commit(ctx)
}
+// Delete is a shortcut for NewTransaction().Delete() and then committing the
transaction.
+//
+// The provided filter acts as a row-level predicate on existing data:
+// - Files where all rows match the filter (strict match) are completely
deleted
+// - Files where some rows match and others don't (partial match) are
rewritten to keep only non-matching rows
+// - Files where no rows match the filter are kept unchanged
+//
+// The filter uses both inclusive and strict metrics evaluators on file
statistics to classify files:
+// - Inclusive evaluator identifies candidate files that may contain
matching rows
+// - Strict evaluator determines if all rows in a file must match the filter
+// - Files that pass inclusive but not strict evaluation are rewritten with
filtered data
+//
+// The concurrency parameter controls the level of parallelism for manifest
processing and file rewriting and
+// can be overridden using the WithOverwriteConcurrency option. Defaults to
runtime.GOMAXPROCS(0).
+func (t Table) Delete(ctx context.Context, filter iceberg.BooleanExpression,
snapshotProps iceberg.Properties, opts ...DeleteOption) (*Table, error) {
+ txn := t.NewTransaction()
+ if err := txn.Delete(ctx, filter, snapshotProps, opts...); err != nil {
+ return nil, err
+ }
+
+ return txn.Commit(ctx)
+}
+
func (t Table) AllManifests(ctx context.Context)
iter.Seq2[iceberg.ManifestFile, error] {
fs, err := t.fsF(ctx)
if err != nil {
diff --git a/table/table_test.go b/table/table_test.go
index e98cf0ea..ae1bbdde 100644
--- a/table/table_test.go
+++ b/table/table_test.go
@@ -22,6 +22,7 @@ import (
"compress/gzip"
"context"
"encoding/json"
+ "errors"
"fmt"
"io"
"io/fs"
@@ -1663,6 +1664,74 @@ func (t *TableWritingTestSuite) TestOverwriteRecord() {
t.Equal(table.OpAppend, snapshot.Summary.Operation) // Empty table
overwrite becomes append
}
+// TestDelete verifies that Table.Delete properly delegates to
Transaction.Delete
+func (t *TableWritingTestSuite) TestDelete() {
+ testCases := []struct {
+ name string
+ table *table.Table
+ expectedErr error
+ }{
+ {
+ name: "success with copy-on-write",
+ table: t.createTable(
+ table.Identifier{"default",
"overwrite_record_wrapper_v" + strconv.Itoa(t.formatVersion)},
+ t.formatVersion,
+ *iceberg.UnpartitionedSpec,
+ t.tableSchema,
+ ),
+ expectedErr: nil,
+ },
+ {
+ name: "abort on merge-on-read",
+ table: t.createTableWithProps(
+ table.Identifier{"default",
"overwrite_record_wrapper_v" + strconv.Itoa(t.formatVersion)},
+ map[string]string{
+ table.PropertyFormatVersion:
strconv.Itoa(t.formatVersion),
+ table.WriteDeleteModeKey:
table.WriteModeMergeOnRead,
+ },
+ t.tableSchema,
+ ),
+ expectedErr: errors.New("only 'copy-on-write' is
currently supported"),
+ },
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.name, func() {
+ // Set up the test table with some data
+ newTable, err :=
array.TableFromJSON(memory.DefaultAllocator, t.arrSchema, []string{
+ `[{"foo": false, "bar": "wrapper_test", "baz":
123, "qux": "2024-01-01"}]`,
+ })
+ t.Require().NoError(err)
+ defer newTable.Release()
+
+ tbl, err := tc.table.OverwriteTable(t.ctx, newTable, 1,
nil)
+ t.Require().NoError(err)
+
+ // Validate the pre-requisite that data is present on
the table before we go ahead and delete it
+ arrowTable, err := tbl.Scan().ToArrowTable(t.ctx)
+ t.Require().NoError(err)
+ t.Equal(int64(1), arrowTable.NumRows())
+
+ tbl, err = tbl.Delete(t.ctx,
iceberg.EqualTo(iceberg.Reference("bar"), "wrapper_test"), nil)
+ // If an error was expected, check that it's the
correct one and abort validating the operation
+ if tc.expectedErr != nil {
+ t.Require().ErrorContains(err,
tc.expectedErr.Error())
+
+ return
+ }
+
+ snapshot := tbl.CurrentSnapshot()
+ t.NotNil(snapshot)
+ t.Equal(table.OpDelete, snapshot.Summary.Operation)
+
+ arrowTable, err = tbl.Scan().ToArrowTable(t.ctx)
+ t.Require().NoError(err)
+
+ t.Equal(int64(0), arrowTable.NumRows())
+ })
+ }
+}
+
func TestTableWriting(t *testing.T) {
suite.Run(t, &TableWritingTestSuite{formatVersion: 1})
suite.Run(t, &TableWritingTestSuite{formatVersion: 2})
diff --git a/table/transaction.go b/table/transaction.go
index 6bf1fc00..06b01631 100644
--- a/table/transaction.go
+++ b/table/transaction.go
@@ -40,6 +40,7 @@ type snapshotUpdate struct {
txn *Transaction
io io.WriteFileIO
snapshotProps iceberg.Properties
+ operation Operation
}
func (s snapshotUpdate) fastAppend() *snapshotProducer {
@@ -47,8 +48,8 @@ func (s snapshotUpdate) fastAppend() *snapshotProducer {
}
func (s snapshotUpdate) mergeOverwrite(commitUUID *uuid.UUID)
*snapshotProducer {
- op := OpOverwrite
- if s.txn.meta.currentSnapshot() == nil {
+ op := s.operation
+ if s.operation == OpOverwrite && s.txn.meta.currentSnapshot() == nil {
op = OpAppend
}
@@ -120,7 +121,7 @@ func (t *Transaction) apply(updates []Update, reqs
[]Requirement) error {
func (t *Transaction) appendSnapshotProducer(afs io.IO, props
iceberg.Properties) *snapshotProducer {
manifestMerge := t.meta.props.GetBool(ManifestMergeEnabledKey,
ManifestMergeEnabledDefault)
- updateSnapshot := t.updateSnapshot(afs, props)
+ updateSnapshot := t.updateSnapshot(afs, props, OpAppend)
if manifestMerge {
return updateSnapshot.mergeAppend()
}
@@ -128,11 +129,12 @@ func (t *Transaction) appendSnapshotProducer(afs io.IO,
props iceberg.Properties
return updateSnapshot.fastAppend()
}
-func (t *Transaction) updateSnapshot(fs io.IO, props iceberg.Properties)
snapshotUpdate {
+func (t *Transaction) updateSnapshot(fs io.IO, props iceberg.Properties,
operation Operation) snapshotUpdate {
return snapshotUpdate{
txn: t,
io: fs.(io.WriteFileIO),
snapshotProps: props,
+ operation: operation,
}
}
@@ -411,7 +413,7 @@ func (t *Transaction) ReplaceDataFiles(ctx context.Context,
filesToDelete, files
}
commitUUID := uuid.New()
- updater := t.updateSnapshot(fs,
snapshotProps).mergeOverwrite(&commitUUID)
+ updater := t.updateSnapshot(fs, snapshotProps,
OpOverwrite).mergeOverwrite(&commitUUID)
for _, df := range markedForDeletion {
updater.deleteDataFile(df)
@@ -482,7 +484,7 @@ func (t *Transaction) AddFiles(ctx context.Context, files
[]string, snapshotProp
return err
}
- updater := t.updateSnapshot(fs, snapshotProps).fastAppend()
+ updater := t.updateSnapshot(fs, snapshotProps, OpAppend).fastAppend()
dataFiles := filesToDataFiles(ctx, fs, t.meta, slices.Values(files))
for df, err := range dataFiles {
@@ -595,29 +597,61 @@ func (t *Transaction) Overwrite(ctx context.Context, rdr
array.RecordReader, sna
apply(&overwrite)
}
+ updater, err := t.performCopyOnWriteDeletion(ctx, OpOverwrite,
snapshotProps, overwrite.filter, overwrite.caseSensitive, overwrite.concurrency)
+ if err != nil {
+ return err
+ }
+
fs, err := t.tbl.fsF(ctx)
if err != nil {
return err
}
+ itr := recordsToDataFiles(ctx, t.tbl.Location(), t.meta,
recordWritingArgs{
+ sc: rdr.Schema(),
+ itr: array.IterFromReader(rdr),
+ fs: fs.(io.WriteFileIO),
+ writeUUID: &updater.commitUuid,
+ })
+
+ for df, err := range itr {
+ if err != nil {
+ return err
+ }
+ updater.appendDataFile(df)
+ }
+
+ updates, reqs, err := updater.commit()
+ if err != nil {
+ return err
+ }
+
+ return t.apply(updates, reqs)
+}
+
+func (t *Transaction) performCopyOnWriteDeletion(ctx context.Context,
operation Operation, snapshotProps iceberg.Properties, filter
iceberg.BooleanExpression, caseSensitive bool, concurrency int)
(*snapshotProducer, error) {
+ fs, err := t.tbl.fsF(ctx)
+ if err != nil {
+ return nil, err
+ }
if t.meta.NameMapping() == nil {
nameMapping := t.meta.CurrentSchema().NameMapping()
mappingJson, err := json.Marshal(nameMapping)
if err != nil {
- return err
+ return nil, err
}
err = t.SetProperties(iceberg.Properties{DefaultNameMappingKey:
string(mappingJson)})
if err != nil {
- return err
+ return nil, err
}
}
commitUUID := uuid.New()
- updater := t.updateSnapshot(fs,
snapshotProps).mergeOverwrite(&commitUUID)
+ updater := t.updateSnapshot(fs, snapshotProps,
operation).mergeOverwrite(&commitUUID)
- filesToDelete, filesToRewrite, err := t.classifyFilesForOverwrite(ctx,
fs, overwrite.filter, overwrite.caseSensitive, overwrite.concurrency)
+ filesToDelete, filesToRewrite, err := t.classifyFilesForOverwrite(ctx,
fs, filter, caseSensitive, concurrency)
if err != nil {
- return err
+ return nil, err
}
for _, df := range filesToDelete {
@@ -625,25 +659,74 @@ func (t *Transaction) Overwrite(ctx context.Context, rdr
array.RecordReader, sna
}
if len(filesToRewrite) > 0 {
- if err := t.rewriteFilesWithFilter(ctx, fs, updater,
filesToRewrite, overwrite.filter, overwrite.caseSensitive,
overwrite.concurrency); err != nil {
- return err
+ if err := t.rewriteFilesWithFilter(ctx, fs, updater,
filesToRewrite, filter, caseSensitive, concurrency); err != nil {
+ return nil, err
}
}
- itr := recordsToDataFiles(ctx, t.tbl.Location(), t.meta,
recordWritingArgs{
- sc: rdr.Schema(),
- itr: array.IterFromReader(rdr),
- fs: fs.(io.WriteFileIO),
- writeUUID: &updater.commitUuid,
- })
+ return updater, nil
+}
- for df, err := range itr {
- if err != nil {
- return err
+type DeleteOption func(deleteOp *deleteOperation)
+
+type deleteOperation struct {
+ caseSensitive bool
+ concurrency int
+}
+
+// WithDeleteConcurrency overwrites the default concurrency for delete
operations.
+// Default: runtime.GOMAXPROCS(0)
+func WithDeleteConcurrency(concurrency int) DeleteOption {
+ return func(op *deleteOperation) {
+ if concurrency <= 0 {
+ op.concurrency = runtime.GOMAXPROCS(0)
+
+ return
}
- updater.appendDataFile(df)
+ op.concurrency = concurrency
}
+}
+
+// WithDeleteCaseInsensitive changes the binding of the filter to be case
insensitive instead of the
+// Default: case sensitive
+// Note that the sensitivity only applies to the field name and not the
evaluation of the literals on string fields.
+func WithDeleteCaseInsensitive() DeleteOption {
+ return func(deleteOp *deleteOperation) {
+ deleteOp.caseSensitive = false
+ }
+}
+// Delete deletes records matching the provided filter.
+//
+// The provided filter acts as a row-level predicate on existing data:
+// - Files where all rows match the filter (strict match) are completely
deleted
+// - Files where some rows match and others don't (partial match) are
rewritten to keep only non-matching rows
+// - Files where no rows match the filter are kept unchanged
+//
+// The filter uses both inclusive and strict metrics evaluators on file
statistics to classify files:
+// - Inclusive evaluator identifies candidate files that may contain
matching rows
+// - Strict evaluator determines if all rows in a file must match the filter
+// - Files that pass inclusive but not strict evaluation are rewritten with
filtered data
+//
+// The concurrency parameter controls the level of parallelism for manifest
processing and file rewriting and
+// can be overridden using the WithOverwriteConcurrency option. Defaults to
runtime.GOMAXPROCS(0).
+func (t *Transaction) Delete(ctx context.Context, filter
iceberg.BooleanExpression, snapshotProps iceberg.Properties, opts
...DeleteOption) error {
+ deleteOp := deleteOperation{
+ concurrency: runtime.GOMAXPROCS(0),
+ caseSensitive: true,
+ }
+ for _, apply := range opts {
+ apply(&deleteOp)
+ }
+
+ writeDeleteMode := t.meta.props.Get(WriteDeleteModeKey,
WriteDeleteModeDefault)
+ if writeDeleteMode != WriteModeCopyOnWrite {
+ return fmt.Errorf("'%s' is set to '%s' but only '%s' is
currently supported", WriteDeleteModeKey, writeDeleteMode, WriteModeCopyOnWrite)
+ }
+ updater, err := t.performCopyOnWriteDeletion(ctx, OpDelete,
snapshotProps, filter, deleteOp.caseSensitive, deleteOp.concurrency)
+ if err != nil {
+ return err
+ }
updates, reqs, err := updater.commit()
if err != nil {
return err
diff --git a/table/transaction_test.go b/table/transaction_test.go
index 2a8f3bde..1f64adf1 100644
--- a/table/transaction_test.go
+++ b/table/transaction_test.go
@@ -92,8 +92,9 @@ func (s *SparkIntegrationTestSuite) TestSetProperties() {
_, err = tx.Commit(s.ctx)
s.Require().NoError(err)
- expectedOutput := `
-+----------------------------------+---------------+
+ output, err := recipe.ExecuteSpark(s.T(), "./validation.py", "--sql",
"SHOW TBLPROPERTIES default.go_test_set_properties")
+ s.Require().NoError(err)
+ s.Require().Contains(output,
`+----------------------------------+---------------+
|key |value |
+----------------------------------+---------------+
|commit.manifest-merge.enabled |true |
@@ -103,15 +104,7 @@ func (s *SparkIntegrationTestSuite) TestSetProperties() {
|format |iceberg/parquet|
|format-version |2 |
|write.parquet.compression-codec |snappy |
-+----------------------------------+---------------+
-`
-
- output, err := recipe.ExecuteSpark(s.T(), "./validation.py", "--test",
"TestSetProperties")
- s.Require().NoError(err)
- s.Require().True(
- strings.HasSuffix(strings.TrimSpace(output),
strings.TrimSpace(expectedOutput)),
- "result does not contain expected output: %s", expectedOutput,
- )
++----------------------------------+---------------+`)
}
func (s *SparkIntegrationTestSuite) TestAddFile() {
@@ -163,7 +156,7 @@ func (s *SparkIntegrationTestSuite) TestAddFile() {
+--------+
`
- output, err := recipe.ExecuteSpark(s.T(), "./validation.py", "--test",
"TestAddedFile")
+ output, err := recipe.ExecuteSpark(s.T(), "./validation.py", "--sql",
"SELECT COUNT(*) FROM default.test_partitioned_by_days")
s.Require().NoError(err)
s.Require().True(
strings.HasSuffix(strings.TrimSpace(output),
strings.TrimSpace(expectedOutput)),
@@ -274,8 +267,9 @@ func (s *SparkIntegrationTestSuite)
TestDifferentDataTypes() {
_, err = tx.Commit(s.ctx)
s.Require().NoError(err)
- expectedSchema := `
-+------------------+-------------+-------+
+ output, err := recipe.ExecuteSpark(s.T(), "./validation.py", "--sql",
"DESCRIBE TABLE EXTENDED default.go_test_different_data_types")
+ s.Require().NoError(err)
+ s.Require().Contains(output, `+------------------+-------------+-------+
|col_name |data_type |comment|
+------------------+-------------+-------+
|bool |boolean |NULL |
@@ -294,29 +288,17 @@ func (s *SparkIntegrationTestSuite)
TestDifferentDataTypes() {
|small_dec |decimal(8,2) |NULL |
|med_dec |decimal(16,2)|NULL |
|large_dec |decimal(24,2)|NULL |
-|list |array<int> |NULL |
-`
+|list |array<int> |NULL |`)
- expectedOutput := `
-+-----+------+----------------------+----+----+-----+------+-------------------+-------------------+----------+------------------------------------+------+-------------------------------------------------+---------+-----------------+-------------------------+------------+
+ output, err = recipe.ExecuteSpark(s.T(), "./validation.py", "--sql",
"SELECT * FROM default.go_test_different_data_types")
+ s.Require().NoError(err)
+ s.Require().Contains(output,
`+-----+------+----------------------+----+----+-----+------+-------------------+-------------------+----------+------------------------------------+------+-------------------------------------------------+---------+-----------------+-------------------------+------------+
|bool |string|string_long |int |long|float|double|timestamp
|timestamptz |date |uuid
|binary|fixed |small_dec|med_dec
|large_dec |list |
+-----+------+----------------------+----+----+-----+------+-------------------+-------------------+----------+------------------------------------+------+-------------------------------------------------+---------+-----------------+-------------------------+------------+
|false|a |aaaaaaaaaaaaaaaaaaaaaa|1 |1 |0.0 |0.0 |2023-01-01
11:25:00|2023-01-01
19:25:00|2023-01-01|00000000-0000-0000-0000-000000000000|[01] |[00 00 00 00 00
00 00 00 00 00 00 00 00 00 00
00]|123456.78|12345678901234.56|1234567890123456789012.34|[1, 2, 3] |
|NULL |NULL |NULL |NULL|NULL|NULL |NULL |NULL
|NULL |NULL |NULL |NULL
|NULL |NULL |NULL
|NULL |NULL |
|true |z |zzzzzzzzzzzzzzzzzzzzzz|9 |9 |0.9 |0.9 |2023-03-01
11:25:00|2023-03-01
19:25:00|2023-03-01|11111111-1111-1111-1111-111111111111|[12] |[11 11 11 11 11
11 11 11 11 11 11 11 11 11 11
11]|876543.21|65432109876543.21|4321098765432109876543.21|[-1, -2, -3]|
-+-----+------+----------------------+----+----+-----+------+-------------------+-------------------+----------+------------------------------------+------+-------------------------------------------------+---------+-----------------+-------------------------+------------+
-`
-
- output, err := recipe.ExecuteSpark(s.T(), "./validation.py", "--test",
"TestReadDifferentDataTypes")
- s.Require().NoError(err)
- s.Require().True(
- strings.Contains(strings.TrimSpace(output),
strings.TrimSpace(expectedSchema)),
- "result does not contain expected output: %s", expectedOutput,
- )
- s.Require().True(
- strings.HasSuffix(strings.TrimSpace(output),
strings.TrimSpace(expectedOutput)),
- "result does not contain expected output: %s", expectedOutput,
- )
++-----+------+----------------------+----+----+-----+------+-------------------+-------------------+----------+------------------------------------+------+-------------------------------------------------+---------+-----------------+-------------------------+------------+`)
}
func (s *SparkIntegrationTestSuite) TestUpdateSpec() {
@@ -345,22 +327,12 @@ func (s *SparkIntegrationTestSuite) TestUpdateSpec() {
s.Require().NoError(err)
_, err = tx.Commit(s.ctx)
- partitionExpectedOutput := `
-|# Partitioning | |
|
+ output, err := recipe.ExecuteSpark(s.T(), "./validation.py", "--sql",
"DESCRIBE TABLE EXTENDED default.go_test_update_spec")
+ s.Require().NoError(err)
+ s.Require().Contains(output, `|# Partitioning |
| |
|Part 0 |truncate(5, bar) |
|
|Part 1 |bucket(3, baz) |
|
-| | |
|
-`
- metadataPartition := `
-|_partition |struct<bar_truncate:string,baz_bucket_3:int>|
|
-`
-
- output, err := recipe.ExecuteSpark(s.T(), "./validation.py", "--test",
"TestReadSpecUpdate")
- s.Require().NoError(err)
- s.Require().True(
- strings.Contains(strings.TrimSpace(output),
strings.TrimSpace(partitionExpectedOutput)),
- "result does not contain expected output: %s",
metadataPartition,
- )
+| | |
|`)
}
func (s *SparkIntegrationTestSuite) TestOverwriteBasic() {
@@ -407,27 +379,14 @@ func (s *SparkIntegrationTestSuite) TestOverwriteBasic() {
_, err = tx.Commit(s.ctx)
s.Require().NoError(err)
- expectedOutput := `
-+--------+
-|count(1)|
-+--------+
-|2 |
-+--------+
-
-+-----+-----------+---+
+ output, err := recipe.ExecuteSpark(s.T(), "./validation.py", "--sql",
"SELECT * FROM default.go_test_overwrite_basic ORDER BY baz")
+ s.Require().NoError(err)
+ s.Require().Contains(output, `+-----+-----------+---+
|foo |bar |baz|
+-----+-----------+---+
|false|overwritten|300|
|true |new_data |400|
-+-----+-----------+---+
-`
-
- output, err := recipe.ExecuteSpark(s.T(), "./validation.py", "--test",
"TestOverwriteBasic")
- s.Require().NoError(err)
- s.Require().True(
- strings.HasSuffix(strings.TrimSpace(output),
strings.TrimSpace(expectedOutput)),
- "result does not contain expected output: %s", expectedOutput,
- )
++-----+-----------+---+`)
}
func (s *SparkIntegrationTestSuite) TestOverwriteWithFilter() {
@@ -474,27 +433,106 @@ func (s *SparkIntegrationTestSuite)
TestOverwriteWithFilter() {
_, err = tx.Commit(s.ctx)
s.Require().NoError(err)
- expectedOutput := `
-+--------+
-|count(1)|
-+--------+
-|2 |
-+--------+
-
-+-----+---------------+---+
+ output, err := recipe.ExecuteSpark(s.T(), "./validation.py", "--sql",
"SELECT * FROM default.go_test_overwrite_filter ORDER BY baz")
+ s.Require().NoError(err)
+ s.Require().Contains(output, `+-----+---------------+---+
|foo |bar |baz|
+-----+---------------+---+
|false|should_remain |200|
|true |new_replacement|999|
-+-----+---------------+---+
-`
++-----+---------------+---+`)
+}
+
+func (s *SparkIntegrationTestSuite) TestDelete() {
+ icebergSchema := iceberg.NewSchema(0,
+ iceberg.NestedField{ID: 1, Name: "first_name", Type:
iceberg.PrimitiveTypes.String},
+ iceberg.NestedField{ID: 2, Name: "last_name", Type:
iceberg.PrimitiveTypes.String},
+ iceberg.NestedField{ID: 3, Name: "age", Type:
iceberg.PrimitiveTypes.Int32},
+ )
- output, err := recipe.ExecuteSpark(s.T(), "./validation.py", "--test",
"TestOverwriteWithFilter")
+ tbl, err := s.cat.CreateTable(s.ctx, catalog.ToIdentifier("default",
"go_test_delete"), icebergSchema)
s.Require().NoError(err)
- s.Require().True(
- strings.HasSuffix(strings.TrimSpace(output),
strings.TrimSpace(expectedOutput)),
- "result does not contain expected output: %s", expectedOutput,
+
+ arrowSchema, err := table.SchemaToArrowSchema(icebergSchema, nil, true,
false)
+ s.Require().NoError(err)
+
+ initialTable, err := array.TableFromJSON(memory.DefaultAllocator,
arrowSchema, []string{
+ `[
+ {"first_name": "alan", "last_name": "gopher", "age": 7},
+ {"first_name": "steve", "last_name": "gopher", "age":
5},
+ {"first_name": "dead", "last_name": "gopher", "age": 97}
+ ]`,
+ })
+ s.Require().NoError(err)
+ defer initialTable.Release()
+
+ tx := tbl.NewTransaction()
+ err = tx.AppendTable(s.ctx, initialTable, 3, nil)
+ s.Require().NoError(err)
+ tbl, err = tx.Commit(s.ctx)
+ s.Require().NoError(err)
+
+ // Delete the dead gopher and confirm that alan and steve are still
present
+ filter := iceberg.EqualTo(iceberg.Reference("first_name"), "dead")
+ tx = tbl.NewTransaction()
+ err = tx.Delete(s.ctx, filter, nil)
+ s.Require().NoError(err)
+ _, err = tx.Commit(s.ctx)
+ s.Require().NoError(err)
+
+ output, err := recipe.ExecuteSpark(s.T(), "./validation.py", "--sql",
"SELECT * FROM default.go_test_delete ORDER BY age")
+ s.Require().NoError(err)
+ s.Require().Contains(output, `|first_name|last_name|age|
++----------+---------+---+
+|steve |gopher |5 |
+|alan |gopher |7 |
++----------+---------+---+`)
+}
+
+func (s *SparkIntegrationTestSuite) TestDeleteInsensitive() {
+ icebergSchema := iceberg.NewSchema(0,
+ iceberg.NestedField{ID: 1, Name: "first_name", Type:
iceberg.PrimitiveTypes.String},
+ iceberg.NestedField{ID: 2, Name: "last_name", Type:
iceberg.PrimitiveTypes.String},
+ iceberg.NestedField{ID: 3, Name: "age", Type:
iceberg.PrimitiveTypes.Int32},
)
+
+ tbl, err := s.cat.CreateTable(s.ctx, catalog.ToIdentifier("default",
"go_test_delete_insensitive"), icebergSchema)
+ s.Require().NoError(err)
+
+ arrowSchema, err := table.SchemaToArrowSchema(icebergSchema, nil, true,
false)
+ s.Require().NoError(err)
+
+ initialTable, err := array.TableFromJSON(memory.DefaultAllocator,
arrowSchema, []string{
+ `[
+ {"first_name": "alan", "last_name": "gopher", "age": 7},
+ {"first_name": "steve", "last_name": "gopher", "age":
5},
+ {"first_name": "dead", "last_name": "gopher", "age": 97}
+ ]`,
+ })
+ s.Require().NoError(err)
+ defer initialTable.Release()
+
+ tx := tbl.NewTransaction()
+ err = tx.AppendTable(s.ctx, initialTable, 3, nil)
+ s.Require().NoError(err)
+ tbl, err = tx.Commit(s.ctx)
+ s.Require().NoError(err)
+
+ // Delete the dead gopher and confirm that alan and steve are still
present
+ filter := iceberg.EqualTo(iceberg.Reference("FIRST_NAME"), "dead")
+ tx = tbl.NewTransaction()
+ err = tx.Delete(s.ctx, filter, nil, table.WithDeleteCaseInsensitive())
+ s.Require().NoError(err)
+ _, err = tx.Commit(s.ctx)
+ s.Require().NoError(err)
+
+ output, err := recipe.ExecuteSpark(s.T(), "./validation.py", "--sql",
"SELECT * FROM default.go_test_delete_insensitive ORDER BY age")
+ s.Require().NoError(err)
+ s.Require().Contains(output, `|first_name|last_name|age|
++----------+---------+---+
+|steve |gopher |5 |
+|alan |gopher |7 |
++----------+---------+---+`)
}
func TestSparkIntegration(t *testing.T) {