laskoviymishka commented on code in PR #1033:
URL: https://github.com/apache/iceberg-go/pull/1033#discussion_r3201805574


##########
table/rewrite_files.go:
##########
@@ -0,0 +1,168 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+       "context"
+       "errors"
+       "fmt"
+
+       "github.com/apache/iceberg-go"
+)
+
+// RewriteFiles is the snapshot-operation builder for rewrite
+// (compaction) commits. It is the snapshot-level sibling of [RowDelta]
+// and mirrors Java's org.apache.iceberg.RewriteFiles interface
+// (returned by Table.newRewrite() in Java).
+//
+// Compared to a raw [Transaction.ReplaceFiles] call, the builder
+// owns the rewrite-specific isolation contract internally:
+//
+//   - The overwrite producer's default isolation validator is suppressed
+//     (concurrent appends into rewritten partitions are allowed; this
+//     is the defining behavior of a rewrite).
+//   - A rewrite-specific conflict validator is registered so concurrent
+//     pos/eq-delete files targeting any rewritten data file are
+//     rejected pre-flight at [Transaction.Commit] time. Pos-delete
+//     conflict detection requires v3 manifests (which carry the
+//     referenced-data-file column); on v2, only the conservative
+//     eq-delete-during-rewrite rule fires.
+//
+// Distributed compaction coordinators construct one [RewriteFiles] on
+// the leader transaction, feed worker outputs in via [RewriteFiles.Apply],
+// and commit one snapshot. In-process callers can use
+// [Transaction.RewriteDataFiles] which drives this builder internally.
+//
+// Adding new delete files (e.g., rewriting position deletes into
+// deletion vectors) is not yet supported; [RewriteFiles.AddDataFile]
+// rejects pos/eq-delete inputs at commit time. Add the support to
+// the underlying [Transaction.ReplaceFiles] before lifting that
+// restriction.
+type RewriteFiles struct {
+       txn                 *Transaction
+       dataFilesToDelete   []iceberg.DataFile
+       dataFilesToAdd      []iceberg.DataFile
+       deleteFilesToRemove []iceberg.DataFile
+       snapshotProps       iceberg.Properties
+}
+
+// NewRewrite returns a [RewriteFiles] builder bound to this transaction.
+// Mirrors Java's org.apache.iceberg.Table#newRewrite. snapshotProps is
+// added to the rewrite snapshot's summary; pass nil for none.
+//
+// Usage:
+//
+//     rewrite := tx.NewRewrite(nil)
+//     rewrite.DeleteFile(oldDataFile)
+//     rewrite.AddDataFile(newDataFile)
+//     if err := rewrite.Commit(ctx); err != nil { ... }
+//     committed, err := tx.Commit(ctx)
+func (t *Transaction) NewRewrite(snapshotProps iceberg.Properties) 
*RewriteFiles {
+       return &RewriteFiles{txn: t, snapshotProps: snapshotProps}
+}
+
+// DeleteFile marks a file for removal in this rewrite. Routes by
+// content type: data files are queued as data-file replacements;
+// pos/eq-delete files are queued for delete-file removal alongside
+// the data rewrite (typical when a delete is fully applied to data
+// files being rewritten and is therefore safe to expunge).
+func (r *RewriteFiles) DeleteFile(df iceberg.DataFile) *RewriteFiles {
+       if df.ContentType() == iceberg.EntryContentData {
+               r.dataFilesToDelete = append(r.dataFilesToDelete, df)
+       } else {
+               r.deleteFilesToRemove = append(r.deleteFilesToRemove, df)
+       }
+
+       return r
+}
+
+// AddDataFile queues a new data file. Adding delete files is not yet
+// supported by the underlying snapshot machinery; passing a
+// pos/eq-delete here is reported at [RewriteFiles.Commit].
+func (r *RewriteFiles) AddDataFile(df iceberg.DataFile) *RewriteFiles {
+       r.dataFilesToAdd = append(r.dataFilesToAdd, df)
+
+       return r
+}
+
+// Apply is a bulk shortcut that routes a worker's outputs onto this
+// builder: every entry in deletes and safeDeletes is queued via
+// [RewriteFiles.DeleteFile] (which routes data vs. delete files by
+// content type), and every entry in adds via [RewriteFiles.AddDataFile].
+//
+// safeDeletes is the position-delete files referenced by tasks in
+// the rewrite group whose target data file is being rewritten — they
+// are safe to expunge in the rewrite snapshot. [CollectSafePositionDeletes]
+// computes this set; [ExecuteCompactionGroup] populates
+// [CompactionGroupResult.SafePosDeletes] from it.
+//
+// The typical distributed-coordinator pattern is one [RewriteFiles]
+// builder + one Apply call per worker result + one Commit:
+//
+//     rewrite := leaderTxn.NewRewrite(snapshotProps)
+//     for _, gr := range workerResults {
+//         rewrite.Apply(gr.OldDataFiles, gr.NewDataFiles, gr.SafePosDeletes)
+//     }
+//     if err := rewrite.Commit(ctx); err != nil { ... }
+func (r *RewriteFiles) Apply(deletes, adds, safeDeletes []iceberg.DataFile) 
*RewriteFiles {
+       for _, df := range deletes {
+               r.DeleteFile(df)
+       }
+       for _, df := range adds {
+               r.AddDataFile(df)
+       }
+       for _, df := range safeDeletes {
+               r.DeleteFile(df)
+       }
+
+       return r
+}
+
+// Commit stages the rewrite snapshot on the underlying transaction.
+// The catalog commit happens once, later, at [Transaction.Commit] time.
+//
+// Returns an error if the builder has no file changes, if any
+// [RewriteFiles.AddDataFile] input is not a data file, or if the underlying
+// [Transaction.ReplaceFiles] call fails.
+func (r *RewriteFiles) Commit(ctx context.Context) error {
+       if len(r.dataFilesToDelete) == 0 && len(r.dataFilesToAdd) == 0 && 
len(r.deleteFilesToRemove) == 0 {
+               return errors.New("rewrite must have at least one file change")

Review Comment:
   I'd wrap this with a sentinel rather than `errors.New` so callers can branch 
on it. `ErrInvalidOperation` already exists in this package and is what 
`ReplaceFiles` returns for the equivalent shape (`cannot replace files in a 
table without an existing snapshot`). A bare string here breaks `errors.Is` for 
anyone driving the builder programmatically.
   
   ```go
   return fmt.Errorf("%w: rewrite must have at least one file change", 
ErrInvalidOperation)
   ```



##########
table/rewrite_files.go:
##########
@@ -0,0 +1,168 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+       "context"
+       "errors"
+       "fmt"
+
+       "github.com/apache/iceberg-go"
+)
+
+// RewriteFiles is the snapshot-operation builder for rewrite
+// (compaction) commits. It is the snapshot-level sibling of [RowDelta]
+// and mirrors Java's org.apache.iceberg.RewriteFiles interface
+// (returned by Table.newRewrite() in Java).
+//
+// Compared to a raw [Transaction.ReplaceFiles] call, the builder
+// owns the rewrite-specific isolation contract internally:
+//
+//   - The overwrite producer's default isolation validator is suppressed
+//     (concurrent appends into rewritten partitions are allowed; this
+//     is the defining behavior of a rewrite).
+//   - A rewrite-specific conflict validator is registered so concurrent
+//     pos/eq-delete files targeting any rewritten data file are
+//     rejected pre-flight at [Transaction.Commit] time. Pos-delete
+//     conflict detection requires v3 manifests (which carry the
+//     referenced-data-file column); on v2, only the conservative
+//     eq-delete-during-rewrite rule fires.
+//
+// Distributed compaction coordinators construct one [RewriteFiles] on
+// the leader transaction, feed worker outputs in via [RewriteFiles.Apply],
+// and commit one snapshot. In-process callers can use
+// [Transaction.RewriteDataFiles] which drives this builder internally.
+//
+// Adding new delete files (e.g., rewriting position deletes into
+// deletion vectors) is not yet supported; [RewriteFiles.AddDataFile]
+// rejects pos/eq-delete inputs at commit time. Add the support to
+// the underlying [Transaction.ReplaceFiles] before lifting that
+// restriction.
+type RewriteFiles struct {
+       txn                 *Transaction
+       dataFilesToDelete   []iceberg.DataFile
+       dataFilesToAdd      []iceberg.DataFile
+       deleteFilesToRemove []iceberg.DataFile
+       snapshotProps       iceberg.Properties
+}
+
+// NewRewrite returns a [RewriteFiles] builder bound to this transaction.
+// Mirrors Java's org.apache.iceberg.Table#newRewrite. snapshotProps is
+// added to the rewrite snapshot's summary; pass nil for none.
+//
+// Usage:
+//
+//     rewrite := tx.NewRewrite(nil)
+//     rewrite.DeleteFile(oldDataFile)
+//     rewrite.AddDataFile(newDataFile)
+//     if err := rewrite.Commit(ctx); err != nil { ... }
+//     committed, err := tx.Commit(ctx)
+func (t *Transaction) NewRewrite(snapshotProps iceberg.Properties) 
*RewriteFiles {
+       return &RewriteFiles{txn: t, snapshotProps: snapshotProps}
+}
+
+// DeleteFile marks a file for removal in this rewrite. Routes by
+// content type: data files are queued as data-file replacements;
+// pos/eq-delete files are queued for delete-file removal alongside
+// the data rewrite (typical when a delete is fully applied to data
+// files being rewritten and is therefore safe to expunge).
+func (r *RewriteFiles) DeleteFile(df iceberg.DataFile) *RewriteFiles {
+       if df.ContentType() == iceberg.EntryContentData {
+               r.dataFilesToDelete = append(r.dataFilesToDelete, df)
+       } else {
+               r.deleteFilesToRemove = append(r.deleteFilesToRemove, df)
+       }
+
+       return r
+}
+
+// AddDataFile queues a new data file. Adding delete files is not yet
+// supported by the underlying snapshot machinery; passing a
+// pos/eq-delete here is reported at [RewriteFiles.Commit].
+func (r *RewriteFiles) AddDataFile(df iceberg.DataFile) *RewriteFiles {
+       r.dataFilesToAdd = append(r.dataFilesToAdd, df)
+
+       return r
+}
+
+// Apply is a bulk shortcut that routes a worker's outputs onto this
+// builder: every entry in deletes and safeDeletes is queued via
+// [RewriteFiles.DeleteFile] (which routes data vs. delete files by
+// content type), and every entry in adds via [RewriteFiles.AddDataFile].
+//
+// safeDeletes is the position-delete files referenced by tasks in
+// the rewrite group whose target data file is being rewritten — they
+// are safe to expunge in the rewrite snapshot. [CollectSafePositionDeletes]
+// computes this set; [ExecuteCompactionGroup] populates
+// [CompactionGroupResult.SafePosDeletes] from it.
+//
+// The typical distributed-coordinator pattern is one [RewriteFiles]
+// builder + one Apply call per worker result + one Commit:
+//
+//     rewrite := leaderTxn.NewRewrite(snapshotProps)
+//     for _, gr := range workerResults {
+//         rewrite.Apply(gr.OldDataFiles, gr.NewDataFiles, gr.SafePosDeletes)
+//     }
+//     if err := rewrite.Commit(ctx); err != nil { ... }
+func (r *RewriteFiles) Apply(deletes, adds, safeDeletes []iceberg.DataFile) 
*RewriteFiles {
+       for _, df := range deletes {
+               r.DeleteFile(df)
+       }
+       for _, df := range adds {
+               r.AddDataFile(df)
+       }
+       for _, df := range safeDeletes {
+               r.DeleteFile(df)
+       }
+
+       return r
+}
+
+// Commit stages the rewrite snapshot on the underlying transaction.
+// The catalog commit happens once, later, at [Transaction.Commit] time.
+//
+// Returns an error if the builder has no file changes, if any
+// [RewriteFiles.AddDataFile] input is not a data file, or if the underlying
+// [Transaction.ReplaceFiles] call fails.
+func (r *RewriteFiles) Commit(ctx context.Context) error {

Review Comment:
   I think `Commit` should be single-shot. As written, calling it twice 
re-appends a `rewriteValidator` to `r.txn.validators` and re-stages a 
`ReplaceFiles` against the (now mutated) transaction — Java's 
`BaseRewriteFiles` rejects this. A small `committed bool` guard plus an 
explicit error keeps the failure mode clean. wdyt?



##########
table/rewrite_files.go:
##########
@@ -0,0 +1,168 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+       "context"
+       "errors"
+       "fmt"
+
+       "github.com/apache/iceberg-go"
+)
+
+// RewriteFiles is the snapshot-operation builder for rewrite
+// (compaction) commits. It is the snapshot-level sibling of [RowDelta]
+// and mirrors Java's org.apache.iceberg.RewriteFiles interface
+// (returned by Table.newRewrite() in Java).
+//
+// Compared to a raw [Transaction.ReplaceFiles] call, the builder
+// owns the rewrite-specific isolation contract internally:
+//
+//   - The overwrite producer's default isolation validator is suppressed
+//     (concurrent appends into rewritten partitions are allowed; this
+//     is the defining behavior of a rewrite).
+//   - A rewrite-specific conflict validator is registered so concurrent
+//     pos/eq-delete files targeting any rewritten data file are
+//     rejected pre-flight at [Transaction.Commit] time. Pos-delete
+//     conflict detection requires v3 manifests (which carry the
+//     referenced-data-file column); on v2, only the conservative
+//     eq-delete-during-rewrite rule fires.
+//
+// Distributed compaction coordinators construct one [RewriteFiles] on
+// the leader transaction, feed worker outputs in via [RewriteFiles.Apply],
+// and commit one snapshot. In-process callers can use
+// [Transaction.RewriteDataFiles] which drives this builder internally.
+//
+// Adding new delete files (e.g., rewriting position deletes into
+// deletion vectors) is not yet supported; [RewriteFiles.AddDataFile]
+// rejects pos/eq-delete inputs at commit time. Add the support to
+// the underlying [Transaction.ReplaceFiles] before lifting that
+// restriction.
+type RewriteFiles struct {
+       txn                 *Transaction
+       dataFilesToDelete   []iceberg.DataFile
+       dataFilesToAdd      []iceberg.DataFile
+       deleteFilesToRemove []iceberg.DataFile
+       snapshotProps       iceberg.Properties
+}
+
+// NewRewrite returns a [RewriteFiles] builder bound to this transaction.
+// Mirrors Java's org.apache.iceberg.Table#newRewrite. snapshotProps is
+// added to the rewrite snapshot's summary; pass nil for none.
+//
+// Usage:
+//
+//     rewrite := tx.NewRewrite(nil)
+//     rewrite.DeleteFile(oldDataFile)
+//     rewrite.AddDataFile(newDataFile)
+//     if err := rewrite.Commit(ctx); err != nil { ... }
+//     committed, err := tx.Commit(ctx)
+func (t *Transaction) NewRewrite(snapshotProps iceberg.Properties) 
*RewriteFiles {
+       return &RewriteFiles{txn: t, snapshotProps: snapshotProps}
+}
+
+// DeleteFile marks a file for removal in this rewrite. Routes by
+// content type: data files are queued as data-file replacements;
+// pos/eq-delete files are queued for delete-file removal alongside
+// the data rewrite (typical when a delete is fully applied to data
+// files being rewritten and is therefore safe to expunge).
+func (r *RewriteFiles) DeleteFile(df iceberg.DataFile) *RewriteFiles {
+       if df.ContentType() == iceberg.EntryContentData {
+               r.dataFilesToDelete = append(r.dataFilesToDelete, df)
+       } else {
+               r.deleteFilesToRemove = append(r.deleteFilesToRemove, df)
+       }
+
+       return r
+}
+
+// AddDataFile queues a new data file. Adding delete files is not yet
+// supported by the underlying snapshot machinery; passing a
+// pos/eq-delete here is reported at [RewriteFiles.Commit].
+func (r *RewriteFiles) AddDataFile(df iceberg.DataFile) *RewriteFiles {

Review Comment:
   The validation deferral here interacts awkwardly with `Apply`: a worker that 
mistakenly puts a delete file into `adds` gets it queued silently, and Commit 
reports `index N` — but N is opaque to a caller that fed the builder via 
`Apply` (they never saw the queue order). I'd either reject at insertion time 
(return `*RewriteFiles` with a stashed error, drained at Commit), or at minimum 
log the file path prominently. The current message has the path but the index 
framing implies the caller chose it.
   
   While we're here — content-type routing in `DeleteFile` is permissive 
(anything non-`EntryContentData` lands in `deleteFilesToRemove`). For symmetry 
I'd at least assert it's pos/eq deletes there, otherwise a future content type 
silently slips through.



##########
table/rewrite_files.go:
##########
@@ -0,0 +1,168 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+       "context"
+       "errors"
+       "fmt"
+
+       "github.com/apache/iceberg-go"
+)
+
+// RewriteFiles is the snapshot-operation builder for rewrite
+// (compaction) commits. It is the snapshot-level sibling of [RowDelta]
+// and mirrors Java's org.apache.iceberg.RewriteFiles interface
+// (returned by Table.newRewrite() in Java).
+//
+// Compared to a raw [Transaction.ReplaceFiles] call, the builder
+// owns the rewrite-specific isolation contract internally:
+//
+//   - The overwrite producer's default isolation validator is suppressed
+//     (concurrent appends into rewritten partitions are allowed; this
+//     is the defining behavior of a rewrite).
+//   - A rewrite-specific conflict validator is registered so concurrent
+//     pos/eq-delete files targeting any rewritten data file are
+//     rejected pre-flight at [Transaction.Commit] time. Pos-delete
+//     conflict detection requires v3 manifests (which carry the

Review Comment:
   Minor accuracy on the V3 claim. `referenced_data_file` (field id 143) was 
added in V2 and is *optional* there; it became *required* in V3 only for 
deletion-vector deletes. So the pos-delete validator branch fires whenever the 
writer populated the column, regardless of format version — it just happens 
that V2 pos-delete writers commonly leave it empty, which is the real reason 
the conservative V2 fallback exists. I'd rephrase as "requires that the 
concurrent pos-delete writer populated the `referenced_data_file` column 
(always set under V3 deletion vectors; often unset in V2 pos-deletes)" so a 
reader doesn't conclude V2 + populated `referenced_data_file` is unsupported.



##########
table/rewrite_files.go:
##########
@@ -0,0 +1,168 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+       "context"
+       "errors"
+       "fmt"
+
+       "github.com/apache/iceberg-go"
+)
+
+// RewriteFiles is the snapshot-operation builder for rewrite
+// (compaction) commits. It is the snapshot-level sibling of [RowDelta]
+// and mirrors Java's org.apache.iceberg.RewriteFiles interface
+// (returned by Table.newRewrite() in Java).
+//
+// Compared to a raw [Transaction.ReplaceFiles] call, the builder
+// owns the rewrite-specific isolation contract internally:
+//
+//   - The overwrite producer's default isolation validator is suppressed
+//     (concurrent appends into rewritten partitions are allowed; this
+//     is the defining behavior of a rewrite).
+//   - A rewrite-specific conflict validator is registered so concurrent
+//     pos/eq-delete files targeting any rewritten data file are
+//     rejected pre-flight at [Transaction.Commit] time. Pos-delete
+//     conflict detection requires v3 manifests (which carry the
+//     referenced-data-file column); on v2, only the conservative
+//     eq-delete-during-rewrite rule fires.
+//
+// Distributed compaction coordinators construct one [RewriteFiles] on
+// the leader transaction, feed worker outputs in via [RewriteFiles.Apply],
+// and commit one snapshot. In-process callers can use
+// [Transaction.RewriteDataFiles] which drives this builder internally.
+//
+// Adding new delete files (e.g., rewriting position deletes into
+// deletion vectors) is not yet supported; [RewriteFiles.AddDataFile]
+// rejects pos/eq-delete inputs at commit time. Add the support to
+// the underlying [Transaction.ReplaceFiles] before lifting that
+// restriction.
+type RewriteFiles struct {
+       txn                 *Transaction
+       dataFilesToDelete   []iceberg.DataFile
+       dataFilesToAdd      []iceberg.DataFile
+       deleteFilesToRemove []iceberg.DataFile
+       snapshotProps       iceberg.Properties

Review Comment:
   `snapshotProps` is captured by reference. If a caller mutates the map 
between `NewRewrite` and `Commit` (or across multiple builders sharing the same 
map, as in `rewriteDataFilesPartial`), the staged snapshot summary changes. A 
`maps.Clone` here, or a doc note that the map must not be mutated after 
`NewRewrite` returns, would close the foot-gun.



##########
table/transaction.go:
##########
@@ -765,8 +768,13 @@ func (t *Transaction) ReplaceDataFilesWithDataFiles(ctx 
context.Context, filesTo
                }
        }
 
