This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 8f3c302f feat(table): add support for merge-on-read delete (#721)
8f3c302f is described below

commit 8f3c302fa525a0914c261f28315e6c5f1c75c138
Author: Alex Normand <[email protected]>
AuthorDate: Thu Feb 26 08:10:17 2026 -0800

    feat(table): add support for merge-on-read delete (#721)
    
    ![delete
    
this](https://media4.giphy.com/media/v1.Y2lkPTc5MGI3NjExZXlvdDl4dnl0d2d1OGEybXc3NTZkbHg5eXplMzZkbzF3c2xkZXl2ZSZlcD12MV9pbnRlcm5hbF9naWZfYnlfaWQmY3Q9Zw/xULW8N9O5WD32L5052/giphy.gif)
    
    This adds support for merge-on-read deletes. It offers an alternative to
    the copy-on-write to generate position delete files instead of rewriting
    existing data files.
    
    I'm not very confident in the elegance of my solution as I'm still new
    to the internals of iceberg-go but the high-level is:
    * Reuse the classification code from the existing delete implementation
    to get the list of files of dropped files vs files with partial deletes
    * Reuse the arrow scanning facilities to filter records from the data
    files with partial deletes and emit position delete records with file
    path and position.
    * This is done by reusing the pipeline code and function and making the
    first stage in the pipeline one to enrich the `RecordBatch` with the
    file Path and position before the original position is lost due to
    filtering.
    * After filtering, the RecordBatch is projected to the position delete
    schema (i.e. the original schema fields are dropped)
    * Once we have filtered PositionDelete records that need to be emitted,
    we reuse the record to file writing to generate position delete files.
    
    ## Testing
    Integration tests were added to exercise the partitioned and
    unpartitioned paths and the data is such that it's meant to actually
    produce a position delete file rather than just go through the quick
    path that drops an entire file because all records are gone.
    
    ## Indirect fixes
    While working on this change and adding the testing for the partitioned
    table deletions, I realized that the manifest evaluation when the filter
    affected a field that was part of a partition spec was not built
    correctly. It needed to use similar code as what's done during scanning
    to build projections and build a manifest evaluator per partition id.
    This is fixed in this PR but this technically also applies to
    copy-on-write and overwrite paths so the fix goes beyond the scope of
    the `merge-on-read`.
    
    Fixes #487.
---
 README.md                                          |  22 +-
 manifest.go                                        |  57 +++-
 table/arrow_scanner.go                             | 122 +++++++
 table/arrow_scanner_test.go                        | 127 +++++++
 table/arrow_utils.go                               | 111 ++++++-
 table/arrow_utils_internal_test.go                 |   2 +-
 table/evaluators_test.go                           |  45 +--
 table/internal/interfaces.go                       |   1 +
 table/internal/parquet_files.go                    |   2 +-
 table/internal/parquet_files_test.go               |   4 +-
 table/internal/utils.go                            |   4 +-
 table/partitioned_fanout_writer.go                 |  36 +-
 table/partitioned_fanout_writer_test.go            |   7 +-
 table/pos_delete_partitioned_fanout_writer.go      | 146 +++++++++
 table/pos_delete_partitioned_fanout_writer_test.go | 364 +++++++++++++++++++++
 table/rolling_data_writer.go                       |  48 +--
 table/scanner.go                                   |  20 +-
 table/snapshot_producers.go                        | 132 +++++---
 table/table_test.go                                | 151 ++++++++-
 table/transaction.go                               | 253 ++++++++++++--
 table/transaction_test.go                          | 114 +++++++
 table/writer.go                                    | 133 ++++++--
 22 files changed, 1657 insertions(+), 244 deletions(-)

diff --git a/README.md b/README.md
index a714bfb2..c02480bb 100644
--- a/README.md
+++ b/README.md
@@ -143,17 +143,17 @@ make lint-install
 As long as the FileSystem is supported and the Catalog supports altering
 the table, the following tracks the current write support:
 
-|        Operation        | Supported |
-|:-----------------------:|:---------:|
-|      Append Stream      |     X     |
-|    Append Data Files    |     X     |
-|      Rewrite Files      |           |
-|    Rewrite manifests    |           |
-|     Overwrite Files     |     X     |
-|  Copy-On-Write Delete   |     X     |
-|    Write Pos Delete     |           |
-|     Write Eq Delete     |           |
-|        Row Delta        |           |
+| Operation            | Supported |
+|:---------------------|:---------:|
+| Append Stream        |     X     |
+| Append Data Files    |     X     |
+| Rewrite Files        |           |
+| Rewrite manifests    |           |
+| Overwrite Files      |     X     |
+| Copy-On-Write Delete |     X     |
+| Write Pos Delete     |     X     |
+| Write Eq Delete      |           |
+| Row Delta            |           |
 
 
 ### CLI Usage
diff --git a/manifest.go b/manifest.go
index 68a7234a..d8c0e79e 100644
--- a/manifest.go
+++ b/manifest.go
@@ -605,6 +605,9 @@ func NewManifestReader(file ManifestFile, in io.Reader) 
(*ManifestReader, error)
        if err != nil {
                return nil, err
        }
+       defer func() {
+               _ = dec.Close()
+       }()
 
        metadata := dec.Metadata()
        sc := dec.Schema()
@@ -832,13 +835,11 @@ func ReadManifestList(in io.Reader) ([]ManifestFile, 
error) {
 }
 
 type writerImpl interface {
-       content() ManifestContent
        prepareEntry(*manifestEntry, int64) (ManifestEntry, error)
 }
 
 type v1writerImpl struct{}
 
-func (v1writerImpl) content() ManifestContent { return ManifestContentData }
 func (v1writerImpl) prepareEntry(entry *manifestEntry, sn int64) 
(ManifestEntry, error) {
        if entry.Snapshot != nil && *entry.Snapshot != sn {
                if entry.EntryStatus != EntryStatusEXISTING {
@@ -855,7 +856,6 @@ func (v1writerImpl) prepareEntry(entry *manifestEntry, sn 
int64) (ManifestEntry,
 
 type v2writerImpl struct{}
 
-func (v2writerImpl) content() ManifestContent { return ManifestContentData }
 func (v2writerImpl) prepareEntry(entry *manifestEntry, snapshotID int64) 
(ManifestEntry, error) {
        if entry.SeqNum == nil {
                if entry.Snapshot != nil && *entry.Snapshot != snapshotID {
@@ -872,7 +872,6 @@ func (v2writerImpl) prepareEntry(entry *manifestEntry, 
snapshotID int64) (Manife
 
 type v3writerImpl struct{}
 
-func (v3writerImpl) content() ManifestContent { return ManifestContentData }
 func (v3writerImpl) prepareEntry(entry *manifestEntry, snapshotID int64) 
(ManifestEntry, error) {
        if entry.SeqNum == nil {
                if entry.Snapshot != nil && *entry.Snapshot != snapshotID {
@@ -1048,8 +1047,9 @@ type ManifestWriter struct {
        output io.Writer
        writer *ocf.Encoder
 
-       spec   PartitionSpec
-       schema *Schema
+       spec    PartitionSpec
+       schema  *Schema
+       content ManifestContent
 
        partFieldNameToID map[string]int
        partFieldIDToType map[int]avro.LogicalType
@@ -1067,7 +1067,15 @@ type ManifestWriter struct {
        reusedEntry manifestEntry
 }
 
-func NewManifestWriter(version int, out io.Writer, spec PartitionSpec, schema 
*Schema, snapshotID int64) (*ManifestWriter, error) {
+type ManifestWriterOption func(w *ManifestWriter)
+
+func WithManifestWriterContent(content ManifestContent) ManifestWriterOption {
+       return func(w *ManifestWriter) {
+               w.content = content
+       }
+}
+
+func NewManifestWriter(version int, out io.Writer, spec PartitionSpec, schema 
*Schema, snapshotID int64, opts ...ManifestWriterOption) (*ManifestWriter, 
error) {
        var impl writerImpl
 
        switch version {
@@ -1098,6 +1106,7 @@ func NewManifestWriter(version int, out io.Writer, spec 
PartitionSpec, schema *S
                version:           version,
                output:            out,
                spec:              spec,
+               content:           ManifestContentData,
                schema:            schema,
                partFieldNameToID: nameToID,
                partFieldIDToType: idToType,
@@ -1106,6 +1115,13 @@ func NewManifestWriter(version int, out io.Writer, spec 
PartitionSpec, schema *S
                partitions:        make([]map[int]any, 0),
        }
 
+       for _, apply := range opts {
+               apply(w)
+       }
+       if version < 2 && w.content != ManifestContentData {
+               return nil, fmt.Errorf("unsupported content '%s' for format 
version '%d'", w.content, version)
+       }
+
        md, err := w.meta()
        if err != nil {
                return nil, err
@@ -1136,7 +1152,17 @@ func (w *ManifestWriter) Close() error {
        return w.writer.Close()
 }
 
-func (w *ManifestWriter) ToManifestFile(location string, length int64) 
(ManifestFile, error) {
+type ManifestFileOption func(mf *manifestFile)
+
+// WithManifestFileContent overrides the ManifestContent of a new manifest 
file with the provided value
+// Default: ManifestContentData
+func WithManifestFileContent(content ManifestContent) ManifestFileOption {
+       return func(mf *manifestFile) {
+               mf.Content = content
+       }
+}
+
+func (w *ManifestWriter) ToManifestFile(location string, length int64, opts 
...ManifestFileOption) (ManifestFile, error) {
        if err := w.Close(); err != nil {
                return nil, err
        }
@@ -1150,7 +1176,7 @@ func (w *ManifestWriter) ToManifestFile(location string, 
length int64) (Manifest
                return nil, err
        }
 
-       return &manifestFile{
+       mf := manifestFile{
                version:            w.version,
                Path:               location,
                Len:                length,
@@ -1167,7 +1193,12 @@ func (w *ManifestWriter) ToManifestFile(location string, 
length int64) (Manifest
                DeletedRowsCount:   w.deletedRows,
                PartitionList:      &partitions,
                Key:                nil,
-       }, nil
+       }
+       for _, apply := range opts {
+               apply(&mf)
+       }
+
+       return &mf, nil
 }
 
 func (w *ManifestWriter) meta() (map[string][]byte, error) {
@@ -1193,7 +1224,7 @@ func (w *ManifestWriter) meta() (map[string][]byte, 
error) {
                "partition-spec":    specFieldsJson,
                "partition-spec-id": []byte(strconv.Itoa(w.spec.ID())),
                "format-version":    []byte(strconv.Itoa(w.version)),
-               "content":           []byte(w.impl.content().String()),
+               "content":           []byte(w.content.String()),
        }, nil
 }
 
@@ -1416,7 +1447,7 @@ func (m *ManifestListWriter) AddManifests(files 
[]ManifestFile) error {
                                // if the sequence number is being assigned 
here,
                                // then the manifest must be created by the 
current
                                // operation.
-                               // to validate this, check the snapshot id 
matches the current commmit
+                               // to validate this, check the snapshot id 
matches the current commit
                                if m.commitSnapshotID != 
wrapped.AddedSnapshotID {
                                        return fmt.Errorf("found unassigned 
sequence number for a manifest from snapshot %d != %d",
                                                m.commitSnapshotID, 
wrapped.AddedSnapshotID)
@@ -2315,5 +2346,5 @@ type ManifestEntry interface {
 
 var PositionalDeleteSchema = NewSchema(0,
        NestedField{ID: 2147483546, Type: PrimitiveTypes.String, Name: 
"file_path", Required: true},
-       NestedField{ID: 2147483545, Type: PrimitiveTypes.Int32, Name: "pos", 
Required: true},
+       NestedField{ID: 2147483545, Type: PrimitiveTypes.Int64, Name: "pos", 
Required: true},
 )
diff --git a/table/arrow_scanner.go b/table/arrow_scanner.go
index 5e053388..6621f6a4 100644
--- a/table/arrow_scanner.go
+++ b/table/arrow_scanner.go
@@ -42,6 +42,8 @@ const (
        ScanOptionArrowUseLargeTypes = "arrow.use_large_types"
 )
 
+var PositionalDeleteArrowSchema, _ = 
SchemaToArrowSchema(iceberg.PositionalDeleteSchema, nil, true, false)
+
 type (
        positionDeletes   = []*arrow.Chunked
        perFilePosDeletes = map[string]positionDeletes
@@ -189,6 +191,54 @@ func processPositionalDeletes(ctx context.Context, deletes 
set[int64]) recProces
        }
 }
 
+// enrichRecordsWithPosDeleteFields enriches a RecordBatch with the columns 
declared in the PositionalDeleteArrowSchema
+// so that during the pipeline filtering stages that sheds filtered out 
records, we still have a way to
+// preserve the original position of those records.
+func enrichRecordsWithPosDeleteFields(ctx context.Context, filePath 
iceberg.DataFile) recProcessFn {
+       nextIdx, mem := int64(0), compute.GetAllocator(ctx)
+       filePathField, ok := 
PositionalDeleteArrowSchema.FieldsByName("file_path")
+       if !ok {
+               panic("position delete schema should have required field 
'file_path'")
+       }
+       posField, ok := PositionalDeleteArrowSchema.FieldsByName("pos")
+       if !ok {
+               panic("position delete schema should have required field 'pos'")
+       }
+
+       return func(inData arrow.RecordBatch) (outData arrow.RecordBatch, err 
error) {
+               defer inData.Release()
+
+               schema := inData.Schema()
+               fieldIdx := schema.NumFields()
+               schema, err = schema.AddField(fieldIdx, filePathField[0])
+               if err != nil {
+                       return nil, err
+               }
+               schema, err = schema.AddField(fieldIdx+1, posField[0])
+               if err != nil {
+                       return nil, err
+               }
+
+               filePathBuilder := array.NewStringBuilder(mem)
+               defer filePathBuilder.Release()
+               posBuilder := array.NewInt64Builder(mem)
+               defer posBuilder.Release()
+
+               startPos := nextIdx
+               nextIdx += inData.NumRows()
+
+               for i := startPos; i < nextIdx; i++ {
+                       filePathBuilder.Append(filePath.FilePath())
+                       posBuilder.Append(i)
+               }
+
+               columns := append(inData.Columns(), filePathBuilder.NewArray(), 
posBuilder.NewArray())
+               outData = array.NewRecordBatch(schema, columns, 
inData.NumRows())
+
+               return outData, err
+       }
+}
+
 func filterRecords(ctx context.Context, recordFilter expr.Expression) 
recProcessFn {
        return func(rec arrow.RecordBatch) (arrow.RecordBatch, error) {
                defer rec.Release()
@@ -473,6 +523,78 @@ func (as *arrowScan) recordsFromTask(ctx context.Context, 
task internal.Enumerat
        return err
 }
 
+func (as *arrowScan) producePosDeletesFromTask(ctx context.Context, task 
internal.Enumerated[FileScanTask], positionalDeletes positionDeletes, out 
chan<- enumeratedRecord) (err error) {
+       defer func() {
+               if err != nil {
+                       out <- enumeratedRecord{Task: task, Err: err}
+               }
+       }()
+
+       var (
+               rdr        internal.FileReader
+               iceSchema  *iceberg.Schema
+               colIndices []int
+               filterFunc recProcessFn
+               dropFile   bool
+       )
+
+       iceSchema, colIndices, rdr, err = as.prepareToRead(ctx, task.Value.File)
+       if err != nil {
+               return err
+       }
+       defer iceinternal.CheckedClose(rdr, &err)
+
+       fields := append(iceSchema.Fields(), 
iceberg.PositionalDeleteSchema.Fields()...)
+       enrichedIcebergSchema := iceberg.NewSchema(iceSchema.ID+1, fields...)
+
+       pipeline := make([]recProcessFn, 0, 2)
+       pipeline = append(pipeline, enrichRecordsWithPosDeleteFields(ctx, 
task.Value.File))
+       if len(positionalDeletes) > 0 {
+               deletes := set[int64]{}
+               for _, chunk := range positionalDeletes {
+                       for _, a := range chunk.Chunks() {
+                               for _, v := range 
a.(*array.Int64).Int64Values() {
+                                       deletes[v] = struct{}{}
+                               }
+                       }
+               }
+
+               pipeline = append(pipeline, processPositionalDeletes(ctx, 
deletes))
+       }
+
+       filterFunc, dropFile, err = as.getRecordFilter(ctx, iceSchema)
+       if err != nil {
+               return err
+       }
+
+       // Nothing to delete in a dropped file
+       if dropFile {
+               var emptySchema *arrow.Schema
+               emptySchema, err = 
SchemaToArrowSchema(iceberg.PositionalDeleteSchema, nil, false, 
as.useLargeTypes)
+               if err != nil {
+                       return err
+               }
+               out <- enumeratedRecord{Task: task, Record: 
internal.Enumerated[arrow.RecordBatch]{
+                       Value: array.NewRecordBatch(emptySchema, nil, 0), 
Index: 0, Last: true,
+               }}
+
+               return err
+       }
+
+       if filterFunc != nil {
+               pipeline = append(pipeline, filterFunc)
+       }
+       pipeline = append(pipeline, func(r arrow.RecordBatch) 
(arrow.RecordBatch, error) {
+               defer r.Release()
+
+               return ToRequestedSchema(ctx, iceberg.PositionalDeleteSchema, 
enrichedIcebergSchema, r, false, true, as.useLargeTypes)
+       })
+
+       err = as.processRecords(ctx, task, iceSchema, rdr, colIndices, 
pipeline, out)
+
+       return err
+}
+
 func createIterator(ctx context.Context, numWorkers uint, records <-chan 
enumeratedRecord, deletesPerFile perFilePosDeletes, cancel 
context.CancelCauseFunc, rowLimit int64) iter.Seq2[arrow.RecordBatch, error] {
        isBeforeAny := func(batch enumeratedRecord) bool {
                return batch.Task.Index < 0
diff --git a/table/arrow_scanner_test.go b/table/arrow_scanner_test.go
new file mode 100644
index 00000000..58e34322
--- /dev/null
+++ b/table/arrow_scanner_test.go
@@ -0,0 +1,127 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+       "strconv"
+       "strings"
+       "testing"
+
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/arrow-go/v18/arrow/array"
+       "github.com/apache/arrow-go/v18/arrow/memory"
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+)
+
+func TestEnrichRecordsWithPosDeleteFields(t *testing.T) {
+       testSchema := arrow.NewSchema([]arrow.Field{
+               {Name: "first_name", Type: &arrow.StringType{}, Nullable: 
false},
+               {Name: "last_name", Type: &arrow.StringType{}, Nullable: false},
+               {Name: "age", Type: &arrow.Int32Type{}, Nullable: true},
+       }, nil)
+       schemaWithPosDelete := arrow.NewSchema(append(testSchema.Fields(),
+               arrow.Field{Name: "file_path", Type: &arrow.StringType{}, 
Nullable: false, Metadata: 
arrow.MetadataFrom(map[string]string{ArrowParquetFieldIDKey: 
strconv.Itoa(2147483546)})},
+               arrow.Field{Name: "pos", Type: &arrow.Int64Type{}, Nullable: 
false, Metadata: arrow.MetadataFrom(map[string]string{ArrowParquetFieldIDKey: 
strconv.Itoa(2147483545)})},
+       ), nil)
+
+       testCases := []struct {
+               name            string
+               inputBatches    []arrow.RecordBatch
+               expectedOutputs []arrow.RecordBatch
+       }{
+               {
+                       name:            "one empty record batch",
+                       inputBatches:    
[]arrow.RecordBatch{mustLoadRecordBatchFromJSON(testSchema, `[]`)},
+                       expectedOutputs: 
[]arrow.RecordBatch{mustLoadRecordBatchFromJSON(schemaWithPosDelete, `[]`)},
+               },
+               {
+                       name:            "batch of one",
+                       inputBatches:    
[]arrow.RecordBatch{mustLoadRecordBatchFromJSON(testSchema, `[{"first_name": 
"alan", "last_name": "gopher", "age": 7}]`)},
+                       expectedOutputs: 
[]arrow.RecordBatch{mustLoadRecordBatchFromJSON(schemaWithPosDelete, 
`[{"first_name": "alan", "last_name": "gopher", "age": 7, "file_path": 
"file://test_path.parquet", "pos": 0}]`)},
+               },
+               {
+                       name: "batch of many",
+                       inputBatches: 
[]arrow.RecordBatch{mustLoadRecordBatchFromJSON(testSchema, `[{"first_name": 
"alan", "last_name": "gopher", "age": 7},
+{"first_name": "steve", "last_name": "gopher", "age": 5},
+{"first_name": "dead", "last_name": "gopher", "age": 95}]`)},
+                       expectedOutputs: 
[]arrow.RecordBatch{mustLoadRecordBatchFromJSON(schemaWithPosDelete, 
`[{"first_name": "alan", "last_name": "gopher", "age": 7, "file_path": 
"file://test_path.parquet", "pos": 0},
+{"first_name": "steve", "last_name": "gopher", "age": 5, "file_path": 
"file://test_path.parquet", "pos": 1},
+{"first_name": "dead", "last_name": "gopher", "age": 95, "file_path": 
"file://test_path.parquet", "pos": 2}]`)},
+               },
+               {
+                       name: "many batches",
+                       inputBatches: []arrow.RecordBatch{
+                               mustLoadRecordBatchFromJSON(testSchema, 
`[{"first_name": "alan", "last_name": "gopher", "age": 7},
+{"first_name": "steve", "last_name": "gopher", "age": 5},
+{"first_name": "dead", "last_name": "gopher", "age": 95}]`),
+                               mustLoadRecordBatchFromJSON(testSchema, 
`[{"first_name": "matt", "last_name": "gopher", "age": 2},
+{"first_name": "alex", "last_name": "gopher", "age": 10}]`),
+                       },
+                       expectedOutputs: []arrow.RecordBatch{
+                               
mustLoadRecordBatchFromJSON(schemaWithPosDelete, `[{"first_name": "alan", 
"last_name": "gopher", "age": 7, "file_path": "file://test_path.parquet", 
"pos": 0},
+{"first_name": "steve", "last_name": "gopher", "age": 5, "file_path": 
"file://test_path.parquet", "pos": 1},
+{"first_name": "dead", "last_name": "gopher", "age": 95, "file_path": 
"file://test_path.parquet", "pos": 2}]`),
+                               
mustLoadRecordBatchFromJSON(schemaWithPosDelete, `[{"first_name": "matt", 
"last_name": "gopher", "age": 2, "file_path": "file://test_path.parquet", 
"pos": 3},
+{"first_name": "alex", "last_name": "gopher", "age": 10, "file_path": 
"file://test_path.parquet", "pos": 4}]`),
+                       },
+               },
+       }
+
+       for _, tc := range testCases {
+               t.Run(tc.name, func(t *testing.T) {
+                       defer func() {
+                               for _, b := range tc.inputBatches {
+                                       b.Release()
+                               }
+                       }()
+
+                       enrichFn := 
enrichRecordsWithPosDeleteFields(t.Context(), &mockDataFile{path: 
"file://test_path.parquet"})
+                       for i, b := range tc.inputBatches {
+                               out, err := enrichFn(b)
+                               require.NoError(t, err)
+                               defer func() {
+                                       out.Release()
+                               }()
+
+                               assert.Equal(t, schemaWithPosDelete, 
out.Schema())
+                               assert.Equal(t, out.NumRows(), b.NumRows())
+
+                               expectedOutputJSON, err := 
tc.expectedOutputs[i].MarshalJSON()
+                               require.NoError(t, err)
+
+                               outAsJSON, err := out.MarshalJSON()
+                               require.NoError(t, err)
+
+                               assert.Equal(t, string(expectedOutputJSON), 
string(outAsJSON))
+                       }
+               })
+       }
+}
+
+// mustLoadRecordBatchFromJSON is a convenience wrapper around 
array.RecordFromJSON that returns the RecordBatch only
+// to make it friendlier to table-driven tests. In case of error parsing the 
json content, it panics.
+func mustLoadRecordBatchFromJSON(schema *arrow.Schema, content string) 
arrow.RecordBatch {
+       mem := memory.NewGoAllocator()
+       recordBatch, _, err := array.RecordFromJSON(mem, schema, 
strings.NewReader(content))
+       if err != nil {
+               panic("failed to load test data from JSON: " + err.Error())
+       }
+
+       return recordBatch
+}
diff --git a/table/arrow_utils.go b/table/arrow_utils.go
index e599735a..66cdf29d 100644
--- a/table/arrow_utils.go
+++ b/table/arrow_utils.go
@@ -1258,7 +1258,7 @@ func filesToDataFiles(ctx context.Context, fileIO 
iceio.IO, meta *MetadataBuilde
                                }
                        }
 
-                       df := statistics.ToDataFile(currentSchema, currentSpec, 
filePath, iceberg.ParquetFile, rdr.SourceFileSize(), partitionValues)
+                       df := statistics.ToDataFile(currentSchema, currentSpec, 
filePath, iceberg.ParquetFile, iceberg.EntryContentData, rdr.SourceFileSize(), 
partitionValues)
                        if !yield(df, nil) {
                                return
                        }
@@ -1306,10 +1306,10 @@ func recordsToDataFiles(ctx context.Context, 
rootLocation string, meta *Metadata
                if r := recover(); r != nil {
                        var err error
                        switch e := r.(type) {
-                       case string:
-                               err = fmt.Errorf("error encountered during file 
writing %s", e)
                        case error:
                                err = fmt.Errorf("error encountered during file 
writing: %w", e)
+                       default:
+                               err = fmt.Errorf("error encountered during 
position delete file writing: %v", e)
                        }
                        ret = func(yield func(iceberg.DataFile, error) bool) {
                                yield(nil, err)
@@ -1328,13 +1328,25 @@ func recordsToDataFiles(ctx context.Context, 
rootLocation string, meta *Metadata
        nameMapping := meta.CurrentSchema().NameMapping()
        taskSchema, err := ArrowSchemaToIceberg(args.sc, false, nameMapping)
        if err != nil {
-               panic(err)
+               return func(yield func(iceberg.DataFile, error) bool) {
+                       yield(nil, err)
+               }
        }
        currentSpec, err := meta.CurrentSpec()
-       if err != nil || currentSpec == nil {
-               panic(fmt.Errorf("%w: cannot write files without a current 
spec", err))
+       if err != nil {
+               return func(yield func(iceberg.DataFile, error) bool) {
+                       yield(nil, err)
+               }
+       }
+       if currentSpec == nil {
+               return func(yield func(iceberg.DataFile, error) bool) {
+                       yield(nil, fmt.Errorf("cannot write files without a 
current spec: %w", err))
+               }
        }
 
+       cw := newConcurrentDataFileWriter(func(rootLocation string, fs 
iceio.WriteFileIO, meta *MetadataBuilder, props iceberg.Properties, opts 
...dataFileWriterOption) (dataFileWriter, error) {
+               return newDataFileWriter(rootLocation, fs, meta, props, opts...)
+       })
        nextCount, stopCount := iter.Pull(args.counter)
        if currentSpec.IsUnpartitioned() {
                tasks := func(yield func(WriteTask) bool) {
@@ -1358,12 +1370,87 @@ func recordsToDataFiles(ctx context.Context, 
rootLocation string, meta *Metadata
                        }
                }
 
-               return writeFiles(ctx, rootLocation, args.fs, meta, nil, tasks)
-       } else {
-               rollingDataWriters := NewWriterFactory(rootLocation, args, 
meta, taskSchema, targetFileSize)
-               partitionWriter := newPartitionedFanoutWriter(*currentSpec, 
meta.CurrentSchema(), args.itr, &rollingDataWriters)
-               workers := config.EnvConfig.MaxWorkers
+               return cw.writeFiles(ctx, rootLocation, args.fs, meta, 
meta.props, nil, tasks)
+       }
+
+       factory := NewWriterFactory(rootLocation, args, meta, taskSchema, 
targetFileSize)
+       partitionWriter := newPartitionedFanoutWriter(*currentSpec, cw, 
meta.CurrentSchema(), args.itr, &factory)
+       workers := config.EnvConfig.MaxWorkers
+
+       return partitionWriter.Write(ctx, workers)
+}
+
+type partitionContext struct {
+       partitionData map[int]any
+       specID        int32
+}
+
+func positionDeleteRecordsToDataFiles(ctx context.Context, rootLocation 
string, meta *MetadataBuilder, partitionContextByFilePath 
map[string]partitionContext, args recordWritingArgs) (ret 
iter.Seq2[iceberg.DataFile, error]) {
+       if args.counter == nil {
+               args.counter = internal.Counter(0)
+       }
+
+       defer func() {
+               if r := recover(); r != nil {
+                       var err error
+                       switch e := r.(type) {
+                       case error:
+                               err = fmt.Errorf("error encountered during 
position delete file writing: %w", e)
+                       default:
+                               err = fmt.Errorf("error encountered during 
position delete file writing: %v", e)
+                       }
+                       ret = func(yield func(iceberg.DataFile, error) bool) {
+                               yield(nil, err)
+                       }
+               }
+       }()
 
-               return partitionWriter.Write(ctx, workers)
+       latestMetadata, err := meta.Build()
+       if err != nil {
+               return func(yield func(iceberg.DataFile, error) bool) {
+                       yield(nil, err)
+               }
        }
+
+       if args.writeUUID == nil {
+               u := uuid.Must(uuid.NewRandom())
+               args.writeUUID = &u
+       }
+
+       targetFileSize := int64(meta.props.GetInt(WriteTargetFileSizeBytesKey,
+               WriteTargetFileSizeBytesDefault))
+
+       cw := newConcurrentDataFileWriter(func(rootLocation string, fs 
iceio.WriteFileIO, meta *MetadataBuilder, props iceberg.Properties, opts 
...dataFileWriterOption) (dataFileWriter, error) {
+               return newPositionDeleteWriter(rootLocation, fs, meta, props, 
opts...)
+       }, withSchemaSanitization(false))
+       nextCount, stopCount := iter.Pull(args.counter)
+       if latestMetadata.PartitionSpec().IsUnpartitioned() {
+               tasks := func(yield func(WriteTask) bool) {
+                       defer stopCount()
+
+                       fileCount := 0
+                       for batch := range binPackRecords(args.itr, 
defaultBinPackLookback, targetFileSize) {
+                               cnt, _ := nextCount()
+                               fileCount++
+                               t := WriteTask{
+                                       Uuid:        *args.writeUUID,
+                                       ID:          cnt,
+                                       PartitionID: 
iceberg.UnpartitionedSpec.ID(),
+                                       FileCount:   fileCount,
+                                       Schema:      
iceberg.PositionalDeleteSchema,
+                                       Batches:     batch,
+                               }
+                               if !yield(t) {
+                                       return
+                               }
+                       }
+               }
+
+               return cw.writeFiles(ctx, rootLocation, args.fs, meta, 
meta.props, nil, tasks)
+       }
+       writerFactory := NewWriterFactory(rootLocation, args, meta, 
iceberg.PositionalDeleteSchema, targetFileSize)
+       partitionWriter := 
newPositionDeletePartitionedFanoutWriter(latestMetadata, cw, 
partitionContextByFilePath, args.itr, &writerFactory)
+       workers := config.EnvConfig.MaxWorkers
+
+       return partitionWriter.Write(ctx, workers)
 }
diff --git a/table/arrow_utils_internal_test.go 
b/table/arrow_utils_internal_test.go
index d3229822..e463c748 100644
--- a/table/arrow_utils_internal_test.go
+++ b/table/arrow_utils_internal_test.go
@@ -200,7 +200,7 @@ func (suite *FileStatsMetricsSuite) getDataFile(meta 
iceberg.Properties, writeSt
        stats := format.DataFileStatsFromMeta(fileMeta, collector, mapping)
 
        return stats.ToDataFile(tableMeta.CurrentSchema(), 
tableMeta.PartitionSpec(), "fake-path.parquet",
-               iceberg.ParquetFile, fileMeta.GetSourceFileSize(), nil)
+               iceberg.ParquetFile, iceberg.EntryContentData, 
fileMeta.GetSourceFileSize(), nil)
 }
 
 func (suite *FileStatsMetricsSuite) TestRecordCount() {
diff --git a/table/evaluators_test.go b/table/evaluators_test.go
index cec68b43..3f86b27b 100644
--- a/table/evaluators_test.go
+++ b/table/evaluators_test.go
@@ -1071,6 +1071,7 @@ func (p *ProjectionTestSuite) 
TestPartialProjectedFields() {
 
 type mockDataFile struct {
        path        string
+       contentType iceberg.ManifestEntryContent
        format      iceberg.FileFormat
        partition   map[int]any
        count       int64
@@ -1085,28 +1086,28 @@ type mockDataFile struct {
        specid int32
 }
 
-func (*mockDataFile) ContentType() iceberg.ManifestEntryContent { return 
iceberg.EntryContentData }
-func (m *mockDataFile) FilePath() string                        { return 
m.path }
-func (m *mockDataFile) FileFormat() iceberg.FileFormat          { return 
m.format }
-func (m *mockDataFile) Partition() map[int]any                  { return 
m.partition }
-func (m *mockDataFile) Count() int64                            { return 
m.count }
-func (m *mockDataFile) FileSizeBytes() int64                    { return 
m.filesize }
-func (m *mockDataFile) ColumnSizes() map[int]int64              { return 
m.columnSizes }
-func (m *mockDataFile) ValueCounts() map[int]int64              { return 
m.valueCounts }
-func (m *mockDataFile) NullValueCounts() map[int]int64          { return 
m.nullCounts }
-func (m *mockDataFile) NaNValueCounts() map[int]int64           { return 
m.nanCounts }
-func (*mockDataFile) DistinctValueCounts() map[int]int64        { return nil }
-func (m *mockDataFile) LowerBoundValues() map[int][]byte        { return 
m.lowerBounds }
-func (m *mockDataFile) UpperBoundValues() map[int][]byte        { return 
m.upperBounds }
-func (*mockDataFile) KeyMetadata() []byte                       { return nil }
-func (*mockDataFile) SplitOffsets() []int64                     { return nil }
-func (*mockDataFile) EqualityFieldIDs() []int                   { return nil }
-func (*mockDataFile) SortOrderID() *int                         { return nil }
-func (m *mockDataFile) SpecID() int32                           { return 
m.specid }
-func (*mockDataFile) FirstRowID() *int64                        { return nil }
-func (*mockDataFile) ReferencedDataFile() *string               { return nil }
-func (*mockDataFile) ContentOffset() *int64                     { return nil }
-func (*mockDataFile) ContentSizeInBytes() *int64                { return nil }
+func (m *mockDataFile) ContentType() iceberg.ManifestEntryContent { return 
m.contentType }
+func (m *mockDataFile) FilePath() string                          { return 
m.path }
+func (m *mockDataFile) FileFormat() iceberg.FileFormat            { return 
m.format }
+func (m *mockDataFile) Partition() map[int]any                    { return 
m.partition }
+func (m *mockDataFile) Count() int64                              { return 
m.count }
+func (m *mockDataFile) FileSizeBytes() int64                      { return 
m.filesize }
+func (m *mockDataFile) ColumnSizes() map[int]int64                { return 
m.columnSizes }
+func (m *mockDataFile) ValueCounts() map[int]int64                { return 
m.valueCounts }
+func (m *mockDataFile) NullValueCounts() map[int]int64            { return 
m.nullCounts }
+func (m *mockDataFile) NaNValueCounts() map[int]int64             { return 
m.nanCounts }
+func (*mockDataFile) DistinctValueCounts() map[int]int64          { return nil 
}
+func (m *mockDataFile) LowerBoundValues() map[int][]byte          { return 
m.lowerBounds }
+func (m *mockDataFile) UpperBoundValues() map[int][]byte          { return 
m.upperBounds }
+func (*mockDataFile) KeyMetadata() []byte                         { return nil 
}
+func (*mockDataFile) SplitOffsets() []int64                       { return nil 
}
+func (*mockDataFile) EqualityFieldIDs() []int                     { return nil 
}
+func (*mockDataFile) SortOrderID() *int                           { return nil 
}
+func (m *mockDataFile) SpecID() int32                             { return 
m.specid }
+func (*mockDataFile) FirstRowID() *int64                          { return nil 
}
+func (*mockDataFile) ReferencedDataFile() *string                 { return nil 
}
+func (*mockDataFile) ContentOffset() *int64                       { return nil 
}
+func (*mockDataFile) ContentSizeInBytes() *int64                  { return nil 
}
 
 type InclusiveMetricsTestSuite struct {
        suite.Suite
diff --git a/table/internal/interfaces.go b/table/internal/interfaces.go
index 0fdb8097..48830154 100644
--- a/table/internal/interfaces.go
+++ b/table/internal/interfaces.go
@@ -105,4 +105,5 @@ type WriteFileInfo struct {
        FileName   string
        StatsCols  map[int]StatisticsCollector
        WriteProps any
+       Content    iceberg.ManifestEntryContent
 }
diff --git a/table/internal/parquet_files.go b/table/internal/parquet_files.go
index 8e7cf252..2d0334c5 100644
--- a/table/internal/parquet_files.go
+++ b/table/internal/parquet_files.go
@@ -282,7 +282,7 @@ func (p parquetFormat) WriteDataFile(ctx context.Context, 
fs iceio.WriteFileIO,
        }
 
        return p.DataFileStatsFromMeta(filemeta, info.StatsCols, colMapping).
-               ToDataFile(info.FileSchema, info.Spec, info.FileName, 
iceberg.ParquetFile, cntWriter.Count, partitionValues), nil
+               ToDataFile(info.FileSchema, info.Spec, info.FileName, 
iceberg.ParquetFile, info.Content, cntWriter.Count, partitionValues), nil
 }
 
 type decAsIntAgg[T int32 | int64] struct {
diff --git a/table/internal/parquet_files_test.go 
b/table/internal/parquet_files_test.go
index 85480da0..42adc22c 100644
--- a/table/internal/parquet_files_test.go
+++ b/table/internal/parquet_files_test.go
@@ -261,7 +261,7 @@ func TestMetricsPrimitiveTypes(t *testing.T) {
 
        stats := format.DataFileStatsFromMeta(internal.Metadata(meta), 
getCollector(), mapping)
        df := stats.ToDataFile(tblMeta.CurrentSchema(), 
tblMeta.PartitionSpec(), "fake-path.parquet",
-               iceberg.ParquetFile, meta.GetSourceFileSize(), nil)
+               iceberg.ParquetFile, iceberg.EntryContentData, 
meta.GetSourceFileSize(), nil)
 
        assert.Len(t, df.ValueCounts(), 15)
        assert.Len(t, df.NullValueCounts(), 15)
@@ -463,7 +463,7 @@ func TestDecimalPhysicalTypes(t *testing.T) {
                        require.NotNil(t, stats)
 
                        df := stats.ToDataFile(tableMeta.CurrentSchema(), 
tableMeta.PartitionSpec(), "test.parquet",
-                               iceberg.ParquetFile, meta.GetSourceFileSize(), 
nil)
+                               iceberg.ParquetFile, iceberg.EntryContentData, 
meta.GetSourceFileSize(), nil)
 
                        // Verify bounds are correctly extracted
                        require.Contains(t, df.LowerBoundValues(), 1)
diff --git a/table/internal/utils.go b/table/internal/utils.go
index 6227d746..4e9a5a05 100644
--- a/table/internal/utils.go
+++ b/table/internal/utils.go
@@ -234,7 +234,7 @@ func (d *DataFileStatistics) PartitionValue(field 
iceberg.PartitionField, sc *ic
        return lowerT.Val.Any()
 }
 
-func (d *DataFileStatistics) ToDataFile(schema *iceberg.Schema, spec 
iceberg.PartitionSpec, path string, format iceberg.FileFormat, filesize int64, 
partitionValues map[int]any) iceberg.DataFile {
+func (d *DataFileStatistics) ToDataFile(schema *iceberg.Schema, spec 
iceberg.PartitionSpec, path string, format iceberg.FileFormat, content 
iceberg.ManifestEntryContent, filesize int64, partitionValues map[int]any) 
iceberg.DataFile {
        var fieldIDToPartitionData map[int]any
        fieldIDToLogicalType := make(map[int]avro.LogicalType)
        fieldIDToFixedSize := make(map[int]int)
@@ -276,7 +276,7 @@ func (d *DataFileStatistics) ToDataFile(schema 
*iceberg.Schema, spec iceberg.Par
                }
        }
 
-       bldr, err := iceberg.NewDataFileBuilder(spec, iceberg.EntryContentData,
+       bldr, err := iceberg.NewDataFileBuilder(spec, content,
                path, format, fieldIDToPartitionData, fieldIDToLogicalType, 
fieldIDToFixedSize, d.RecordCount, filesize)
        if err != nil {
                panic(err)
diff --git a/table/partitioned_fanout_writer.go 
b/table/partitioned_fanout_writer.go
index 624c5ad5..81479085 100644
--- a/table/partitioned_fanout_writer.go
+++ b/table/partitioned_fanout_writer.go
@@ -32,14 +32,15 @@ import (
        "golang.org/x/sync/errgroup"
 )
 
-// PartitionedFanoutWriter distributes Arrow records across multiple 
partitions based on
+// partitionedFanoutWriter distributes Arrow records across multiple 
partitions based on
 // a partition specification, writing data to separate files for each 
partition using
 // a fanout pattern with configurable parallelism.
 type partitionedFanoutWriter struct {
-       partitionSpec iceberg.PartitionSpec
-       schema        *iceberg.Schema
-       itr           iter.Seq2[arrow.RecordBatch, error]
-       writerFactory *writerFactory
+       partitionSpec            iceberg.PartitionSpec
+       schema                   *iceberg.Schema
+       itr                      iter.Seq2[arrow.RecordBatch, error]
+       writerFactory            *writerFactory
+       concurrentDataFileWriter *concurrentDataFileWriter
 }
 
 // PartitionInfo holds the row indices and partition values for a specific 
partition,
@@ -52,12 +53,13 @@ type partitionInfo struct {
 
 // NewPartitionedFanoutWriter creates a new PartitionedFanoutWriter with the 
specified
 // partition specification, schema, record iterator, and writerFactory.
-func newPartitionedFanoutWriter(partitionSpec iceberg.PartitionSpec, schema 
*iceberg.Schema, itr iter.Seq2[arrow.RecordBatch, error], writerFactory 
*writerFactory) *partitionedFanoutWriter {
+func newPartitionedFanoutWriter(partitionSpec iceberg.PartitionSpec, 
concurrentWriter *concurrentDataFileWriter, schema *iceberg.Schema, itr 
iter.Seq2[arrow.RecordBatch, error], writerFactory *writerFactory) 
*partitionedFanoutWriter {
        return &partitionedFanoutWriter{
-               partitionSpec: partitionSpec,
-               schema:        schema,
-               itr:           itr,
-               writerFactory: writerFactory,
+               partitionSpec:            partitionSpec,
+               schema:                   schema,
+               itr:                      itr,
+               writerFactory:            writerFactory,
+               concurrentDataFileWriter: concurrentWriter,
        }
 }
 
@@ -73,7 +75,7 @@ func (p *partitionedFanoutWriter) Write(ctx context.Context, 
workers int) iter.S
        outputDataFilesCh := make(chan iceberg.DataFile, workers)
 
        fanoutWorkers, ctx := errgroup.WithContext(ctx)
-       p.startRecordFeeder(ctx, fanoutWorkers, inputRecordsCh)
+       startRecordFeeder(ctx, p.itr, fanoutWorkers, inputRecordsCh)
 
        for range workers {
                fanoutWorkers.Go(func() error {
@@ -84,11 +86,11 @@ func (p *partitionedFanoutWriter) Write(ctx 
context.Context, workers int) iter.S
        return p.yieldDataFiles(fanoutWorkers, outputDataFilesCh)
 }
 
-func (p *partitionedFanoutWriter) startRecordFeeder(ctx context.Context, 
fanoutWorkers *errgroup.Group, inputRecordsCh chan<- arrow.RecordBatch) {
+func startRecordFeeder(ctx context.Context, itr iter.Seq2[arrow.RecordBatch, 
error], fanoutWorkers *errgroup.Group, inputRecordsCh chan<- arrow.RecordBatch) 
{
        fanoutWorkers.Go(func() error {
                defer close(inputRecordsCh)
 
-               for record, err := range p.itr {
+               for record, err := range itr {
                        if err != nil {
                                return err
                        }
@@ -137,7 +139,7 @@ func (p *partitionedFanoutWriter) fanout(ctx 
context.Context, inputRecordsCh <-c
                                }
 
                                partitionPath := 
p.partitionPath(val.partitionRec)
-                               rollingDataWriter, err := 
p.writerFactory.getOrCreateRollingDataWriter(ctx, partitionPath, 
val.partitionValues, dataFilesChannel)
+                               rollingDataWriter, err := 
p.writerFactory.getOrCreateRollingDataWriter(ctx, p.concurrentDataFileWriter, 
partitionPath, val.partitionValues, dataFilesChannel)
                                if err != nil {
                                        return err
                                }
@@ -152,13 +154,17 @@ func (p *partitionedFanoutWriter) fanout(ctx 
context.Context, inputRecordsCh <-c
 }
 
 func (p *partitionedFanoutWriter) yieldDataFiles(fanoutWorkers 
*errgroup.Group, outputDataFilesCh chan iceberg.DataFile) 
iter.Seq2[iceberg.DataFile, error] {
+       return yieldDataFiles(p.writerFactory, fanoutWorkers, outputDataFilesCh)
+}
+
+func yieldDataFiles(writerFactory *writerFactory, fanoutWorkers 
*errgroup.Group, outputDataFilesCh chan iceberg.DataFile) 
iter.Seq2[iceberg.DataFile, error] {
        // Use a channel to safely communicate the error from the goroutine
        // to avoid a data race between writing err in the goroutine and 
reading it in the iterator.
        errCh := make(chan error, 1)
        go func() {
                defer close(outputDataFilesCh)
                err := fanoutWorkers.Wait()
-               err = errors.Join(err, p.writerFactory.closeAll())
+               err = errors.Join(err, writerFactory.closeAll())
                errCh <- err
                close(errCh)
        }()
diff --git a/table/partitioned_fanout_writer_test.go 
b/table/partitioned_fanout_writer_test.go
index 734e31c7..b2082c84 100644
--- a/table/partitioned_fanout_writer_test.go
+++ b/table/partitioned_fanout_writer_test.go
@@ -134,8 +134,11 @@ func (s *FanoutWriterTestSuite) 
testTransformPartition(transform iceberg.Transfo
        taskSchema, err := ArrowSchemaToIceberg(args.sc, false, nameMapping)
        s.Require().NoError(err)
 
-       rollingDataWriters := NewWriterFactory(loc, args, metaBuilder, 
icebergSchema, 1024*1024)
-       partitionWriter := newPartitionedFanoutWriter(spec, taskSchema, 
args.itr, &rollingDataWriters)
+       cw := newConcurrentDataFileWriter(func(rootLocation string, fs 
iceio.WriteFileIO, meta *MetadataBuilder, props iceberg.Properties, opts 
...dataFileWriterOption) (dataFileWriter, error) {
+               return newDataFileWriter(rootLocation, fs, meta, props, opts...)
+       })
+       writerFactory := NewWriterFactory(loc, args, metaBuilder, 
icebergSchema, 1024*1024)
+       partitionWriter := newPartitionedFanoutWriter(spec, cw, taskSchema, 
args.itr, &writerFactory)
        workers := config.EnvConfig.MaxWorkers
 
        dataFiles := partitionWriter.Write(s.ctx, workers)
diff --git a/table/pos_delete_partitioned_fanout_writer.go 
b/table/pos_delete_partitioned_fanout_writer.go
new file mode 100644
index 00000000..4b3daeaa
--- /dev/null
+++ b/table/pos_delete_partitioned_fanout_writer.go
@@ -0,0 +1,146 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+       "context"
+       "fmt"
+       "iter"
+       "maps"
+       "slices"
+
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/arrow-go/v18/arrow/array"
+       "github.com/apache/iceberg-go"
+       "golang.org/x/sync/errgroup"
+)
+
+// positionDeletePartitionedFanoutWriter distributes Arrow position delete 
records across multiple partitions based on
+// a partition specification, writing data to separate delete files for each 
partition using
+// a fanout pattern with configurable parallelism.
+type positionDeletePartitionedFanoutWriter struct {
+       partitionContextByFilePath map[string]partitionContext
+       metadata                   Metadata
+       schema                     *iceberg.Schema
+       itr                        iter.Seq2[arrow.RecordBatch, error]
+       writerFactory              *writerFactory
+       concurrentDataFileWriter   *concurrentDataFileWriter
+}
+
+// newPositionDeletePartitionedFanoutWriter creates a new 
PartitionedFanoutWriter with the specified
+// partition specification, schema, and record iterator.
+func newPositionDeletePartitionedFanoutWriter(metadata Metadata, 
concurrentWriter *concurrentDataFileWriter, partitionContextByFilePath 
map[string]partitionContext, itr iter.Seq2[arrow.RecordBatch, error], 
writerFactory *writerFactory) *positionDeletePartitionedFanoutWriter {
+       return &positionDeletePartitionedFanoutWriter{
+               partitionContextByFilePath: partitionContextByFilePath,
+               metadata:                   metadata,
+               schema:                     iceberg.PositionalDeleteSchema,
+               itr:                        itr,
+               writerFactory:              writerFactory,
+               concurrentDataFileWriter:   concurrentWriter,
+       }
+}
+
+// Write writes the Arrow records to the specified location using a fanout 
pattern with
+// the specified number of workers. The returned iterator yields the data 
files written
+// by the fanout process.
+func (p *positionDeletePartitionedFanoutWriter) Write(ctx context.Context, 
workers int) iter.Seq2[iceberg.DataFile, error] {
+       inputRecordsCh := make(chan arrow.RecordBatch, workers)
+       outputDataFilesCh := make(chan iceberg.DataFile, workers)
+
+       fanoutWorkers, ctx := errgroup.WithContext(ctx)
+       startRecordFeeder(ctx, p.itr, fanoutWorkers, inputRecordsCh)
+
+       for range workers {
+               fanoutWorkers.Go(func() error {
+                       return p.fanout(ctx, inputRecordsCh, outputDataFilesCh)
+               })
+       }
+
+       return p.yieldDataFiles(fanoutWorkers, outputDataFilesCh)
+}
+
+func (p *positionDeletePartitionedFanoutWriter) fanout(ctx context.Context, 
inputRecordsCh <-chan arrow.RecordBatch, dataFilesChannel chan<- 
iceberg.DataFile) error {
+       for {
+               select {
+               case <-ctx.Done():
+                       return context.Cause(ctx)
+
+               case record, ok := <-inputRecordsCh:
+                       if !ok {
+                               return nil
+                       }
+
+                       err := p.processBatch(ctx, record, dataFilesChannel)
+                       if err != nil {
+                               return err
+                       }
+               }
+       }
+}
+
+func (p *positionDeletePartitionedFanoutWriter) processBatch(ctx 
context.Context, batch arrow.RecordBatch, dataFilesChannel chan<- 
iceberg.DataFile) (err error) {
+       defer batch.Release()
+
+       select {
+       case <-ctx.Done():
+               return context.Cause(ctx)
+       default:
+       }
+
+       if batch.NumRows() == 0 {
+               return
+       }
+
+       columns := batch.Columns()
+       filePathArray := columns[0].(*array.String)
+       filePath := filePathArray.ValueStr(0)
+       partitionContext, ok := p.partitionContextByFilePath[filePath]
+       if !ok {
+               return fmt.Errorf("unexpected missing partition context for 
path %s", filePath)
+       }
+
+       partitionPath, err := p.partitionPath(partitionContext)
+       if err != nil {
+               return err
+       }
+       rollingDataWriter, err := 
p.writerFactory.getOrCreateRollingDataWriter(ctx, p.concurrentDataFileWriter, 
partitionPath, partitionContext.partitionData, dataFilesChannel)
+       if err != nil {
+               return err
+       }
+
+       err = rollingDataWriter.Add(batch)
+       if err != nil {
+               return err
+       }
+
+       return nil
+}
+
+func (p *positionDeletePartitionedFanoutWriter) partitionPath(partitionContext 
partitionContext) (string, error) {
+       data := 
partitionRecord(slices.Collect(maps.Values(partitionContext.partitionData)))
+       spec := p.metadata.PartitionSpecByID(int(partitionContext.specID))
+       if spec == nil {
+               return "", fmt.Errorf("unexpected missing partition spec in 
metadata for spec id %d", partitionContext.specID)
+       }
+
+       return spec.PartitionToPath(data, p.schema), nil
+}
+
+func (p *positionDeletePartitionedFanoutWriter) yieldDataFiles(fanoutWorkers 
*errgroup.Group, outputDataFilesCh chan iceberg.DataFile) 
iter.Seq2[iceberg.DataFile, error] {
+       return yieldDataFiles(p.writerFactory, fanoutWorkers, outputDataFilesCh)
+}
diff --git a/table/pos_delete_partitioned_fanout_writer_test.go 
b/table/pos_delete_partitioned_fanout_writer_test.go
new file mode 100644
index 00000000..f0f1ff4c
--- /dev/null
+++ b/table/pos_delete_partitioned_fanout_writer_test.go
@@ -0,0 +1,364 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "maps"
+       "strings"
+       "testing"
+       "time"
+
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/iceberg-go"
+       "github.com/apache/iceberg-go/internal"
+       "github.com/apache/iceberg-go/io"
+       "github.com/google/uuid"
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+)
+
+func TestPositionDeletePartitionedFanoutWriterProcessBatch(t *testing.T) {
+       t.Parallel()
+
+       testCases := []struct {
+               name                   string
+               pathToPartitionContext map[string]partitionContext
+               ctx                    context.Context
+               input                  arrow.RecordBatch
+               expectedDataFile       iceberg.DataFile
+               expectedErr            error
+       }{
+               {
+                       name:             "empty batch",
+                       input:            
mustLoadRecordBatchFromJSON(PositionalDeleteArrowSchema, `[]`),
+                       expectedDataFile: nil,
+               },
+               {
+                       name:        "error on missing required path to 
partition data",
+                       input:       
mustLoadRecordBatchFromJSON(PositionalDeleteArrowSchema, `[{"file_path": 
"file://test_path.parquet", "pos": 0}]`),
+                       expectedErr: errors.New("unexpected missing partition 
context"),
+               },
+               {
+                       name:        "abort on context already done",
+                       ctx:         
onlyContext(context.WithDeadline(context.Background(), time.UnixMilli(0))),
+                       input:       
mustLoadRecordBatchFromJSON(PositionalDeleteArrowSchema, `[]`),
+                       expectedErr: errors.New("context deadline exceeded"),
+               },
+               {
+                       name:                   "error on partition context 
pointing to unknown partition spec",
+                       pathToPartitionContext: 
map[string]partitionContext{"file://namespace/age_bucket=1/test.parquet": 
{partitionData: map[int]any{iceberg.PartitionDataIDStart: 1}, specID: 200}},
+                       input:                  
mustLoadRecordBatchFromJSON(PositionalDeleteArrowSchema, `[{"file_path": 
"file://namespace/age_bucket=1/test.parquet", "pos": 100}]`),
+                       expectedErr:            errors.New("unexpected missing 
partition spec"),
+               },
+               {
+                       name:                   "success",
+                       pathToPartitionContext: 
map[string]partitionContext{"file://namespace/age_bucket=1/test.parquet": 
{partitionData: map[int]any{iceberg.PartitionDataIDStart: 1}, specID: 0}},
+                       input:                  
mustLoadRecordBatchFromJSON(PositionalDeleteArrowSchema, `[{"file_path": 
"file://namespace/age_bucket=1/test.parquet", "pos": 100}]`),
+                       expectedDataFile:       &mockDataFile{columnSizes: 
map[int]int64{2147483545: 88, 2147483546: 174}, format: iceberg.ParquetFile, 
partition: map[int]any{iceberg.PartitionDataIDStart: 1}, count: 1, specid: 0, 
contentType: iceberg.EntryContentPosDeletes},
+               },
+               // This test case illustrates how the 
positionDeletePartitionedFanoutWriter does not validate that all records
+               // in a batch have the same file path. Doing so would be 
prohibitive in the current implementation and
+               // the usage of the positionDeletePartitionedFanoutWriter is 
expected to ensure batches all have the same
+               // file_path value.
+               {
+                       name:                   "batch with records having 
different file paths",
+                       pathToPartitionContext: 
map[string]partitionContext{"file://namespace/age_bucket=1/test.parquet": 
{partitionData: map[int]any{iceberg.PartitionDataIDStart: 1}, specID: 0}},
+                       input:                  
mustLoadRecordBatchFromJSON(PositionalDeleteArrowSchema, `[{"file_path": 
"file://namespace/age_bucket=1/test.parquet", "pos": 100}, {"file_path": 
"file://namespace/age_bucket=0/test.parquet", "pos": 10}]`),
+                       expectedDataFile:       &mockDataFile{columnSizes: 
map[int]int64{2147483545: 96, 2147483546: 187}, format: iceberg.ParquetFile, 
partition: map[int]any{iceberg.PartitionDataIDStart: 1}, count: 2, specid: 0, 
contentType: iceberg.EntryContentPosDeletes},
+               },
+       }
+
+       for _, tc := range testCases {
+               t.Run(tc.name, func(t *testing.T) {
+                       ctx := tc.ctx
+                       if ctx == nil {
+                               ctx = t.Context()
+                       }
+
+                       partitionSpec := 
iceberg.NewPartitionSpec(iceberg.PartitionField{
+                               SourceID: 2,
+                               Name:     "age_bucket",
+                               Transform: iceberg.BucketTransform{
+                                       NumBuckets: 2,
+                               },
+                       })
+
+                       metadataBuilder, err := NewMetadataBuilder(2)
+                       require.NoError(t, err)
+                       err = metadataBuilder.AddSchema(iceberg.NewSchema(0, 
append(iceberg.PositionalDeleteSchema.Fields(), iceberg.NestedField{Name: 
"age", ID: 2, Type: iceberg.Int64Type{}})...))
+                       require.NoError(t, err)
+                       err = metadataBuilder.SetCurrentSchemaID(0)
+                       require.NoError(t, err)
+                       err = metadataBuilder.AddPartitionSpec(&partitionSpec, 
true)
+                       require.NoError(t, err)
+                       err = metadataBuilder.SetDefaultSpecID(0)
+                       require.NoError(t, err)
+                       sortOrder, err := NewSortOrder(1, []SortField{{
+                               SourceID:  2,
+                               Direction: SortASC,
+                               Transform: iceberg.IdentityTransform{},
+                               NullOrder: NullsFirst,
+                       }})
+                       require.NoError(t, err)
+                       err = metadataBuilder.AddSortOrder(&sortOrder)
+                       require.NoError(t, err)
+                       err = metadataBuilder.SetDefaultSortOrderID(1)
+                       require.NoError(t, err)
+                       latestMeta, err := metadataBuilder.Build()
+                       require.NoError(t, err)
+
+                       writeUUID := uuid.New()
+                       cw := newConcurrentDataFileWriter(func(rootLocation 
string, fs io.WriteFileIO, meta *MetadataBuilder, props iceberg.Properties, 
opts ...dataFileWriterOption) (dataFileWriter, error) {
+                               return newPositionDeleteWriter(rootLocation, 
fs, meta, props, opts...)
+                       })
+                       writerFactory := NewWriterFactory(t.TempDir(), 
recordWritingArgs{
+                               fs:        &io.LocalFS{},
+                               sc:        PositionalDeleteArrowSchema,
+                               writeUUID: &writeUUID,
+                               counter:   internal.Counter(0),
+                       }, metadataBuilder, iceberg.PositionalDeleteSchema, 
1024*1024)
+                       writer := 
newPositionDeletePartitionedFanoutWriter(latestMeta, cw, 
tc.pathToPartitionContext, nil, &writerFactory)
+                       require.NoError(t, err)
+
+                       dataFileCh := make(chan iceberg.DataFile, 10)
+                       err = writer.processBatch(ctx, tc.input, dataFileCh)
+                       if tc.expectedErr != nil {
+                               require.ErrorContains(t, err, 
tc.expectedErr.Error())
+
+                               return
+                       }
+                       require.NoError(t, err)
+
+                       err = writerFactory.closeAll()
+                       require.NoError(t, err)
+
+                       close(dataFileCh)
+
+                       actualDataFile := <-dataFileCh
+                       assert.NoError(t, equalsDataFile(tc.expectedDataFile, 
actualDataFile, defaultPositionDeleteMatching...))
+               })
+       }
+}
+
+func onlyContext(ctx context.Context, _ func()) context.Context {
+       return ctx
+}
+
+// dataFileMatcher implements a custom "matcher" that compares DataFiles. 
Because not all fields are always
+// important to validate, the dataFileMatcher can be passed in a number of 
matching options to define
+// which of its fields are compared during matching.
+type dataFileMatcher struct {
+       matchers   []fieldMatcher
+       formatters []formatter
+}
+
+type (
+       dataFileMatcherOption func(m *dataFileMatcher)
+       fieldMatcher          func(expected iceberg.DataFile, actual 
iceberg.DataFile) bool
+       formatter             func(val iceberg.DataFile) string
+)
+
+func withFileFormatMatching() dataFileMatcherOption {
+       return func(m *dataFileMatcher) {
+               m.matchers = append(m.matchers, func(expected iceberg.DataFile, 
actual iceberg.DataFile) bool {
+                       return expected.FileFormat() == actual.FileFormat()
+               })
+               m.formatters = append(m.formatters, func(val iceberg.DataFile) 
string {
+                       return fmt.Sprintf("FileFormat: %s", val.FileFormat())
+               })
+       }
+}
+
+func withPathMatching() dataFileMatcherOption {
+       return func(m *dataFileMatcher) {
+               m.matchers = append(m.matchers, func(expected iceberg.DataFile, 
actual iceberg.DataFile) bool {
+                       return expected.FilePath() == actual.FilePath()
+               })
+               m.formatters = append(m.formatters, func(val iceberg.DataFile) 
string {
+                       return "FilePath: " + val.FilePath()
+               })
+       }
+}
+
+func withSpecIDMatching() dataFileMatcherOption {
+       return func(m *dataFileMatcher) {
+               m.matchers = append(m.matchers, func(expected iceberg.DataFile, 
actual iceberg.DataFile) bool {
+                       return expected.SpecID() == actual.SpecID()
+               })
+               m.formatters = append(m.formatters, func(val iceberg.DataFile) 
string {
+                       return fmt.Sprintf("SpecID: %d", val.SpecID())
+               })
+       }
+}
+
+func withPartitionMatching() dataFileMatcherOption {
+       return func(m *dataFileMatcher) {
+               m.matchers = append(m.matchers, func(expected iceberg.DataFile, 
actual iceberg.DataFile) bool {
+                       return maps.Equal(expected.Partition(), 
actual.Partition())
+               })
+               m.formatters = append(m.formatters, func(val iceberg.DataFile) 
string {
+                       return fmt.Sprintf("Partition: %v", val.Partition())
+               })
+       }
+}
+
+func withContentTypeMatching() dataFileMatcherOption {
+       return func(m *dataFileMatcher) {
+               m.matchers = append(m.matchers, func(expected iceberg.DataFile, 
actual iceberg.DataFile) bool {
+                       return expected.ContentType() == actual.ContentType()
+               })
+               m.formatters = append(m.formatters, func(val iceberg.DataFile) 
string {
+                       return fmt.Sprintf("ContentType: %s", val.ContentType())
+               })
+       }
+}
+
+func withCountMatching() dataFileMatcherOption {
+       return func(m *dataFileMatcher) {
+               m.matchers = append(m.matchers, func(expected iceberg.DataFile, 
actual iceberg.DataFile) bool {
+                       return expected.Count() == actual.Count()
+               })
+               m.formatters = append(m.formatters, func(val iceberg.DataFile) 
string {
+                       return fmt.Sprintf("Count: %d", val.Count())
+               })
+       }
+}
+
+func withColumnSizesMatching() dataFileMatcherOption {
+       return func(m *dataFileMatcher) {
+               m.matchers = append(m.matchers, func(expected iceberg.DataFile, 
actual iceberg.DataFile) bool {
+                       return maps.Equal(expected.ColumnSizes(), 
actual.ColumnSizes())
+               })
+               m.formatters = append(m.formatters, func(val iceberg.DataFile) 
string {
+                       return fmt.Sprintf("ColumnSizes: %v", val.ColumnSizes())
+               })
+       }
+}
+
+func withContentOffsetMatching() dataFileMatcherOption {
+       return func(m *dataFileMatcher) {
+               m.matchers = append(m.matchers, func(expected iceberg.DataFile, 
actual iceberg.DataFile) bool {
+                       return comparePointerAndValue(expected.ContentOffset(), 
actual.ContentOffset())
+               })
+               m.formatters = append(m.formatters, func(val iceberg.DataFile) 
string {
+                       if val.ContentSizeInBytes() == nil {
+                               return "ContentOffset: nil"
+                       }
+
+                       return fmt.Sprintf("ContentOffset: %d", 
*val.ContentOffset())
+               })
+       }
+}
+
+func withContentSizeInBytesMatching() dataFileMatcherOption {
+       return func(m *dataFileMatcher) {
+               m.matchers = append(m.matchers, func(expected iceberg.DataFile, 
actual iceberg.DataFile) bool {
+                       return 
comparePointerAndValue(expected.ContentSizeInBytes(), 
actual.ContentSizeInBytes())
+               })
+               m.formatters = append(m.formatters, func(val iceberg.DataFile) 
string {
+                       if val.ContentSizeInBytes() == nil {
+                               return "ContentSizeInBytes: nil"
+                       }
+
+                       return fmt.Sprintf("ContentSizeInBytes: %d", 
*val.ContentSizeInBytes())
+               })
+       }
+}
+
+func withSortOrderIDMatching() dataFileMatcherOption {
+       return func(m *dataFileMatcher) {
+               m.matchers = append(m.matchers, func(expected iceberg.DataFile, 
actual iceberg.DataFile) bool {
+                       return comparePointerAndValue(expected.SortOrderID(), 
actual.SortOrderID())
+               })
+               m.formatters = append(m.formatters, func(val iceberg.DataFile) 
string {
+                       if val.SortOrderID() == nil {
+                               return "SortOrderID: nil"
+                       }
+
+                       return fmt.Sprintf("SortOrderID: %d", 
*val.SortOrderID())
+               })
+       }
+}
+
+func comparePointerAndValue[T comparable](left *T, right *T) bool {
+       if left == nil && right == nil {
+               return true
+       }
+       if left == nil {
+               return false
+       }
+       if right == nil {
+               return false
+       }
+
+       return *left == *right
+}
+
+func (m *dataFileMatcher) Matches(expected iceberg.DataFile, actual 
iceberg.DataFile) bool {
+       if expected == nil && actual == nil {
+               return true
+       }
+       if expected == nil {
+               return false
+       }
+       if actual == nil {
+               return false
+       }
+       for _, m := range m.matchers {
+               if !m(expected, actual) {
+                       return false
+               }
+       }
+
+       return true
+}
+
+func (m *dataFileMatcher) Format(val iceberg.DataFile) string {
+       if val == nil {
+               return "nil"
+       }
+       values := make([]string, 0, len(m.formatters))
+       for _, format := range m.formatters {
+               values = append(values, format(val))
+       }
+
+       return fmt.Sprintf("{%s}", strings.Join(values, ", "))
+}
+
+// defaultPositionDeleteMatching is a convenience preset for the options we 
want to match for position delete matching
+var defaultPositionDeleteMatching = 
[]dataFileMatcherOption{withContentTypeMatching(), withColumnSizesMatching(), 
withCountMatching(), withFileFormatMatching(), withSpecIDMatching(), 
withPartitionMatching(), withCountMatching()}
+
+// equalsDataFile invokes a dataFileMatcher with the specified matching 
options and compares two DataFile values.
+// Its return value is nil if both values are equal and an error with a 
meaningful formatted message to help
+// show the mismatch in case they are not. This is meant to be used with 
testify like:
+//
+//     assert.NoError(t, equalsDataFile(expected, actual))
+func equalsDataFile(expected iceberg.DataFile, actual iceberg.DataFile, opts 
...dataFileMatcherOption) (err error) {
+       matcher := &dataFileMatcher{}
+       for _, apply := range opts {
+               apply(matcher)
+       }
+       if !matcher.Matches(expected, actual) {
+               return fmt.Errorf("Expected: %s\nActual:   %s", 
matcher.Format(expected), matcher.Format(actual))
+       }
+
+       return nil
+}
diff --git a/table/rolling_data_writer.go b/table/rolling_data_writer.go
index 35d42ebc..e8950e2f 100644
--- a/table/rolling_data_writer.go
+++ b/table/rolling_data_writer.go
@@ -65,32 +65,34 @@ func NewWriterFactory(rootLocation string, args 
recordWritingArgs, meta *Metadat
 // them to data files when the target file size is reached, implementing a 
rolling
 // file strategy to manage file sizes.
 type RollingDataWriter struct {
-       partitionKey    string
-       partitionID     int          // unique ID for this partition
-       fileCount       atomic.Int64 // counter for files in this partition
-       recordCh        chan arrow.RecordBatch
-       errorCh         chan error
-       factory         *writerFactory
-       partitionValues map[int]any
-       ctx             context.Context
-       cancel          context.CancelFunc
-       wg              sync.WaitGroup
+       partitionKey     string
+       partitionID      int          // unique ID for this partition
+       fileCount        atomic.Int64 // counter for files in this partition
+       recordCh         chan arrow.RecordBatch
+       errorCh          chan error
+       factory          *writerFactory
+       partitionValues  map[int]any
+       ctx              context.Context
+       cancel           context.CancelFunc
+       wg               sync.WaitGroup
+       concurrentWriter *concurrentDataFileWriter
 }
 
 // NewRollingDataWriter creates a new RollingDataWriter for the specified 
partition
 // with the given partition values.
-func (w *writerFactory) NewRollingDataWriter(ctx context.Context, partition 
string, partitionValues map[int]any, outputDataFilesCh chan<- iceberg.DataFile) 
*RollingDataWriter {
+func (w *writerFactory) NewRollingDataWriter(ctx context.Context, 
concurrentWriter *concurrentDataFileWriter, partition string, partitionValues 
map[int]any, outputDataFilesCh chan<- iceberg.DataFile) *RollingDataWriter {
        ctx, cancel := context.WithCancel(ctx)
        partitionID := int(w.partitionIDCounter.Add(1) - 1)
        writer := &RollingDataWriter{
-               partitionKey:    partition,
-               partitionID:     partitionID,
-               recordCh:        make(chan arrow.RecordBatch, 64),
-               errorCh:         make(chan error, 1),
-               factory:         w,
-               partitionValues: partitionValues,
-               ctx:             ctx,
-               cancel:          cancel,
+               partitionKey:     partition,
+               partitionID:      partitionID,
+               recordCh:         make(chan arrow.RecordBatch, 64),
+               errorCh:          make(chan error, 1),
+               factory:          w,
+               partitionValues:  partitionValues,
+               ctx:              ctx,
+               concurrentWriter: concurrentWriter,
+               cancel:           cancel,
        }
 
        writer.wg.Add(1)
@@ -99,7 +101,7 @@ func (w *writerFactory) NewRollingDataWriter(ctx 
context.Context, partition stri
        return writer
 }
 
-func (w *writerFactory) getOrCreateRollingDataWriter(ctx context.Context, 
partition string, partitionValues map[int]any, outputDataFilesCh chan<- 
iceberg.DataFile) (*RollingDataWriter, error) {
+func (w *writerFactory) getOrCreateRollingDataWriter(ctx context.Context, 
concurrentWriter *concurrentDataFileWriter, partition string, partitionValues 
map[int]any, outputDataFilesCh chan<- iceberg.DataFile) (*RollingDataWriter, 
error) {
        w.mu.Lock()
        defer w.mu.Unlock()
 
@@ -111,7 +113,7 @@ func (w *writerFactory) getOrCreateRollingDataWriter(ctx 
context.Context, partit
                return nil, fmt.Errorf("invalid writer type for partition: %s", 
partition)
        }
 
-       writer := w.NewRollingDataWriter(ctx, partition, partitionValues, 
outputDataFilesCh)
+       writer := w.NewRollingDataWriter(ctx, concurrentWriter, partition, 
partitionValues, outputDataFilesCh)
        w.writers.Store(partition, writer)
 
        return writer, nil
@@ -165,7 +167,7 @@ func (r *RollingDataWriter) flushToDataFile(batch 
[]arrow.RecordBatch, outputDat
                return nil
        }
 
-       task := iter.Seq[WriteTask](func(yield func(WriteTask) bool) {
+       tasks := iter.Seq[WriteTask](func(yield func(WriteTask) bool) {
                cnt, _ := r.factory.nextCount()
                fileCount := int(r.fileCount.Add(1))
 
@@ -190,7 +192,7 @@ func (r *RollingDataWriter) flushToDataFile(batch 
[]arrow.RecordBatch, outputDat
        }
        partitionMeta.props[WriteDataPathKey] = 
parseDataLoc.JoinPath("data").JoinPath(r.partitionKey).String()
 
-       outputDataFiles := writeFiles(r.ctx, r.factory.rootLocation, 
r.factory.args.fs, &partitionMeta, r.partitionValues, task)
+       outputDataFiles := r.concurrentWriter.writeFiles(r.ctx, 
r.factory.rootLocation, r.factory.args.fs, &partitionMeta, partitionMeta.props, 
r.partitionValues, tasks)
        for dataFile, err := range outputDataFiles {
                if err != nil {
                        return err
diff --git a/table/scanner.go b/table/scanner.go
index 1bc9d005..bc3c0902 100644
--- a/table/scanner.go
+++ b/table/scanner.go
@@ -242,23 +242,31 @@ func (scan *Scan) Projection() (*iceberg.Schema, error) {
 }
 
 func (scan *Scan) buildPartitionProjection(specID int) 
(iceberg.BooleanExpression, error) {
-       spec := scan.metadata.PartitionSpecByID(specID)
+       return buildPartitionProjection(specID, scan.metadata, scan.rowFilter, 
scan.caseSensitive)
+}
+
+func buildPartitionProjection(specID int, meta Metadata, rowFilter 
iceberg.BooleanExpression, caseSensitive bool) (iceberg.BooleanExpression, 
error) {
+       spec := meta.PartitionSpecByID(specID)
        if spec == nil {
                return nil, fmt.Errorf("%w: id %d", ErrPartitionSpecNotFound, 
specID)
        }
-       project := newInclusiveProjection(scan.metadata.CurrentSchema(), *spec, 
true)
+       project := newInclusiveProjection(meta.CurrentSchema(), *spec, 
caseSensitive)
 
-       return project(scan.rowFilter)
+       return project(rowFilter)
 }
 
 func (scan *Scan) buildManifestEvaluator(specID int) 
(func(iceberg.ManifestFile) (bool, error), error) {
-       spec := scan.metadata.PartitionSpecByID(specID)
+       return buildManifestEvaluator(specID, scan.metadata, 
scan.partitionFilters, scan.caseSensitive)
+}
+
+func buildManifestEvaluator(specID int, metadata Metadata, partitionFilters 
*keyDefaultMap[int, iceberg.BooleanExpression], caseSensitive bool) 
(func(iceberg.ManifestFile) (bool, error), error) {
+       spec := metadata.PartitionSpecByID(specID)
        if spec == nil {
                return nil, fmt.Errorf("%w: id %d", ErrPartitionSpecNotFound, 
specID)
        }
 
-       return newManifestEvaluator(*spec, scan.metadata.CurrentSchema(),
-               scan.partitionFilters.Get(specID), scan.caseSensitive)
+       return newManifestEvaluator(*spec, metadata.CurrentSchema(),
+               partitionFilters.Get(specID), caseSensitive)
 }
 
 func (scan *Scan) buildPartitionEvaluator(specID int) (func(iceberg.DataFile) 
(bool, error), error) {
diff --git a/table/snapshot_producers.go b/table/snapshot_producers.go
index c310499f..9bbc0e85 100644
--- a/table/snapshot_producers.go
+++ b/table/snapshot_producers.go
@@ -182,7 +182,7 @@ func (of *overwriteFiles) existingManifests() 
([]iceberg.ManifestFile, error) {
                                return nil, err
                        }
 
-                       return wr.ToManifestFile(path, counter.Count)
+                       return wr.ToManifestFile(path, counter.Count, 
iceberg.WithManifestFileContent(m.ManifestContent()))
                }
 
                mf, err := rewriteManifest(m, notDeleted)
@@ -427,16 +427,17 @@ func (m *mergeAppendFiles) processManifests(manifests 
[]iceberg.ManifestFile) ([
 type snapshotProducer struct {
        producerImpl
 
-       commitUuid       uuid.UUID
-       io               iceio.WriteFileIO
-       txn              *Transaction
-       op               Operation
-       snapshotID       int64
-       parentSnapshotID int64
-       addedFiles       []iceberg.DataFile
-       manifestCount    atomic.Int32
-       deletedFiles     map[string]iceberg.DataFile
-       snapshotProps    iceberg.Properties
+       commitUuid          uuid.UUID
+       io                  iceio.WriteFileIO
+       txn                 *Transaction
+       op                  Operation
+       snapshotID          int64
+       parentSnapshotID    int64
+       addedFiles          []iceberg.DataFile
+       positionDeleteFiles []iceberg.DataFile
+       manifestCount       atomic.Int32
+       deletedFiles        map[string]iceberg.DataFile
+       snapshotProps       iceberg.Properties
 }
 
 func createSnapshotProducer(op Operation, txn *Transaction, fs 
iceio.WriteFileIO, commitUUID *uuid.UUID, snapshotProps iceberg.Properties) 
*snapshotProducer {
@@ -482,6 +483,12 @@ func (sp *snapshotProducer) appendDataFile(df 
iceberg.DataFile) *snapshotProduce
        return sp
 }
 
+func (sp *snapshotProducer) appendPositionDeleteFile(df iceberg.DataFile) 
*snapshotProducer {
+       sp.positionDeleteFiles = append(sp.positionDeleteFiles, df)
+
+       return sp
+}
+
 func (sp *snapshotProducer) deleteDataFile(df iceberg.DataFile) 
*snapshotProducer {
        sp.deletedFiles[df.FilePath()] = df
 
@@ -531,49 +538,17 @@ func (sp *snapshotProducer) manifests() (_ 
[]iceberg.ManifestFile, err error) {
 
        var g errgroup.Group
 
-       results := [...][]iceberg.ManifestFile{nil, nil, nil}
+       addedManifests := make([]iceberg.ManifestFile, 0)
+       positionDeleteManifests := make([]iceberg.ManifestFile, 0)
+       var deletedFilesManifests []iceberg.ManifestFile
+       var existingManifests []iceberg.ManifestFile
 
        if len(sp.addedFiles) > 0 {
-               g.Go(func() (err error) {
-                       out, path, err := sp.newManifestOutput()
-                       if err != nil {
-                               return err
-                       }
-                       defer internal.CheckedClose(out, &err)
-
-                       counter := &internal.CountingWriter{W: out}
-                       currentSpec, err := sp.txn.meta.CurrentSpec()
-                       if err != nil || currentSpec == nil {
-                               return fmt.Errorf("could not get current 
partition spec: %w", err)
-                       }
-                       wr, err := 
iceberg.NewManifestWriter(sp.txn.meta.formatVersion, counter,
-                               *currentSpec, sp.txn.meta.CurrentSchema(),
-                               sp.snapshotID)
-                       if err != nil {
-                               return err
-                       }
-                       defer internal.CheckedClose(wr, &err)
-
-                       for _, df := range sp.addedFiles {
-                               err := 
wr.Add(iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &sp.snapshotID,
-                                       nil, nil, df))
-                               if err != nil {
-                                       return err
-                               }
-                       }
-
-                       // close the writer to force a flush and ensure 
counter.Count is accurate
-                       if err := wr.Close(); err != nil {
-                               return err
-                       }
-
-                       mf, err := wr.ToManifestFile(path, counter.Count)
-                       if err == nil {
-                               results[0] = append(results[0], mf)
-                       }
+               g.Go(sp.manifestProducer(iceberg.ManifestContentData, 
sp.addedFiles, &addedManifests))
+       }
 
-                       return err
-               })
+       if len(sp.positionDeleteFiles) > 0 {
+               g.Go(sp.manifestProducer(iceberg.ManifestContentDeletes, 
sp.positionDeleteFiles, &positionDeleteManifests))
        }
 
        if len(deleted) > 0 {
@@ -607,7 +582,7 @@ func (sp *snapshotProducer) manifests() (_ 
[]iceberg.ManifestFile, err error) {
                                if err != nil {
                                        return err
                                }
-                               results[1] = append(results[1], mf)
+                               deletedFilesManifests = 
append(deletedFilesManifests, mf)
                        }
 
                        return nil
@@ -619,7 +594,7 @@ func (sp *snapshotProducer) manifests() (_ 
[]iceberg.ManifestFile, err error) {
                if err != nil {
                        return err
                }
-               results[2] = m
+               existingManifests = m
 
                return nil
        })
@@ -628,11 +603,55 @@ func (sp *snapshotProducer) manifests() (_ 
[]iceberg.ManifestFile, err error) {
                return nil, err
        }
 
-       manifests := slices.Concat(results[0], results[1], results[2])
+       manifests := slices.Concat(addedManifests, positionDeleteManifests, 
deletedFilesManifests, existingManifests)
 
        return sp.processManifests(manifests)
 }
 
+func (sp *snapshotProducer) manifestProducer(content iceberg.ManifestContent, 
files []iceberg.DataFile, output *[]iceberg.ManifestFile) func() (err error) {
+       return func() (err error) {
+               out, path, err := sp.newManifestOutput()
+               if err != nil {
+                       return err
+               }
+               defer internal.CheckedClose(out, &err)
+
+               counter := &internal.CountingWriter{W: out}
+               currentSpec, err := sp.txn.meta.CurrentSpec()
+               if err != nil || currentSpec == nil {
+                       return fmt.Errorf("could not get current partition 
spec: %w", err)
+               }
+               wr, err := iceberg.NewManifestWriter(sp.txn.meta.formatVersion, 
counter,
+                       *currentSpec, sp.txn.meta.CurrentSchema(),
+                       sp.snapshotID, 
iceberg.WithManifestWriterContent(content))
+               if err != nil {
+                       return err
+               }
+               defer internal.CheckedClose(wr, &err)
+
+               for _, df := range files {
+                       err := 
wr.Add(iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &sp.snapshotID,
+                               nil, nil, df))
+                       if err != nil {
+                               return err
+                       }
+               }
+
+               // close the writer to force a flush and ensure counter.Count 
is accurate
+               if err := wr.Close(); err != nil {
+                       return err
+               }
+
+               mf, err := wr.ToManifestFile(path, counter.Count, 
iceberg.WithManifestFileContent(content))
+               if err != nil {
+                       return err
+               }
+               *output = []iceberg.ManifestFile{mf}
+
+               return nil
+       }
+}
+
 func (sp *snapshotProducer) summary(props iceberg.Properties) (Summary, error) 
{
        var ssc SnapshotSummaryCollector
        partitionSummaryLimit := sp.txn.meta.props.
@@ -649,6 +668,11 @@ func (sp *snapshotProducer) summary(props 
iceberg.Properties) (Summary, error) {
                        return Summary{}, err
                }
        }
+       for _, df := range sp.positionDeleteFiles {
+               if err = ssc.addFile(df, currentSchema, *partitionSpec); err != 
nil {
+                       return Summary{}, err
+               }
+       }
 
        if len(sp.deletedFiles) > 0 {
                specs := sp.txn.meta.specs
diff --git a/table/table_test.go b/table/table_test.go
index 0e6c3715..6b47bcb0 100644
--- a/table/table_test.go
+++ b/table/table_test.go
@@ -22,13 +22,14 @@ import (
        "compress/gzip"
        "context"
        "encoding/json"
-       "errors"
        "fmt"
        "io"
        "io/fs"
        "log"
+       "maps"
        "os"
        "path/filepath"
+       "reflect"
        "runtime"
        "slices"
        "strconv"
@@ -2176,9 +2177,11 @@ func (t *TableWritingTestSuite) TestOverwriteRecord() {
 // TestDelete verifies that Table.Delete properly delegates to 
Transaction.Delete
 func (t *TableWritingTestSuite) TestDelete() {
        testCases := []struct {
-               name        string
-               table       *table.Table
-               expectedErr error
+               name                     string
+               table                    *table.Table
+               formatVersionRequirement int
+               expectedSnapshotSummary  *table.Summary
+               expectedErr              error
        }{
                {
                        name: "success with copy-on-write",
@@ -2188,10 +2191,51 @@ func (t *TableWritingTestSuite) TestDelete() {
                                *iceberg.UnpartitionedSpec,
                                t.tableSchema,
                        ),
+                       expectedSnapshotSummary: &table.Summary{
+                               Operation: table.OpDelete,
+                               Properties: map[string]string{
+                                       "added-data-files":       "1",
+                                       "added-records":          "1",
+                                       "deleted-data-files":     "1",
+                                       "deleted-records":        "2",
+                                       "total-data-files":       "1",
+                                       "total-delete-files":     "0",
+                                       "total-equality-deletes": "0",
+                                       "total-position-deletes": "0",
+                                       "total-records":          "1",
+                               },
+                       },
+                       expectedErr: nil,
+               },
+               {
+                       name:                     "fallback to copy-on-write 
when on v1 format",
+                       formatVersionRequirement: 1,
+                       table: t.createTableWithProps(
+                               table.Identifier{"default", 
"overwrite_record_wrapper_v" + strconv.Itoa(t.formatVersion)},
+                               map[string]string{
+                                       table.PropertyFormatVersion: 
strconv.Itoa(t.formatVersion),
+                                       table.WriteDeleteModeKey:    
table.WriteModeMergeOnRead,
+                               },
+                               t.tableSchema,
+                       ),
+                       expectedSnapshotSummary: &table.Summary{
+                               Operation: table.OpDelete,
+                               Properties: map[string]string{
+                                       "added-data-files":       "1",
+                                       "added-records":          "1",
+                                       "deleted-data-files":     "1",
+                                       "deleted-records":        "2",
+                                       "total-data-files":       "1",
+                                       "total-delete-files":     "0",
+                                       "total-equality-deletes": "0",
+                                       "total-position-deletes": "0",
+                                       "total-records":          "1",
+                               },
+                       },
                        expectedErr: nil,
                },
                {
-                       name: "abort on merge-on-read",
+                       name: "success with merge-on-read on v2 format",
                        table: t.createTableWithProps(
                                table.Identifier{"default", 
"overwrite_record_wrapper_v" + strconv.Itoa(t.formatVersion)},
                                map[string]string{
@@ -2200,15 +2244,38 @@ func (t *TableWritingTestSuite) TestDelete() {
                                },
                                t.tableSchema,
                        ),
-                       expectedErr: errors.New("only 'copy-on-write' is 
currently supported"),
+                       // Position deletes are only
+                       formatVersionRequirement: 2,
+                       expectedSnapshotSummary: &table.Summary{
+                               Operation: table.OpDelete,
+                               Properties: map[string]string{
+                                       "added-delete-files":          "1",
+                                       "added-position-delete-files": "1",
+                                       "added-position-deletes":      "1",
+                                       "total-data-files":            "1",
+                                       "total-delete-files":          "1",
+                                       "total-equality-deletes":      "0",
+                                       "total-position-deletes":      "1",
+                                       "total-records":               "2",
+                               },
+                       },
+                       expectedErr: nil,
                },
        }
 
        for _, tc := range testCases {
                t.Run(tc.name, func() {
-                       // Set up the test table with some data
+                       // Skip this test case execution if the format version 
isn't the one the test case is limited to. This
+                       // is because some test cases don't have the same 
expected behavior or don't apply because of format
+                       // specific features
+                       if tc.formatVersionRequirement > 0 && 
tc.formatVersionRequirement != t.formatVersion {
+                               return
+                       }
+                       // Set up the test table with some data. Include more 
than just one row so that the delete operation
+                       // is not a straight file deletion
                        newTable, err := 
array.TableFromJSON(memory.DefaultAllocator, t.arrSchema, []string{
-                               `[{"foo": false, "bar": "wrapper_test", "baz": 
123, "qux": "2024-01-01"}]`,
+                               `[{"foo": false, "bar": "wrapper_test", "baz": 
123, "qux": "2024-01-01"},
+                  {"foo": true, "bar": "keep_this", "baz": 456, "qux": 
"2024-01-02"}]`,
                        })
                        t.Require().NoError(err)
                        defer newTable.Release()
@@ -2219,7 +2286,7 @@ func (t *TableWritingTestSuite) TestDelete() {
                        // Validate the pre-requisite that data is present on 
the table before we go ahead and delete it
                        arrowTable, err := tbl.Scan().ToArrowTable(t.ctx)
                        t.Require().NoError(err)
-                       t.Equal(int64(1), arrowTable.NumRows())
+                       t.Equal(int64(2), arrowTable.NumRows())
 
                        tbl, err = tbl.Delete(t.ctx, 
iceberg.EqualTo(iceberg.Reference("bar"), "wrapper_test"), nil)
                        // If an error was expected, check that it's the 
correct one and abort validating the operation
@@ -2231,12 +2298,12 @@ func (t *TableWritingTestSuite) TestDelete() {
 
                        snapshot := tbl.CurrentSnapshot()
                        t.NotNil(snapshot)
-                       t.Equal(table.OpDelete, snapshot.Summary.Operation)
+                       
t.NoError(equalSnapshotSummary(tc.expectedSnapshotSummary, snapshot.Summary))
 
                        arrowTable, err = tbl.Scan().ToArrowTable(t.ctx)
                        t.Require().NoError(err)
 
-                       t.Equal(int64(0), arrowTable.NumRows())
+                       t.Equal(int64(1), arrowTable.NumRows())
                })
        }
 }
@@ -2370,13 +2437,13 @@ func (m *DeleteOldMetadataMockedCatalog) 
CommitTable(ctx context.Context, ident
        location := m.metadata.Location()
 
        randid := uuid.New().String()
-       metdatafile := fmt.Sprintf("%s/metadata/%s.metadata.json", location, 
randid)
+       metadatafile := fmt.Sprintf("%s/metadata/%s.metadata.json", location, 
randid)
 
        // removing old metadata files
        bldr.TrimMetadataLogs(0)
 
        bldr.AppendMetadataLog(table.MetadataLogEntry{
-               MetadataFile: metdatafile,
+               MetadataFile: metadatafile,
                TimestampMs:  time.Now().UnixMilli(),
        })
 
@@ -2393,7 +2460,7 @@ func (m *DeleteOldMetadataMockedCatalog) CommitTable(ctx 
context.Context, ident
 
        m.metadata = meta
 
-       return meta, metdatafile, nil
+       return meta, metadatafile, nil
 }
 
 func createMetadataFile(metadatadir, metadataFile string) error {
@@ -2702,3 +2769,59 @@ func (t *TableTestSuite) 
TestMetadataCompressionRoundTrip() {
 
        t.True(tbl.Equals(*tbl2))
 }
+
+type snapshotSummaryMatcher struct{}
+
+func (m *snapshotSummaryMatcher) Matches(expected *table.Summary, actual 
*table.Summary) bool {
+       if expected == nil && actual == nil {
+               return true
+       }
+       if expected == nil {
+               return false
+       }
+       if actual == nil {
+               return false
+       }
+       // Filter properties to validate by deleting all the ones that aren't 
in the expected summary properties. This is
+       // to allow ignoring the properties that vary per environment/test 
execution like file sizes
+       filtered := cloneSummaryAndFilterProperties(expected, actual)
+
+       return reflect.DeepEqual(expected, filtered)
+}
+
+// cloneSummaryAndFilterProperties clones a summary and filters out any 
summary properties that aren't part of the
+// expected summary. This is useful to ignore properties that we don't wish to 
validate
+func cloneSummaryAndFilterProperties(expected *table.Summary, actual 
*table.Summary) *table.Summary {
+       actualPropertiesFiltered := maps.Clone(actual.Properties)
+       maps.DeleteFunc(actualPropertiesFiltered, func(k string, v string) bool 
{
+               _, ok := expected.Properties[k]
+
+               return !ok
+       })
+       filtered := *actual
+       filtered.Properties = actualPropertiesFiltered
+
+       return &filtered
+}
+
+func (m *snapshotSummaryMatcher) Format(val *table.Summary) string {
+       if val == nil {
+               return "nil"
+       }
+
+       return fmt.Sprintf("{Operation: %s, Properties: %#v}", val.Operation, 
val.Properties)
+}
+
+// equalSnapshotSummary invokes a snapshotSummaryMatcher to compare two 
snapshot summary values.
+// Its return value is nil if both values are equal and an error with a 
meaningful formatted message to help
+// show the mismatch in case they are not. This is meant to be used with 
testify like:
+//
+//     assert.NoError(t, equalSnapshotSummary(expected, actual))
+func equalSnapshotSummary(expected *table.Summary, actual *table.Summary) (err 
error) {
+       matcher := &snapshotSummaryMatcher{}
+       if !matcher.Matches(expected, actual) {
+               return fmt.Errorf("Expected: %s\nActual:   %s", 
matcher.Format(expected), matcher.Format(actual))
+       }
+
+       return nil
+}
diff --git a/table/transaction.go b/table/transaction.go
index 765d9149..49081792 100644
--- a/table/transaction.go
+++ b/table/transaction.go
@@ -23,6 +23,7 @@ import (
        "encoding/json"
        "errors"
        "fmt"
+       "iter"
        "runtime"
        "slices"
        "sync"
@@ -30,8 +31,11 @@ import (
 
        "github.com/apache/arrow-go/v18/arrow"
        "github.com/apache/arrow-go/v18/arrow/array"
+       "github.com/apache/arrow-go/v18/arrow/compute/exprs"
        "github.com/apache/iceberg-go"
        "github.com/apache/iceberg-go/io"
+       "github.com/apache/iceberg-go/table/internal"
+       "github.com/apache/iceberg-go/table/substrait"
        "github.com/google/uuid"
        "golang.org/x/sync/errgroup"
 )
@@ -897,7 +901,7 @@ func (t *Transaction) performCopyOnWriteDeletion(ctx 
context.Context, operation
        commitUUID := uuid.New()
        updater := t.updateSnapshot(fs, snapshotProps, 
operation).mergeOverwrite(&commitUUID)
 
-       filesToDelete, filesToRewrite, err := t.classifyFilesForOverwrite(ctx, 
fs, filter, caseSensitive, concurrency)
+       filesToDelete, filesToRewrite, err := t.classifyFilesForDeletions(ctx, 
fs, filter, caseSensitive, concurrency)
        if err != nil {
                return nil, err
        }
@@ -915,6 +919,45 @@ func (t *Transaction) performCopyOnWriteDeletion(ctx 
context.Context, operation
        return updater, nil
 }
 
+func (t *Transaction) performMergeOnReadDeletion(ctx context.Context, 
snapshotProps iceberg.Properties, filter iceberg.BooleanExpression, 
caseSensitive bool, concurrency int) (*snapshotProducer, error) {
+       fs, err := t.tbl.fsF(ctx)
+       if err != nil {
+               return nil, err
+       }
+
+       if t.meta.NameMapping() == nil {
+               nameMapping := t.meta.CurrentSchema().NameMapping()
+               mappingJson, err := json.Marshal(nameMapping)
+               if err != nil {
+                       return nil, err
+               }
+               err = t.SetProperties(iceberg.Properties{DefaultNameMappingKey: 
string(mappingJson)})
+               if err != nil {
+                       return nil, err
+               }
+       }
+
+       commitUUID := uuid.New()
+       updater := t.updateSnapshot(fs, snapshotProps, 
OpDelete).mergeOverwrite(&commitUUID)
+
+       filesToDelete, withPartialDeletions, err := 
t.classifyFilesForDeletions(ctx, fs, filter, caseSensitive, concurrency)
+       if err != nil {
+               return nil, err
+       }
+
+       for _, df := range filesToDelete {
+               updater.deleteDataFile(df)
+       }
+
+       if len(withPartialDeletions) > 0 {
+               if err := t.writePositionDeletesForFiles(ctx, fs, updater, 
withPartialDeletions, filter, caseSensitive, concurrency, commitUUID); err != 
nil {
+                       return nil, err
+               }
+       }
+
+       return updater, nil
+}
+
 type DeleteOption func(deleteOp *deleteOperation)
 
 type deleteOperation struct {
@@ -958,7 +1001,7 @@ func WithDeleteCaseInsensitive() DeleteOption {
 //
 // The concurrency parameter controls the level of parallelism for manifest 
processing and file rewriting and
 // can be overridden using the WithOverwriteConcurrency option. Defaults to 
runtime.GOMAXPROCS(0).
-func (t *Transaction) Delete(ctx context.Context, filter 
iceberg.BooleanExpression, snapshotProps iceberg.Properties, opts 
...DeleteOption) error {
+func (t *Transaction) Delete(ctx context.Context, filter 
iceberg.BooleanExpression, snapshotProps iceberg.Properties, opts 
...DeleteOption) (err error) {
        deleteOp := deleteOperation{
                concurrency:   runtime.GOMAXPROCS(0),
                caseSensitive: true,
@@ -967,14 +1010,28 @@ func (t *Transaction) Delete(ctx context.Context, filter 
iceberg.BooleanExpressi
                apply(&deleteOp)
        }
 
-       writeDeleteMode := t.meta.props.Get(WriteDeleteModeKey, 
WriteDeleteModeDefault)
-       if writeDeleteMode != WriteModeCopyOnWrite {
-               return fmt.Errorf("'%s' is set to '%s' but only '%s' is 
currently supported", WriteDeleteModeKey, writeDeleteMode, WriteModeCopyOnWrite)
+       var updater *snapshotProducer
+       writeDeleteMode := WriteDeleteModeDefault
+       // Only copy on write is supported on v1 so we ignore any override to 
the write delete mode unless the version is
+       // 2 and up
+       if t.meta.formatVersion > 1 {
+               writeDeleteMode = t.meta.props.Get(WriteDeleteModeKey, 
WriteDeleteModeDefault)
        }
-       updater, err := t.performCopyOnWriteDeletion(ctx, OpDelete, 
snapshotProps, filter, deleteOp.caseSensitive, deleteOp.concurrency)
-       if err != nil {
-               return err
+       switch writeDeleteMode {
+       case WriteModeCopyOnWrite:
+               updater, err = t.performCopyOnWriteDeletion(ctx, OpDelete, 
snapshotProps, filter, deleteOp.caseSensitive, deleteOp.concurrency)
+               if err != nil {
+                       return err
+               }
+       case WriteModeMergeOnRead:
+               updater, err = t.performMergeOnReadDeletion(ctx, snapshotProps, 
filter, deleteOp.caseSensitive, deleteOp.concurrency)
+               if err != nil {
+                       return err
+               }
+       default:
+               return fmt.Errorf("unsupported write mode: '%s'", 
writeDeleteMode)
        }
+
        updates, reqs, err := updater.commit()
        if err != nil {
                return err
@@ -983,9 +1040,9 @@ func (t *Transaction) Delete(ctx context.Context, filter 
iceberg.BooleanExpressi
        return t.apply(updates, reqs)
 }
 
-// classifyFilesForOverwrite classifies existing data files based on the 
provided filter.
+// classifyFilesForDeletions classifies existing data files based on the 
provided filter.
 // Returns files to delete completely, files to rewrite partially, and any 
error.
-func (t *Transaction) classifyFilesForOverwrite(ctx context.Context, fs io.IO, 
filter iceberg.BooleanExpression, caseSensitive bool, concurrency int) 
(filesToDelete, filesToRewrite []iceberg.DataFile, err error) {
+func (t *Transaction) classifyFilesForDeletions(ctx context.Context, fs io.IO, 
filter iceberg.BooleanExpression, caseSensitive bool, concurrency int) 
(filesToDelete, filesWithPartialDeletions []iceberg.DataFile, err error) {
        s := t.meta.currentSnapshot()
        if s == nil {
                return nil, nil, nil
@@ -1001,16 +1058,46 @@ func (t *Transaction) classifyFilesForOverwrite(ctx 
context.Context, fs io.IO, f
                        }
                }
 
-               return filesToDelete, filesToRewrite, nil
+               return filesToDelete, filesWithPartialDeletions, nil
        }
 
-       return t.classifyFilesForFilteredOverwrite(ctx, fs, filter, 
caseSensitive, concurrency)
+       return t.classifyFilesForFilteredDeletions(ctx, fs, filter, 
caseSensitive, concurrency)
+}
+
+type fileClassificationTask struct {
+       meta             Metadata
+       partitionFilters *keyDefaultMap[int, iceberg.BooleanExpression]
+       caseSensitive    bool
+       rowFilter        iceberg.BooleanExpression
 }
 
-// classifyFilesForFilteredOverwrite classifies files for filtered overwrite 
operations.
+func newFileClassificationTask(meta Metadata, rowFilter 
iceberg.BooleanExpression, caseSensitive bool) *fileClassificationTask {
+       classificationTask := &fileClassificationTask{
+               meta:          meta,
+               caseSensitive: caseSensitive,
+               rowFilter:     rowFilter,
+       }
+       classificationTask.partitionFilters = 
newKeyDefaultMapWrapErr(classificationTask.buildPartitionProjection)
+
+       return classificationTask
+}
+
+func (t *fileClassificationTask) buildManifestEvaluator(specID int) 
(func(iceberg.ManifestFile) (bool, error), error) {
+       return buildManifestEvaluator(specID, t.meta, t.partitionFilters, 
t.caseSensitive)
+}
+
+func (t *fileClassificationTask) buildPartitionProjection(specID int) 
(iceberg.BooleanExpression, error) {
+       return buildPartitionProjection(specID, t.meta, t.rowFilter, 
t.caseSensitive)
+}
+
+// classifyFilesForFilteredDeletions classifies files for filtered overwrite 
operations.
 // Returns files to delete completely, files to rewrite partially, and any 
error.
-func (t *Transaction) classifyFilesForFilteredOverwrite(ctx context.Context, 
fs io.IO, filter iceberg.BooleanExpression, caseSensitive bool, concurrency 
int) (filesToDelete, filesToRewrite []iceberg.DataFile, err error) {
+func (t *Transaction) classifyFilesForFilteredDeletions(ctx context.Context, 
fs io.IO, filter iceberg.BooleanExpression, caseSensitive bool, concurrency 
int) (filesToDelete, filesWithPartialDeletes []iceberg.DataFile, err error) {
        schema := t.meta.CurrentSchema()
+       meta, err := t.meta.Build()
+       if err != nil {
+               return nil, nil, err
+       }
 
        inclusiveEvaluator, err := newInclusiveMetricsEvaluator(schema, filter, 
caseSensitive, false)
        if err != nil {
@@ -1022,18 +1109,8 @@ func (t *Transaction) 
classifyFilesForFilteredOverwrite(ctx context.Context, fs
                return nil, nil, fmt.Errorf("failed to create strict metrics 
evaluator: %w", err)
        }
 
-       var manifestEval func(iceberg.ManifestFile) (bool, error)
-       meta, err := t.meta.Build()
-       if err != nil {
-               return nil, nil, fmt.Errorf("failed to build metadata: %w", err)
-       }
-       spec := meta.PartitionSpec()
-       if !spec.IsUnpartitioned() {
-               manifestEval, err = newManifestEvaluator(spec, schema, filter, 
caseSensitive)
-               if err != nil {
-                       return nil, nil, fmt.Errorf("failed to create manifest 
evaluator: %w", err)
-               }
-       }
+       classificationTask := newFileClassificationTask(meta, filter, 
caseSensitive)
+       manifestEvaluators := 
newKeyDefaultMapWrapErr(classificationTask.buildManifestEvaluator)
 
        s := t.meta.currentSnapshot()
        var manifests []iceberg.ManifestFile
@@ -1044,11 +1121,7 @@ func (t *Transaction) 
classifyFilesForFilteredOverwrite(ctx context.Context, fs
                }
        }
 
-       var (
-               mu             sync.Mutex
-               allFilesToDel  []iceberg.DataFile
-               allFilesToRewr []iceberg.DataFile
-       )
+       var mu sync.Mutex
 
        g, _ := errgroup.WithContext(ctx)
        g.SetLimit(min(concurrency, len(manifests)))
@@ -1056,6 +1129,7 @@ func (t *Transaction) 
classifyFilesForFilteredOverwrite(ctx context.Context, fs
        for _, manifest := range manifests {
                manifest := manifest // capture loop variable
                g.Go(func() error {
+                       manifestEval := 
manifestEvaluators.Get(int(manifest.PartitionSpecID()))
                        if manifestEval != nil {
                                match, err := manifestEval(manifest)
                                if err != nil {
@@ -1107,8 +1181,8 @@ func (t *Transaction) 
classifyFilesForFilteredOverwrite(ctx context.Context, fs
 
                        if len(localDelete) > 0 || len(localRewrite) > 0 {
                                mu.Lock()
-                               allFilesToDel = append(allFilesToDel, 
localDelete...)
-                               allFilesToRewr = append(allFilesToRewr, 
localRewrite...)
+                               filesToDelete = append(filesToDelete, 
localDelete...)
+                               filesWithPartialDeletes = 
append(filesWithPartialDeletes, localRewrite...)
                                mu.Unlock()
                        }
 
@@ -1120,7 +1194,7 @@ func (t *Transaction) 
classifyFilesForFilteredOverwrite(ctx context.Context, fs
                return nil, nil, err
        }
 
-       return allFilesToDel, allFilesToRewr, nil
+       return filesToDelete, filesWithPartialDeletes, nil
 }
 
 // rewriteFilesWithFilter rewrites data files by preserving only rows that do 
NOT match the filter
@@ -1212,6 +1286,117 @@ func (t *Transaction) rewriteSingleFile(ctx 
context.Context, fs io.IO, originalF
        return result, nil
 }
 
+// writePositionDeletesForFiles rewrites data files by preserving only rows 
that do NOT match the filter
+func (t *Transaction) writePositionDeletesForFiles(ctx context.Context, fs 
io.IO, updater *snapshotProducer, files []iceberg.DataFile, filter 
iceberg.BooleanExpression, caseSensitive bool, concurrency int, commitUUID 
uuid.UUID) error {
+       posDeleteRecIter, err := t.makePositionDeleteRecordsForFilter(ctx, fs, 
files, filter, caseSensitive, concurrency)
+       if err != nil {
+               return err
+       }
+
+       partitionContextByFilePath := make(map[string]partitionContext, 
len(files))
+       for _, df := range files {
+               partitionContextByFilePath[df.FilePath()] = 
partitionContext{partitionData: df.Partition(), specID: df.SpecID()}
+       }
+
+       posDeleteFiles := positionDeleteRecordsToDataFiles(ctx, 
t.tbl.Location(), t.meta, partitionContextByFilePath, recordWritingArgs{
+               sc:        PositionalDeleteArrowSchema,
+               itr:       posDeleteRecIter,
+               writeUUID: &commitUUID,
+               fs:        fs.(io.WriteFileIO),
+       })
+
+       for f, err := range posDeleteFiles {
+               if err != nil {
+                       return err
+               }
+               updater.appendPositionDeleteFile(f)
+       }
+
+       return nil
+}
+
+func (t *Transaction) makePositionDeleteRecordsForFilter(ctx context.Context, 
fs io.IO, files []iceberg.DataFile, filter iceberg.BooleanExpression, 
caseSensitive bool, concurrency int) (seq2 iter.Seq2[arrow.RecordBatch, error], 
err error) {
+       tasks := make([]FileScanTask, 0, len(files))
+       for _, f := range files {
+               tasks = append(tasks, FileScanTask{
+                       File:   f,
+                       Start:  0,
+                       Length: f.FileSizeBytes(),
+               })
+       }
+
+       boundFilter, err := iceberg.BindExpr(t.meta.CurrentSchema(), filter, 
caseSensitive)
+       if err != nil {
+               return nil, fmt.Errorf("failed to bind filter: %w", err)
+       }
+
+       meta, err := t.meta.Build()
+       if err != nil {
+               return nil, fmt.Errorf("failed to build metadata: %w", err)
+       }
+
+       scanner := &arrowScan{
+               metadata:        meta,
+               fs:              fs,
+               projectedSchema: t.meta.CurrentSchema(),
+               boundRowFilter:  boundFilter,
+               caseSensitive:   caseSensitive,
+               rowLimit:        -1, // No limit
+               concurrency:     concurrency,
+       }
+
+       deletesPerFile, err := readAllDeleteFiles(ctx, fs, tasks, concurrency)
+       if err != nil {
+               return nil, err
+       }
+
+       extSet := substrait.NewExtensionSet()
+
+       ctx, cancel := context.WithCancelCause(exprs.WithExtensionIDSet(ctx, 
extSet))
+       taskChan := make(chan internal.Enumerated[FileScanTask], len(tasks))
+
+       numWorkers := min(concurrency, len(tasks))
+       records := make(chan enumeratedRecord, numWorkers)
+
+       var wg sync.WaitGroup
+       wg.Add(numWorkers)
+       for i := 0; i < numWorkers; i++ {
+               go func() {
+                       defer wg.Done()
+                       for {
+                               select {
+                               case <-ctx.Done():
+                                       return
+                               case task, ok := <-taskChan:
+                                       if !ok {
+                                               return
+                                       }
+
+                                       if err := 
scanner.producePosDeletesFromTask(ctx, task, 
deletesPerFile[task.Value.File.FilePath()], records); err != nil {
+                                               cancel(err)
+
+                                               return
+                                       }
+                               }
+                       }
+               }()
+       }
+
+       go func() {
+               for i, t := range tasks {
+                       taskChan <- internal.Enumerated[FileScanTask]{
+                               Value: t, Index: i, Last: i == len(tasks)-1,
+                       }
+               }
+               close(taskChan)
+
+               wg.Wait()
+               close(records)
+       }()
+
+       return createIterator(ctx, uint(numWorkers), records, deletesPerFile, 
cancel, scanner.rowLimit), nil
+}
+
 func (t *Transaction) Scan(opts ...ScanOption) (*Scan, error) {
        updatedMeta, err := t.meta.Build()
        if err != nil {
diff --git a/table/transaction_test.go b/table/transaction_test.go
index db06d64d..d356b468 100644
--- a/table/transaction_test.go
+++ b/table/transaction_test.go
@@ -536,6 +536,120 @@ func (s *SparkIntegrationTestSuite) 
TestDeleteInsensitive() {
 +----------+---------+---+`)
 }
 
+func (s *SparkIntegrationTestSuite) TestDeleteMergeOnReadUnpartitioned() {
+       icebergSchema := iceberg.NewSchema(0,
+               iceberg.NestedField{ID: 1, Name: "first_name", Type: 
iceberg.PrimitiveTypes.String},
+               iceberg.NestedField{ID: 2, Name: "last_name", Type: 
iceberg.PrimitiveTypes.String},
+               iceberg.NestedField{ID: 3, Name: "age", Type: 
iceberg.PrimitiveTypes.Int32},
+       )
+
+       tbl, err := s.cat.CreateTable(s.ctx, catalog.ToIdentifier("default", 
"go_test_merge_on_read_delete"), icebergSchema,
+               catalog.WithProperties(
+                       map[string]string{
+                               table.WriteDeleteModeKey: 
table.WriteModeMergeOnRead,
+                       },
+               ),
+       )
+       s.Require().NoError(err)
+
+       arrowSchema, err := table.SchemaToArrowSchema(icebergSchema, nil, true, 
false)
+       s.Require().NoError(err)
+
+       initialTable, err := array.TableFromJSON(memory.DefaultAllocator, 
arrowSchema, []string{
+               `[
+                       {"first_name": "alan", "last_name": "gopher", "age": 7},
+                       {"first_name": "steve", "last_name": "gopher", "age": 
5},
+                       {"first_name": "dead", "last_name": "gopher", "age": 97}
+               ]`,
+       })
+       s.Require().NoError(err)
+       defer initialTable.Release()
+
+       tx := tbl.NewTransaction()
+       err = tx.AppendTable(s.ctx, initialTable, 3, nil)
+       s.Require().NoError(err)
+       tbl, err = tx.Commit(s.ctx)
+       s.Require().NoError(err)
+
+       // Delete the dead gopher and confirm that alan and steve are still 
present
+       filter := iceberg.EqualTo(iceberg.Reference("first_name"), "dead")
+       tx = tbl.NewTransaction()
+       err = tx.Delete(s.ctx, filter, nil)
+       s.Require().NoError(err)
+       _, err = tx.Commit(s.ctx)
+       s.Require().NoError(err)
+
+       output, err := recipe.ExecuteSpark(s.T(), "./validation.py", "--sql", 
"SELECT * FROM default.go_test_merge_on_read_delete ORDER BY age")
+       s.Require().NoError(err)
+       s.Require().Contains(output, `|first_name|last_name|age|
++----------+---------+---+
+|steve     |gopher   |5  |
+|alan      |gopher   |7  |
++----------+---------+---+`)
+}
+
+func (s *SparkIntegrationTestSuite) TestDeleteMergeOnReadPartitioned() {
+       icebergSchema := iceberg.NewSchema(0,
+               iceberg.NestedField{ID: 1, Name: "first_name", Type: 
iceberg.PrimitiveTypes.String},
+               iceberg.NestedField{ID: 2, Name: "last_name", Type: 
iceberg.PrimitiveTypes.String},
+               iceberg.NestedField{ID: 3, Name: "age", Type: 
iceberg.PrimitiveTypes.Int32},
+       )
+
+       spec := iceberg.NewPartitionSpec(iceberg.PartitionField{
+               SourceID: 3,
+               Name:     "age_bucket",
+               Transform: iceberg.BucketTransform{
+                       NumBuckets: 2,
+               },
+       })
+       tbl, err := s.cat.CreateTable(s.ctx, catalog.ToIdentifier("default", 
"go_test_merge_on_read_delete_partitioned"), icebergSchema,
+               catalog.WithProperties(
+                       map[string]string{
+                               table.WriteDeleteModeKey: 
table.WriteModeMergeOnRead,
+                       },
+               ),
+               catalog.WithPartitionSpec(&spec),
+       )
+       s.Require().NoError(err)
+
+       arrowSchema, err := table.SchemaToArrowSchema(icebergSchema, nil, true, 
false)
+       s.Require().NoError(err)
+
+       initialTable, err := array.TableFromJSON(memory.DefaultAllocator, 
arrowSchema, []string{
+               `[
+                       {"first_name": "alan", "last_name": "gopher", "age": 7},
+                       {"first_name": "steve", "last_name": "gopher", "age": 
5},
+                       {"first_name": "dead", "last_name": "gopher", "age": 
97},
+                       {"first_name": "uncle", "last_name": "gopher", "age": 
90}
+               ]`,
+       })
+       s.Require().NoError(err)
+       defer initialTable.Release()
+
+       tx := tbl.NewTransaction()
+       err = tx.AppendTable(s.ctx, initialTable, 3, nil)
+       s.Require().NoError(err)
+       tbl, err = tx.Commit(s.ctx)
+       s.Require().NoError(err)
+
+       // Delete the dead gopher and confirm that alan and steve are still 
present
+       filter := iceberg.NewAnd(iceberg.GreaterThan(iceberg.Reference("age"), 
"50"), iceberg.EqualTo(iceberg.Reference("first_name"), "dead"))
+       tx = tbl.NewTransaction()
+       err = tx.Delete(s.ctx, filter, nil)
+       s.Require().NoError(err)
+       _, err = tx.Commit(s.ctx)
+       s.Require().NoError(err)
+
+       output, err := recipe.ExecuteSpark(s.T(), "./validation.py", "--sql", 
"SELECT * FROM default.go_test_merge_on_read_delete_partitioned ORDER BY age")
+       s.Require().NoError(err)
+       s.Require().Contains(output, `|first_name|last_name|age|
++----------+---------+---+
+|steve     |gopher   |5  |
+|alan      |gopher   |7  |
+|uncle     |gopher   |90 |
++----------+---------+---+`)
+}
+
 func TestSparkIntegration(t *testing.T) {
        suite.Run(t, new(SparkIntegrationTestSuite))
 }
diff --git a/table/writer.go b/table/writer.go
index 8952b6c6..cdda0357 100644
--- a/table/writer.go
+++ b/table/writer.go
@@ -47,16 +47,58 @@ func (w WriteTask) GenerateDataFileName(extension string) 
string {
        return fmt.Sprintf("%05d-%d-%s-%05d.%s", w.PartitionID, w.ID, w.Uuid, 
w.FileCount, extension)
 }
 
-type writer struct {
+type defaultDataFileWriter struct {
        loc        LocationProvider
        fs         io.WriteFileIO
        fileSchema *iceberg.Schema
        format     internal.FileFormat
-       props      any
+       props      iceberg.Properties
+       content    iceberg.ManifestEntryContent
        meta       *MetadataBuilder
 }
 
-func (w *writer) writeFile(ctx context.Context, partitionValues map[int]any, 
task WriteTask) (iceberg.DataFile, error) {
+type dataFileWriterOption func(writer *defaultDataFileWriter)
+
+func withFormat(format internal.FileFormat) dataFileWriterOption {
+       return func(writer *defaultDataFileWriter) {
+               writer.format = format
+       }
+}
+
+func withFileSchema(schema *iceberg.Schema) dataFileWriterOption {
+       return func(writer *defaultDataFileWriter) {
+               writer.fileSchema = schema
+       }
+}
+
+func withContent(content iceberg.ManifestEntryContent) dataFileWriterOption {
+       return func(writer *defaultDataFileWriter) {
+               writer.content = content
+       }
+}
+
+func newDataFileWriter(rootLocation string, fs io.WriteFileIO, meta 
*MetadataBuilder, props iceberg.Properties, opts ...dataFileWriterOption) 
(*defaultDataFileWriter, error) {
+       locProvider, err := LoadLocationProvider(rootLocation, props)
+       if err != nil {
+               return nil, err
+       }
+       w := defaultDataFileWriter{
+               loc:        locProvider,
+               fs:         fs,
+               fileSchema: meta.CurrentSchema(),
+               format:     internal.GetFileFormat(iceberg.ParquetFile),
+               content:    iceberg.EntryContentData,
+               props:      props,
+               meta:       meta,
+       }
+       for _, apply := range opts {
+               apply(&w)
+       }
+
+       return &w, nil
+}
+
+func (w *defaultDataFileWriter) writeFile(ctx context.Context, partitionValues 
map[int]any, task WriteTask) (iceberg.DataFile, error) {
        defer func() {
                for _, b := range task.Batches {
                        b.Release()
@@ -89,49 +131,76 @@ func (w *writer) writeFile(ctx context.Context, 
partitionValues map[int]any, tas
 
        return w.format.WriteDataFile(ctx, w.fs, partitionValues, 
internal.WriteFileInfo{
                FileSchema: w.fileSchema,
+               Content:    w.content,
                FileName:   filePath,
                StatsCols:  statsCols,
-               WriteProps: w.props,
+               WriteProps: w.format.GetWriteProperties(w.props),
                Spec:       *currentSpec,
        }, batches)
 }
 
-func writeFiles(ctx context.Context, rootLocation string, fs io.WriteFileIO, 
meta *MetadataBuilder, partitionValues map[int]any, tasks iter.Seq[WriteTask]) 
iter.Seq2[iceberg.DataFile, error] {
-       locProvider, err := LoadLocationProvider(rootLocation, meta.props)
-       if err != nil {
-               return func(yield func(iceberg.DataFile, error) bool) {
-                       yield(nil, err)
-               }
+type dataFileWriter interface {
+       writeFile(ctx context.Context, partitionValues map[int]any, task 
WriteTask) (iceberg.DataFile, error)
+}
+
+func newPositionDeleteWriter(rootLocation string, fs io.WriteFileIO, meta 
*MetadataBuilder, props iceberg.Properties, opts ...dataFileWriterOption) 
(*defaultDataFileWriter, error) {
+       // Always enforce the file schema to be the Positional Delete Schema by 
appending the option at the very end
+       return newDataFileWriter(rootLocation, fs, meta, props, append(opts, 
withFileSchema(iceberg.PositionalDeleteSchema), 
withContent(iceberg.EntryContentPosDeletes))...)
+}
+
+type dataFileWriterMaker func(rootLocation string, fs io.WriteFileIO, meta 
*MetadataBuilder, props iceberg.Properties, opts ...dataFileWriterOption) 
(dataFileWriter, error)
+
+type concurrentDataFileWriter struct {
+       newDataFileWriter dataFileWriterMaker
+       sanitizeSchema    bool
+}
+
+type concurrentDataFileWriterOption func(w *concurrentDataFileWriter)
+
+func withSchemaSanitization(enabled bool) concurrentDataFileWriterOption {
+       return func(w *concurrentDataFileWriter) {
+               w.sanitizeSchema = enabled
        }
+}
 
-       format := internal.GetFileFormat(iceberg.ParquetFile)
+func newConcurrentDataFileWriter(newDataFileWriter dataFileWriterMaker, opts 
...concurrentDataFileWriterOption) *concurrentDataFileWriter {
+       w := concurrentDataFileWriter{
+               newDataFileWriter: newDataFileWriter,
+               sanitizeSchema:    true,
+       }
+       for _, apply := range opts {
+               apply(&w)
+       }
+
+       return &w
+}
+
+func (w *concurrentDataFileWriter) writeFiles(ctx context.Context, 
rootLocation string, fs io.WriteFileIO, meta *MetadataBuilder, props 
iceberg.Properties, partitionValues map[int]any, tasks iter.Seq[WriteTask]) 
iter.Seq2[iceberg.DataFile, error] {
        fileSchema := meta.CurrentSchema()
-       sanitized, err := iceberg.SanitizeColumnNames(fileSchema)
-       if err != nil {
-               return func(yield func(iceberg.DataFile, error) bool) {
-                       yield(nil, err)
+       if w.sanitizeSchema {
+               sanitized, err := iceberg.SanitizeColumnNames(fileSchema)
+               if err != nil {
+                       return func(yield func(iceberg.DataFile, error) bool) {
+                               yield(nil, err)
+                       }
                }
-       }
 
-       // if the schema needs to be transformed, use the transformed schema
-       // and adjust the arrow schema appropriately. otherwise we just
-       // use the original schema.
-       if !sanitized.Equals(fileSchema) {
-               fileSchema = sanitized
+               // if the schema needs to be transformed, use the transformed 
schema
+               // and adjust the arrow schema appropriately. otherwise we just
+               // use the original schema.
+               if !sanitized.Equals(fileSchema) {
+                       fileSchema = sanitized
+               }
        }
 
-       w := &writer{
-               loc:        locProvider,
-               fs:         fs,
-               fileSchema: fileSchema,
-               format:     format,
-               props:      format.GetWriteProperties(meta.props),
-               meta:       meta,
+       fw, err := w.newDataFileWriter(rootLocation, fs, meta, props, 
withFileSchema(fileSchema))
+       if err != nil {
+               return func(yield func(iceberg.DataFile, error) bool) {
+                       yield(nil, err)
+               }
        }
 
-       nworkers := config.EnvConfig.MaxWorkers
-
-       return internal.MapExec(nworkers, tasks, func(t WriteTask) 
(iceberg.DataFile, error) {
-               return w.writeFile(ctx, partitionValues, t)
+       return internal.MapExec(config.EnvConfig.MaxWorkers, tasks, func(t 
WriteTask) (iceberg.DataFile, error) {
+               return fw.writeFile(ctx, partitionValues, t)
        })
 }

Reply via email to