This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new 6c041dde feat(table): add RowDelta API for atomic row-level mutations
(#789)
6c041dde is described below
commit 6c041dde5bd35cefa7c1bce2d245d7aeb4dfeddd
Author: Andrei Tserakhau <[email protected]>
AuthorDate: Mon Mar 23 22:06:57 2026 +0100
feat(table): add RowDelta API for atomic row-level mutations (#789)
Adds `Transaction.NewRowDelta()` — Go equivalent of Java's
`BaseRowDelta`. Commits data files and delete files (position or
equality) in one atomic snapshot. This is needed for row-level
mutations: an UPDATE becomes an equality delete for the old row + append
of the new row, both in one commit.
Resolves #602.
## API
```go
rd := tx.NewRowDelta(snapshotProps)
rd.AddRows(dataFile1, dataFile2)
rd.AddDeletes(posDeleteFile, eqDeleteFile)
rd.Commit(ctx)
```
Operation type picked automatically: data-only → `append`, deletes-only
→ `delete`, both → `overwrite`.
## Validation
- Delete files require format version >= 2
- Equality deletes must have non-empty `EqualityFieldIDs` referencing
existing schema columns
- Content types checked: no data files in `AddDeletes`, no delete files
in `AddRows`
## Known limitations
- No conflict detection for concurrent writers — documented in the type
comment
- Uses fast-append producer (no manifest merging)
## What's tested
The interesting ones:
- Commit data + position deletes, check snapshot summary has
`added-data-files=1`, `added-delete-files=1`, operation is `overwrite`
- Commit equality deletes, check `added-equality-delete-files` shows up
in summary
- Read back manifests after commit, verify there's one data manifest and
one delete manifest with correct content types in entries
- Two RowDeltas on same transaction (batch1 append, batch2
append+delete), verify cumulative `total-data-files`
- v1 table rejects delete files with clear error
- Equality delete file without field IDs → error
- Equality delete file with field ID 999 (not in schema) → error
The round-trip integration test:
1. Write 5 rows as real Parquet, append to table
2. Write a position delete file targeting positions 1 and 3, commit via
RowDelta
3. Scan the table back — get 3 rows, verify IDs are `[1, 3, 5]` (beta
and delta gone)
This covers the full path: write parquet → RowDelta commit → scan with
position delete filtering applied.
## What's left to do
This PR covers the commit API. Remaining work for full DML support:
- **Equality delete file writing** — a writer that produces Parquet
files with PK-only schema and `EntryContentEqDeletes` content type. The
RowDelta API already accepts them, but there's no convenient writer yet.
- **Equality delete reading** — the scanner currently errors with
"iceberg-go does not yet support equality deletes" (`scanner.go:415`).
Needs: collect eq delete entries during scan planning, match to data
files by partition + sequence number, apply hash-based anti-join during
Arrow reads.
- **Conflict validation** — `validateFromSnapshot`,
`validateNoConflictingDataFiles`, etc. Java's Flink connector skips most
of this for streaming, so it's not blocking for CDC use cases.
---
table/row_delta.go | 181 ++++++++++++++
table/row_delta_test.go | 557 ++++++++++++++++++++++++++++++++++++++++++++
table/snapshot_producers.go | 32 +--
table/transaction.go | 2 +-
4 files changed, 755 insertions(+), 17 deletions(-)
diff --git a/table/row_delta.go b/table/row_delta.go
new file mode 100644
index 00000000..df0072c9
--- /dev/null
+++ b/table/row_delta.go
@@ -0,0 +1,181 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+ "context"
+ "errors"
+ "fmt"
+
+ "github.com/apache/iceberg-go"
+ iceio "github.com/apache/iceberg-go/io"
+)
+
+// RowDelta encodes a set of row-level changes to a table: new data files
+// (inserts) and delete files (equality or position deletes). All changes
+// are committed atomically in a single snapshot.
+//
+// The operation type of the produced snapshot is determined automatically:
+// - Data files only → OpAppend
+// - Delete files only → OpDelete
+// - Both data and delete files → OpOverwrite
+//
+// This matches the semantics of Java's BaseRowDelta. It is the primary
+// API for CDC/streaming workloads where INSERTs, UPDATEs, and DELETEs
+// must be committed together.
+//
+// Note: conflict detection for concurrent writers is not yet implemented.
+// Concurrent RowDelta commits against the same table may produce incorrect
+// results if delete files miss newly appended data. For single-writer
+// workloads this is safe.
+//
+// Usage:
+//
+// rd := tx.NewRowDelta(snapshotProps)
+// rd.AddRows(dataFile1, dataFile2)
+// rd.AddDeletes(equalityDeleteFile1)
+// err := rd.Commit(ctx)
+type RowDelta struct {
+ txn *Transaction
+ dataFiles []iceberg.DataFile
+ delFiles []iceberg.DataFile
+ props iceberg.Properties
+}
+
+// NewRowDelta creates a new RowDelta for committing row-level changes
+// within this transaction. The provided properties are included in the
+// snapshot summary.
+func (t *Transaction) NewRowDelta(snapshotProps iceberg.Properties) *RowDelta {
+ return &RowDelta{
+ txn: t,
+ props: snapshotProps,
+ }
+}
+
+// AddRows adds data files containing new rows (inserts) to this RowDelta.
+func (rd *RowDelta) AddRows(files ...iceberg.DataFile) *RowDelta {
+ rd.dataFiles = append(rd.dataFiles, files...)
+
+ return rd
+}
+
+// AddDeletes adds delete files (equality or position) to this RowDelta.
+// Equality delete files must have ContentType == EntryContentEqDeletes
+// and non-empty EqualityFieldIDs referencing valid schema columns.
+// Position delete files must have ContentType == EntryContentPosDeletes.
+func (rd *RowDelta) AddDeletes(files ...iceberg.DataFile) *RowDelta {
+ rd.delFiles = append(rd.delFiles, files...)
+
+ return rd
+}
+
+// Commit validates and commits all accumulated row-level changes as a
+// single atomic snapshot. Returns an error if there are no files to
+// commit, if any file has an unexpected content type, or if the table
+// format version does not support delete files.
+func (rd *RowDelta) Commit(ctx context.Context) error {
+ if len(rd.dataFiles) == 0 && len(rd.delFiles) == 0 {
+ return errors.New("row delta must have at least one data file
or delete file")
+ }
+
+ // Delete files require format version >= 2.
+ if len(rd.delFiles) > 0 && rd.txn.meta.formatVersion < 2 {
+ return fmt.Errorf("delete files require table format version >=
2, got v%d",
+ rd.txn.meta.formatVersion)
+ }
+
+ for _, f := range rd.dataFiles {
+ if f.ContentType() != iceberg.EntryContentData {
+ return fmt.Errorf("expected data file, got content type
%s: %s",
+ f.ContentType(), f.FilePath())
+ }
+ }
+
+ schema := rd.txn.meta.CurrentSchema()
+ for _, f := range rd.delFiles {
+ ct := f.ContentType()
+ if ct != iceberg.EntryContentPosDeletes && ct !=
iceberg.EntryContentEqDeletes {
+ return fmt.Errorf("expected delete file, got content
type %s: %s",
+ ct, f.FilePath())
+ }
+
+ // Equality delete files must declare which columns form the
delete key,
+ // and those columns must exist in the current schema.
+ if ct == iceberg.EntryContentEqDeletes {
+ eqIDs := f.EqualityFieldIDs()
+ if len(eqIDs) == 0 {
+ return fmt.Errorf("equality delete file must
have non-empty EqualityFieldIDs: %s",
+ f.FilePath())
+ }
+
+ for _, id := range eqIDs {
+ if _, ok := schema.FindFieldByID(id); !ok {
+ return fmt.Errorf("equality field ID %d
not found in table schema: %s",
+ id, f.FilePath())
+ }
+ }
+ }
+ }
+
+ fs, err := rd.txn.tbl.fsF(ctx)
+ if err != nil {
+ return err
+ }
+
+ wfs, ok := fs.(iceio.WriteFileIO)
+ if !ok {
+ return errors.New("filesystem does not support writing")
+ }
+
+ op := rd.Operation()
+ producer := newFastAppendFilesProducer(op, rd.txn, wfs, nil, rd.props)
+
+ for _, f := range rd.dataFiles {
+ producer.appendDataFile(f)
+ }
+
+ for _, f := range rd.delFiles {
+ producer.appendDeleteFile(f)
+ }
+
+ updates, reqs, err := producer.commit()
+ if err != nil {
+ return err
+ }
+
+ return rd.txn.apply(updates, reqs)
+}
+
+// Operation returns the snapshot operation type that will be used when
+// this RowDelta is committed:
+// - data only → OpAppend
+// - deletes only → OpDelete
+// - both → OpOverwrite
+func (rd *RowDelta) Operation() Operation {
+ hasData := len(rd.dataFiles) > 0
+ hasDeletes := len(rd.delFiles) > 0
+
+ switch {
+ case hasData && hasDeletes:
+ return OpOverwrite
+ case hasDeletes:
+ return OpDelete
+ default:
+ return OpAppend
+ }
+}
diff --git a/table/row_delta_test.go b/table/row_delta_test.go
new file mode 100644
index 00000000..208953a0
--- /dev/null
+++ b/table/row_delta_test.go
@@ -0,0 +1,557 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table_test
+
+import (
+ "context"
+ "iter"
+ "path/filepath"
+ "strconv"
+ "strings"
+ "testing"
+
+ "github.com/apache/arrow-go/v18/arrow"
+ "github.com/apache/arrow-go/v18/arrow/array"
+ "github.com/apache/arrow-go/v18/arrow/memory"
+ "github.com/apache/arrow-go/v18/parquet"
+ "github.com/apache/arrow-go/v18/parquet/pqarrow"
+ "github.com/apache/iceberg-go"
+ iceio "github.com/apache/iceberg-go/io"
+ "github.com/apache/iceberg-go/table"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+func newRowDeltaTestTable(t *testing.T, formatVersion int) *table.Table {
+ t.Helper()
+
+ schema := iceberg.NewSchema(0,
+ iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int64, Required: true},
+ iceberg.NestedField{ID: 2, Name: "data", Type:
iceberg.PrimitiveTypes.String, Required: false},
+ )
+
+ meta, err := table.NewMetadata(schema, iceberg.UnpartitionedSpec,
+ table.UnsortedSortOrder, "s3://bucket/test",
+ iceberg.Properties{table.PropertyFormatVersion:
formatVersionStr(formatVersion)})
+ require.NoError(t, err)
+
+ return table.New(
+ table.Identifier{"db", "test_table"},
+ meta, "s3://bucket/test/metadata/v1.metadata.json",
+ nil, nil,
+ )
+}
+
+func formatVersionStr(v int) string {
+ return string(rune('0' + v))
+}
+
+func buildDataFile(t *testing.T, path string) iceberg.DataFile {
+ t.Helper()
+
+ b, err := iceberg.NewDataFileBuilder(
+ *iceberg.UnpartitionedSpec, iceberg.EntryContentData,
+ path, iceberg.ParquetFile, nil, nil, nil, 10, 1024)
+ require.NoError(t, err)
+
+ return b.Build()
+}
+
+func buildPosDeleteFile(t *testing.T, path string) iceberg.DataFile {
+ t.Helper()
+
+ b, err := iceberg.NewDataFileBuilder(
+ *iceberg.UnpartitionedSpec, iceberg.EntryContentPosDeletes,
+ path, iceberg.ParquetFile, nil, nil, nil, 5, 512)
+ require.NoError(t, err)
+
+ return b.Build()
+}
+
+func buildEqDeleteFile(t *testing.T, path string, fieldIDs []int)
iceberg.DataFile {
+ t.Helper()
+
+ b, err := iceberg.NewDataFileBuilder(
+ *iceberg.UnpartitionedSpec, iceberg.EntryContentEqDeletes,
+ path, iceberg.ParquetFile, nil, nil, nil, 3, 256)
+ require.NoError(t, err)
+ b.EqualityFieldIDs(fieldIDs)
+
+ return b.Build()
+}
+
+func TestRowDeltaOperationDataOnly(t *testing.T) {
+ tbl := newRowDeltaTestTable(t, 2)
+ rd := tbl.NewTransaction().NewRowDelta(nil)
+ rd.AddRows(buildDataFile(t, "s3://bucket/data/file1.parquet"))
+
+ assert.Equal(t, table.OpAppend, rd.Operation())
+}
+
+func TestRowDeltaOperationDeleteOnly(t *testing.T) {
+ tbl := newRowDeltaTestTable(t, 2)
+ rd := tbl.NewTransaction().NewRowDelta(nil)
+ rd.AddDeletes(buildPosDeleteFile(t, "s3://bucket/data/del1.parquet"))
+
+ assert.Equal(t, table.OpDelete, rd.Operation())
+}
+
+func TestRowDeltaOperationBoth(t *testing.T) {
+ tbl := newRowDeltaTestTable(t, 2)
+ rd := tbl.NewTransaction().NewRowDelta(nil)
+ rd.AddRows(buildDataFile(t, "s3://bucket/data/file1.parquet"))
+ rd.AddDeletes(buildPosDeleteFile(t, "s3://bucket/data/del1.parquet"))
+
+ assert.Equal(t, table.OpOverwrite, rd.Operation())
+}
+
+func TestRowDeltaCommitEmpty(t *testing.T) {
+ tbl := newRowDeltaTestTable(t, 2)
+ rd := tbl.NewTransaction().NewRowDelta(nil)
+
+ err := rd.Commit(t.Context())
+ require.Error(t, err)
+ assert.Contains(t, err.Error(), "at least one data file or delete file")
+}
+
+func TestRowDeltaRejectsDataFileInDeletes(t *testing.T) {
+ tbl := newRowDeltaTestTable(t, 2)
+ rd := tbl.NewTransaction().NewRowDelta(nil)
+ rd.AddDeletes(buildDataFile(t, "s3://bucket/data/file1.parquet"))
+
+ err := rd.Commit(t.Context())
+ require.Error(t, err)
+ assert.Contains(t, err.Error(), "expected delete file")
+}
+
+func TestRowDeltaRejectsDeleteFileInRows(t *testing.T) {
+ tbl := newRowDeltaTestTable(t, 2)
+ rd := tbl.NewTransaction().NewRowDelta(nil)
+ rd.AddRows(buildPosDeleteFile(t, "s3://bucket/data/del1.parquet"))
+
+ err := rd.Commit(t.Context())
+ require.Error(t, err)
+ assert.Contains(t, err.Error(), "expected data file")
+}
+
+func TestRowDeltaAcceptsEqualityDeleteFiles(t *testing.T) {
+ tbl := newRowDeltaTestTable(t, 2)
+ rd := tbl.NewTransaction().NewRowDelta(nil)
+ rd.AddDeletes(buildEqDeleteFile(t, "s3://bucket/data/eq-del1.parquet",
[]int{1}))
+
+ assert.Equal(t, table.OpDelete, rd.Operation())
+}
+
+func TestRowDeltaChaining(t *testing.T) {
+ tbl := newRowDeltaTestTable(t, 2)
+ rd := tbl.NewTransaction().NewRowDelta(nil).
+ AddRows(buildDataFile(t, "s3://bucket/data/file1.parquet")).
+ AddDeletes(buildEqDeleteFile(t,
"s3://bucket/data/eq-del1.parquet", []int{1}))
+
+ assert.Equal(t, table.OpOverwrite, rd.Operation())
+}
+
+func TestRowDeltaRejectsDeleteFilesOnV1Table(t *testing.T) {
+ tbl := newRowDeltaTestTable(t, 1)
+ rd := tbl.NewTransaction().NewRowDelta(nil)
+ rd.AddDeletes(buildPosDeleteFile(t, "s3://bucket/data/del1.parquet"))
+
+ err := rd.Commit(t.Context())
+ require.Error(t, err)
+ assert.Contains(t, err.Error(), "format version >= 2")
+}
+
+func TestRowDeltaAllowsDataOnlyOnV1Table(t *testing.T) {
+ tbl := newRowDeltaTestTable(t, 1)
+ rd := tbl.NewTransaction().NewRowDelta(nil)
+ rd.AddRows(buildDataFile(t, "s3://bucket/data/file1.parquet"))
+
+ // Operation selection should work — data-only on v1 is fine.
+ assert.Equal(t, table.OpAppend, rd.Operation())
+}
+
+func TestRowDeltaRejectsEqDeleteWithoutFieldIDs(t *testing.T) {
+ tbl := newRowDeltaTestTable(t, 2)
+
+ // Build an equality delete file without setting EqualityFieldIDs
+ b, err := iceberg.NewDataFileBuilder(
+ *iceberg.UnpartitionedSpec, iceberg.EntryContentEqDeletes,
+ "s3://bucket/data/eq-del.parquet", iceberg.ParquetFile, nil,
nil, nil, 3, 256)
+ require.NoError(t, err)
+ df := b.Build()
+
+ rd := tbl.NewTransaction().NewRowDelta(nil)
+ rd.AddDeletes(df)
+
+ err = rd.Commit(t.Context())
+ require.Error(t, err)
+ assert.Contains(t, err.Error(), "non-empty EqualityFieldIDs")
+}
+
+func TestRowDeltaRejectsEqDeleteWithInvalidFieldID(t *testing.T) {
+ tbl := newRowDeltaTestTable(t, 2)
+ rd := tbl.NewTransaction().NewRowDelta(nil)
+ // Field ID 999 does not exist in the schema (which has fields 1 and 2)
+ rd.AddDeletes(buildEqDeleteFile(t, "s3://bucket/data/eq-del.parquet",
[]int{999}))
+
+ err := rd.Commit(t.Context())
+ require.Error(t, err)
+ assert.Contains(t, err.Error(), "not found in table schema")
+}
+
+// rowDeltaCatalog simulates catalog behavior for RowDelta commit tests.
+type rowDeltaCatalog struct {
+ metadata table.Metadata
+}
+
+func (m *rowDeltaCatalog) LoadTable(ctx context.Context, ident
table.Identifier) (*table.Table, error) {
+ return nil, nil
+}
+
+func (m *rowDeltaCatalog) CommitTable(ctx context.Context, ident
table.Identifier, reqs []table.Requirement, updates []table.Update)
(table.Metadata, string, error) {
+ meta, err := table.UpdateTableMetadata(m.metadata, updates, "")
+ if err != nil {
+ return nil, "", err
+ }
+
+ m.metadata = meta
+
+ return meta, "", nil
+}
+
+func newRowDeltaCommitTestTable(t *testing.T) *table.Table {
+ t.Helper()
+
+ location := filepath.ToSlash(t.TempDir())
+
+ schema := iceberg.NewSchema(0,
+ iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int64, Required: true},
+ iceberg.NestedField{ID: 2, Name: "data", Type:
iceberg.PrimitiveTypes.String, Required: false},
+ )
+
+ meta, err := table.NewMetadata(schema, iceberg.UnpartitionedSpec,
+ table.UnsortedSortOrder, location,
+ iceberg.Properties{table.PropertyFormatVersion: "2"})
+ require.NoError(t, err)
+
+ return table.New(
+ table.Identifier{"db", "row_delta_test"},
+ meta, location+"/metadata/v1.metadata.json",
+ func(ctx context.Context) (iceio.IO, error) {
+ return iceio.LocalFS{}, nil
+ },
+ &rowDeltaCatalog{meta},
+ )
+}
+
+func TestRowDeltaCommitDataAndDeletes(t *testing.T) {
+ tbl := newRowDeltaCommitTestTable(t)
+
+ tx := tbl.NewTransaction()
+ rd := tx.NewRowDelta(iceberg.Properties{"custom-prop": "test"})
+ rd.AddRows(buildDataFile(t, "s3://bucket/data/insert.parquet"))
+ rd.AddDeletes(buildPosDeleteFile(t, "s3://bucket/data/pos-del.parquet"))
+
+ require.NoError(t, rd.Commit(t.Context()))
+
+ result, err := tx.Commit(t.Context())
+ require.NoError(t, err)
+
+ snap := result.CurrentSnapshot()
+ require.NotNil(t, snap)
+
+ assert.Equal(t, table.OpOverwrite, snap.Summary.Operation)
+ assert.Equal(t, "1", snap.Summary.Properties["added-data-files"])
+ assert.Equal(t, "1", snap.Summary.Properties["added-delete-files"])
+ assert.Equal(t, "10", snap.Summary.Properties["added-records"])
+}
+
+func TestRowDeltaCommitDataOnly(t *testing.T) {
+ tbl := newRowDeltaCommitTestTable(t)
+
+ tx := tbl.NewTransaction()
+ rd := tx.NewRowDelta(nil)
+ rd.AddRows(
+ buildDataFile(t, "s3://bucket/data/file1.parquet"),
+ buildDataFile(t, "s3://bucket/data/file2.parquet"),
+ )
+
+ require.NoError(t, rd.Commit(t.Context()))
+
+ result, err := tx.Commit(t.Context())
+ require.NoError(t, err)
+
+ snap := result.CurrentSnapshot()
+ require.NotNil(t, snap)
+
+ assert.Equal(t, table.OpAppend, snap.Summary.Operation)
+ assert.Equal(t, "2", snap.Summary.Properties["added-data-files"])
+ assert.Equal(t, "20", snap.Summary.Properties["added-records"])
+}
+
+func TestRowDeltaCommitDeletesOnly(t *testing.T) {
+ tbl := newRowDeltaCommitTestTable(t)
+
+ tx := tbl.NewTransaction()
+ rd := tx.NewRowDelta(nil)
+ rd.AddDeletes(buildPosDeleteFile(t, "s3://bucket/data/pos-del.parquet"))
+
+ require.NoError(t, rd.Commit(t.Context()))
+
+ result, err := tx.Commit(t.Context())
+ require.NoError(t, err)
+
+ snap := result.CurrentSnapshot()
+ require.NotNil(t, snap)
+
+ assert.Equal(t, table.OpDelete, snap.Summary.Operation)
+ assert.Equal(t, "1",
snap.Summary.Properties["added-position-delete-files"])
+ assert.Equal(t, "1", snap.Summary.Properties["added-delete-files"])
+}
+
+func TestRowDeltaCommitWithEqualityDeletes(t *testing.T) {
+ tbl := newRowDeltaCommitTestTable(t)
+
+ tx := tbl.NewTransaction()
+ rd := tx.NewRowDelta(nil)
+ rd.AddRows(buildDataFile(t, "s3://bucket/data/insert.parquet"))
+ rd.AddDeletes(buildEqDeleteFile(t, "s3://bucket/data/eq-del.parquet",
[]int{1}))
+
+ require.NoError(t, rd.Commit(t.Context()))
+
+ result, err := tx.Commit(t.Context())
+ require.NoError(t, err)
+
+ snap := result.CurrentSnapshot()
+ require.NotNil(t, snap)
+
+ assert.Equal(t, table.OpOverwrite, snap.Summary.Operation)
+ assert.Equal(t, "1", snap.Summary.Properties["added-data-files"])
+ assert.Equal(t, "1",
snap.Summary.Properties["added-equality-delete-files"])
+}
+
+func TestRowDeltaManifestContents(t *testing.T) {
+ tbl := newRowDeltaCommitTestTable(t)
+
+ tx := tbl.NewTransaction()
+ rd := tx.NewRowDelta(nil)
+ rd.AddRows(buildDataFile(t, "s3://bucket/data/insert.parquet"))
+ rd.AddDeletes(buildPosDeleteFile(t, "s3://bucket/data/pos-del.parquet"))
+
+ require.NoError(t, rd.Commit(t.Context()))
+
+ result, err := tx.Commit(t.Context())
+ require.NoError(t, err)
+
+ snap := result.CurrentSnapshot()
+ require.NotNil(t, snap)
+
+ fs := iceio.LocalFS{}
+ manifests, err := snap.Manifests(fs)
+ require.NoError(t, err)
+
+ // Should have separate data and delete manifests
+ var dataManifests, deleteManifests int
+ for _, m := range manifests {
+ switch m.ManifestContent() {
+ case iceberg.ManifestContentData:
+ dataManifests++
+ case iceberg.ManifestContentDeletes:
+ deleteManifests++
+ }
+ }
+
+ assert.Equal(t, 1, dataManifests, "expected 1 data manifest")
+ assert.Equal(t, 1, deleteManifests, "expected 1 delete manifest")
+
+ // Verify manifest entries have correct content types
+ for _, m := range manifests {
+ entries, err := m.FetchEntries(fs, true)
+ require.NoError(t, err)
+
+ for _, e := range entries {
+ if m.ManifestContent() == iceberg.ManifestContentData {
+ assert.Equal(t, iceberg.EntryContentData,
e.DataFile().ContentType())
+ } else {
+ assert.Equal(t, iceberg.EntryContentPosDeletes,
e.DataFile().ContentType())
+ }
+ }
+ }
+}
+
+func TestRowDeltaMultipleCommitsOnSameTransaction(t *testing.T) {
+ tbl := newRowDeltaCommitTestTable(t)
+
+ tx := tbl.NewTransaction()
+
+ // First RowDelta: append data
+ rd1 := tx.NewRowDelta(nil)
+ rd1.AddRows(buildDataFile(t, "s3://bucket/data/batch1.parquet"))
+ require.NoError(t, rd1.Commit(t.Context()))
+
+ // Second RowDelta: append + delete
+ rd2 := tx.NewRowDelta(nil)
+ rd2.AddRows(buildDataFile(t, "s3://bucket/data/batch2.parquet"))
+ rd2.AddDeletes(buildPosDeleteFile(t, "s3://bucket/data/del2.parquet"))
+ require.NoError(t, rd2.Commit(t.Context()))
+
+ result, err := tx.Commit(t.Context())
+ require.NoError(t, err)
+
+ snap := result.CurrentSnapshot()
+ require.NotNil(t, snap)
+
+ // The last RowDelta's operation should be reflected
+ assert.Equal(t, table.OpOverwrite, snap.Summary.Operation)
+ assert.Equal(t, strconv.Itoa(2),
snap.Summary.Properties["total-data-files"])
+}
+
+// writeParquetFile writes Arrow records to a Parquet file on local disk.
+func writeParquetFile(t *testing.T, path string, sc *arrow.Schema, jsonData
string) {
+ t.Helper()
+
+ rec, _, err := array.RecordFromJSON(memory.DefaultAllocator, sc,
strings.NewReader(jsonData))
+ require.NoError(t, err)
+ defer rec.Release()
+
+ fs := iceio.LocalFS{}
+ fw, err := fs.Create(path)
+ require.NoError(t, err)
+
+ tbl := array.NewTableFromRecords(sc, []arrow.RecordBatch{rec})
+ defer tbl.Release()
+
+ require.NoError(t, pqarrow.WriteTable(tbl, fw, rec.NumRows(),
+ parquet.NewWriterProperties(parquet.WithStats(true)),
+ pqarrow.DefaultWriterProps()))
+}
+
+func TestRowDeltaIntegrationPosDeleteRoundTrip(t *testing.T) {
+ location := filepath.ToSlash(t.TempDir())
+
+ // Schema: id (int64), data (string)
+ iceSchema := iceberg.NewSchema(0,
+ iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int64, Required: true},
+ iceberg.NestedField{ID: 2, Name: "data", Type:
iceberg.PrimitiveTypes.String, Required: false},
+ )
+
+ meta, err := table.NewMetadata(iceSchema, iceberg.UnpartitionedSpec,
+ table.UnsortedSortOrder, location,
+ iceberg.Properties{table.PropertyFormatVersion: "2"})
+ require.NoError(t, err)
+
+ cat := &rowDeltaCatalog{metadata: meta}
+ tbl := table.New(
+ table.Identifier{"db", "pos_del_roundtrip"},
+ meta, location+"/metadata/v1.metadata.json",
+ func(ctx context.Context) (iceio.IO, error) {
+ return iceio.LocalFS{}, nil
+ },
+ cat,
+ )
+
+ // Step 1: Append 5 rows via normal append
+ arrowSc, err := table.SchemaToArrowSchema(iceSchema, nil, false, false)
+ require.NoError(t, err)
+
+ dataPath := location + "/data/data-001.parquet"
+ writeParquetFile(t, dataPath, arrowSc, `[
+ {"id": 1, "data": "alpha"},
+ {"id": 2, "data": "beta"},
+ {"id": 3, "data": "gamma"},
+ {"id": 4, "data": "delta"},
+ {"id": 5, "data": "epsilon"}
+ ]`)
+
+ tx := tbl.NewTransaction()
+ err = tx.AddFiles(t.Context(), []string{dataPath}, nil, false)
+ require.NoError(t, err)
+
+ tbl, err = tx.Commit(t.Context())
+ require.NoError(t, err)
+
+ // Verify: 5 rows scannable
+ assertRowCount(t, tbl, 5)
+
+ // Step 2: Commit a position delete via RowDelta that removes rows 1
and 3
+ // (0-indexed: positions 1 and 3 → "beta" and "delta")
+ posDelArrowSc := table.PositionalDeleteArrowSchema
+ posDelPath := location + "/data/pos-del-001.parquet"
+ writeParquetFile(t, posDelPath, posDelArrowSc, `[
+ {"file_path": "`+dataPath+`", "pos": 1},
+ {"file_path": "`+dataPath+`", "pos": 3}
+ ]`)
+
+ posDelBuilder, err := iceberg.NewDataFileBuilder(
+ *iceberg.UnpartitionedSpec, iceberg.EntryContentPosDeletes,
+ posDelPath, iceberg.ParquetFile, nil, nil, nil, 2, 256)
+ require.NoError(t, err)
+ posDelFile := posDelBuilder.Build()
+
+ tx2 := tbl.NewTransaction()
+ rd := tx2.NewRowDelta(nil)
+ rd.AddDeletes(posDelFile)
+ require.NoError(t, rd.Commit(t.Context()))
+
+ tbl, err = tx2.Commit(t.Context())
+ require.NoError(t, err)
+
+ // Step 3: Scan and verify rows 1 and 3 (beta, delta) are deleted
+ // Remaining: alpha (0), gamma (2), epsilon (4)
+ assertRowCount(t, tbl, 3)
+
+ // Verify the actual values
+ _, itr, err := tbl.Scan(table.WithSelectedFields("id",
"data")).ToArrowRecords(t.Context())
+ require.NoError(t, err)
+
+ var ids []int64
+ for rec, err := range itr {
+ require.NoError(t, err)
+ col := rec.Column(0).(*array.Int64)
+ for i := 0; i < col.Len(); i++ {
+ ids = append(ids, col.Value(i))
+ }
+ rec.Release()
+ }
+
+ assert.Equal(t, []int64{1, 3, 5}, ids, "expected rows at positions
0,2,4 (beta/delta deleted)")
+}
+
+func assertRowCount(t *testing.T, tbl *table.Table, expected int64) {
+ t.Helper()
+
+ _, itr, err := tbl.Scan().ToArrowRecords(t.Context())
+ require.NoError(t, err)
+
+ var total int64
+ next, stop := iter.Pull2(itr)
+ defer stop()
+
+ for {
+ rec, err, valid := next()
+ if !valid {
+ break
+ }
+
+ require.NoError(t, err)
+ total += rec.NumRows()
+ rec.Release()
+ }
+
+ assert.Equal(t, expected, total, "unexpected row count")
+}
diff --git a/table/snapshot_producers.go b/table/snapshot_producers.go
index 9bbc0e85..00d71f68 100644
--- a/table/snapshot_producers.go
+++ b/table/snapshot_producers.go
@@ -427,17 +427,17 @@ func (m *mergeAppendFiles) processManifests(manifests
[]iceberg.ManifestFile) ([
type snapshotProducer struct {
producerImpl
- commitUuid uuid.UUID
- io iceio.WriteFileIO
- txn *Transaction
- op Operation
- snapshotID int64
- parentSnapshotID int64
- addedFiles []iceberg.DataFile
- positionDeleteFiles []iceberg.DataFile
- manifestCount atomic.Int32
- deletedFiles map[string]iceberg.DataFile
- snapshotProps iceberg.Properties
+ commitUuid uuid.UUID
+ io iceio.WriteFileIO
+ txn *Transaction
+ op Operation
+ snapshotID int64
+ parentSnapshotID int64
+ addedFiles []iceberg.DataFile
+ addedDeleteFiles []iceberg.DataFile
+ manifestCount atomic.Int32
+ deletedFiles map[string]iceberg.DataFile
+ snapshotProps iceberg.Properties
}
func createSnapshotProducer(op Operation, txn *Transaction, fs
iceio.WriteFileIO, commitUUID *uuid.UUID, snapshotProps iceberg.Properties)
*snapshotProducer {
@@ -483,8 +483,8 @@ func (sp *snapshotProducer) appendDataFile(df
iceberg.DataFile) *snapshotProduce
return sp
}
-func (sp *snapshotProducer) appendPositionDeleteFile(df iceberg.DataFile)
*snapshotProducer {
- sp.positionDeleteFiles = append(sp.positionDeleteFiles, df)
+func (sp *snapshotProducer) appendDeleteFile(df iceberg.DataFile)
*snapshotProducer {
+ sp.addedDeleteFiles = append(sp.addedDeleteFiles, df)
return sp
}
@@ -547,8 +547,8 @@ func (sp *snapshotProducer) manifests() (_
[]iceberg.ManifestFile, err error) {
g.Go(sp.manifestProducer(iceberg.ManifestContentData,
sp.addedFiles, &addedManifests))
}
- if len(sp.positionDeleteFiles) > 0 {
- g.Go(sp.manifestProducer(iceberg.ManifestContentDeletes,
sp.positionDeleteFiles, &positionDeleteManifests))
+ if len(sp.addedDeleteFiles) > 0 {
+ g.Go(sp.manifestProducer(iceberg.ManifestContentDeletes,
sp.addedDeleteFiles, &positionDeleteManifests))
}
if len(deleted) > 0 {
@@ -668,7 +668,7 @@ func (sp *snapshotProducer) summary(props
iceberg.Properties) (Summary, error) {
return Summary{}, err
}
}
- for _, df := range sp.positionDeleteFiles {
+ for _, df := range sp.addedDeleteFiles {
if err = ssc.addFile(df, currentSchema, *partitionSpec); err !=
nil {
return Summary{}, err
}
diff --git a/table/transaction.go b/table/transaction.go
index 815d3459..a28457a3 100644
--- a/table/transaction.go
+++ b/table/transaction.go
@@ -1350,7 +1350,7 @@ func (t *Transaction) writePositionDeletesForFiles(ctx
context.Context, fs io.IO
if err != nil {
return err
}
- updater.appendPositionDeleteFile(f)
+ updater.appendDeleteFile(f)
}
return nil