+       op := OpOverwrite

Review Comment:
   Tagging the snapshot `replace` is the right move for parity with Java's 
`BaseRewriteFiles.operation()`, but I'd want to check that the overwrite 
producer's summary writer still emits the right keys under this tag. Java's 
`RewriteFiles` snapshots carry `added-data-files`, `deleted-data-files`, 
`removed-position-delete-files` (and the byte counterparts) — if our 
`mergeOverwrite` path keys any summary entry on `OpOverwrite` specifically, the 
flip to `OpReplace` will silently drop it.
   
   Worth adding an assertion in one of the rewrite tests against 
`snap.Summary.Properties` — at minimum that `added-data-files`, 
`deleted-data-files`, `total-records`, `total-data-files` are populated. 
PyIceberg readers parse these; a missing field on `OpReplace` would be a 
cross-client interop regression.



##########
table/rewrite_data_files.go:
##########
@@ -70,16 +71,55 @@ type CompactionTaskGroup struct {
        TotalSizeBytes int64
 }
 
+// CompactionGroupResult is the per-group output of a compaction
+// worker: the new files written, the old files being replaced, and
+// the position delete files safe to expunge in the rewrite snapshot.
+//
+// A distributed coordinator aggregates results from N workers and
+// applies them to a [RewriteFiles] builder via [RewriteFiles.Apply]
+// to commit a single atomic snapshot. Each field is plain data
+// ([]iceberg.DataFile values plus scalars) — callers serialize the
+// contained DataFiles across process boundaries themselves; the
+// typical pattern is to have the worker write a manifest containing
+// the new files and ship the manifest path to the coordinator, which
+// re-reads it.
+type CompactionGroupResult struct {
+       // PartitionKey mirrors [CompactionTaskGroup.PartitionKey] for
+       // display/logging on the coordinator.
+       PartitionKey string
+
+       // OldDataFiles are the data files this group replaces.
+       OldDataFiles []iceberg.DataFile
+
+       // NewDataFiles are the consolidated outputs the worker wrote.
+       NewDataFiles []iceberg.DataFile
+
+       // SafePosDeletes are position-delete files referenced by tasks in
+       // this group whose target data file is being rewritten, computed
+       // via [CollectSafePositionDeletes]. They are safe to expunge in
+       // the rewrite snapshot.
+       SafePosDeletes []iceberg.DataFile
+
+       // BytesBefore is [CompactionTaskGroup.TotalSizeBytes] passed
+       // through, recorded so the coordinator can roll up metrics
+       // without re-reading the plan.
+       BytesBefore int64
+
+       // BytesAfter is the sum of [iceberg.DataFile.FileSizeBytes] across
+       // NewDataFiles.
+       BytesAfter int64
+}
+
 // RewriteDataFilesOptions bundles the per-rewrite knobs for
