This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 6c041dde feat(table): add RowDelta API for atomic row-level mutations 
(#789)
6c041dde is described below

commit 6c041dde5bd35cefa7c1bce2d245d7aeb4dfeddd
Author: Andrei Tserakhau <[email protected]>
AuthorDate: Mon Mar 23 22:06:57 2026 +0100

    feat(table): add RowDelta API for atomic row-level mutations (#789)
    
    Adds `Transaction.NewRowDelta()` — Go equivalent of Java's
    `BaseRowDelta`. Commits data files and delete files (position or
    equality) in one atomic snapshot. This is needed for row-level
    mutations: an UPDATE becomes an equality delete for the old row + append
    of the new row, both in one commit.
    
    Resolves #602.
    
    ## API
    
    ```go
    rd := tx.NewRowDelta(snapshotProps)
    rd.AddRows(dataFile1, dataFile2)
    rd.AddDeletes(posDeleteFile, eqDeleteFile)
    rd.Commit(ctx)
    ```
    
    Operation type picked automatically: data-only → `append`, deletes-only
    → `delete`, both → `overwrite`.
    
    ## Validation
    
    - Delete files require format version >= 2
    - Equality deletes must have non-empty `EqualityFieldIDs` referencing
    existing schema columns
    - Content types checked: no data files in `AddDeletes`, no delete files
    in `AddRows`
    
    ## Known limitations
    
    - No conflict detection for concurrent writers — documented in the type
    comment
    - Uses fast-append producer (no manifest merging)
    
    ## What's tested
    
    The interesting ones:
    
    - Commit data + position deletes, check snapshot summary has
    `added-data-files=1`, `added-delete-files=1`, operation is `overwrite`
    - Commit equality deletes, check `added-equality-delete-files` shows up
    in summary
    - Read back manifests after commit, verify there's one data manifest and
    one delete manifest with correct content types in entries
    - Two RowDeltas on same transaction (batch1 append, batch2
    append+delete), verify cumulative `total-data-files`
    - v1 table rejects delete files with clear error
    - Equality delete file without field IDs → error
    - Equality delete file with field ID 999 (not in schema) → error
    
    The round-trip integration test:
    1. Write 5 rows as real Parquet, append to table
    2. Write a position delete file targeting positions 1 and 3, commit via
    RowDelta
    3. Scan the table back — get 3 rows, verify IDs are `[1, 3, 5]` (beta
    and delta gone)
    
    This covers the full path: write parquet → RowDelta commit → scan with
    position delete filtering applied.
    
    ## What's left to do
    
    This PR covers the commit API. Remaining work for full DML support:
    
    - **Equality delete file writing** — a writer that produces Parquet
    files with PK-only schema and `EntryContentEqDeletes` content type. The
    RowDelta API already accepts them, but there's no convenient writer yet.
    - **Equality delete reading** — the scanner currently errors with
    "iceberg-go does not yet support equality deletes" (`scanner.go:415`).
    Needs: collect eq delete entries during scan planning, match to data
    files by partition + sequence number, apply hash-based anti-join during
    Arrow reads.
    - **Conflict validation** — `validateFromSnapshot`,
    `validateNoConflictingDataFiles`, etc. Java's Flink connector skips most
    of this for streaming, so it's not blocking for CDC use cases.
---
 table/row_delta.go          | 181 ++++++++++++++
 table/row_delta_test.go     | 557 ++++++++++++++++++++++++++++++++++++++++++++
 table/snapshot_producers.go |  32 +--
 table/transaction.go        |   2 +-
 4 files changed, 755 insertions(+), 17 deletions(-)

diff --git a/table/row_delta.go b/table/row_delta.go
new file mode 100644
index 00000000..df0072c9
--- /dev/null
+++ b/table/row_delta.go
@@ -0,0 +1,181 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+       "context"
+       "errors"
+       "fmt"
+
+       "github.com/apache/iceberg-go"
+       iceio "github.com/apache/iceberg-go/io"
+)
+
+// RowDelta encodes a set of row-level changes to a table: new data files
+// (inserts) and delete files (equality or position deletes). All changes
+// are committed atomically in a single snapshot.
+//
+// The operation type of the produced snapshot is determined automatically:
+//   - Data files only → OpAppend
+//   - Delete files only → OpDelete
+//   - Both data and delete files → OpOverwrite
+//
+// This matches the semantics of Java's BaseRowDelta. It is the primary
+// API for CDC/streaming workloads where INSERTs, UPDATEs, and DELETEs
+// must be committed together.
+//
+// Note: conflict detection for concurrent writers is not yet implemented.
+// Concurrent RowDelta commits against the same table may produce incorrect
+// results if delete files miss newly appended data. For single-writer
+// workloads this is safe.
+//
+// Usage:
+//
+//     rd := tx.NewRowDelta(snapshotProps)
+//     rd.AddRows(dataFile1, dataFile2)
+//     rd.AddDeletes(equalityDeleteFile1)
+//     err := rd.Commit(ctx)
+type RowDelta struct {
+       txn       *Transaction
+       dataFiles []iceberg.DataFile
+       delFiles  []iceberg.DataFile
+       props     iceberg.Properties
+}
+
+// NewRowDelta creates a new RowDelta for committing row-level changes
+// within this transaction. The provided properties are included in the
+// snapshot summary.
+func (t *Transaction) NewRowDelta(snapshotProps iceberg.Properties) *RowDelta {
+       return &RowDelta{
+               txn:   t,
+               props: snapshotProps,
+       }
+}
+
+// AddRows adds data files containing new rows (inserts) to this RowDelta.
+func (rd *RowDelta) AddRows(files ...iceberg.DataFile) *RowDelta {
+       rd.dataFiles = append(rd.dataFiles, files...)
+
+       return rd
+}
+
+// AddDeletes adds delete files (equality or position) to this RowDelta.
+// Equality delete files must have ContentType == EntryContentEqDeletes
+// and non-empty EqualityFieldIDs referencing valid schema columns.
+// Position delete files must have ContentType == EntryContentPosDeletes.
+func (rd *RowDelta) AddDeletes(files ...iceberg.DataFile) *RowDelta {
+       rd.delFiles = append(rd.delFiles, files...)
+
+       return rd
+}
+
+// Commit validates and commits all accumulated row-level changes as a
+// single atomic snapshot. Returns an error if there are no files to
+// commit, if any file has an unexpected content type, or if the table
+// format version does not support delete files.
+func (rd *RowDelta) Commit(ctx context.Context) error {
+       if len(rd.dataFiles) == 0 && len(rd.delFiles) == 0 {
+               return errors.New("row delta must have at least one data file 
or delete file")
+       }
+
+       // Delete files require format version >= 2.
+       if len(rd.delFiles) > 0 && rd.txn.meta.formatVersion < 2 {
+               return fmt.Errorf("delete files require table format version >= 
2, got v%d",
+                       rd.txn.meta.formatVersion)
+       }
+
+       for _, f := range rd.dataFiles {
+               if f.ContentType() != iceberg.EntryContentData {
+                       return fmt.Errorf("expected data file, got content type 
%s: %s",
+                               f.ContentType(), f.FilePath())
+               }
+       }
+
+       schema := rd.txn.meta.CurrentSchema()
+       for _, f := range rd.delFiles {
+               ct := f.ContentType()
+               if ct != iceberg.EntryContentPosDeletes && ct != 
iceberg.EntryContentEqDeletes {
+                       return fmt.Errorf("expected delete file, got content 
type %s: %s",
+                               ct, f.FilePath())
+               }
+
+               // Equality delete files must declare which columns form the 
delete key,
+               // and those columns must exist in the current schema.
+               if ct == iceberg.EntryContentEqDeletes {
+                       eqIDs := f.EqualityFieldIDs()
+                       if len(eqIDs) == 0 {
+                               return fmt.Errorf("equality delete file must 
have non-empty EqualityFieldIDs: %s",
+                                       f.FilePath())
+                       }
+
+                       for _, id := range eqIDs {
+                               if _, ok := schema.FindFieldByID(id); !ok {
+                                       return fmt.Errorf("equality field ID %d 
not found in table schema: %s",
+                                               id, f.FilePath())
+                               }
+                       }
+               }
+       }
+
+       fs, err := rd.txn.tbl.fsF(ctx)
+       if err != nil {
+               return err
+       }
+
+       wfs, ok := fs.(iceio.WriteFileIO)
+       if !ok {
+               return errors.New("filesystem does not support writing")
+       }
+
+       op := rd.Operation()
+       producer := newFastAppendFilesProducer(op, rd.txn, wfs, nil, rd.props)
+
+       for _, f := range rd.dataFiles {
+               producer.appendDataFile(f)
+       }
+
+       for _, f := range rd.delFiles {
+               producer.appendDeleteFile(f)
+       }
+
+       updates, reqs, err := producer.commit()
+       if err != nil {
+               return err
+       }
+
+       return rd.txn.apply(updates, reqs)
+}
+
+// Operation returns the snapshot operation type that will be used when
+// this RowDelta is committed:
+//   - data only → OpAppend
+//   - deletes only → OpDelete
+//   - both → OpOverwrite
+func (rd *RowDelta) Operation() Operation {
+       hasData := len(rd.dataFiles) > 0
+       hasDeletes := len(rd.delFiles) > 0
+
+       switch {
+       case hasData && hasDeletes:
+               return OpOverwrite
+       case hasDeletes:
+               return OpDelete
+       default:
+               return OpAppend
+       }
+}
diff --git a/table/row_delta_test.go b/table/row_delta_test.go
new file mode 100644
index 00000000..208953a0
--- /dev/null
+++ b/table/row_delta_test.go
@@ -0,0 +1,557 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table_test
+
+import (
+       "context"
+       "iter"
+       "path/filepath"
+       "strconv"
+       "strings"
+       "testing"
+
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/arrow-go/v18/arrow/array"
+       "github.com/apache/arrow-go/v18/arrow/memory"
+       "github.com/apache/arrow-go/v18/parquet"
+       "github.com/apache/arrow-go/v18/parquet/pqarrow"
+       "github.com/apache/iceberg-go"
+       iceio "github.com/apache/iceberg-go/io"
+       "github.com/apache/iceberg-go/table"
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+)
+
+func newRowDeltaTestTable(t *testing.T, formatVersion int) *table.Table {
+       t.Helper()
+
+       schema := iceberg.NewSchema(0,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
+               iceberg.NestedField{ID: 2, Name: "data", Type: 
iceberg.PrimitiveTypes.String, Required: false},
+       )
+
+       meta, err := table.NewMetadata(schema, iceberg.UnpartitionedSpec,
+               table.UnsortedSortOrder, "s3://bucket/test",
+               iceberg.Properties{table.PropertyFormatVersion: 
formatVersionStr(formatVersion)})
+       require.NoError(t, err)
+
+       return table.New(
+               table.Identifier{"db", "test_table"},
+               meta, "s3://bucket/test/metadata/v1.metadata.json",
+               nil, nil,
+       )
+}
+
+func formatVersionStr(v int) string {
+       return string(rune('0' + v))
+}
+
+func buildDataFile(t *testing.T, path string) iceberg.DataFile {
+       t.Helper()
+
+       b, err := iceberg.NewDataFileBuilder(
+               *iceberg.UnpartitionedSpec, iceberg.EntryContentData,
+               path, iceberg.ParquetFile, nil, nil, nil, 10, 1024)
+       require.NoError(t, err)
+
+       return b.Build()
+}
+
+func buildPosDeleteFile(t *testing.T, path string) iceberg.DataFile {
+       t.Helper()
+
+       b, err := iceberg.NewDataFileBuilder(
+               *iceberg.UnpartitionedSpec, iceberg.EntryContentPosDeletes,
+               path, iceberg.ParquetFile, nil, nil, nil, 5, 512)
+       require.NoError(t, err)
+
+       return b.Build()
+}
+
+func buildEqDeleteFile(t *testing.T, path string, fieldIDs []int) 
iceberg.DataFile {
+       t.Helper()
+
+       b, err := iceberg.NewDataFileBuilder(
+               *iceberg.UnpartitionedSpec, iceberg.EntryContentEqDeletes,
+               path, iceberg.ParquetFile, nil, nil, nil, 3, 256)
+       require.NoError(t, err)
+       b.EqualityFieldIDs(fieldIDs)
+
+       return b.Build()
+}
+
+func TestRowDeltaOperationDataOnly(t *testing.T) {
+       tbl := newRowDeltaTestTable(t, 2)
+       rd := tbl.NewTransaction().NewRowDelta(nil)
+       rd.AddRows(buildDataFile(t, "s3://bucket/data/file1.parquet"))
+
+       assert.Equal(t, table.OpAppend, rd.Operation())
+}
+
+func TestRowDeltaOperationDeleteOnly(t *testing.T) {
+       tbl := newRowDeltaTestTable(t, 2)
+       rd := tbl.NewTransaction().NewRowDelta(nil)
+       rd.AddDeletes(buildPosDeleteFile(t, "s3://bucket/data/del1.parquet"))
+
+       assert.Equal(t, table.OpDelete, rd.Operation())
+}
+
+func TestRowDeltaOperationBoth(t *testing.T) {
+       tbl := newRowDeltaTestTable(t, 2)
+       rd := tbl.NewTransaction().NewRowDelta(nil)
+       rd.AddRows(buildDataFile(t, "s3://bucket/data/file1.parquet"))
+       rd.AddDeletes(buildPosDeleteFile(t, "s3://bucket/data/del1.parquet"))
+
+       assert.Equal(t, table.OpOverwrite, rd.Operation())
+}
+
+func TestRowDeltaCommitEmpty(t *testing.T) {
+       tbl := newRowDeltaTestTable(t, 2)
+       rd := tbl.NewTransaction().NewRowDelta(nil)
+
+       err := rd.Commit(t.Context())
+       require.Error(t, err)
+       assert.Contains(t, err.Error(), "at least one data file or delete file")
+}
+
+func TestRowDeltaRejectsDataFileInDeletes(t *testing.T) {
+       tbl := newRowDeltaTestTable(t, 2)
+       rd := tbl.NewTransaction().NewRowDelta(nil)
+       rd.AddDeletes(buildDataFile(t, "s3://bucket/data/file1.parquet"))
+
+       err := rd.Commit(t.Context())
+       require.Error(t, err)
+       assert.Contains(t, err.Error(), "expected delete file")
+}
+
+func TestRowDeltaRejectsDeleteFileInRows(t *testing.T) {
+       tbl := newRowDeltaTestTable(t, 2)
+       rd := tbl.NewTransaction().NewRowDelta(nil)
+       rd.AddRows(buildPosDeleteFile(t, "s3://bucket/data/del1.parquet"))
+
+       err := rd.Commit(t.Context())
+       require.Error(t, err)
+       assert.Contains(t, err.Error(), "expected data file")
+}
+
+func TestRowDeltaAcceptsEqualityDeleteFiles(t *testing.T) {
+       tbl := newRowDeltaTestTable(t, 2)
+       rd := tbl.NewTransaction().NewRowDelta(nil)
+       rd.AddDeletes(buildEqDeleteFile(t, "s3://bucket/data/eq-del1.parquet", 
[]int{1}))
+
+       assert.Equal(t, table.OpDelete, rd.Operation())
+}
+
+func TestRowDeltaChaining(t *testing.T) {
+       tbl := newRowDeltaTestTable(t, 2)
+       rd := tbl.NewTransaction().NewRowDelta(nil).
+               AddRows(buildDataFile(t, "s3://bucket/data/file1.parquet")).
+               AddDeletes(buildEqDeleteFile(t, 
"s3://bucket/data/eq-del1.parquet", []int{1}))
+
+       assert.Equal(t, table.OpOverwrite, rd.Operation())
+}
+
+func TestRowDeltaRejectsDeleteFilesOnV1Table(t *testing.T) {
+       tbl := newRowDeltaTestTable(t, 1)
+       rd := tbl.NewTransaction().NewRowDelta(nil)
+       rd.AddDeletes(buildPosDeleteFile(t, "s3://bucket/data/del1.parquet"))
+
+       err := rd.Commit(t.Context())
+       require.Error(t, err)
+       assert.Contains(t, err.Error(), "format version >= 2")
+}
+
+func TestRowDeltaAllowsDataOnlyOnV1Table(t *testing.T) {
+       tbl := newRowDeltaTestTable(t, 1)
+       rd := tbl.NewTransaction().NewRowDelta(nil)
+       rd.AddRows(buildDataFile(t, "s3://bucket/data/file1.parquet"))
+
+       // Operation selection should work — data-only on v1 is fine.
+       assert.Equal(t, table.OpAppend, rd.Operation())
+}
+
+func TestRowDeltaRejectsEqDeleteWithoutFieldIDs(t *testing.T) {
+       tbl := newRowDeltaTestTable(t, 2)
+
+       // Build an equality delete file without setting EqualityFieldIDs
+       b, err := iceberg.NewDataFileBuilder(
+               *iceberg.UnpartitionedSpec, iceberg.EntryContentEqDeletes,
+               "s3://bucket/data/eq-del.parquet", iceberg.ParquetFile, nil, 
nil, nil, 3, 256)
+       require.NoError(t, err)
+       df := b.Build()
+
+       rd := tbl.NewTransaction().NewRowDelta(nil)
+       rd.AddDeletes(df)
+
+       err = rd.Commit(t.Context())
+       require.Error(t, err)
+       assert.Contains(t, err.Error(), "non-empty EqualityFieldIDs")
+}
+
+func TestRowDeltaRejectsEqDeleteWithInvalidFieldID(t *testing.T) {
+       tbl := newRowDeltaTestTable(t, 2)
+       rd := tbl.NewTransaction().NewRowDelta(nil)
+       // Field ID 999 does not exist in the schema (which has fields 1 and 2)
+       rd.AddDeletes(buildEqDeleteFile(t, "s3://bucket/data/eq-del.parquet", 
[]int{999}))
+
+       err := rd.Commit(t.Context())
+       require.Error(t, err)
+       assert.Contains(t, err.Error(), "not found in table schema")
+}
+
+// rowDeltaCatalog simulates catalog behavior for RowDelta commit tests.
+type rowDeltaCatalog struct {
+       metadata table.Metadata
+}
+
+func (m *rowDeltaCatalog) LoadTable(ctx context.Context, ident 
table.Identifier) (*table.Table, error) {
+       return nil, nil
+}
+
+func (m *rowDeltaCatalog) CommitTable(ctx context.Context, ident 
table.Identifier, reqs []table.Requirement, updates []table.Update) 
(table.Metadata, string, error) {
+       meta, err := table.UpdateTableMetadata(m.metadata, updates, "")
+       if err != nil {
+               return nil, "", err
+       }
+
+       m.metadata = meta
+
+       return meta, "", nil
+}
+
+func newRowDeltaCommitTestTable(t *testing.T) *table.Table {
+       t.Helper()
+
+       location := filepath.ToSlash(t.TempDir())
+
+       schema := iceberg.NewSchema(0,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
+               iceberg.NestedField{ID: 2, Name: "data", Type: 
iceberg.PrimitiveTypes.String, Required: false},
+       )
+
+       meta, err := table.NewMetadata(schema, iceberg.UnpartitionedSpec,
+               table.UnsortedSortOrder, location,
+               iceberg.Properties{table.PropertyFormatVersion: "2"})
+       require.NoError(t, err)
+
+       return table.New(
+               table.Identifier{"db", "row_delta_test"},
+               meta, location+"/metadata/v1.metadata.json",
+               func(ctx context.Context) (iceio.IO, error) {
+                       return iceio.LocalFS{}, nil
+               },
+               &rowDeltaCatalog{meta},
+       )
+}
+
+func TestRowDeltaCommitDataAndDeletes(t *testing.T) {
+       tbl := newRowDeltaCommitTestTable(t)
+
+       tx := tbl.NewTransaction()
+       rd := tx.NewRowDelta(iceberg.Properties{"custom-prop": "test"})
+       rd.AddRows(buildDataFile(t, "s3://bucket/data/insert.parquet"))
+       rd.AddDeletes(buildPosDeleteFile(t, "s3://bucket/data/pos-del.parquet"))
+
+       require.NoError(t, rd.Commit(t.Context()))
+
+       result, err := tx.Commit(t.Context())
+       require.NoError(t, err)
+
+       snap := result.CurrentSnapshot()
+       require.NotNil(t, snap)
+
+       assert.Equal(t, table.OpOverwrite, snap.Summary.Operation)
+       assert.Equal(t, "1", snap.Summary.Properties["added-data-files"])
+       assert.Equal(t, "1", snap.Summary.Properties["added-delete-files"])
+       assert.Equal(t, "10", snap.Summary.Properties["added-records"])
+}
+
+func TestRowDeltaCommitDataOnly(t *testing.T) {
+       tbl := newRowDeltaCommitTestTable(t)
+
+       tx := tbl.NewTransaction()
+       rd := tx.NewRowDelta(nil)
+       rd.AddRows(
+               buildDataFile(t, "s3://bucket/data/file1.parquet"),
+               buildDataFile(t, "s3://bucket/data/file2.parquet"),
+       )
+
+       require.NoError(t, rd.Commit(t.Context()))
+
+       result, err := tx.Commit(t.Context())
+       require.NoError(t, err)
+
+       snap := result.CurrentSnapshot()
+       require.NotNil(t, snap)
+
+       assert.Equal(t, table.OpAppend, snap.Summary.Operation)
+       assert.Equal(t, "2", snap.Summary.Properties["added-data-files"])
+       assert.Equal(t, "20", snap.Summary.Properties["added-records"])
+}
+
+func TestRowDeltaCommitDeletesOnly(t *testing.T) {
+       tbl := newRowDeltaCommitTestTable(t)
+
+       tx := tbl.NewTransaction()
+       rd := tx.NewRowDelta(nil)
+       rd.AddDeletes(buildPosDeleteFile(t, "s3://bucket/data/pos-del.parquet"))
+
+       require.NoError(t, rd.Commit(t.Context()))
+
+       result, err := tx.Commit(t.Context())
+       require.NoError(t, err)
+
+       snap := result.CurrentSnapshot()
+       require.NotNil(t, snap)
+
+       assert.Equal(t, table.OpDelete, snap.Summary.Operation)
+       assert.Equal(t, "1", 
snap.Summary.Properties["added-position-delete-files"])
+       assert.Equal(t, "1", snap.Summary.Properties["added-delete-files"])
+}
+
+func TestRowDeltaCommitWithEqualityDeletes(t *testing.T) {
+       tbl := newRowDeltaCommitTestTable(t)
+
+       tx := tbl.NewTransaction()
+       rd := tx.NewRowDelta(nil)
+       rd.AddRows(buildDataFile(t, "s3://bucket/data/insert.parquet"))
+       rd.AddDeletes(buildEqDeleteFile(t, "s3://bucket/data/eq-del.parquet", 
[]int{1}))
+
+       require.NoError(t, rd.Commit(t.Context()))
+
+       result, err := tx.Commit(t.Context())
+       require.NoError(t, err)
+
+       snap := result.CurrentSnapshot()
+       require.NotNil(t, snap)
+
+       assert.Equal(t, table.OpOverwrite, snap.Summary.Operation)
+       assert.Equal(t, "1", snap.Summary.Properties["added-data-files"])
+       assert.Equal(t, "1", 
snap.Summary.Properties["added-equality-delete-files"])
+}
+
+func TestRowDeltaManifestContents(t *testing.T) {
+       tbl := newRowDeltaCommitTestTable(t)
+
+       tx := tbl.NewTransaction()
+       rd := tx.NewRowDelta(nil)
+       rd.AddRows(buildDataFile(t, "s3://bucket/data/insert.parquet"))
+       rd.AddDeletes(buildPosDeleteFile(t, "s3://bucket/data/pos-del.parquet"))
+
+       require.NoError(t, rd.Commit(t.Context()))
+
+       result, err := tx.Commit(t.Context())
+       require.NoError(t, err)
+
+       snap := result.CurrentSnapshot()
+       require.NotNil(t, snap)
+
+       fs := iceio.LocalFS{}
+       manifests, err := snap.Manifests(fs)
+       require.NoError(t, err)
+
+       // Should have separate data and delete manifests
+       var dataManifests, deleteManifests int
+       for _, m := range manifests {
+               switch m.ManifestContent() {
+               case iceberg.ManifestContentData:
+                       dataManifests++
+               case iceberg.ManifestContentDeletes:
+                       deleteManifests++
+               }
+       }
+
+       assert.Equal(t, 1, dataManifests, "expected 1 data manifest")
+       assert.Equal(t, 1, deleteManifests, "expected 1 delete manifest")
+
+       // Verify manifest entries have correct content types
+       for _, m := range manifests {
+               entries, err := m.FetchEntries(fs, true)
+               require.NoError(t, err)
+
+               for _, e := range entries {
+                       if m.ManifestContent() == iceberg.ManifestContentData {
+                               assert.Equal(t, iceberg.EntryContentData, 
e.DataFile().ContentType())
+                       } else {
+                               assert.Equal(t, iceberg.EntryContentPosDeletes, 
e.DataFile().ContentType())
+                       }
+               }
+       }
+}
+
+func TestRowDeltaMultipleCommitsOnSameTransaction(t *testing.T) {
+       tbl := newRowDeltaCommitTestTable(t)
+
+       tx := tbl.NewTransaction()
+
+       // First RowDelta: append data
+       rd1 := tx.NewRowDelta(nil)
+       rd1.AddRows(buildDataFile(t, "s3://bucket/data/batch1.parquet"))
+       require.NoError(t, rd1.Commit(t.Context()))
+
+       // Second RowDelta: append + delete
+       rd2 := tx.NewRowDelta(nil)
+       rd2.AddRows(buildDataFile(t, "s3://bucket/data/batch2.parquet"))
+       rd2.AddDeletes(buildPosDeleteFile(t, "s3://bucket/data/del2.parquet"))
+       require.NoError(t, rd2.Commit(t.Context()))
+
+       result, err := tx.Commit(t.Context())
+       require.NoError(t, err)
+
+       snap := result.CurrentSnapshot()
+       require.NotNil(t, snap)
+
+       // The last RowDelta's operation should be reflected
+       assert.Equal(t, table.OpOverwrite, snap.Summary.Operation)
+       assert.Equal(t, strconv.Itoa(2), 
snap.Summary.Properties["total-data-files"])
+}
+
+// writeParquetFile writes Arrow records to a Parquet file on local disk.
+func writeParquetFile(t *testing.T, path string, sc *arrow.Schema, jsonData 
string) {
+       t.Helper()
+
+       rec, _, err := array.RecordFromJSON(memory.DefaultAllocator, sc, 
strings.NewReader(jsonData))
+       require.NoError(t, err)
+       defer rec.Release()
+
+       fs := iceio.LocalFS{}
+       fw, err := fs.Create(path)
+       require.NoError(t, err)
+
+       tbl := array.NewTableFromRecords(sc, []arrow.RecordBatch{rec})
+       defer tbl.Release()
+
+       require.NoError(t, pqarrow.WriteTable(tbl, fw, rec.NumRows(),
+               parquet.NewWriterProperties(parquet.WithStats(true)),
+               pqarrow.DefaultWriterProps()))
+}
+
+func TestRowDeltaIntegrationPosDeleteRoundTrip(t *testing.T) {
+       location := filepath.ToSlash(t.TempDir())
+
+       // Schema: id (int64), data (string)
+       iceSchema := iceberg.NewSchema(0,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
+               iceberg.NestedField{ID: 2, Name: "data", Type: 
iceberg.PrimitiveTypes.String, Required: false},
+       )
+
+       meta, err := table.NewMetadata(iceSchema, iceberg.UnpartitionedSpec,
+               table.UnsortedSortOrder, location,
+               iceberg.Properties{table.PropertyFormatVersion: "2"})
+       require.NoError(t, err)
+
+       cat := &rowDeltaCatalog{metadata: meta}
+       tbl := table.New(
+               table.Identifier{"db", "pos_del_roundtrip"},
+               meta, location+"/metadata/v1.metadata.json",
+               func(ctx context.Context) (iceio.IO, error) {
+                       return iceio.LocalFS{}, nil
+               },
+               cat,
+       )
+
+       // Step 1: Append 5 rows via normal append
+       arrowSc, err := table.SchemaToArrowSchema(iceSchema, nil, false, false)
+       require.NoError(t, err)
+
+       dataPath := location + "/data/data-001.parquet"
+       writeParquetFile(t, dataPath, arrowSc, `[
+               {"id": 1, "data": "alpha"},
+               {"id": 2, "data": "beta"},
+               {"id": 3, "data": "gamma"},
+               {"id": 4, "data": "delta"},
+               {"id": 5, "data": "epsilon"}
+       ]`)
+
+       tx := tbl.NewTransaction()
+       err = tx.AddFiles(t.Context(), []string{dataPath}, nil, false)
+       require.NoError(t, err)
+
+       tbl, err = tx.Commit(t.Context())
+       require.NoError(t, err)
+
+       // Verify: 5 rows scannable
+       assertRowCount(t, tbl, 5)
+
+       // Step 2: Commit a position delete via RowDelta that removes rows 1 
and 3
+       // (0-indexed: positions 1 and 3 → "beta" and "delta")
+       posDelArrowSc := table.PositionalDeleteArrowSchema
+       posDelPath := location + "/data/pos-del-001.parquet"
+       writeParquetFile(t, posDelPath, posDelArrowSc, `[
+               {"file_path": "`+dataPath+`", "pos": 1},
+               {"file_path": "`+dataPath+`", "pos": 3}
+       ]`)
+
+       posDelBuilder, err := iceberg.NewDataFileBuilder(
+               *iceberg.UnpartitionedSpec, iceberg.EntryContentPosDeletes,
+               posDelPath, iceberg.ParquetFile, nil, nil, nil, 2, 256)
+       require.NoError(t, err)
+       posDelFile := posDelBuilder.Build()
+
+       tx2 := tbl.NewTransaction()
+       rd := tx2.NewRowDelta(nil)
+       rd.AddDeletes(posDelFile)
+       require.NoError(t, rd.Commit(t.Context()))
+
+       tbl, err = tx2.Commit(t.Context())
+       require.NoError(t, err)
+
+       // Step 3: Scan and verify rows 1 and 3 (beta, delta) are deleted
+       // Remaining: alpha (0), gamma (2), epsilon (4)
+       assertRowCount(t, tbl, 3)
+
+       // Verify the actual values
+       _, itr, err := tbl.Scan(table.WithSelectedFields("id", 
"data")).ToArrowRecords(t.Context())
+       require.NoError(t, err)
+
+       var ids []int64
+       for rec, err := range itr {
+               require.NoError(t, err)
+               col := rec.Column(0).(*array.Int64)
+               for i := 0; i < col.Len(); i++ {
+                       ids = append(ids, col.Value(i))
+               }
+               rec.Release()
+       }
+
+       assert.Equal(t, []int64{1, 3, 5}, ids, "expected rows at positions 
0,2,4 (beta/delta deleted)")
+}
+
+func assertRowCount(t *testing.T, tbl *table.Table, expected int64) {
+       t.Helper()
+
+       _, itr, err := tbl.Scan().ToArrowRecords(t.Context())
+       require.NoError(t, err)
+
+       var total int64
+       next, stop := iter.Pull2(itr)
+       defer stop()
+
+       for {
+               rec, err, valid := next()
+               if !valid {
+                       break
+               }
+
+               require.NoError(t, err)
+               total += rec.NumRows()
+               rec.Release()
+       }
+
+       assert.Equal(t, expected, total, "unexpected row count")
+}
diff --git a/table/snapshot_producers.go b/table/snapshot_producers.go
index 9bbc0e85..00d71f68 100644
--- a/table/snapshot_producers.go
+++ b/table/snapshot_producers.go
@@ -427,17 +427,17 @@ func (m *mergeAppendFiles) processManifests(manifests 
[]iceberg.ManifestFile) ([
 type snapshotProducer struct {
        producerImpl
 
-       commitUuid          uuid.UUID
-       io                  iceio.WriteFileIO
-       txn                 *Transaction
-       op                  Operation
-       snapshotID          int64
-       parentSnapshotID    int64
-       addedFiles          []iceberg.DataFile
-       positionDeleteFiles []iceberg.DataFile
-       manifestCount       atomic.Int32
-       deletedFiles        map[string]iceberg.DataFile
-       snapshotProps       iceberg.Properties
+       commitUuid       uuid.UUID
+       io               iceio.WriteFileIO
+       txn              *Transaction
+       op               Operation
+       snapshotID       int64
+       parentSnapshotID int64
+       addedFiles       []iceberg.DataFile
+       addedDeleteFiles []iceberg.DataFile
+       manifestCount    atomic.Int32
+       deletedFiles     map[string]iceberg.DataFile
+       snapshotProps    iceberg.Properties
 }
 
 func createSnapshotProducer(op Operation, txn *Transaction, fs 
iceio.WriteFileIO, commitUUID *uuid.UUID, snapshotProps iceberg.Properties) 
*snapshotProducer {
@@ -483,8 +483,8 @@ func (sp *snapshotProducer) appendDataFile(df 
iceberg.DataFile) *snapshotProduce
        return sp
 }
 
-func (sp *snapshotProducer) appendPositionDeleteFile(df iceberg.DataFile) 
*snapshotProducer {
-       sp.positionDeleteFiles = append(sp.positionDeleteFiles, df)
+func (sp *snapshotProducer) appendDeleteFile(df iceberg.DataFile) 
*snapshotProducer {
+       sp.addedDeleteFiles = append(sp.addedDeleteFiles, df)
 
        return sp
 }
@@ -547,8 +547,8 @@ func (sp *snapshotProducer) manifests() (_ 
[]iceberg.ManifestFile, err error) {
                g.Go(sp.manifestProducer(iceberg.ManifestContentData, 
sp.addedFiles, &addedManifests))
        }
 
-       if len(sp.positionDeleteFiles) > 0 {
-               g.Go(sp.manifestProducer(iceberg.ManifestContentDeletes, 
sp.positionDeleteFiles, &positionDeleteManifests))
+       if len(sp.addedDeleteFiles) > 0 {
+               g.Go(sp.manifestProducer(iceberg.ManifestContentDeletes, 
sp.addedDeleteFiles, &positionDeleteManifests))
        }
 
        if len(deleted) > 0 {
@@ -668,7 +668,7 @@ func (sp *snapshotProducer) summary(props 
iceberg.Properties) (Summary, error) {
                        return Summary{}, err
                }
        }
-       for _, df := range sp.positionDeleteFiles {
+       for _, df := range sp.addedDeleteFiles {
                if err = ssc.addFile(df, currentSchema, *partitionSpec); err != 
nil {
                        return Summary{}, err
                }
diff --git a/table/transaction.go b/table/transaction.go
index 815d3459..a28457a3 100644
--- a/table/transaction.go
+++ b/table/transaction.go
@@ -1350,7 +1350,7 @@ func (t *Transaction) writePositionDeletesForFiles(ctx 
context.Context, fs io.IO
                if err != nil {
                        return err
                }
-               updater.appendPositionDeleteFile(f)
+               updater.appendDeleteFile(f)
        }
 
        return nil

Reply via email to