This is an automated email from the ASF dual-hosted git repository.

laskoviymishka pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new be4cf4cf feat(table): add RewriteFiles snapshot-op builder (#1033)
be4cf4cf is described below

commit be4cf4cfed619e4940829935b305977210b58b12
Author: Tobias Pütz <[email protected]>
AuthorDate: Tue May 12 11:58:53 2026 +0200

    feat(table): add RewriteFiles snapshot-op builder (#1033)
    
    Decompose the atomic rewrite-snapshot of `Transaction.RewriteDataFiles`
    which assumes in-process compaction to enable out-of-process distributed
    compaction, by exposing a builder similar to what iceberg-java has with
    `Table.newRewrite()`.
    
    Mirrors iceberg-java's RewriteFiles (Table.newRewrite()), sitting beside
    RowDelta. Suppresses the overwrite producer's default validator and
    queues a rewrite-specific conflict validator internally, so
    RewriteDataFiles and external callers (distributed compaction
    coordinators) commit the same shape through one indivisible operation.
    
    Transaction.NewRewrite(snapshotProps) returns *RewriteFiles with
    DeleteFile / AddDataFile / Apply / Commit. ExecuteCompactionGroup is the
    worker-side read+write step, returning a plain-data
    CompactionGroupResult.
    
    RewriteDataFiles drives the builder internally. Partial-progress mode
    now commits per group inside the loop, fixing a latent bug where a
    mid-loop failure left staged groups uncovered by the rewrite validator.
    
    Tag rewrite snapshots OpReplace instead of OpOverwrite (matching
    iceberg-java's BaseRewriteFiles.operation()) — gated on
    cfg.rewriteSemantics so generic ReplaceFiles callers, where row content
    can change, keep OpOverwrite. This also addresses #841 and should
    address the comments made in #867 since only the unexported
    `cfg.rewriteSemantics` makes OpOverwrite -> OpReplace and
    `RewriteFiles.Commit()` is the only caller of `withRewriteSemantics()`.
    
    Black-box coverage: equivalence with RewriteDataFiles, safe pos-delete
    expunge, concurrent eq-delete rejection under refresh-and-replay.
    
    ---------
    
    Signed-off-by: Tobias Pütz <[email protected]>
---
 table/rewrite_data_files.go      | 368 ++++++++++++++++------
 table/rewrite_data_files_test.go | 137 +++++++++
 table/rewrite_files.go           | 240 +++++++++++++++
 table/rewrite_files_test.go      | 636 +++++++++++++++++++++++++++++++++++++++
 table/row_delta.go               |   2 +-
 table/transaction.go             |  37 ++-
 6 files changed, 1320 insertions(+), 100 deletions(-)

diff --git a/table/rewrite_data_files.go b/table/rewrite_data_files.go
index 911488a9..e2cee62f 100644
--- a/table/rewrite_data_files.go
+++ b/table/rewrite_data_files.go
@@ -20,6 +20,7 @@ package table
 import (
        "context"
        "fmt"
+       "maps"
 
        "github.com/apache/iceberg-go"
 )
@@ -58,7 +59,8 @@ type RewriteResult struct {
 // import between table and table/compaction.
 //
 // Use [compaction.Config.PlanCompaction] to produce groups, then convert
-// [compaction.Group] → [CompactionTaskGroup] to call 
[Transaction.RewriteDataFiles].
+// [compaction.Group] → [CompactionTaskGroup] to call
+// [Transaction.RewriteDataFiles] or [ExecuteCompactionGroup].
 type CompactionTaskGroup struct {
        // PartitionKey is an opaque grouping key for display/logging.
        PartitionKey string
@@ -70,34 +72,120 @@ type CompactionTaskGroup struct {
        TotalSizeBytes int64
 }
 
+// CompactionGroupResult is the per-group output of a compaction
+// worker: the new files written, the old files being replaced, and
+// the position delete files safe to expunge in the rewrite snapshot.
+//
+// A distributed coordinator aggregates results from N workers and
+// applies them to a [RewriteFiles] builder via [RewriteFiles.Apply]
+// to commit a single atomic snapshot. Each field is plain data
+// ([]iceberg.DataFile values plus scalars) — callers serialize the
+// contained DataFiles across process boundaries themselves; the
+// typical pattern is to have the worker write a manifest containing
+// the new files and ship the manifest path to the coordinator, which
+// re-reads it.
+type CompactionGroupResult struct {
+       // PartitionKey mirrors [CompactionTaskGroup.PartitionKey] for
+       // display/logging on the coordinator.
+       PartitionKey string
+
+       // OldDataFiles are the data files this group replaces.
+       OldDataFiles []iceberg.DataFile
+
+       // NewDataFiles are the consolidated outputs the worker wrote.
+       NewDataFiles []iceberg.DataFile
+
+       // SafePosDeletes are position-delete files referenced by tasks in
+       // this group whose target data file is being rewritten, computed
+       // via [CollectSafePositionDeletes]. They are safe to expunge in
+       // the rewrite snapshot.
+       SafePosDeletes []iceberg.DataFile
+
+       // BytesBefore is [CompactionTaskGroup.TotalSizeBytes] passed
+       // through, recorded so the coordinator can roll up metrics
+       // without re-reading the plan.
+       BytesBefore int64
+
+       // BytesAfter is the sum of [iceberg.DataFile.FileSizeBytes] across
+       // NewDataFiles.
+       BytesAfter int64
+}
+
 // RewriteDataFilesOptions bundles the per-rewrite knobs for
-// Transaction.RewriteDataFiles.
+// [Transaction.RewriteDataFiles].
 type RewriteDataFilesOptions struct {
-       // PartialProgress, when true, stages each group via ReplaceFiles
-       // inside the loop so work survives a mid-loop write failure. When
-       // false (the default), all groups are committed in a single atomic
-       // snapshot.
+       // PartialProgress, when true, stages each group as its own
+       // rewrite snapshot inside the loop so a mid-loop write failure
+       // leaves the already-completed groups staged on this transaction
+       // (the in-memory transaction can be discarded by group rather
+       // than wholesale). When false (the default), every group lands in
+       // a single atomic rewrite snapshot.
        //
-       // In both modes the final catalog commit happens once at
-       // Transaction.Commit() time. True per-group durability (matching
-       // Java's behavior) requires committing separate transactions per
-       // group, which is left to the caller.
+       // In both modes the catalog commit happens once at
+       // [Transaction.Commit] time, so a process crash mid-loop loses
+       // every staged group regardless of this flag. Callers who need
+       // true per-group catalog durability (matching Java's behavior)
+       // should drive [Transaction.NewRewrite] themselves and commit a
+       // fresh transaction per group.
        PartialProgress bool
 
        // SnapshotProps are added to the rewrite snapshot's summary.
+       // In partial-progress mode the same properties land on every
+       // per-group snapshot rather than being summed or split.
        SnapshotProps iceberg.Properties
 
        // ExtraDeleteFilesToRemove are delete files (typically equality
        // deletes that are dead after the rewrite) that the caller wants
-       // expunged in the same snapshot as the rewrite. The executor
-       // passes them through to ReplaceFiles unchanged. Honored only
-       // when PartialProgress is false.
+       // expunged in the same snapshot as the rewrite. Honored only when
+       // PartialProgress is false.
        //
        // Use [compaction.CollectDeadEqualityDeletes] to compute this list
        // from the current snapshot. Position delete files that are fully
        // applied are removed automatically and do NOT need to be passed
        // in here.
        ExtraDeleteFilesToRemove []iceberg.DataFile
+
+       // GroupOptions are forwarded to every [ExecuteCompactionGroup]
+       // call to tune the per-group read+write pipeline (target file
+       // size, scan concurrency). See the With* helpers returning
+       // [CompactionGroupOption].
+       GroupOptions []CompactionGroupOption
+}
+
+// CompactionGroupOption configures a single [ExecuteCompactionGroup]
+// call. Use the With* helpers to construct values.
+type CompactionGroupOption func(*compactionGroupConfig)
+
+type compactionGroupConfig struct {
+       targetFileSize  int64
+       scanConcurrency int
+}
+
+// WithCompactionTargetFileSize sets the size target for output files
+// written by [ExecuteCompactionGroup]. Forwarded to [WriteRecords] as
+// [WithTargetFileSize]. A non-positive value (including the zero
+// default) means inherit the table's `write.target-file-size-bytes`
+// property.
+func WithCompactionTargetFileSize(size int64) CompactionGroupOption {
+       if size <= 0 {
+               return func(*compactionGroupConfig) {}
+       }
+
+       return func(c *compactionGroupConfig) {
+               c.targetFileSize = size
+       }
+}
+
+// WithCompactionScanConcurrency sets the scan concurrency used when
+// reading the group's tasks. Forwarded to [Table.Scan] as
+// [WitMaxConcurrency]. Zero (the default) means runtime.GOMAXPROCS.
+//
+// TODO: the [WitMaxConcurrency] link enshrines a pre-existing typo
+// (missing `h`). Update this reference when that symbol is renamed.
+func WithCompactionScanConcurrency(n int) CompactionGroupOption {
+       return func(c *compactionGroupConfig) {
+               c.scanConcurrency = n
+       }
 }
 
 // RewriteDataFiles compacts the given groups by reading data with
@@ -116,22 +204,20 @@ type RewriteDataFilesOptions struct {
 //
 // Use [compaction.Config.PlanCompaction] to produce the groups, then
 // convert [compaction.Group] → [CompactionTaskGroup] and pass them
-// here.
+// here. Distributed coordinators stage worker results via
+// [ExecuteCompactionGroup] and commit them via [Transaction.NewRewrite]
+// + [RewriteFiles.Apply] + [RewriteFiles.Commit] instead.
 func (t *Transaction) RewriteDataFiles(ctx context.Context, groups 
[]CompactionTaskGroup, opts RewriteDataFilesOptions) (*RewriteResult, error) {
        if len(groups) == 0 {
                return &RewriteResult{}, nil
        }
 
-       // Use an unfiltered scan to read all surviving rows. Compaction must
-       // preserve every non-deleted row in the data files being rewritten.
-       scan := t.tbl.Scan()
-       result := &RewriteResult{}
+       if opts.PartialProgress {
+               return t.rewriteDataFilesPartial(ctx, groups, opts)
+       }
 
-       var (
-               allOldData    []iceberg.DataFile
-               allNewData    []iceberg.DataFile
-               allOldDeletes []iceberg.DataFile
-       )
+       result := &RewriteResult{}
+       rewrite := t.NewRewrite(opts.SnapshotProps)
 
        for _, group := range groups {
                if err := ctx.Err(); err != nil {
@@ -142,91 +228,168 @@ func (t *Transaction) RewriteDataFiles(ctx 
context.Context, groups []CompactionT
                        continue
                }
 
-               // Read with deletes applied.
-               arrowSchema, records, err := scan.ReadTasks(ctx, group.Tasks)
+               gr, err := ExecuteCompactionGroup(ctx, t.tbl, group, 
opts.GroupOptions...)
                if err != nil {
-                       return result, fmt.Errorf("read tasks for compaction 
group %q: %w", group.PartitionKey, err)
+                       return result, err
                }
 
-               // Each compaction group is single-partition by construction, 
so the
-               // read stream is trivially clustered and we can use the 
clustered writer.
-               var newFiles []iceberg.DataFile
-               for df, err := range WriteRecords(ctx, t.tbl, arrowSchema, 
records, WithClusteredWrite()) {
-                       if err != nil {
-                               return result, fmt.Errorf("write compacted 
files for group %q: %w", group.PartitionKey, err)
-                       }
-                       newFiles = append(newFiles, df)
+               if len(gr.OldDataFiles) == 0 && len(gr.NewDataFiles) == 0 {
+                       continue
                }
 
-               // Collect old data files.
-               oldDataFiles := make([]iceberg.DataFile, 0, len(group.Tasks))
-               for _, task := range group.Tasks {
-                       oldDataFiles = append(oldDataFiles, task.File)
+               rewrite.Apply(gr.OldDataFiles, gr.NewDataFiles, 
gr.SafePosDeletes)
+               accumulateGroupMetrics(result, gr)
+       }
+
+       if result.RewrittenGroups == 0 {
+               return result, nil
+       }
+
+       for _, df := range opts.ExtraDeleteFilesToRemove {
+               rewrite.DeleteFile(df)
+               result.RemovedEqualityDeleteFiles++
+       }
+
+       if err := rewrite.Commit(ctx); err != nil {
+               return result, fmt.Errorf("commit compaction: %w", err)
+       }
+
+       return result, nil
+}
+
+// ExecuteCompactionGroup reads a compaction group's tasks (with
+// deletes applied), writes consolidated output files via
+// [WriteRecords], and computes the position-delete files safe to
+// expunge in the rewrite snapshot. It does not commit — the caller
+// hands the result to a coordinator that uses [Transaction.NewRewrite]
+// + [RewriteFiles.Apply] + [RewriteFiles.Commit] to stage the
+// atomic commit.
+//
+// Empty groups return a zero [CompactionGroupResult] without doing
+// any I/O.
+//
+// In-process callers should prefer [Transaction.RewriteDataFiles],
+// which drives this and the commit step in one call.
+//
+// Tunables are exposed via [CompactionGroupOption]. The clustered
+// write path is always used (a compaction group is single-partition
+// by construction so its read stream is trivially clustered).
+func ExecuteCompactionGroup(ctx context.Context, tbl *Table, group 
CompactionTaskGroup, opts ...CompactionGroupOption) (CompactionGroupResult, 
error) {
+       if len(group.Tasks) == 0 {
+               return CompactionGroupResult{PartitionKey: group.PartitionKey}, 
nil
+       }
+
+       cfg := compactionGroupConfig{}
+       for _, opt := range opts {
+               opt(&cfg)
+       }
+
+       var scanOpts []ScanOption
+       if cfg.scanConcurrency > 0 {
+               scanOpts = append(scanOpts, 
WitMaxConcurrency(cfg.scanConcurrency))
+       }
+
+       arrowSchema, records, err := tbl.Scan(scanOpts...).ReadTasks(ctx, 
group.Tasks)
+       if err != nil {
+               return CompactionGroupResult{}, fmt.Errorf("read tasks for 
compaction group %q: %w", group.PartitionKey, err)
+       }
+
+       // Each compaction group is single-partition by construction, so the
+       // read stream is trivially clustered and we can use the clustered 
writer.
+       writeOpts := []WriteRecordOption{WithClusteredWrite()}
+       if cfg.targetFileSize > 0 {
+               writeOpts = append(writeOpts, 
WithTargetFileSize(cfg.targetFileSize))
+       }
+
+       var (
+               newFiles   []iceberg.DataFile
+               bytesAfter int64
+       )
+       for df, err := range WriteRecords(ctx, tbl, arrowSchema, records, 
writeOpts...) {
+               if err != nil {
+                       return CompactionGroupResult{}, fmt.Errorf("write 
compacted files for group %q: %w", group.PartitionKey, err)
                }
+               newFiles = append(newFiles, df)
+               bytesAfter += df.FileSizeBytes()
+       }
+
+       oldFiles := make([]iceberg.DataFile, 0, len(group.Tasks))
+       for _, task := range group.Tasks {
+               oldFiles = append(oldFiles, task.File)
+       }
+
+       return CompactionGroupResult{
+               PartitionKey:   group.PartitionKey,
+               OldDataFiles:   oldFiles,
+               NewDataFiles:   newFiles,
+               SafePosDeletes: CollectSafePositionDeletes(group.Tasks),
+               BytesBefore:    group.TotalSizeBytes,
+               BytesAfter:     bytesAfter,
+       }, nil
+}
 
-               // Collect position delete files safe to remove.
-               safeDeletes := collectSafePositionDeletes(group.Tasks)
+// rewriteDataFilesPartial stages each group as its own rewrite
+// snapshot via [Transaction.ReplaceFiles] directly. Per-group staging
+// lets a mid-loop write failure leave already-staged groups on the
+// transaction; the catalog still receives them at
+// [Transaction.Commit] time.
+//
+// Validator registration is coalesced: a single [rewriteValidator]
+// covering every rewritten path across all groups is registered once,
+// after the loop, instead of one per group. The transaction's
+// validator list otherwise grows linearly with the group count, and
+// each entry independently walks the concurrent-snapshot set on
+// refresh-replay — the union walk subsumes them.
+func (t *Transaction) rewriteDataFilesPartial(ctx context.Context, groups 
[]CompactionTaskGroup, opts RewriteDataFilesOptions) (*RewriteResult, error) {
+       result := &RewriteResult{}
+       props := maps.Clone(opts.SnapshotProps)
+       var allRewritten []string
 
-               // Update result metrics.
-               var bytesAfter int64
-               for _, df := range newFiles {
-                       bytesAfter += df.FileSizeBytes()
+       for _, group := range groups {
+               if err := ctx.Err(); err != nil {
+                       return result, err
                }
 
-               result.RewrittenGroups++
-               result.AddedDataFiles += len(newFiles)
-               result.RemovedDataFiles += len(oldDataFiles)
-               result.RemovedPositionDeleteFiles += len(safeDeletes)
-               result.BytesBefore += group.TotalSizeBytes
-               result.BytesAfter += bytesAfter
-
-               // Always accumulate across groups; partial-progress mode also
-               // stages each group via ReplaceFiles so work survives a
-               // mid-loop write failure, but the final catalog commit is
-               // always one atomic doCommit at Transaction.Commit() time.
-               allOldData = append(allOldData, oldDataFiles...)
-               allNewData = append(allNewData, newFiles...)
-               allOldDeletes = append(allOldDeletes, safeDeletes...)
-
-               if opts.PartialProgress {
-                       if err := t.ReplaceFiles(ctx, oldDataFiles, newFiles, 
safeDeletes, opts.SnapshotProps, withRewriteSemantics()); err != nil {
-                               return result, fmt.Errorf("commit compaction 
group %q: %w", group.PartitionKey, err)
-                       }
+               if len(group.Tasks) == 0 {
+                       continue
                }
-       }
 
-       // Register the rewrite-specific conflict validator covering every
-       // rewritten data file across every group. The validator list is
-       // drained at Transaction.Commit() → doCommit. Runs alongside the
-       // overwrite producer's suppressed validator (via
-       // withRewriteSemantics) so concurrent pos/eq-deletes targeting a
-       // rewritten file are caught pre-flight.
-       if len(allOldData) > 0 {
-               rewritten := make([]string, 0, len(allOldData))
-               for _, f := range allOldData {
-                       rewritten = append(rewritten, f.FilePath())
+               gr, err := ExecuteCompactionGroup(ctx, t.tbl, group, 
opts.GroupOptions...)
+               if err != nil {
+                       return result, err
                }
-               t.validators = append(t.validators, rewriteValidator(rewritten))
-       }
 
-       if !opts.PartialProgress {
-               // Caller-supplied dead eq-deletes (typically from
-               // [compaction.CollectDeadEqualityDeletes]). The caller is
-               // responsible for computing these against the same snapshot
-               // this transaction is staged on.
-               if len(opts.ExtraDeleteFilesToRemove) > 0 {
-                       allOldDeletes = append(allOldDeletes, 
opts.ExtraDeleteFilesToRemove...)
-                       result.RemovedEqualityDeleteFiles += 
len(opts.ExtraDeleteFilesToRemove)
+               if len(gr.OldDataFiles) == 0 && len(gr.NewDataFiles) == 0 {
+                       continue
                }
 
-               if err := t.ReplaceFiles(ctx, allOldData, allNewData, 
allOldDeletes, opts.SnapshotProps, withRewriteSemantics()); err != nil {
-                       return result, fmt.Errorf("commit compaction: %w", err)
+               if err := t.ReplaceFiles(ctx, gr.OldDataFiles, gr.NewDataFiles, 
gr.SafePosDeletes,
+                       props, withRewriteSemantics()); err != nil {
+                       return result, fmt.Errorf("commit compaction group %q: 
%w", group.PartitionKey, err)
                }
+
+               for _, f := range gr.OldDataFiles {
+                       allRewritten = append(allRewritten, f.FilePath())
+               }
+               accumulateGroupMetrics(result, gr)
+       }
+
+       if len(allRewritten) > 0 {
+               t.addValidator(rewriteValidator(allRewritten))
        }
 
        return result, nil
 }
 
+func accumulateGroupMetrics(r *RewriteResult, gr CompactionGroupResult) {
+       r.RewrittenGroups++
+       r.AddedDataFiles += len(gr.NewDataFiles)
+       r.RemovedDataFiles += len(gr.OldDataFiles)
+       r.RemovedPositionDeleteFiles += len(gr.SafePosDeletes)
+       r.BytesBefore += gr.BytesBefore
+       r.BytesAfter += gr.BytesAfter
+}
+
 // rewriteValidator builds a conflictValidatorFunc that rejects the
 // commit if a concurrent snapshot added delete files pointing at any
 // of the rewritten data-file paths (or eq-deletes during the rewrite,
@@ -242,19 +405,38 @@ func rewriteValidator(rewrittenPaths []string) 
conflictValidatorFunc {
        }
 }
 
-// collectSafePositionDeletes returns position delete files from the given
-// tasks that are safe to remove during compaction.
+// CollectSafePositionDeletes returns position delete files from the
+// given tasks that are safe to remove during compaction.
 //
-// A position delete file is safe to remove when it was matched to a data
-// file (via scan planning) and that data file is being rewritten in this
-// compaction group. Since ReadTasks applies the deletes during reading,
-// the new output files will not contain the deleted rows.
+// A position delete file is safe to remove when it was matched to a
+// data file (via scan planning) and that data file is being rewritten
+// in this compaction group. Since ReadTasks applies the deletes during
+// reading, the new output files will not contain the deleted rows.
 //
 // Only position deletes (EntryContentPosDeletes) are considered.
 // Equality deletes are decided by [compaction.DecideDeadEqualityDeletes]
 // (which needs partition-wide visibility, not just the task scope).
 // Deletion vectors will be handled when DV read support lands.
-func collectSafePositionDeletes(tasks []FileScanTask) []iceberg.DataFile {
+//
+// Caller contract: every data file referenced by a returned pos-delete
+// must be in the caller's rewrite set across the entire commit.
+// This function only sees one group's tasks, but a pos-delete file
+// can reference data files across multiple groups (the planner
+// bin-packs within a partition via [compaction.Config.PlanCompaction]
+// and skips files via MinInputFiles). If a pos-delete is reported safe
+// by one group but references a still-live data file in another group
+// — or a file the planner skipped — committing only this group's
+// rewrite would orphan the still-live data file's deletes. Coordinators
+// that aggregate multiple groups into one rewrite snapshot are
+// responsible for re-checking against the full set of rewritten paths,
+// or for moving this computation leader-side once worker outputs have
+// aggregated.
+//
+// [ExecuteCompactionGroup] calls this internally to populate
+// [CompactionGroupResult.SafePosDeletes]. It is kept exported for
+// custom workers that want the spec-shaped predicate without taking
+// the rest of [ExecuteCompactionGroup]'s read+write pipeline.
+func CollectSafePositionDeletes(tasks []FileScanTask) []iceberg.DataFile {
        seen := make(map[string]bool)
        var safe []iceberg.DataFile
 
diff --git a/table/rewrite_data_files_test.go b/table/rewrite_data_files_test.go
index ce12a8fd..22d295df 100644
--- a/table/rewrite_data_files_test.go
+++ b/table/rewrite_data_files_test.go
@@ -243,6 +243,143 @@ func TestRewriteDataFiles_EmptyPlan(t *testing.T) {
        assert.Equal(t, int64(0), result.BytesBefore)
 }
 
+// TestExecuteCompactionGroup_TargetFileSizeForwarded verifies that
+// WithCompactionTargetFileSize reaches the underlying WriteRecords
+// call: a tiny target size on a multi-row group must force the
+// writer to emit more than one output file.
+func TestExecuteCompactionGroup_TargetFileSizeForwarded(t *testing.T) {
+       tbl := newRewriteTestTable(t)
+
+       arrowSc, err := table.SchemaToArrowSchema(tbl.Schema(), nil, false, 
false)
+       require.NoError(t, err)
+
+       for i := range 5 {
+               dataPath := tbl.Location() + 
fmt.Sprintf("/data/file-%d.parquet", i)
+               writeParquetFile(t, dataPath, arrowSc,
+                       fmt.Sprintf(`[{"id": %d, "data": "row-%d"}]`, i+1, i+1))
+               tx := tbl.NewTransaction()
+               require.NoError(t, tx.AddFiles(t.Context(), []string{dataPath}, 
nil, false))
+               tbl, err = tx.Commit(t.Context())
+               require.NoError(t, err)
+       }
+
+       tasks, err := tbl.Scan().PlanFiles(t.Context())
+       require.NoError(t, err)
+       require.Len(t, tasks, 5)
+
+       // Build the group manually so option-forwarding is decoupled from
+       // the planner's bin-packing / MinInputFiles knobs.
+       scanTasks := make([]table.FileScanTask, len(tasks))
+       var totalSize int64
+       for i, st := range tasks {
+               scanTasks[i] = st
+               totalSize += st.File.FileSizeBytes()
+       }
+       group := table.CompactionTaskGroup{
+               PartitionKey:   "single",
+               Tasks:          scanTasks,
+               TotalSizeBytes: totalSize,
+       }
+
+       withTiny, err := table.ExecuteCompactionGroup(t.Context(), tbl, group,
+               table.WithCompactionTargetFileSize(1))
+       require.NoError(t, err)
+       assert.Greater(t, len(withTiny.NewDataFiles), 1,
+               "WithCompactionTargetFileSize(1) must force the writer to roll 
over per row")
+
+       withDefault, err := table.ExecuteCompactionGroup(t.Context(), tbl, 
group)
+       require.NoError(t, err)
+       assert.Len(t, withDefault.NewDataFiles, 1,
+               "without the option, the same group consolidates into a single 
file")
+}
+
+// TestExecuteCompactionGroup_ScanConcurrencyForwarded is a smoke test
+// confirming WithCompactionScanConcurrency is wired through without
+// breaking the read path. We can't easily observe scan parallelism
+// from the result, so the assertion is correctness equivalence with a
+// no-option baseline: same group → same files in / out.
+func TestExecuteCompactionGroup_ScanConcurrencyForwarded(t *testing.T) {
+       tbl := newRewriteTestTable(t)
+
+       arrowSc, err := table.SchemaToArrowSchema(tbl.Schema(), nil, false, 
false)
+       require.NoError(t, err)
+
+       for i := range 3 {
+               dataPath := tbl.Location() + 
fmt.Sprintf("/data/file-%d.parquet", i)
+               writeParquetFile(t, dataPath, arrowSc,
+                       fmt.Sprintf(`[{"id": %d, "data": "row-%d"}]`, i+1, i+1))
+               tx := tbl.NewTransaction()
+               require.NoError(t, tx.AddFiles(t.Context(), []string{dataPath}, 
nil, false))
+               tbl, err = tx.Commit(t.Context())
+               require.NoError(t, err)
+       }
+
+       tasks, err := tbl.Scan().PlanFiles(t.Context())
+       require.NoError(t, err)
+
+       plan, err := defaultTestCompactionCfg.PlanCompaction(tasks)
+       require.NoError(t, err)
+       require.NotEmpty(t, plan.Groups)
+
+       group := toTaskGroups(plan.Groups)[0]
+
+       withOption, err := table.ExecuteCompactionGroup(t.Context(), tbl, group,
+               table.WithCompactionScanConcurrency(1))
+       require.NoError(t, err)
+
+       baseline, err := table.ExecuteCompactionGroup(t.Context(), tbl, group)
+       require.NoError(t, err)
+
+       assert.Equal(t, len(baseline.OldDataFiles), 
len(withOption.OldDataFiles),
+               "setting scan concurrency must not change the set of old files 
read")
+       assert.Equal(t, len(baseline.NewDataFiles), 
len(withOption.NewDataFiles),
+               "setting scan concurrency must not change the set of 
consolidated outputs")
+}
+
+// TestRewriteDataFiles_GroupOptionsForwarded verifies that
+// RewriteDataFilesOptions.GroupOptions are piped through to every
+// ExecuteCompactionGroup call.
+func TestRewriteDataFiles_GroupOptionsForwarded(t *testing.T) {
+       tbl := newRewriteTestTable(t)
+
+       arrowSc, err := table.SchemaToArrowSchema(tbl.Schema(), nil, false, 
false)
+       require.NoError(t, err)
+
+       for i := range 5 {
+               dataPath := tbl.Location() + 
fmt.Sprintf("/data/file-%d.parquet", i)
+               writeParquetFile(t, dataPath, arrowSc,
+                       fmt.Sprintf(`[{"id": %d, "data": "row-%d"}]`, i+1, i+1))
+               tx := tbl.NewTransaction()
+               require.NoError(t, tx.AddFiles(t.Context(), []string{dataPath}, 
nil, false))
+               tbl, err = tx.Commit(t.Context())
+               require.NoError(t, err)
+       }
+
+       tasks, err := tbl.Scan().PlanFiles(t.Context())
+       require.NoError(t, err)
+
+       plan, err := defaultTestCompactionCfg.PlanCompaction(tasks)
+       require.NoError(t, err)
+
+       tx := tbl.NewTransaction()
+       result, err := tx.RewriteDataFiles(t.Context(), 
toTaskGroups(plan.Groups), table.RewriteDataFilesOptions{
+               GroupOptions: []table.CompactionGroupOption{
+                       table.WithCompactionTargetFileSize(1),
+               },
+       })
+       require.NoError(t, err)
+
+       assert.Greater(t, result.AddedDataFiles, 1,
+               "GroupOptions must reach ExecuteCompactionGroup; tiny target 
size should split output")
+
+       // Drive the commit through to catch regressions that break
+       // ReplaceFiles under tiny-target rewrites — the in-process counter
+       // above only proves the option reached the writer.
+       committed, err := tx.Commit(t.Context())
+       require.NoError(t, err)
+       assertRowCount(t, committed, 5)
+}
+
 func TestRewriteDataFiles_EmptyGroupSkipped(t *testing.T) {
        tbl := newRewriteTestTable(t)
 
diff --git a/table/rewrite_files.go b/table/rewrite_files.go
new file mode 100644
index 00000000..6ee2b805
--- /dev/null
+++ b/table/rewrite_files.go
@@ -0,0 +1,240 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+       "context"
+       "fmt"
+       "maps"
+
+       "github.com/apache/iceberg-go"
+)
+
+// RewriteFiles is the snapshot-operation builder for rewrite
+// (compaction) commits. It is the snapshot-level sibling of [RowDelta]
+// and mirrors Java's org.apache.iceberg.RewriteFiles interface
+// (returned by Table.newRewrite() in Java).
+//
+// Compared to a raw [Transaction.ReplaceFiles] call, the builder
+// owns the rewrite-specific isolation contract internally:
+//
+//   - The overwrite producer's default isolation validator is suppressed
+//     (concurrent appends into rewritten partitions are allowed; this
+//     is the defining behavior of a rewrite).
+//   - A rewrite-specific conflict validator is registered so concurrent
+//     pos/eq-delete files targeting any rewritten data file are
+//     rejected pre-flight at [Transaction.Commit] time. The pos-delete
+//     branch only fires when the concurrent writer populated the
+//     manifest's referenced_data_file column (field id 143). That
+//     column is V2-optional and V3-required for deletion-vector
+//     deletes; V2 pos-delete writers commonly leave it empty, in
+//     which case only the conservative eq-delete-during-rewrite rule
+//     fires.
+//
+// Distributed compaction coordinators construct one [RewriteFiles] on
+// the leader transaction, feed worker outputs in via [RewriteFiles.Apply],
+// and commit one snapshot. In-process callers can use
+// [Transaction.RewriteDataFiles] which drives this builder internally.
+//
+// The builder follows the same fail-fast pattern as
+// [view.MetadataBuilder]: a method that hits an invalid input stages
+// the error and short-circuits all subsequent calls until
+// [RewriteFiles.Commit] drains it. The builder is single-use; once
+// Commit has been called, a second call returns an error regardless
+// of whether the first call succeeded.
+//
+// Adding new delete files (e.g., rewriting position deletes into
+// deletion vectors) is not yet supported; [RewriteFiles.AddDataFile]
+// rejects pos/eq-delete inputs at insertion time. Add the support to
+// the underlying [Transaction.ReplaceFiles] before lifting that
+// restriction.
+type RewriteFiles struct {
+       txn                 *Transaction
+       dataFilesToDelete   []iceberg.DataFile
+       dataFilesToAdd      []iceberg.DataFile
+       deleteFilesToRemove []iceberg.DataFile
+       snapshotProps       iceberg.Properties
+       err                 error
+       committed           bool
+}
+
+// NewRewrite returns a [RewriteFiles] builder bound to this transaction.
+// Mirrors Java's org.apache.iceberg.Table#newRewrite. snapshotProps is
+// cloned and the clone is added to the rewrite snapshot's summary;
+// pass nil for none.
+//
+// Usage:
+//
+//     rewrite := tx.NewRewrite(nil)
+//     rewrite.DeleteFile(oldDataFile)
+//     rewrite.AddDataFile(newDataFile)
+//     if err := rewrite.Commit(ctx); err != nil { ... }
+//     committed, err := tx.Commit(ctx)
+func (t *Transaction) NewRewrite(snapshotProps iceberg.Properties) 
*RewriteFiles {
+       return &RewriteFiles{txn: t, snapshotProps: maps.Clone(snapshotProps)}
+}
+
+// DeleteFile marks a file for removal in this rewrite. Routes by
+// content type: data files are queued as data-file replacements;
+// pos/eq-delete files are queued for delete-file removal alongside
+// the data rewrite (typical when a delete is fully applied to data
+// files being rewritten and is therefore safe to expunge).
+//
+// Any other content type stages an error that is returned from the
+// next [RewriteFiles.Commit] call.
+func (r *RewriteFiles) DeleteFile(df iceberg.DataFile) *RewriteFiles {
+       if r.err != nil {
+               return r
+       }
+       if df == nil {
+               r.err = fmt.Errorf("%w: DeleteFile got nil data file", 
ErrInvalidOperation)
+
+               return r
+       }
+
+       switch df.ContentType() {
+       case iceberg.EntryContentData:
+               r.dataFilesToDelete = append(r.dataFilesToDelete, df)
+       case iceberg.EntryContentPosDeletes, iceberg.EntryContentEqDeletes:
+               r.deleteFilesToRemove = append(r.deleteFilesToRemove, df)
+       default:
+               r.err = fmt.Errorf("%w: DeleteFile got unsupported content type 
%s (%s)",
+                       ErrInvalidOperation, df.ContentType(), df.FilePath())
+       }
+
+       return r
+}
+
+// AddDataFile queues a new data file. Adding delete files is not yet
+// supported by the underlying snapshot machinery; a pos/eq-delete here
+// stages an error that is returned from the next [RewriteFiles.Commit]
+// call. The error names the offending file path so callers driving the
+// builder via [RewriteFiles.Apply] can identify it without tracking
+// queue order.
+func (r *RewriteFiles) AddDataFile(df iceberg.DataFile) *RewriteFiles {
+       if r.err != nil {
+               return r
+       }
+       if df == nil {
+               r.err = fmt.Errorf("%w: AddDataFile got nil data file", 
ErrInvalidOperation)
+
+               return r
+       }
+
+       if df.ContentType() != iceberg.EntryContentData {
+               r.err = fmt.Errorf("%w: AddDataFile only supports data files; 
got content type %s (%s)",
+                       ErrInvalidOperation, df.ContentType(), df.FilePath())
+
+               return r
+       }
+       r.dataFilesToAdd = append(r.dataFilesToAdd, df)
+
+       return r
+}
+
+// Apply is a bulk shortcut that routes three slices onto this builder:
+// every entry in deletes and safeDeletes is queued via
+// [RewriteFiles.DeleteFile] (which routes data vs. delete files by
+// content type), and every entry in adds via [RewriteFiles.AddDataFile].
+//
+// Distributed coordinators should prefer [RewriteFiles.ApplyResult],
+// which takes a [CompactionGroupResult] directly: the three positional
+// same-typed slices here transpose silently under refactor.
+func (r *RewriteFiles) Apply(deletes, adds, safeDeletes []iceberg.DataFile) 
*RewriteFiles {
+       if r.err != nil {
+               return r
+       }
+
+       for _, df := range deletes {
+               r.DeleteFile(df)
+       }
+       for _, df := range adds {
+               r.AddDataFile(df)
+       }
+       for _, df := range safeDeletes {
+               r.DeleteFile(df)
+       }
+
+       return r
+}
+
+// ApplyResult is the typed coordinator entry point: it queues a worker's
+// [CompactionGroupResult] onto this builder by routing OldDataFiles
+// (via DeleteFile), NewDataFiles (via AddDataFile), and SafePosDeletes
+// (via DeleteFile) in one call. Prefer this over [RewriteFiles.Apply]
+// when feeding worker outputs — the field names line up with the
+// builder semantics, so a refactor of CompactionGroupResult cannot
+// silently transpose roles.
+//
+// Typical distributed-coordinator pattern:
+//
+//     rewrite := leaderTxn.NewRewrite(snapshotProps)
+//     for _, gr := range workerResults {
+//         rewrite.ApplyResult(gr)
+//     }
+//     if err := rewrite.Commit(ctx); err != nil { ... }
+func (r *RewriteFiles) ApplyResult(gr CompactionGroupResult) *RewriteFiles {
+       return r.Apply(gr.OldDataFiles, gr.NewDataFiles, gr.SafePosDeletes)
+}
+
+// Commit stages the rewrite snapshot on the underlying transaction.
+// The catalog commit happens once, later, at [Transaction.Commit] time.
+//
+// Commit is single-shot: any second call returns an error regardless
+// of whether the first call succeeded, and neither re-stages the
+// rewrite nor re-registers the conflict validator. Returns an error
+// if any file passed to [RewriteFiles.AddDataFile] or
+// [RewriteFiles.DeleteFile] had an unsupported content type, if the
+// builder has no file changes, or if the underlying
+// [Transaction.ReplaceFiles] call fails.
+func (r *RewriteFiles) Commit(ctx context.Context) error {
+       if r.committed {
+               return fmt.Errorf("%w: RewriteFiles.Commit already called on 
this builder", ErrInvalidOperation)
+       }
+       r.committed = true
+
+       if r.err != nil {
+               return r.err
+       }
+       if len(r.dataFilesToDelete) == 0 && len(r.dataFilesToAdd) == 0 && 
len(r.deleteFilesToRemove) == 0 {
+               return fmt.Errorf("%w: rewrite must have at least one file 
change", ErrInvalidOperation)
+       }
+       // Adds-without-deletes would route through ReplaceFiles →
+       // ReplaceDataFilesWithDataFiles → AddDataFiles, an OpAppend
+       // producer that never reads cfg.rewriteSemantics. The snapshot
+       // would be tagged append with no rewrite validator — silently
+       // wrong for a rewrite. A pure delete-file expunge (only
+       // deleteFilesToRemove non-empty) is still legitimate.
+       if len(r.dataFilesToDelete) == 0 && len(r.dataFilesToAdd) > 0 {
+               return fmt.Errorf("%w: rewrite must delete at least one data 
file when adding data files", ErrInvalidOperation)
+       }
+
+       if err := r.txn.ReplaceFiles(ctx, r.dataFilesToDelete, 
r.dataFilesToAdd, r.deleteFilesToRemove, r.snapshotProps, 
withRewriteSemantics()); err != nil {
+               return err
+       }
+
+       if len(r.dataFilesToDelete) > 0 {
+               rewritten := make([]string, 0, len(r.dataFilesToDelete))
+               for _, df := range r.dataFilesToDelete {
+                       rewritten = append(rewritten, df.FilePath())
+               }
+               r.txn.addValidator(rewriteValidator(rewritten))
+       }
+
+       return nil
+}
diff --git a/table/rewrite_files_test.go b/table/rewrite_files_test.go
new file mode 100644
index 00000000..7647dd54
--- /dev/null
+++ b/table/rewrite_files_test.go
@@ -0,0 +1,636 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table_test
+
+// Black-box coverage for the [table.RewriteFiles] snapshot-operation
+// builder, including the supporting [table.ExecuteCompactionGroup]
+// worker function and the [table.CollectSafePositionDeletes]
+// predicate. Tests cover the in-process path (one transaction stages
+// and commits) and the distributed-coordinator path (workers produce
+// [table.CompactionGroupResult]s, a leader builds a single
+// [table.RewriteFiles] from them and commits).
+
+import (
+       "context"
+       "fmt"
+       "path/filepath"
+       "sync/atomic"
+       "testing"
+
+       "github.com/apache/iceberg-go"
+       iceio "github.com/apache/iceberg-go/io"
+       "github.com/apache/iceberg-go/table"
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+)
+
+// concurrentTestCatalog enforces Requirement.Validate against its
+// current metadata on every CommitTable, so a leader transaction
+// whose AssertRefSnapshotID points at a stale snapshot fails its
+// first attempt and triggers refresh-and-replay. attempts counts
+// CommitTable invocations so tests can prove the retry boundary.
+type concurrentTestCatalog struct {
+       metadata table.Metadata
+       location string
+       fsF      table.FSysF
+       attempts atomic.Int32
+}
+
+func (c *concurrentTestCatalog) LoadTable(_ context.Context, ident 
table.Identifier) (*table.Table, error) {
+       return table.New(ident, c.metadata, c.location, c.fsF, c), nil
+}
+
+func (c *concurrentTestCatalog) CommitTable(_ context.Context, _ 
table.Identifier, reqs []table.Requirement, updates []table.Update) 
(table.Metadata, string, error) {
+       c.attempts.Add(1)
+       for _, req := range reqs {
+               if err := req.Validate(c.metadata); err != nil {
+                       return nil, "", fmt.Errorf("%w: %w", 
table.ErrCommitFailed, err)
+               }
+       }
+       meta, err := table.UpdateTableMetadata(c.metadata, updates, "")
+       if err != nil {
+               return nil, "", err
+       }
+       c.metadata = meta
+
+       // Returning the seed location is enough to keep subsequent
+       // NewTransaction calls from adding AssertCreate; the value is not
+       // re-read by the table machinery between commits in this stub.
+       return meta, c.location, nil
+}
+
+func newConcurrentRewriteTestTable(t *testing.T) (*table.Table, 
*concurrentTestCatalog) {
+       t.Helper()
+
+       location := filepath.ToSlash(t.TempDir())
+       schema := iceberg.NewSchema(0,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
+               iceberg.NestedField{ID: 2, Name: "data", Type: 
iceberg.PrimitiveTypes.String, Required: false},
+       )
+       meta, err := table.NewMetadata(schema, iceberg.UnpartitionedSpec, 
table.UnsortedSortOrder, location,
+               iceberg.Properties{
+                       table.PropertyFormatVersion:        "2",
+                       table.CommitNumRetriesKey:          "2",
+                       table.CommitMinRetryWaitMsKey:      "1",
+                       table.CommitMaxRetryWaitMsKey:      "2",
+                       table.CommitTotalRetryTimeoutMsKey: "1000",
+               })
+       require.NoError(t, err)
+
+       metaLoc := location + "/metadata/v1.metadata.json"
+       fsF := func(context.Context) (iceio.IO, error) { return 
iceio.LocalFS{}, nil }
+       cat := &concurrentTestCatalog{metadata: meta, location: metaLoc, fsF: 
fsF}
+
+       return table.New(table.Identifier{"db", "concurrent_rewrite"}, meta, 
metaLoc, fsF, cat), cat
+}
+
+func newPosDeleteFile(t *testing.T, path string) iceberg.DataFile {
+       t.Helper()
+
+       b, err := iceberg.NewDataFileBuilder(
+               *iceberg.UnpartitionedSpec, iceberg.EntryContentPosDeletes,
+               path, iceberg.ParquetFile, nil, nil, nil, 1, 128)
+       require.NoError(t, err)
+
+       return b.Build()
+}
+
+func newEqDeleteFile(t *testing.T, path string) iceberg.DataFile {
+       t.Helper()
+
+       b, err := iceberg.NewDataFileBuilder(
+               *iceberg.UnpartitionedSpec, iceberg.EntryContentEqDeletes,
+               path, iceberg.ParquetFile, nil, nil, nil, 1, 128)
+       require.NoError(t, err)
+       b = b.EqualityFieldIDs([]int{1})
+
+       return b.Build()
+}
+
+func newDataFile(t *testing.T, path string) iceberg.DataFile {
+       t.Helper()
+
+       b, err := iceberg.NewDataFileBuilder(
+               *iceberg.UnpartitionedSpec, iceberg.EntryContentData,
+               path, iceberg.ParquetFile, nil, nil, nil, 1, 128)
+       require.NoError(t, err)
+
+       return b.Build()
+}
+
+func TestCollectSafePositionDeletes_FiltersAndDedupes(t *testing.T) {
+       posA := newPosDeleteFile(t, "pos-a.parquet")
+       posB := newPosDeleteFile(t, "pos-b.parquet")
+       eq := newEqDeleteFile(t, "eq.parquet")
+
+       tasks := []table.FileScanTask{
+               {DeleteFiles: []iceberg.DataFile{posA, posB}, 
EqualityDeleteFiles: []iceberg.DataFile{eq}},
+               {DeleteFiles: []iceberg.DataFile{posA}}, // duplicate of posA
+       }
+
+       got := table.CollectSafePositionDeletes(tasks)
+       require.Len(t, got, 2, "duplicate pos-deletes must be deduped, 
eq-deletes must be filtered out")
+
+       paths := []string{got[0].FilePath(), got[1].FilePath()}
+       assert.ElementsMatch(t, []string{"pos-a.parquet", "pos-b.parquet"}, 
paths)
+}
+
+func TestCollectSafePositionDeletes_EmptyTasks(t *testing.T) {
+       assert.Empty(t, table.CollectSafePositionDeletes(nil))
+       assert.Empty(t, 
table.CollectSafePositionDeletes([]table.FileScanTask{{}}))
+}
+
+func TestExecuteCompactionGroup_EmptyGroup(t *testing.T) {
+       tbl := newRewriteTestTable(t)
+
+       got, err := table.ExecuteCompactionGroup(t.Context(), tbl,
+               table.CompactionTaskGroup{PartitionKey: "p"})
+       require.NoError(t, err)
+       assert.Equal(t, "p", got.PartitionKey)
+       assert.Empty(t, got.OldDataFiles)
+       assert.Empty(t, got.NewDataFiles)
+       assert.Empty(t, got.SafePosDeletes)
+}
+
+func TestRewriteFiles_EmptyCommit_Errors(t *testing.T) {
+       tbl := newRewriteTestTable(t)
+       tx := tbl.NewTransaction()
+
+       err := tx.NewRewrite(nil).Commit(t.Context())
+       require.Error(t, err, "an empty rewrite has nothing to stage and must 
reject")
+       assert.ErrorIs(t, err, table.ErrInvalidOperation)
+       assert.Contains(t, err.Error(), "at least one file change")
+}
+
+func TestRewriteFiles_AddDataFile_RejectsNonDataFile(t *testing.T) {
+       tbl := newRewriteTestTable(t)
+       tx := tbl.NewTransaction()
+
+       posDel := newPosDeleteFile(t, "spurious-pos-del.parquet")
+
+       err := tx.NewRewrite(nil).AddDataFile(posDel).Commit(t.Context())
+       require.Error(t, err)
+       assert.ErrorIs(t, err, table.ErrInvalidOperation)
+       assert.Contains(t, err.Error(), "AddDataFile only supports data files",
+               "adding a delete file via AddDataFile must be reported at 
commit time")
+       assert.Contains(t, err.Error(), "spurious-pos-del.parquet",
+               "error must name the offending file path")
+}
+
+func TestRewriteFiles_Commit_SingleShot(t *testing.T) {
+       tbl := newRewriteTestTable(t)
+
+       arrowSc, err := table.SchemaToArrowSchema(tbl.Schema(), nil, false, 
false)
+       require.NoError(t, err)
+
+       for i := range 2 {
+               dataPath := tbl.Location() + 
fmt.Sprintf("/data/seed-%d.parquet", i)
+               writeParquetFile(t, dataPath, arrowSc,
+                       fmt.Sprintf(`[{"id": %d, "data": "seed"}]`, i+1))
+               seedTx := tbl.NewTransaction()
+               require.NoError(t, seedTx.AddFiles(t.Context(), 
[]string{dataPath}, nil, false))
+               tbl, err = seedTx.Commit(t.Context())
+               require.NoError(t, err)
+       }
+
+       tasks, err := tbl.Scan().PlanFiles(t.Context())
+       require.NoError(t, err)
+
+       plan, err := defaultTestCompactionCfg.PlanCompaction(tasks)
+       require.NoError(t, err)
+       require.NotEmpty(t, plan.Groups)
+
+       groups := toTaskGroups(plan.Groups)
+
+       results := make([]table.CompactionGroupResult, 0, len(groups))
+       for _, g := range groups {
+               gr, err := table.ExecuteCompactionGroup(t.Context(), tbl, g)
+               require.NoError(t, err)
+               results = append(results, gr)
+       }
+
+       tx := tbl.NewTransaction()
+       rewrite := tx.NewRewrite(nil)
+       for _, gr := range results {
+               rewrite.Apply(gr.OldDataFiles, gr.NewDataFiles, 
gr.SafePosDeletes)
+       }
+       require.NoError(t, rewrite.Commit(t.Context()))
+
+       err = rewrite.Commit(t.Context())
+       require.Error(t, err, "double commit must reject so validators are not 
re-appended")
+       assert.ErrorIs(t, err, table.ErrInvalidOperation)
+       assert.Contains(t, err.Error(), "already called")
+}
+
+// TestRewriteFiles_Commit_SingleShot_AfterFailure pins down the
+// stricter contract: a builder is dead even when the first Commit
+// failed before reaching ReplaceFiles. Without this guard, an
+// empty-rewrite or stranger-file failure would leave the builder
+// reusable, and a retry would slip through and append the conflict
+// validator a second time.
+func TestRewriteFiles_Commit_SingleShot_AfterFailure(t *testing.T) {
+       t.Run("empty rewrite", func(t *testing.T) {
+               tbl := newRewriteTestTable(t)
+               tx := tbl.NewTransaction()
+
+               rewrite := tx.NewRewrite(nil)
+
+               err := rewrite.Commit(t.Context())
+               require.Error(t, err, "first Commit on an empty builder must 
fail")
+               assert.ErrorIs(t, err, table.ErrInvalidOperation)
+               assert.Contains(t, err.Error(), "at least one file change")
+
+               err = rewrite.Commit(t.Context())
+               require.Error(t, err, "second Commit must reject even though 
the first never staged anything")
+               assert.ErrorIs(t, err, table.ErrInvalidOperation)
+               assert.Contains(t, err.Error(), "already called")
+       })
+
+       t.Run("ReplaceFiles failure", func(t *testing.T) {
+               tbl := newRewriteTestTable(t)
+               tx := tbl.NewTransaction()
+
+               stranger := newDataFile(t, 
tbl.Location()+"/data/stranger.parquet")
+               replacement := newDataFile(t, 
tbl.Location()+"/data/replacement.parquet")
+
+               rewrite := tx.NewRewrite(nil).
+                       DeleteFile(stranger).
+                       AddDataFile(replacement)
+
+               err := rewrite.Commit(t.Context())
+               require.Error(t, err, "ReplaceFiles must reject a stranger data 
file")
+
+               err = rewrite.Commit(t.Context())
+               require.Error(t, err, "second Commit must reject so a retry 
can't re-stage ReplaceFiles or re-append the validator")
+               assert.ErrorIs(t, err, table.ErrInvalidOperation)
+               assert.Contains(t, err.Error(), "already called")
+       })
+}
+
+func TestRewriteFiles_DeleteFile_RoutesByContentType(t *testing.T) {
+       // Validates routing into the right slice via observable behavior:
+       // the data-slice and delete-file-slice membership checks in
+       // ReplaceFiles produce distinct error messages, so a mis-routed
+       // file would surface the wrong error class.
+       //
+       //   Builder A — DeleteFile(strangerData): if routed correctly, the
+       //   transaction has an empty deleteFilesToRemove slice and falls
+       //   through to ReplaceDataFilesWithDataFiles, which rejects with
+       //   "cannot delete files that do not belong to the table". A
+       //   mis-route into deleteFilesToRemove would error with "cannot
+       //   remove delete files that do not belong to the table" instead.
+       //
+       //   Builder B — DeleteFile(strangerPosDel) only: if routed
+       //   correctly, ReplaceFiles' main path rejects with "cannot remove
+       //   delete files that do not belong to the table". A mis-route
+       //   into dataFilesToDelete would surface "cannot delete files
+       //   that do not belong to the table" instead. (No AddDataFile
+       //   here: adds-without-data-deletes is rejected pre-flight by
+       //   Commit, so it would mask the routing error.)
+       tbl := newRewriteTestTable(t)
+
+       arrowSc, err := table.SchemaToArrowSchema(tbl.Schema(), nil, false, 
false)
+       require.NoError(t, err)
+       dataPath := tbl.Location() + "/data/seed.parquet"
+       writeParquetFile(t, dataPath, arrowSc, `[{"id": 1, "data": "seed"}]`)
+       seedTx := tbl.NewTransaction()
+       require.NoError(t, seedTx.AddFiles(t.Context(), []string{dataPath}, 
nil, false))
+       tbl, err = seedTx.Commit(t.Context())
+       require.NoError(t, err)
+
+       strangerData := newDataFile(t, tbl.Location()+"/data/stranger.parquet")
+       strangerPosDel := newPosDeleteFile(t, 
tbl.Location()+"/data/stranger-pos-del.parquet")
+       replacement := newDataFile(t, 
tbl.Location()+"/data/replacement.parquet")
+
+       dataSliceMiss := "cannot delete files that do not belong to the table"
+       deleteFileSliceMiss := "cannot remove delete files that do not belong 
to the table"
+
+       dataTx := tbl.NewTransaction()
+       err = dataTx.NewRewrite(nil).
+               DeleteFile(strangerData).
+               AddDataFile(replacement).
+               Commit(t.Context())
+       require.Error(t, err)
+       assert.Contains(t, err.Error(), dataSliceMiss,
+               "DeleteFile(data) must route into dataFilesToDelete; mis-routed 
it would surface the delete-file-slice error")
+       assert.NotContains(t, err.Error(), deleteFileSliceMiss)
+
+       delTx := tbl.NewTransaction()
+       err = delTx.NewRewrite(nil).
+               DeleteFile(strangerPosDel).
+               Commit(t.Context())
+       require.Error(t, err)
+       assert.Contains(t, err.Error(), deleteFileSliceMiss,
+               "DeleteFile(pos-delete) must route into deleteFilesToRemove; 
mis-routed it would surface the data-slice error")
+       assert.NotContains(t, err.Error(), dataSliceMiss)
+}
+
+// TestRewriteFiles_RejectsAddsWithoutDataDeletes proves that staging
+// new data files with no data files to delete is rejected up front —
+// otherwise the chain would slip through ReplaceFiles →
+// ReplaceDataFilesWithDataFiles → AddDataFiles (an OpAppend producer)
+// and silently tag the snapshot append instead of replace, with no
+// rewrite validator registered.
+func TestRewriteFiles_RejectsAddsWithoutDataDeletes(t *testing.T) {
+       tbl := newRewriteTestTable(t)
+       tx := tbl.NewTransaction()
+
+       add := newDataFile(t, tbl.Location()+"/data/lonely-add.parquet")
+
+       err := tx.NewRewrite(nil).AddDataFile(add).Commit(t.Context())
+       require.Error(t, err)
+       assert.ErrorIs(t, err, table.ErrInvalidOperation)
+       assert.Contains(t, err.Error(), "must delete at least one data file 
when adding")
+}
+
+func TestRewriteFiles_RejectsNilDataFile(t *testing.T) {
+       t.Run("DeleteFile", func(t *testing.T) {
+               tbl := newRewriteTestTable(t)
+               tx := tbl.NewTransaction()
+
+               err := tx.NewRewrite(nil).DeleteFile(nil).Commit(t.Context())
+               require.Error(t, err)
+               assert.ErrorIs(t, err, table.ErrInvalidOperation)
+               assert.Contains(t, err.Error(), "DeleteFile got nil data file")
+       })
+
+       t.Run("AddDataFile", func(t *testing.T) {
+               tbl := newRewriteTestTable(t)
+               tx := tbl.NewTransaction()
+
+               err := tx.NewRewrite(nil).AddDataFile(nil).Commit(t.Context())
+               require.Error(t, err)
+               assert.ErrorIs(t, err, table.ErrInvalidOperation)
+               assert.Contains(t, err.Error(), "AddDataFile got nil data file")
+       })
+}
+
+// TestRewriteFiles_DistributedEquivalence proves the worker+coordinator
+// pipeline lands at the same end state as the bundled in-process
+// [Transaction.RewriteDataFiles]: workers run [table.ExecuteCompactionGroup]
+// per group (here inline; in distributed compaction this happens on
+// remote peers and the results travel over the wire), then a single
+// leader transaction stages [table.RewriteFiles.Apply] +
+// [table.RewriteFiles.Commit] for one atomic snapshot.
+func TestRewriteFiles_DistributedEquivalence(t *testing.T) {
+       tbl := newRewriteTestTable(t)
+
+       arrowSc, err := table.SchemaToArrowSchema(tbl.Schema(), nil, false, 
false)
+       require.NoError(t, err)
+
+       for i := range 4 {
+               dataPath := tbl.Location() + 
fmt.Sprintf("/data/file-%d.parquet", i)
+               writeParquetFile(t, dataPath, arrowSc,
+                       fmt.Sprintf(`[{"id": %d, "data": "row-%d"}]`, i+1, i+1))
+               tx := tbl.NewTransaction()
+               require.NoError(t, tx.AddFiles(t.Context(), []string{dataPath}, 
nil, false))
+               tbl, err = tx.Commit(t.Context())
+               require.NoError(t, err)
+       }
+
+       tasks, err := tbl.Scan().PlanFiles(t.Context())
+       require.NoError(t, err)
+       require.Len(t, tasks, 4)
+
+       plan, err := defaultTestCompactionCfg.PlanCompaction(tasks)
+       require.NoError(t, err)
+       require.NotEmpty(t, plan.Groups)
+
+       groups := toTaskGroups(plan.Groups)
+
+       results := make([]table.CompactionGroupResult, 0, len(groups))
+       for _, g := range groups {
+               gr, err := table.ExecuteCompactionGroup(t.Context(), tbl, g)
+               require.NoError(t, err)
+               require.NotEmpty(t, gr.NewDataFiles)
+               results = append(results, gr)
+       }
+
+       leaderTxn := tbl.NewTransaction()
+       rewrite := leaderTxn.NewRewrite(nil)
+       for _, gr := range results {
+               rewrite.Apply(gr.OldDataFiles, gr.NewDataFiles, 
gr.SafePosDeletes)
+       }
+       require.NoError(t, rewrite.Commit(t.Context()))
+
+       committed, err := leaderTxn.Commit(t.Context())
+       require.NoError(t, err)
+
+       assertRowCount(t, committed, 4)
+
+       snap := committed.CurrentSnapshot()
+       require.NotNil(t, snap)
+       assert.Equal(t, table.OpReplace, snap.Summary.Operation,
+               "rewrite snapshot must be tagged replace, not overwrite")
+
+       // Lock the cross-client summary contract under OpReplace. PyIceberg
+       // (and other readers) parse these keys; the OpOverwrite → OpReplace
+       // flip must not silently drop any of them.
+       props := snap.Summary.Properties
+       require.NotNil(t, props)
+       assert.Equal(t, "1", props["added-data-files"])
+       assert.Equal(t, "4", props["deleted-data-files"])
+       assert.Equal(t, "4", props["added-records"])
+       assert.Equal(t, "4", props["deleted-records"])
+       assert.Equal(t, "1", props["total-data-files"])
+       assert.Equal(t, "4", props["total-records"])
+       assert.NotEmpty(t, props["added-files-size"], "byte counter must be 
populated")
+       assert.NotEmpty(t, props["removed-files-size"], "byte counter must be 
populated")
+       assert.NotEmpty(t, props["total-files-size"], "byte counter must be 
populated")
+
+       postTasks, err := committed.Scan().PlanFiles(t.Context())
+       require.NoError(t, err)
+       assert.Len(t, postTasks, 1, "leader-side commit must produce one 
consolidated file")
+}
+
+// TestRewriteFiles_DropsSafePositionDeletes drives the pipeline with a
+// position-delete present at plan time and asserts the pos-delete is
+// expunged in the rewrite snapshot.
+func TestRewriteFiles_DropsSafePositionDeletes(t *testing.T) {
+       tbl := newRewriteTestTable(t)
+
+       arrowSc, err := table.SchemaToArrowSchema(tbl.Schema(), nil, false, 
false)
+       require.NoError(t, err)
+
+       for i := range 3 {
+               dataPath := tbl.Location() + 
fmt.Sprintf("/data/file-%d.parquet", i)
+               writeParquetFile(t, dataPath, arrowSc, fmt.Sprintf(
+                       `[{"id": %d, "data": "a"}, {"id": %d, "data": "b"}]`, 
i*2+1, i*2+2))
+               tx := tbl.NewTransaction()
+               require.NoError(t, tx.AddFiles(t.Context(), []string{dataPath}, 
nil, false))
+               tbl, err = tx.Commit(t.Context())
+               require.NoError(t, err)
+       }
+
+       firstDataPath := tbl.Location() + "/data/file-0.parquet"
+       posDelPath := tbl.Location() + "/data/pos-del-001.parquet"
+       writeParquetFile(t, posDelPath, table.PositionalDeleteArrowSchema,
+               fmt.Sprintf(`[{"file_path": "%s", "pos": 0}]`, firstDataPath))
+
+       posDelBuilder, err := iceberg.NewDataFileBuilder(
+               *iceberg.UnpartitionedSpec, iceberg.EntryContentPosDeletes,
+               posDelPath, iceberg.ParquetFile, nil, nil, nil, 1, 128)
+       require.NoError(t, err)
+
+       tx := tbl.NewTransaction()
+       rd := tx.NewRowDelta(nil)
+       rd.AddDeletes(posDelBuilder.Build())
+       require.NoError(t, rd.Commit(t.Context()))
+       tbl, err = tx.Commit(t.Context())
+       require.NoError(t, err)
+       assertRowCount(t, tbl, 5)
+
+       tasks, err := tbl.Scan().PlanFiles(t.Context())
+       require.NoError(t, err)
+
+       cfg := defaultTestCompactionCfg
+       cfg.DeleteFileThreshold = 1
+       plan, err := cfg.PlanCompaction(tasks)
+       require.NoError(t, err)
+       require.NotEmpty(t, plan.Groups)
+
+       groups := toTaskGroups(plan.Groups)
+
+       results := make([]table.CompactionGroupResult, 0, len(groups))
+       totalSafe := 0
+       for _, g := range groups {
+               gr, err := table.ExecuteCompactionGroup(t.Context(), tbl, g)
+               require.NoError(t, err)
+               results = append(results, gr)
+               totalSafe += len(gr.SafePosDeletes)
+       }
+       require.Equal(t, 1, totalSafe, "the staged pos-delete must be reported 
safe by exactly one group")
+
+       leaderTxn := tbl.NewTransaction()
+       rewrite := leaderTxn.NewRewrite(nil)
+       for _, gr := range results {
+               rewrite.Apply(gr.OldDataFiles, gr.NewDataFiles, 
gr.SafePosDeletes)
+       }
+       require.NoError(t, rewrite.Commit(t.Context()))
+
+       committed, err := leaderTxn.Commit(t.Context())
+       require.NoError(t, err)
+
+       assertRowCount(t, committed, 5)
+
+       // Pos-delete removal must show up in the OpReplace summary —
+       // `removed-position-delete-files` (count of files) and
+       // `removed-position-deletes` (count of rows) are the keys other
+       // clients read.
+       snap := committed.CurrentSnapshot()
+       require.NotNil(t, snap)
+       props := snap.Summary.Properties
+       require.NotNil(t, props)
+       assert.Equal(t, "1", props["removed-position-delete-files"])
+       assert.Equal(t, "1", props["removed-position-deletes"])
+
+       postTasks, err := committed.Scan().PlanFiles(t.Context())
+       require.NoError(t, err)
+       for _, task := range postTasks {
+               assert.Empty(t, task.DeleteFiles,
+                       "safe pos-delete must be expunged in the rewrite 
snapshot")
+       }
+}
+
+// TestRewriteFiles_RejectsConcurrentEqDelete is the negative path: a
+// leader stages a rewrite, a concurrent peer commits an equality-delete
+// during the rewrite window, and the leader's [table.Transaction.Commit]
+// must fail with [table.ErrConflictingDeleteFiles] — proving the
+// rewrite-specific conflict validator that [table.RewriteFiles.Commit]
+// registers internally fires under refresh-and-replay.
+//
+// First Commit attempt fails on the stale AssertRefSnapshotID (the
+// catalog has advanced past the leader's base). The retry refreshes,
+// builds a fresh conflictContext walking S0 → S1, and the rewrite
+// validator's conservative rule rejects any concurrent equality-delete
+// during a rewrite — terminal exit before any further CommitTable
+// call.
+//
+// Equality deletes are used here rather than positional deletes because
+// the v2 manifest schema does not carry a pos-delete's referenced data
+// file path; the validator's pos-delete branch is only effective on v3
+// tables. The eq-delete branch is the conservative rule for v2.
+func TestRewriteFiles_RejectsConcurrentEqDelete(t *testing.T) {
+       tbl, cat := newConcurrentRewriteTestTable(t)
+
+       arrowSc, err := table.SchemaToArrowSchema(tbl.Schema(), nil, false, 
false)
+       require.NoError(t, err)
+
+       for i := range 3 {
+               dataPath := tbl.Location() + 
fmt.Sprintf("/data/file-%d.parquet", i)
+               writeParquetFile(t, dataPath, arrowSc,
+                       fmt.Sprintf(`[{"id": %d, "data": "row-%d"}]`, i+1, i+1))
+               tx := tbl.NewTransaction()
+               require.NoError(t, tx.AddFiles(t.Context(), []string{dataPath}, 
nil, false))
+               tbl, err = tx.Commit(t.Context())
+               require.NoError(t, err)
+       }
+
+       tasks, err := tbl.Scan().PlanFiles(t.Context())
+       require.NoError(t, err)
+
+       plan, err := defaultTestCompactionCfg.PlanCompaction(tasks)
+       require.NoError(t, err)
+       require.NotEmpty(t, plan.Groups)
+
+       groups := toTaskGroups(plan.Groups)
+
+       results := make([]table.CompactionGroupResult, 0, len(groups))
+       for _, g := range groups {
+               gr, err := table.ExecuteCompactionGroup(t.Context(), tbl, g)
+               require.NoError(t, err)
+               require.NotEmpty(t, gr.OldDataFiles)
+               results = append(results, gr)
+       }
+
+       leaderTxn := tbl.NewTransaction()
+       rewrite := leaderTxn.NewRewrite(nil)
+       for _, gr := range results {
+               rewrite.Apply(gr.OldDataFiles, gr.NewDataFiles, 
gr.SafePosDeletes)
+       }
+       require.NoError(t, rewrite.Commit(t.Context()),
+               "staging the rewrite must succeed; the conflict surfaces at 
Commit time")
+
+       eqDelPath := tbl.Location() + "/data/concurrent-eq-del.parquet"
+       eqDelBuilder, err := iceberg.NewDataFileBuilder(
+               *iceberg.UnpartitionedSpec, iceberg.EntryContentEqDeletes,
+               eqDelPath, iceberg.ParquetFile, nil, nil, nil, 1, 128)
+       require.NoError(t, err)
+       eqDelBuilder = eqDelBuilder.EqualityFieldIDs([]int{1})
+
+       peerTxn := tbl.NewTransaction()
+       rd := peerTxn.NewRowDelta(nil)
+       rd.AddDeletes(eqDelBuilder.Build())
+       require.NoError(t, rd.Commit(t.Context()))
+       _, err = peerTxn.Commit(t.Context())
+       require.NoError(t, err, "peer commit advances the catalog so the 
leader's first attempt fails")
+
+       beforeLeader := cat.attempts.Load()
+       _, err = leaderTxn.Commit(t.Context())
+       require.Error(t, err)
+       assert.ErrorIs(t, err, table.ErrConflictingDeleteFiles,
+               "refresh-and-replay must detect the concurrent equality-delete 
during a rewrite")
+       // The expected delta is exactly 1 CommitTable invocation, the
+       // first attempt that fails on the stale AssertRefSnapshotID. The
+       // retry refreshes the conflictContext, the rewrite validator
+       // rejects the concurrent eq-delete, and the leader exits before
+       // re-issuing CommitTable. A delta of 0 means the validator fired
+       // pre-flight; a delta of ≥2 means the retry reached the catalog.
+       assert.Equal(t, int32(1), cat.attempts.Load()-beforeLeader,
+               "only the stale-assertion attempt landed; the retry never 
reached CommitTable")
+}
diff --git a/table/row_delta.go b/table/row_delta.go
index 2f7de4f1..03c73613 100644
--- a/table/row_delta.go
+++ b/table/row_delta.go
@@ -174,7 +174,7 @@ func (rd *RowDelta) Commit(ctx context.Context) error {
        // fast-append producer's validator is a no-op; RowDelta semantics
        // (pos-delete references, eq-delete predicate) require a dedicated
        // check that snapshot_producers does not know about.
-       rd.txn.validators = append(rd.txn.validators, rd.validate)
+       rd.txn.addValidator(rd.validate)
 
        return rd.txn.apply(updates, reqs)
 }
diff --git a/table/transaction.go b/table/transaction.go
index 1ffa18de..2fdc9872 100644
--- a/table/transaction.go
+++ b/table/transaction.go
@@ -142,6 +142,18 @@ func (t *Transaction) apply(updates []Update, reqs 
[]Requirement) error {
        return nil
 }
 
+// addValidator appends a conflict validator under t.mx. Producers
+// that register validators from outside doCommit (RowDelta, RewriteFiles)
+// must use this helper rather than mutating t.validators directly —
+// the RewriteFiles type doc endorses fanout builders against a single
+// transaction, so the append races with Transaction.Commit's
+// validator read under t.mx.
+func (t *Transaction) addValidator(v conflictValidatorFunc) {
+       t.mx.Lock()
+       defer t.mx.Unlock()
+       t.validators = append(t.validators, v)
+}
+
 func (t *Transaction) appendSnapshotProducer(afs io.IO, props 
iceberg.Properties) *snapshotProducer {
        manifestMerge := t.meta.props.GetBool(ManifestMergeEnabledKey, 
ManifestMergeEnabledDefault)
        updateSnapshot := t.updateSnapshot(afs, props, OpAppend)
@@ -550,10 +562,13 @@ type dataFileCfg struct {
 // withRewriteSemantics marks an overwrite/replace operation as a
 // rewrite (compaction) rather than a user-facing overwrite. The
 // overwrite producer's default pre-commit conflict validator is
-// bypassed; the caller registers a rewrite-specific validator on the
-// transaction separately via validateNoNewDeletesForRewrittenFiles.
-// Unexported: only RewriteDataFiles passes this; there is no public
-// surface for user code to bypass overwrite isolation.
+// suppressed in favor of the rewrite-specific validator built by
+// [rewriteValidator] and queued onto the transaction's validator list.
+//
+// Unexported: the only safe way to flip this flag is via the
+// [RewriteFiles] builder ([Transaction.NewRewrite]), which always
+// pairs the suppression with the matching validator. Direct callers
+// of [Transaction.ReplaceFiles] cannot bypass overwrite isolation.
 func withRewriteSemantics() WriteOption {
        return func(cfg *dataFileCfg) {
                cfg.rewriteSemantics = true
@@ -765,8 +780,13 @@ func (t *Transaction) ReplaceDataFilesWithDataFiles(ctx 
context.Context, filesTo
                }
        }
 
+       op := OpOverwrite
+       if cfg.rewriteSemantics {
+               op = OpReplace
+       }
+
        commitUUID := uuid.New()
-       updater := t.updateSnapshot(fs, snapshotProps, 
OpOverwrite).mergeOverwrite(&commitUUID, nil)
+       updater := t.updateSnapshot(fs, snapshotProps, 
op).mergeOverwrite(&commitUUID, nil)
        if cfg.rewriteSemantics {
                // mergeOverwrite guarantees an *overwriteFiles producerImpl.
                updater.producerImpl.(*overwriteFiles).skipDefaultValidator = 
true
@@ -882,8 +902,13 @@ func (t *Transaction) ReplaceFiles(ctx context.Context, 
dataFilesToDelete, dataF
                }
        }
 
+       op := OpOverwrite
+       if cfg.rewriteSemantics {
+               op = OpReplace
+       }
+
        commitUUID := uuid.New()
-       updater := t.updateSnapshot(fs, snapshotProps, 
OpOverwrite).mergeOverwrite(&commitUUID, nil)
+       updater := t.updateSnapshot(fs, snapshotProps, 
op).mergeOverwrite(&commitUUID, nil)
        if cfg.rewriteSemantics {
                // mergeOverwrite guarantees an *overwriteFiles producerImpl.
                updater.producerImpl.(*overwriteFiles).skipDefaultValidator = 
true

Reply via email to