This is an automated email from the ASF dual-hosted git repository.
laskoviymishka pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new c38e16bc test(table): add v3 OCC row lineage regression tests (#998)
(#1043)
c38e16bc is described below
commit c38e16bcf93042c3e18253dda927ed7fe81ef401
Author: Tanmay Rauth <[email protected]>
AuthorDate: Sat May 9 01:13:50 2026 -0700
test(table): add v3 OCC row lineage regression tests (#998) (#1043)
After investigating the codebase, I found that the write-side row
lineage is already fully implemented on main:
- Manifest-list writer assigns first_row_id to data manifests
(manifest.go:1508)
- OCC retry re-derives first_row_id from fresh metadata
(snapshot_producers.go:1029)
- validateAndUpdateRowLineage advances next-row-id at commit time
(metadata.go:507)
- Scanner synthesizes _row_id at read time (arrow_scanner.go:425)
All I added was two tests that prove the existing code works correctly
under the specific scenarios:
1. Fast-append + OCC retry → first_row_id rebases correctly
2. Merge-append + OCC retry → merged manifests get correct first_row_id
from list writer
Closes: #998
---
table/occ_scenario_test.go | 249 +++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 249 insertions(+)
diff --git a/table/occ_scenario_test.go b/table/occ_scenario_test.go
index 64ad9d91..a91613d7 100644
--- a/table/occ_scenario_test.go
+++ b/table/occ_scenario_test.go
@@ -27,6 +27,7 @@ import (
"context"
"fmt"
"path/filepath"
+ "strings"
"sync/atomic"
"testing"
@@ -255,3 +256,251 @@ func (s *OCCScenarioTestSuite)
TestManifestListInheritedAfterConflict() {
"manifest[%d] should have 1 data file, got %d", i,
count)
}
}
+
+// TestV3RowLineageCorrectAfterOCCRetry verifies that row lineage
(first-row-id,
+// added-rows, next-row-id) is correctly recomputed on OCC retry for v3 tables.
+//
+// Scenario:
+// - Writer B commits 1 row to an empty v3 table (next-row-id advances 0→1).
+// - Writer A starts from the empty base (stale next-row-id=0) and appends 1
row.
+// - Writer A's first commit attempt fails (409 conflict).
+// - On retry, rebuildManifestList re-derives first-row-id from fresh
metadata
+// (next-row-id=1 after B's commit) so that Writer A's first-row-id = 1.
+// - The final metadata next-row-id = 2 (gap-free: B claimed [0,1), A
claimed [1,2)).
+//
+// This guards against the stale-ID bug the reviewer identified: if the retry
+// reused the first-row-id computed at attempt 0, validateAndUpdateRowLineage
+// would reject the commit because first-row-id (0) < next-row-id (1).
+func (s *OCCScenarioTestSuite) TestV3RowLineageCorrectAfterOCCRetry() {
+ props := iceberg.Properties{
+ table.CommitMinRetryWaitMsKey: "0",
+ table.CommitMaxRetryWaitMsKey: "0",
+ table.CommitTotalRetryTimeoutMsKey: "60000",
+ table.CommitNumRetriesKey: "2",
+ }
+
+ // Step 1: Writer B commits 1 row to an empty v3 table.
+ emptyTbl, catB := s.makeV3Table(0, props)
+ rowB := s.makeArrowTable()
+ defer rowB.Release()
+
+ txB := emptyTbl.NewTransaction()
+ s.Require().NoError(txB.AppendTable(s.ctx, rowB, rowB.NumRows(), nil))
+
+ _, err := txB.Commit(s.ctx)
+ s.Require().NoError(err, "Writer B must commit successfully")
+
+ metaAfterB := catB.current
+ s.Equal(int64(1), metaAfterB.NextRowID(),
+ "after Writer B: next-row-id must be 1 (one row committed)")
+
+ snapB := metaAfterB.CurrentSnapshot()
+ s.Require().NotNil(snapB)
+ s.Require().NotNil(snapB.FirstRowID)
+ s.Equal(int64(0), *snapB.FirstRowID, "Writer B first-row-id starts at
0")
+
+ // Step 2: Writer A's catalog starts with B's committed state but
returns
+ // one conflict before accepting.
+ catA := &occScenarioCatalog{
+ current: metaAfterB,
+ conflictsLeft: 1,
+ location: s.location,
+ }
+
+ // Writer A's table starts from the EMPTY base — stale next-row-id=0.
+ writerATable := table.New(
+ emptyTbl.Identifier(),
+ emptyTbl.Metadata(),
+ s.location+"/metadata/v1.metadata.json",
+ func(_ context.Context) (iceio.IO, error) { return
iceio.LocalFS{}, nil },
+ catA,
+ )
+
+ rowA := s.makeArrowTable()
+ defer rowA.Release()
+
+ txA := writerATable.NewTransaction()
+ s.Require().NoError(txA.AppendTable(s.ctx, rowA, rowA.NumRows(), nil))
+
+ _, err = txA.Commit(s.ctx)
+ s.Require().NoError(err, "Writer A must succeed after one conflict
retry")
+
+ s.Equal(int32(2), catA.commitTableCalls.Load(),
+ "expected 2 commit attempts: 1 conflict + 1 success")
+
+ // Step 3: Validate row lineage in final metadata.
+ finalMeta := catA.current
+ s.Equal(int64(2), finalMeta.NextRowID(),
+ "final next-row-id must be 2: B claimed [0,1), A claimed [1,2)")
+
+ finalSnap := finalMeta.CurrentSnapshot()
+ s.Require().NotNil(finalSnap)
+ s.Require().NotNil(finalSnap.FirstRowID)
+ s.Require().NotNil(finalSnap.AddedRows)
+ s.Equal(int64(1), *finalSnap.FirstRowID,
+ "Writer A's first-row-id must be 1 (rebased against B's
next-row-id)")
+ s.Equal(int64(1), *finalSnap.AddedRows,
+ "Writer A added exactly 1 row")
+
+ // Step 4: Validate manifest-level first_row_id is persisted correctly.
+ fio := iceio.LocalFS{}
+ manifests, err := finalSnap.Manifests(fio)
+ s.Require().NoError(err)
+ s.Require().Len(manifests, 2,
+ "expected 2 manifests (Writer A + inherited Writer B)")
+
+ // The new manifest (from Writer A's snapshot) should have first_row_id
= 1.
+ var writerAManifest iceberg.ManifestFile
+ for _, mf := range manifests {
+ if mf.SnapshotID() == finalSnap.SnapshotID {
+ writerAManifest = mf
+ }
+ }
+ s.Require().NotNil(writerAManifest, "must find Writer A's manifest")
+ s.Require().NotNil(writerAManifest.FirstRowID(),
+ "v3 data manifest must have first_row_id set")
+ s.Equal(int64(1), *writerAManifest.FirstRowID(),
+ "Writer A manifest first_row_id must equal snapshot
first-row-id (rebased)")
+}
+
+// TestV3MergeAppendRowLineageAfterOCCRetry verifies that merge-append
+// correctly maintains row lineage under OCC retry. The reviewer's concern was
+// that manifest merging could lose the manifest-level first_row_id. Since the
+// manifest-list writer (not the manifest itself) assigns first_row_id, merged
+// manifests correctly receive their value at list-write time from fresh
metadata.
+//
+// Scenario:
+// - Writer B commits 3 rows to an empty v3 table with merge enabled.
+// - Writer A (merge-append) starts from the empty base and appends 2 rows.
+// - First attempt fails (409). Retry re-derives first-row-id from fresh
+// metadata (next-row-id=3 after B).
+// - Final metadata: next-row-id = 3 + (rows assigned in merged manifest).
+func (s *OCCScenarioTestSuite) TestV3MergeAppendRowLineageAfterOCCRetry() {
+ props := iceberg.Properties{
+ table.CommitMinRetryWaitMsKey: "0",
+ table.CommitMaxRetryWaitMsKey: "0",
+ table.CommitTotalRetryTimeoutMsKey: "60000",
+ table.CommitNumRetriesKey: "2",
+ table.ManifestMergeEnabledKey: "true",
+ table.ManifestMinMergeCountKey: "2",
+ }
+
+ // Step 1: Writer B commits 3 rows.
+ emptyTbl, catB := s.makeV3Table(0, props)
+ rowB := s.makeArrowTableN(3)
+ defer rowB.Release()
+
+ txB := emptyTbl.NewTransaction()
+ s.Require().NoError(txB.AppendTable(s.ctx, rowB, rowB.NumRows(), nil))
+ _, err := txB.Commit(s.ctx)
+ s.Require().NoError(err, "Writer B must commit")
+
+ metaAfterB := catB.current
+ s.Equal(int64(3), metaAfterB.NextRowID())
+
+ // Step 2: Writer A with merge-append, starting from stale empty base.
+ catA := &occScenarioCatalog{
+ current: metaAfterB,
+ conflictsLeft: 1,
+ location: s.location,
+ }
+
+ writerATable := table.New(
+ emptyTbl.Identifier(),
+ emptyTbl.Metadata(),
+ s.location+"/metadata/v1.metadata.json",
+ func(_ context.Context) (iceio.IO, error) { return
iceio.LocalFS{}, nil },
+ catA,
+ )
+
+ rowA := s.makeArrowTableN(2)
+ defer rowA.Release()
+
+ txA := writerATable.NewTransaction()
+ s.Require().NoError(txA.AppendTable(s.ctx, rowA, rowA.NumRows(), nil))
+ _, err = txA.Commit(s.ctx)
+ s.Require().NoError(err, "Writer A (merge-append) must succeed after
retry")
+
+ s.Equal(int32(2), catA.commitTableCalls.Load())
+
+ // Step 3: Validate row lineage.
+ finalMeta := catA.current
+ finalSnap := finalMeta.CurrentSnapshot()
+ s.Require().NotNil(finalSnap)
+ s.Require().NotNil(finalSnap.FirstRowID)
+ s.Require().NotNil(finalSnap.AddedRows)
+
+ s.Equal(int64(3), *finalSnap.FirstRowID,
+ "merge-append first-row-id must be rebased to fresh
next-row-id=3")
+ s.Greater(*finalSnap.AddedRows, int64(0),
+ "merge-append must report positive added-rows")
+
+ expectedNextRowID := *finalSnap.FirstRowID + *finalSnap.AddedRows
+ s.Equal(expectedNextRowID, finalMeta.NextRowID(),
+ "final next-row-id = first-row-id + added-rows (gap-free)")
+
+ // Step 4: All data manifests in the final snapshot must have
first_row_id set.
+ fio := iceio.LocalFS{}
+ manifests, err := finalSnap.Manifests(fio)
+ s.Require().NoError(err)
+ for i, mf := range manifests {
+ if mf.ManifestContent() == iceberg.ManifestContentData {
+ s.NotNil(mf.FirstRowID(),
+ "v3 data manifest[%d] must have first_row_id
assigned by list writer", i)
+ }
+ }
+}
+
+func (s *OCCScenarioTestSuite) makeArrowTableN(n int) arrow.Table {
+ mem := memory.DefaultAllocator
+ sc := arrow.NewSchema([]arrow.Field{
+ {Name: "id", Type: arrow.PrimitiveTypes.Int64, Nullable: true},
+ {Name: "val", Type: arrow.BinaryTypes.String, Nullable: true},
+ }, nil)
+
+ rows := make([]string, n)
+ for i := range n {
+ rows[i] = fmt.Sprintf(`{"id": %d, "val": "row-%d"}`, i+1, i+1)
+ }
+ jsonStr := "[" + strings.Join(rows, ",") + "]"
+
+ tbl, err := array.TableFromJSON(mem, sc, []string{jsonStr})
+ s.Require().NoError(err)
+
+ return tbl
+}
+
+func (s *OCCScenarioTestSuite) makeV3Table(
+ conflicts int,
+ extraProps iceberg.Properties,
+) (*table.Table, *occScenarioCatalog) {
+ schema := iceberg.NewSchema(0,
+ iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int64},
+ iceberg.NestedField{ID: 2, Name: "val", Type:
iceberg.PrimitiveTypes.String},
+ )
+
+ props := iceberg.Properties{table.PropertyFormatVersion: "3"}
+ for k, v := range extraProps {
+ props[k] = v
+ }
+
+ meta, err := table.NewMetadata(schema, iceberg.UnpartitionedSpec,
+ table.UnsortedSortOrder, s.location, props)
+ s.Require().NoError(err)
+
+ cat := &occScenarioCatalog{
+ current: meta,
+ conflictsLeft: conflicts,
+ location: s.location,
+ }
+
+ tbl := table.New(
+ []string{"default", "occ_scenario_test"},
+ meta,
+ s.location+"/metadata/v1.metadata.json",
+ func(_ context.Context) (iceio.IO, error) { return
iceio.LocalFS{}, nil },
+ cat,
+ )
+
+ return tbl, cat
+}