-// Transaction.RewriteDataFiles.
+// [Transaction.RewriteDataFiles].
 type RewriteDataFilesOptions struct {
-       // PartialProgress, when true, stages each group via ReplaceFiles
-       // inside the loop so work survives a mid-loop write failure. When
-       // false (the default), all groups are committed in a single atomic
-       // snapshot.
+       // PartialProgress, when true, stages each group via its own
+       // rewrite snapshot inside the loop so work survives a mid-loop

Review Comment:
   "so work survives a mid-loop write failure" reads as if there's catalog 
durability per group, but the catalog commit is still single, at 
`Transaction.Commit`. A process crash mid-loop loses everything just like the 
non-partial path. The disclaimer four lines down ("True per-group durability 
... requires committing separate transactions per group") effectively 
contradicts the lead.
   
   With the new `RewriteFiles` builder, the per-group-transaction pattern is 
now expressible by callers — I'd update the doc to say partial mode "stages 
each group as its own snapshot in the transaction so the in-memory transaction 
can be discarded by group rather than wholesale, and point at the `NewRewrite` 
builder for callers who want true per-group catalog durability".



##########
table/rewrite_data_files.go:
##########
@@ -142,91 +213,151 @@ func (t *Transaction) RewriteDataFiles(ctx 
context.Context, groups []CompactionT
                        continue
                }
 
-               // Read with deletes applied.
-               arrowSchema, records, err := scan.ReadTasks(ctx, group.Tasks)
+               gr, err := ExecuteCompactionGroup(ctx, t.tbl, group, 
opts.GroupOptions...)
                if err != nil {
-                       return result, fmt.Errorf("read tasks for compaction 
group %q: %w", group.PartitionKey, err)
+                       return result, err
                }
 
-               // Each compaction group is single-partition by construction, 
so the
-               // read stream is trivially clustered and we can use the 
clustered writer.
-               var newFiles []iceberg.DataFile
-               for df, err := range WriteRecords(ctx, t.tbl, arrowSchema, 
records, WithClusteredWrite()) {
-                       if err != nil {
-                               return result, fmt.Errorf("write compacted 
files for group %q: %w", group.PartitionKey, err)
-                       }
-                       newFiles = append(newFiles, df)
+               if len(gr.OldDataFiles) == 0 && len(gr.NewDataFiles) == 0 {
+                       continue
                }
 
-               // Collect old data files.
-               oldDataFiles := make([]iceberg.DataFile, 0, len(group.Tasks))
-               for _, task := range group.Tasks {
-                       oldDataFiles = append(oldDataFiles, task.File)
+               rewrite.Apply(gr.OldDataFiles, gr.NewDataFiles, 
gr.SafePosDeletes)
+               accumulateGroupMetrics(result, gr)
+       }
+
+       if result.RewrittenGroups == 0 {
+               return result, nil
+       }
+
+       for _, df := range opts.ExtraDeleteFilesToRemove {
+               rewrite.DeleteFile(df)
+               result.RemovedEqualityDeleteFiles++
+       }
+
+       if err := rewrite.Commit(ctx); err != nil {
+               return result, fmt.Errorf("commit compaction: %w", err)
+       }
+
+       return result, nil
+}
+
+// ExecuteCompactionGroup reads a compaction group's tasks (with
+// deletes applied), writes consolidated output files via
+// [WriteRecords], and computes the position-delete files safe to
+// expunge in the rewrite snapshot. It does not commit — the caller
+// hands the result to a coordinator that uses [Transaction.NewRewrite]
+// + [RewriteFiles.Apply] + [RewriteFiles.Commit] to stage the
+// atomic commit.
+//
+// Empty groups return a zero [CompactionGroupResult] without doing
+// any I/O.
+//
+// In-process callers should prefer [Transaction.RewriteDataFiles],
+// which drives this and the commit step in one call.
+//
+// Tunables are exposed via [CompactionGroupOption]. The clustered
+// write path is always used (a compaction group is single-partition
+// by construction so its read stream is trivially clustered).
+func ExecuteCompactionGroup(ctx context.Context, tbl *Table, group 
CompactionTaskGroup, opts ...CompactionGroupOption) (CompactionGroupResult, 
error) {
+       if len(group.Tasks) == 0 {
+               return CompactionGroupResult{PartitionKey: group.PartitionKey}, 
nil
+       }
+
+       cfg := compactionGroupConfig{}
+       for _, opt := range opts {
+               opt(&cfg)
+       }
+
+       var scanOpts []ScanOption
+       if cfg.scanConcurrency > 0 {
+               scanOpts = append(scanOpts, 
WitMaxConcurrency(cfg.scanConcurrency))
+       }
+
+       arrowSchema, records, err := tbl.Scan(scanOpts...).ReadTasks(ctx, 
group.Tasks)
+       if err != nil {
+               return CompactionGroupResult{}, fmt.Errorf("read tasks for 
compaction group %q: %w", group.PartitionKey, err)
+       }
+
+       // Each compaction group is single-partition by construction, so the
+       // read stream is trivially clustered and we can use the clustered 
writer.
+       writeOpts := []WriteRecordOption{WithClusteredWrite()}
+       if cfg.targetFileSize > 0 {
+               writeOpts = append(writeOpts, 
WithTargetFileSize(cfg.targetFileSize))
+       }
+
+       var (
+               newFiles   []iceberg.DataFile
+               bytesAfter int64
+       )
+       for df, err := range WriteRecords(ctx, tbl, arrowSchema, records, 
writeOpts...) {
+               if err != nil {
+                       return CompactionGroupResult{}, fmt.Errorf("write 
compacted files for group %q: %w", group.PartitionKey, err)
                }
+               newFiles = append(newFiles, df)
+               bytesAfter += df.FileSizeBytes()
+       }
+
+       oldFiles := make([]iceberg.DataFile, 0, len(group.Tasks))
+       for _, task := range group.Tasks {
+               oldFiles = append(oldFiles, task.File)
+       }
 
-               // Collect position delete files safe to remove.
-               safeDeletes := collectSafePositionDeletes(group.Tasks)
+       return CompactionGroupResult{
+               PartitionKey:   group.PartitionKey,
+               OldDataFiles:   oldFiles,
+               NewDataFiles:   newFiles,
+               SafePosDeletes: CollectSafePositionDeletes(group.Tasks),
+               BytesBefore:    group.TotalSizeBytes,
+               BytesAfter:     bytesAfter,
+       }, nil
+}
 
-               // Update result metrics.
-               var bytesAfter int64
-               for _, df := range newFiles {
-                       bytesAfter += df.FileSizeBytes()
+// rewriteDataFilesPartial stages each group as its own rewrite
+// snapshot via a fresh [RewriteFiles] builder. Each builder commits
+// independently inside the loop, so a mid-loop write failure leaves
+// already-staged groups in the transaction and the catalog still
+// receives them at [Transaction.Commit] time.
+func (t *Transaction) rewriteDataFilesPartial(ctx context.Context, groups 
[]CompactionTaskGroup, opts RewriteDataFilesOptions) (*RewriteResult, error) {
+       result := &RewriteResult{}
+
+       for _, group := range groups {
+               if err := ctx.Err(); err != nil {
+                       return result, err
                }
 
-               result.RewrittenGroups++
-               result.AddedDataFiles += len(newFiles)
-               result.RemovedDataFiles += len(oldDataFiles)
-               result.RemovedPositionDeleteFiles += len(safeDeletes)
-               result.BytesBefore += group.TotalSizeBytes
-               result.BytesAfter += bytesAfter
-
-               // Always accumulate across groups; partial-progress mode also
-               // stages each group via ReplaceFiles so work survives a
-               // mid-loop write failure, but the final catalog commit is
-               // always one atomic doCommit at Transaction.Commit() time.
-               allOldData = append(allOldData, oldDataFiles...)
-               allNewData = append(allNewData, newFiles...)
-               allOldDeletes = append(allOldDeletes, safeDeletes...)
-
-               if opts.PartialProgress {
-                       if err := t.ReplaceFiles(ctx, oldDataFiles, newFiles, 
safeDeletes, opts.SnapshotProps, withRewriteSemantics()); err != nil {
-                               return result, fmt.Errorf("commit compaction 
group %q: %w", group.PartitionKey, err)
-                       }
+               if len(group.Tasks) == 0 {
+                       continue
                }
-       }
 
-       // Register the rewrite-specific conflict validator covering every
-       // rewritten data file across every group. The validator list is
-       // drained at Transaction.Commit() → doCommit. Runs alongside the
-       // overwrite producer's suppressed validator (via
-       // withRewriteSemantics) so concurrent pos/eq-deletes targeting a
-       // rewritten file are caught pre-flight.
-       if len(allOldData) > 0 {
-               rewritten := make([]string, 0, len(allOldData))
-               for _, f := range allOldData {
-                       rewritten = append(rewritten, f.FilePath())
+               gr, err := ExecuteCompactionGroup(ctx, t.tbl, group, 
opts.GroupOptions...)
+               if err != nil {
+                       return result, err
                }
-               t.validators = append(t.validators, rewriteValidator(rewritten))
-       }
 
-       if !opts.PartialProgress {
-               // Caller-supplied dead eq-deletes (typically from
-               // [compaction.CollectDeadEqualityDeletes]). The caller is
-               // responsible for computing these against the same snapshot
-               // this transaction is staged on.
-               if len(opts.ExtraDeleteFilesToRemove) > 0 {
-                       allOldDeletes = append(allOldDeletes, 
opts.ExtraDeleteFilesToRemove...)
-                       result.RemovedEqualityDeleteFiles += 
len(opts.ExtraDeleteFilesToRemove)
+               if len(gr.OldDataFiles) == 0 && len(gr.NewDataFiles) == 0 {
+                       continue
                }
 
-               if err := t.ReplaceFiles(ctx, allOldData, allNewData, 
allOldDeletes, opts.SnapshotProps, withRewriteSemantics()); err != nil {
-                       return result, fmt.Errorf("commit compaction: %w", err)
+               if err := 
t.NewRewrite(opts.SnapshotProps).Apply(gr.OldDataFiles, gr.NewDataFiles, 
gr.SafePosDeletes).Commit(ctx); err != nil {

Review Comment:
   Each loop iteration constructs `t.NewRewrite(opts.SnapshotProps)` with the 
same `opts.SnapshotProps` map. So whatever caller-supplied properties are in 
there land on every per-group snapshot summary. Most use cases (free-form 
properties like an actor or dag-run id) are fine with this, but if the caller 
pre-populates anything that's meant to be unique per snapshot they'll get 
duplicates. Worth a one-line doc note on 
`RewriteDataFilesOptions.SnapshotProps` that in partial mode it's applied to 
every snapshot, not summed.



##########
table/rewrite_files_test.go:
##########
@@ -0,0 +1,442 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table_test
+
+// Black-box coverage for the [table.RewriteFiles] snapshot-operation
+// builder, including the supporting [table.ExecuteCompactionGroup]
+// worker function and the [table.CollectSafePositionDeletes]
+// predicate. Tests cover the in-process path (one transaction stages
+// and commits) and the distributed-coordinator path (workers produce
+// [table.CompactionGroupResult]s, a leader builds a single
+// [table.RewriteFiles] from them and commits).
+
+import (
+       "context"
+       "fmt"
+       "path/filepath"
+       "strings"
+       "sync/atomic"
+       "testing"
+
+       "github.com/apache/iceberg-go"
+       iceio "github.com/apache/iceberg-go/io"
+       "github.com/apache/iceberg-go/table"
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+)
+
+// concurrentTestCatalog enforces Requirement.Validate against its
+// current metadata on every CommitTable, so a leader transaction
+// whose AssertRefSnapshotID points at a stale snapshot fails its
+// first attempt and triggers refresh-and-replay. attempts counts
+// CommitTable invocations so tests can prove the retry boundary.
+type concurrentTestCatalog struct {
+       metadata table.Metadata
+       location string
+       fsF      table.FSysF
+       attempts atomic.Int32
+}
+
+func (c *concurrentTestCatalog) LoadTable(_ context.Context, ident 
table.Identifier) (*table.Table, error) {
+       return table.New(ident, c.metadata, c.location, c.fsF, c), nil
+}
+
+func (c *concurrentTestCatalog) CommitTable(_ context.Context, _ 
table.Identifier, reqs []table.Requirement, updates []table.Update) 
(table.Metadata, string, error) {
+       c.attempts.Add(1)
+       for _, req := range reqs {
+               if err := req.Validate(c.metadata); err != nil {
+                       return nil, "", fmt.Errorf("%w: %w", 
table.ErrCommitFailed, err)
+               }
+       }
+       meta, err := table.UpdateTableMetadata(c.metadata, updates, "")
+       if err != nil {
+               return nil, "", err
+       }
+       c.metadata = meta
+
+       // Returning the seed location is enough to keep subsequent
+       // NewTransaction calls from adding AssertCreate; the value is not
+       // re-read by the table machinery between commits in this stub.
+       return meta, c.location, nil
+}
+
+func newConcurrentRewriteTestTable(t *testing.T) (*table.Table, 
*concurrentTestCatalog) {
+       t.Helper()
+
+       location := filepath.ToSlash(t.TempDir())
+       schema := iceberg.NewSchema(0,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
+               iceberg.NestedField{ID: 2, Name: "data", Type: 
iceberg.PrimitiveTypes.String, Required: false},
+       )
+       meta, err := table.NewMetadata(schema, iceberg.UnpartitionedSpec, 
table.UnsortedSortOrder, location,
+               iceberg.Properties{
+                       table.PropertyFormatVersion:        "2",
+                       table.CommitNumRetriesKey:          "2",
+                       table.CommitMinRetryWaitMsKey:      "1",
+                       table.CommitMaxRetryWaitMsKey:      "2",
+                       table.CommitTotalRetryTimeoutMsKey: "1000",
+               })
+       require.NoError(t, err)
+
+       metaLoc := location + "/metadata/v1.metadata.json"
+       fsF := func(context.Context) (iceio.IO, error) { return 
iceio.LocalFS{}, nil }
+       cat := &concurrentTestCatalog{metadata: meta, location: metaLoc, fsF: 
fsF}
+
+       return table.New(table.Identifier{"db", "concurrent_rewrite"}, meta, 
metaLoc, fsF, cat), cat
+}
+
+func newPosDeleteFile(t *testing.T, path string) iceberg.DataFile {
+       t.Helper()
+
+       b, err := iceberg.NewDataFileBuilder(
+               *iceberg.UnpartitionedSpec, iceberg.EntryContentPosDeletes,
+               path, iceberg.ParquetFile, nil, nil, nil, 1, 128)
+       require.NoError(t, err)
+
+       return b.Build()
+}
+
+func newEqDeleteFile(t *testing.T, path string) iceberg.DataFile {
+       t.Helper()
+
+       b, err := iceberg.NewDataFileBuilder(
+               *iceberg.UnpartitionedSpec, iceberg.EntryContentEqDeletes,
+               path, iceberg.ParquetFile, nil, nil, nil, 1, 128)
+       require.NoError(t, err)
+       b = b.EqualityFieldIDs([]int{1})
+
+       return b.Build()
+}
+
+func newDataFile(t *testing.T, path string) iceberg.DataFile {
+       t.Helper()
+
+       b, err := iceberg.NewDataFileBuilder(
+               *iceberg.UnpartitionedSpec, iceberg.EntryContentData,
+               path, iceberg.ParquetFile, nil, nil, nil, 1, 128)
+       require.NoError(t, err)
+
+       return b.Build()
+}
+
+func TestCollectSafePositionDeletes_FiltersAndDedupes(t *testing.T) {
+       posA := newPosDeleteFile(t, "pos-a.parquet")
+       posB := newPosDeleteFile(t, "pos-b.parquet")
+       eq := newEqDeleteFile(t, "eq.parquet")
+
+       tasks := []table.FileScanTask{
+               {DeleteFiles: []iceberg.DataFile{posA, posB}, 
EqualityDeleteFiles: []iceberg.DataFile{eq}},
+               {DeleteFiles: []iceberg.DataFile{posA}}, // duplicate of posA
+       }
+
+       got := table.CollectSafePositionDeletes(tasks)
+       require.Len(t, got, 2, "duplicate pos-deletes must be deduped, 
eq-deletes must be filtered out")
+
+       paths := []string{got[0].FilePath(), got[1].FilePath()}
+       assert.ElementsMatch(t, []string{"pos-a.parquet", "pos-b.parquet"}, 
paths)
+}
+
+func TestCollectSafePositionDeletes_EmptyTasks(t *testing.T) {
+       assert.Empty(t, table.CollectSafePositionDeletes(nil))
+       assert.Empty(t, 
table.CollectSafePositionDeletes([]table.FileScanTask{{}}))
+}
+
+func TestExecuteCompactionGroup_EmptyGroup(t *testing.T) {
+       tbl := newRewriteTestTable(t)
+
+       got, err := table.ExecuteCompactionGroup(t.Context(), tbl,
+               table.CompactionTaskGroup{PartitionKey: "p"})
+       require.NoError(t, err)
+       assert.Equal(t, "p", got.PartitionKey)
+       assert.Empty(t, got.OldDataFiles)
+       assert.Empty(t, got.NewDataFiles)
+       assert.Empty(t, got.SafePosDeletes)
+}
+
+func TestRewriteFiles_EmptyCommit_Errors(t *testing.T) {
+       tbl := newRewriteTestTable(t)
+       tx := tbl.NewTransaction()
+
+       err := tx.NewRewrite(nil).Commit(t.Context())
+       require.Error(t, err, "an empty rewrite has nothing to stage and must 
reject")
+       assert.Contains(t, err.Error(), "at least one file change")
+}
+
+func TestRewriteFiles_AddDataFile_RejectsNonDataFile(t *testing.T) {
+       tbl := newRewriteTestTable(t)
+       tx := tbl.NewTransaction()
+
+       posDel := newPosDeleteFile(t, "spurious-pos-del.parquet")
+
+       err := tx.NewRewrite(nil).AddDataFile(posDel).Commit(t.Context())
+       require.Error(t, err)
+       assert.Contains(t, err.Error(), "AddDataFile only supports data files",
+               "adding a delete file via AddDataFile must be reported at 
commit time")
+}
+
+func TestRewriteFiles_DeleteFile_RoutesByContentType(t *testing.T) {
+       // Validates the routing via observable behavior: a builder that
+       // is given only a pos-delete to delete (no data files) must reach
+       // ReplaceFiles, where the underlying check rejects removing a
+       // delete file that is not in the table. That distinct error proves
+       // DeleteFile routed it to deleteFilesToRemove (not dataFilesToDelete).
+       tbl := newRewriteTestTable(t)
+       tx := tbl.NewTransaction()
+
+       dummyData := newDataFile(t, tbl.Location()+"/data/dummy.parquet")
+       posDel := newPosDeleteFile(t, 
tbl.Location()+"/data/spurious-pos-del.parquet")
+
+       err := tx.NewRewrite(nil).
+               DeleteFile(dummyData).
+               AddDataFile(newDataFile(t, 
tbl.Location()+"/data/replacement.parquet")).
+               DeleteFile(posDel).
+               Commit(t.Context())
+       require.Error(t, err, "ReplaceFiles fails because the dummy files are 
not in the empty table")
+       // Any of these messages confirms we got past the builder's own
+       // validation and into ReplaceFiles' membership checks — i.e., the
+       // pos-delete was routed to deleteFilesToRemove and the data file
+       // to dataFilesToDelete.
+       msg := err.Error()
+       assert.True(t,
+               strings.Contains(msg, "do not belong to the table") ||
+                       strings.Contains(msg, "without an existing snapshot"),
+               "got %q — expected the underlying ReplaceFiles 
membership/snapshot error", msg)

Review Comment:
   This assertion would still pass if `DeleteFile` mis-routed both files to 
`dataFilesToDelete` (the resulting `ReplaceFiles` would also error with `do not 
belong to the table`). The substring check proves we got into `ReplaceFiles`, 
not that the pos-delete went into the right slice.
   
   A tighter shape: stage two builders, one with the pos-delete and one with 
the data file, each with a benign add. Each should produce a different error 
class — `do not belong to the table` (data slice, membership check) vs. the 
delete-files-not-in-table message. wdyt?



##########
table/rewrite_files_test.go:
##########
@@ -0,0 +1,442 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table_test
+
+// Black-box coverage for the [table.RewriteFiles] snapshot-operation
+// builder, including the supporting [table.ExecuteCompactionGroup]
+// worker function and the [table.CollectSafePositionDeletes]
+// predicate. Tests cover the in-process path (one transaction stages
+// and commits) and the distributed-coordinator path (workers produce
+// [table.CompactionGroupResult]s, a leader builds a single
+// [table.RewriteFiles] from them and commits).
+
+import (
+       "context"
+       "fmt"
+       "path/filepath"
+       "strings"
+       "sync/atomic"
+       "testing"
+
+       "github.com/apache/iceberg-go"
+       iceio "github.com/apache/iceberg-go/io"
+       "github.com/apache/iceberg-go/table"
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+)
+
+// concurrentTestCatalog enforces Requirement.Validate against its
+// current metadata on every CommitTable, so a leader transaction
+// whose AssertRefSnapshotID points at a stale snapshot fails its
+// first attempt and triggers refresh-and-replay. attempts counts
+// CommitTable invocations so tests can prove the retry boundary.
+type concurrentTestCatalog struct {
+       metadata table.Metadata
+       location string
+       fsF      table.FSysF
+       attempts atomic.Int32
+}
+
+func (c *concurrentTestCatalog) LoadTable(_ context.Context, ident 
table.Identifier) (*table.Table, error) {
+       return table.New(ident, c.metadata, c.location, c.fsF, c), nil
+}
+
+func (c *concurrentTestCatalog) CommitTable(_ context.Context, _ 
table.Identifier, reqs []table.Requirement, updates []table.Update) 
(table.Metadata, string, error) {
+       c.attempts.Add(1)
+       for _, req := range reqs {
+               if err := req.Validate(c.metadata); err != nil {
+                       return nil, "", fmt.Errorf("%w: %w", 
table.ErrCommitFailed, err)
+               }
+       }
+       meta, err := table.UpdateTableMetadata(c.metadata, updates, "")
+       if err != nil {
+               return nil, "", err
+       }
+       c.metadata = meta
+
+       // Returning the seed location is enough to keep subsequent
+       // NewTransaction calls from adding AssertCreate; the value is not
+       // re-read by the table machinery between commits in this stub.
+       return meta, c.location, nil
+}
+
+func newConcurrentRewriteTestTable(t *testing.T) (*table.Table, 
*concurrentTestCatalog) {
+       t.Helper()
+
+       location := filepath.ToSlash(t.TempDir())
+       schema := iceberg.NewSchema(0,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
+               iceberg.NestedField{ID: 2, Name: "data", Type: 
iceberg.PrimitiveTypes.String, Required: false},
+       )
+       meta, err := table.NewMetadata(schema, iceberg.UnpartitionedSpec, 
table.UnsortedSortOrder, location,
+               iceberg.Properties{
+                       table.PropertyFormatVersion:        "2",
+                       table.CommitNumRetriesKey:          "2",
+                       table.CommitMinRetryWaitMsKey:      "1",
+                       table.CommitMaxRetryWaitMsKey:      "2",
+                       table.CommitTotalRetryTimeoutMsKey: "1000",
+               })
+       require.NoError(t, err)
+
+       metaLoc := location + "/metadata/v1.metadata.json"
+       fsF := func(context.Context) (iceio.IO, error) { return 
iceio.LocalFS{}, nil }
+       cat := &concurrentTestCatalog{metadata: meta, location: metaLoc, fsF: 
fsF}
+
+       return table.New(table.Identifier{"db", "concurrent_rewrite"}, meta, 
metaLoc, fsF, cat), cat
+}
+
+func newPosDeleteFile(t *testing.T, path string) iceberg.DataFile {
+       t.Helper()
+
+       b, err := iceberg.NewDataFileBuilder(
+               *iceberg.UnpartitionedSpec, iceberg.EntryContentPosDeletes,
+               path, iceberg.ParquetFile, nil, nil, nil, 1, 128)
+       require.NoError(t, err)
+
+       return b.Build()
+}
+
+func newEqDeleteFile(t *testing.T, path string) iceberg.DataFile {
+       t.Helper()
+
+       b, err := iceberg.NewDataFileBuilder(
+               *iceberg.UnpartitionedSpec, iceberg.EntryContentEqDeletes,
+               path, iceberg.ParquetFile, nil, nil, nil, 1, 128)
+       require.NoError(t, err)
+       b = b.EqualityFieldIDs([]int{1})
+
+       return b.Build()
+}
+
+func newDataFile(t *testing.T, path string) iceberg.DataFile {
+       t.Helper()
+
+       b, err := iceberg.NewDataFileBuilder(
+               *iceberg.UnpartitionedSpec, iceberg.EntryContentData,
+               path, iceberg.ParquetFile, nil, nil, nil, 1, 128)
+       require.NoError(t, err)
+
+       return b.Build()
+}
+
+func TestCollectSafePositionDeletes_FiltersAndDedupes(t *testing.T) {
+       posA := newPosDeleteFile(t, "pos-a.parquet")
+       posB := newPosDeleteFile(t, "pos-b.parquet")
+       eq := newEqDeleteFile(t, "eq.parquet")
+
+       tasks := []table.FileScanTask{
+               {DeleteFiles: []iceberg.DataFile{posA, posB}, 
EqualityDeleteFiles: []iceberg.DataFile{eq}},
+               {DeleteFiles: []iceberg.DataFile{posA}}, // duplicate of posA
+       }
+
+       got := table.CollectSafePositionDeletes(tasks)
+       require.Len(t, got, 2, "duplicate pos-deletes must be deduped, 
eq-deletes must be filtered out")
+
+       paths := []string{got[0].FilePath(), got[1].FilePath()}
+       assert.ElementsMatch(t, []string{"pos-a.parquet", "pos-b.parquet"}, 
paths)
+}
+
+func TestCollectSafePositionDeletes_EmptyTasks(t *testing.T) {
+       assert.Empty(t, table.CollectSafePositionDeletes(nil))
+       assert.Empty(t, 
table.CollectSafePositionDeletes([]table.FileScanTask{{}}))
+}
+
+func TestExecuteCompactionGroup_EmptyGroup(t *testing.T) {
+       tbl := newRewriteTestTable(t)
+
+       got, err := table.ExecuteCompactionGroup(t.Context(), tbl,
+               table.CompactionTaskGroup{PartitionKey: "p"})
+       require.NoError(t, err)
+       assert.Equal(t, "p", got.PartitionKey)
+       assert.Empty(t, got.OldDataFiles)
+       assert.Empty(t, got.NewDataFiles)
+       assert.Empty(t, got.SafePosDeletes)
+}
+
+func TestRewriteFiles_EmptyCommit_Errors(t *testing.T) {
+       tbl := newRewriteTestTable(t)
+       tx := tbl.NewTransaction()
+
+       err := tx.NewRewrite(nil).Commit(t.Context())
+       require.Error(t, err, "an empty rewrite has nothing to stage and must 
reject")
+       assert.Contains(t, err.Error(), "at least one file change")
+}
+
+func TestRewriteFiles_AddDataFile_RejectsNonDataFile(t *testing.T) {
+       tbl := newRewriteTestTable(t)
+       tx := tbl.NewTransaction()
+
+       posDel := newPosDeleteFile(t, "spurious-pos-del.parquet")
+
+       err := tx.NewRewrite(nil).AddDataFile(posDel).Commit(t.Context())
+       require.Error(t, err)
+       assert.Contains(t, err.Error(), "AddDataFile only supports data files",
+               "adding a delete file via AddDataFile must be reported at 
commit time")
+}
+
+func TestRewriteFiles_DeleteFile_RoutesByContentType(t *testing.T) {
+       // Validates the routing via observable behavior: a builder that
+       // is given only a pos-delete to delete (no data files) must reach
+       // ReplaceFiles, where the underlying check rejects removing a
+       // delete file that is not in the table. That distinct error proves
+       // DeleteFile routed it to deleteFilesToRemove (not dataFilesToDelete).
+       tbl := newRewriteTestTable(t)
+       tx := tbl.NewTransaction()
+
+       dummyData := newDataFile(t, tbl.Location()+"/data/dummy.parquet")
+       posDel := newPosDeleteFile(t, 
tbl.Location()+"/data/spurious-pos-del.parquet")
+
+       err := tx.NewRewrite(nil).
+               DeleteFile(dummyData).
+               AddDataFile(newDataFile(t, 
tbl.Location()+"/data/replacement.parquet")).
+               DeleteFile(posDel).
+               Commit(t.Context())
+       require.Error(t, err, "ReplaceFiles fails because the dummy files are 
not in the empty table")
+       // Any of these messages confirms we got past the builder's own
+       // validation and into ReplaceFiles' membership checks — i.e., the
+       // pos-delete was routed to deleteFilesToRemove and the data file
+       // to dataFilesToDelete.
+       msg := err.Error()
+       assert.True(t,
+               strings.Contains(msg, "do not belong to the table") ||
+                       strings.Contains(msg, "without an existing snapshot"),
+               "got %q — expected the underlying ReplaceFiles 
membership/snapshot error", msg)
+}
+
+// TestRewriteFiles_DistributedEquivalence proves the worker+coordinator
+// pipeline lands at the same end state as the bundled in-process
+// [Transaction.RewriteDataFiles]: workers run [table.ExecuteCompactionGroup]
+// per group (here inline; in distributed compaction this happens on
+// remote peers and the results travel over the wire), then a single
+// leader transaction stages [table.RewriteFiles.Apply] +
+// [table.RewriteFiles.Commit] for one atomic snapshot.
+func TestRewriteFiles_DistributedEquivalence(t *testing.T) {
+       tbl := newRewriteTestTable(t)
+
+       arrowSc, err := table.SchemaToArrowSchema(tbl.Schema(), nil, false, 
false)
+       require.NoError(t, err)
+
+       for i := range 4 {
+               dataPath := tbl.Location() + 
fmt.Sprintf("/data/file-%d.parquet", i)
+               writeParquetFile(t, dataPath, arrowSc,
+                       fmt.Sprintf(`[{"id": %d, "data": "row-%d"}]`, i+1, i+1))
+               tx := tbl.NewTransaction()
+               require.NoError(t, tx.AddFiles(t.Context(), []string{dataPath}, 
nil, false))
+               tbl, err = tx.Commit(t.Context())
+               require.NoError(t, err)
+       }
+
+       tasks, err := tbl.Scan().PlanFiles(t.Context())
+       require.NoError(t, err)
+       require.Len(t, tasks, 4)
+
+       plan, err := defaultTestCompactionCfg.PlanCompaction(tasks)
+       require.NoError(t, err)
+       require.NotEmpty(t, plan.Groups)
+
+       groups := toTaskGroups(plan.Groups)
+
+       results := make([]table.CompactionGroupResult, 0, len(groups))
+       for _, g := range groups {
+               gr, err := table.ExecuteCompactionGroup(t.Context(), tbl, g)
+               require.NoError(t, err)
+               require.NotEmpty(t, gr.NewDataFiles)
+               results = append(results, gr)
+       }
+
+       leaderTxn := tbl.NewTransaction()
+       rewrite := leaderTxn.NewRewrite(nil)
+       for _, gr := range results {
+               rewrite.Apply(gr.OldDataFiles, gr.NewDataFiles, 
gr.SafePosDeletes)
+       }
+       require.NoError(t, rewrite.Commit(t.Context()))
+
+       committed, err := leaderTxn.Commit(t.Context())
+       require.NoError(t, err)
+
+       assertRowCount(t, committed, 4)
+
+       snap := committed.CurrentSnapshot()
+       require.NotNil(t, snap)
+       assert.Equal(t, table.OpReplace, snap.Summary.Operation,
+               "rewrite snapshot must be tagged replace, not overwrite")
+
+       postTasks, err := committed.Scan().PlanFiles(t.Context())
+       require.NoError(t, err)
+       assert.Len(t, postTasks, 1, "leader-side commit must produce one 
consolidated file")
+}
+
+// TestRewriteFiles_DropsSafePositionDeletes drives the pipeline with a
+// position-delete present at plan time and asserts the pos-delete is
+// expunged in the rewrite snapshot.
+func TestRewriteFiles_DropsSafePositionDeletes(t *testing.T) {
+       tbl := newRewriteTestTable(t)
+
+       arrowSc, err := table.SchemaToArrowSchema(tbl.Schema(), nil, false, 
false)
+       require.NoError(t, err)
+
+       for i := range 3 {
+               dataPath := tbl.Location() + 
fmt.Sprintf("/data/file-%d.parquet", i)
+               writeParquetFile(t, dataPath, arrowSc, fmt.Sprintf(
+                       `[{"id": %d, "data": "a"}, {"id": %d, "data": "b"}]`, 
i*2+1, i*2+2))
+               tx := tbl.NewTransaction()
+               require.NoError(t, tx.AddFiles(t.Context(), []string{dataPath}, 
nil, false))
+               tbl, err = tx.Commit(t.Context())
+               require.NoError(t, err)
+       }
+
+       firstDataPath := tbl.Location() + "/data/file-0.parquet"
+       posDelPath := tbl.Location() + "/data/pos-del-001.parquet"
+       writeParquetFile(t, posDelPath, table.PositionalDeleteArrowSchema,
+               fmt.Sprintf(`[{"file_path": "%s", "pos": 0}]`, firstDataPath))
+
+       posDelBuilder, err := iceberg.NewDataFileBuilder(
+               *iceberg.UnpartitionedSpec, iceberg.EntryContentPosDeletes,
+               posDelPath, iceberg.ParquetFile, nil, nil, nil, 1, 128)
+       require.NoError(t, err)
+
+       tx := tbl.NewTransaction()
+       rd := tx.NewRowDelta(nil)
+       rd.AddDeletes(posDelBuilder.Build())
+       require.NoError(t, rd.Commit(t.Context()))
+       tbl, err = tx.Commit(t.Context())
+       require.NoError(t, err)
+       assertRowCount(t, tbl, 5)
+
+       tasks, err := tbl.Scan().PlanFiles(t.Context())
+       require.NoError(t, err)
+
+       cfg := defaultTestCompactionCfg
+       cfg.DeleteFileThreshold = 1
+       plan, err := cfg.PlanCompaction(tasks)
+       require.NoError(t, err)
+       require.NotEmpty(t, plan.Groups)
+
+       groups := toTaskGroups(plan.Groups)
+
+       results := make([]table.CompactionGroupResult, 0, len(groups))
+       totalSafe := 0
+       for _, g := range groups {
+               gr, err := table.ExecuteCompactionGroup(t.Context(), tbl, g)
+               require.NoError(t, err)
+               results = append(results, gr)
+               totalSafe += len(gr.SafePosDeletes)
+       }
+       require.Equal(t, 1, totalSafe, "the staged pos-delete must be reported 
safe by exactly one group")
+
+       leaderTxn := tbl.NewTransaction()
+       rewrite := leaderTxn.NewRewrite(nil)
+       for _, gr := range results {
+               rewrite.Apply(gr.OldDataFiles, gr.NewDataFiles, 
gr.SafePosDeletes)
+       }
+       require.NoError(t, rewrite.Commit(t.Context()))
+
+       committed, err := leaderTxn.Commit(t.Context())
+       require.NoError(t, err)
+
+       assertRowCount(t, committed, 5)
+
+       postTasks, err := committed.Scan().PlanFiles(t.Context())
+       require.NoError(t, err)
+       for _, task := range postTasks {
+               assert.Empty(t, task.DeleteFiles,
+                       "safe pos-delete must be expunged in the rewrite 
snapshot")
+       }
+}
+
+// TestRewriteFiles_RejectsConcurrentEqDelete is the negative path: a
+// leader stages a rewrite, a concurrent peer commits an equality-delete
+// during the rewrite window, and the leader's [table.Transaction.Commit]
+// must fail with [table.ErrConflictingDeleteFiles] — proving the
+// rewrite-specific conflict validator that [table.RewriteFiles.Commit]
+// registers internally fires under refresh-and-replay.
+//
+// First Commit attempt fails on the stale AssertRefSnapshotID (the
+// catalog has advanced past the leader's base). The retry refreshes,
+// builds a fresh conflictContext walking S0 → S1, and the rewrite
+// validator's conservative rule rejects any concurrent equality-delete
+// during a rewrite — terminal exit before any further CommitTable
+// call.
+//
+// Equality deletes are used here rather than positional deletes because
+// the v2 manifest schema does not carry a pos-delete's referenced data
+// file path; the validator's pos-delete branch is only effective on v3
+// tables. The eq-delete branch is the conservative rule for v2.
+func TestRewriteFiles_RejectsConcurrentEqDelete(t *testing.T) {
+       tbl, cat := newConcurrentRewriteTestTable(t)
+
+       arrowSc, err := table.SchemaToArrowSchema(tbl.Schema(), nil, false, 
false)
+       require.NoError(t, err)
+
+       for i := range 3 {
+               dataPath := tbl.Location() + 
fmt.Sprintf("/data/file-%d.parquet", i)
+               writeParquetFile(t, dataPath, arrowSc,
+                       fmt.Sprintf(`[{"id": %d, "data": "row-%d"}]`, i+1, i+1))
+               tx := tbl.NewTransaction()
+               require.NoError(t, tx.AddFiles(t.Context(), []string{dataPath}, 
nil, false))
+               tbl, err = tx.Commit(t.Context())
+               require.NoError(t, err)
+       }
+
+       tasks, err := tbl.Scan().PlanFiles(t.Context())
+       require.NoError(t, err)
+
+       plan, err := defaultTestCompactionCfg.PlanCompaction(tasks)
+       require.NoError(t, err)
+       require.NotEmpty(t, plan.Groups)
+
+       groups := toTaskGroups(plan.Groups)
+
+       results := make([]table.CompactionGroupResult, 0, len(groups))
+       for _, g := range groups {
+               gr, err := table.ExecuteCompactionGroup(t.Context(), tbl, g)
+               require.NoError(t, err)
+               require.NotEmpty(t, gr.OldDataFiles)
+               results = append(results, gr)
+       }
+
+       leaderTxn := tbl.NewTransaction()
+       rewrite := leaderTxn.NewRewrite(nil)
+       for _, gr := range results {
+               rewrite.Apply(gr.OldDataFiles, gr.NewDataFiles, 
gr.SafePosDeletes)
+       }
+       require.NoError(t, rewrite.Commit(t.Context()),
+               "staging the rewrite must succeed; the conflict surfaces at 
Commit time")
+
+       eqDelPath := tbl.Location() + "/data/concurrent-eq-del.parquet"
+       eqDelBuilder, err := iceberg.NewDataFileBuilder(
+               *iceberg.UnpartitionedSpec, iceberg.EntryContentEqDeletes,
+               eqDelPath, iceberg.ParquetFile, nil, nil, nil, 1, 128)
+       require.NoError(t, err)
+       eqDelBuilder = eqDelBuilder.EqualityFieldIDs([]int{1})
+
+       peerTxn := tbl.NewTransaction()
+       rd := peerTxn.NewRowDelta(nil)
+       rd.AddDeletes(eqDelBuilder.Build())
+       require.NoError(t, rd.Commit(t.Context()))
+       _, err = peerTxn.Commit(t.Context())
+       require.NoError(t, err, "peer commit advances the catalog so the 
leader's first attempt fails")
+
+       beforeLeader := cat.attempts.Load()
+       _, err = leaderTxn.Commit(t.Context())
+       require.Error(t, err)
+       assert.ErrorIs(t, err, table.ErrConflictingDeleteFiles,
+               "refresh-and-replay must detect the concurrent equality-delete 
during a rewrite")
+       assert.Equal(t, int32(1), cat.attempts.Load()-beforeLeader,

Review Comment:
   The `attempts.Load() - beforeLeader == 1` math is load-bearing for the 
assertion's claim ("validator rejects on retry before re-issuing CommitTable") 
but takes a second read to follow — the count includes the leader's first 
attempt that fails on the stale `AssertRefSnapshotID`, then the validator runs 
on the refresh path and short-circuits before a second `CommitTable`. A line of 
comment naming both attempts (or naming the delta as "only the stale-assertion 
attempt landed; the retry never reached CommitTable") would help the next 
reader.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to