laskoviymishka commented on code in PR #1120: URL: https://github.com/apache/iceberg-go/pull/1120#discussion_r3302296013
########## table/position_delta_writer.go: ########## @@ -0,0 +1,241 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table + +import ( + "context" + "fmt" + "iter" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/memory" + "github.com/apache/iceberg-go" + "github.com/google/uuid" +) + +// PositionDeltaWriter writes data files for the position-delta MoR pattern, +// distinguishing reinserted rows (survivors of a position-delta rewrite that +// preserve their original _row_id) from fresh inserts (rows that get a new +// _row_id synthesized at read time). +// +// Scope: this writer produces *data files only*. The position-delete entries +// that pair with reinserts (and that turn an UPDATE into delete-old + reinsert) +// are not emitted here — the engine driver is responsible for composing this +// writer with a position-delete writer and committing both via a RowDelta-style +// snapshot. This mirrors the data-file half of Java's +// SparkPositionDeltaWrite.reinsert(meta, row) vs insert(row) split. +// +// _last_updated_sequence_number is intentionally not written: the row-id-only +// file schema lets the reader synthesize it from the manifest entry's +// data_sequence_number, which after the rewrite is the new snapshot's sequence +// number — the value the spec requires. +// +// Usage: +// +// w, err := table.NewPositionDeltaWriter(tbl) +// w.Reinsert(survivorBatch) // batch must include _row_id column with non-null values +// w.Insert(freshBatch) // batch without _row_id (writer appends nulls) +// dataFiles, err := w.Close(ctx) +type PositionDeltaWriter struct { + tbl *Table + writeUUID uuid.UUID + + // reinsertBatches hold records with explicit _row_id values (preserving lineage). + reinsertBatches []arrow.RecordBatch + // insertBatches hold fresh records without lineage. + insertBatches []arrow.RecordBatch + + closed bool +} + +// NewPositionDeltaWriter creates a writer for the position-delta MoR update +// pattern on the given table. The table must be format version 3 or higher. +func NewPositionDeltaWriter(tbl *Table) (*PositionDeltaWriter, error) { + if tbl.metadata.Version() < 3 { + return nil, fmt.Errorf("%w: PositionDeltaWriter requires format version >= 3, got %d", + iceberg.ErrInvalidArgument, tbl.metadata.Version()) + } + + return &PositionDeltaWriter{ + tbl: tbl, + writeUUID: uuid.New(), + }, nil +} + +// Reinsert adds survivor rows that preserve their original _row_id. The batch +// MUST contain a _row_id column (field name "_row_id") with non-null int64 +// values representing the preserved row identities. +// +// Per the Iceberg spec, only position-delete rewrites and CoW can preserve +// lineage. Equality deletes cannot preserve lineage because the engine writes +// without reading old identity. +func (w *PositionDeltaWriter) Reinsert(batch arrow.RecordBatch) error { + if w.closed { + return fmt.Errorf("%w: writer is already closed", ErrInvalidOperation) + } + + indices := batch.Schema().FieldIndices(iceberg.RowIDColumnName) + if len(indices) == 0 { + return fmt.Errorf("%w: Reinsert batch must contain %s column", + iceberg.ErrInvalidArgument, iceberg.RowIDColumnName) + } + + col := batch.Column(indices[0]) + if col.NullN() > 0 { + return fmt.Errorf("%w: Reinsert batch %s column must not contain null values", + iceberg.ErrInvalidArgument, iceberg.RowIDColumnName) + } + + batch.Retain() + w.reinsertBatches = append(w.reinsertBatches, batch) + + return nil +} + +// Insert adds fresh rows that get a new _row_id at read time. The batch should +// NOT contain a _row_id column; if it does, all values must be null. These rows +// represent genuinely new data (not survivors of a rewrite). +func (w *PositionDeltaWriter) Insert(batch arrow.RecordBatch) error { + if w.closed { + return fmt.Errorf("%w: writer is already closed", ErrInvalidOperation) + } + + if indices := batch.Schema().FieldIndices(iceberg.RowIDColumnName); len(indices) > 0 { + if batch.Column(indices[0]).NullN() != int(batch.NumRows()) { + return fmt.Errorf("%w: Insert batch %s column must be all null (use Reinsert for preserved IDs)", + iceberg.ErrInvalidArgument, iceberg.RowIDColumnName) + } + } + + batch.Retain() + w.insertBatches = append(w.insertBatches, batch) + + return nil +} + +// Close finalizes the writer and returns the data files produced. The returned +// files contain both reinserted and fresh rows, with the _row_id column written +// explicitly for reinserted rows (non-null) and left null for fresh inserts. +// +// The caller is responsible for adding these files to a snapshot (typically via +// a Transaction's snapshot producer) along with any position-delete entries +// that pair with the reinserts. +func (w *PositionDeltaWriter) Close(ctx context.Context) ([]iceberg.DataFile, error) { + if w.closed { + return nil, fmt.Errorf("%w: writer is already closed", ErrInvalidOperation) + } + w.closed = true + + defer func() { + for _, b := range w.reinsertBatches { + b.Release() + } + for _, b := range w.insertBatches { + b.Release() + } + }() + + if len(w.reinsertBatches) == 0 && len(w.insertBatches) == 0 { + return nil, nil + } + + fileSchema := iceberg.SchemaWithRowID(w.tbl.Schema()) + arrowSc, err := SchemaToArrowSchema(fileSchema, nil, true, false) + if err != nil { + return nil, fmt.Errorf("PositionDeltaWriter: build arrow schema: %w", err) + } + + writeOpts := []WriteRecordOption{ + WithPreserveRowLineage(fileSchema), + WithWriteUUID(w.writeUUID), + } + + records := w.buildUnifiedIterator() + + var result []iceberg.DataFile + for df, err := range WriteRecords(ctx, w.tbl, arrowSc, records, writeOpts...) { + if err != nil { + return nil, fmt.Errorf("PositionDeltaWriter: write records: %w", err) + } + result = append(result, df) + } + + return result, nil +} + +// buildUnifiedIterator merges reinsert and insert batches into a single +// iterator. Reinsert batches already have _row_id; insert batches get a null +// _row_id column appended. +func (w *PositionDeltaWriter) buildUnifiedIterator() iter.Seq2[arrow.RecordBatch, error] { + return func(yield func(arrow.RecordBatch, error) bool) { + alloc := memory.NewGoAllocator() + + for _, batch := range w.reinsertBatches { + batch.Retain() + if !yield(batch, nil) { + batch.Release() Review Comment: I think there's a double-release on the early-stop path. `buildUnifiedIterator` calls `batch.Retain()` and `batch.Release()` in the `!yield` branch here (and the analogous `enriched.Release()` at line 206). But `WriteRecords`'s `releasing` wrapper (write_records.go:191) *also* calls `rec.Release()` on its own `!yield` branch — same pointer, two releases. Arrow-go's behavior below zero refcount is undefined, so happy-path tests don't catch it; any IO error mid-write silently corrupts the allocator. I'd drop the `batch.Release()` (and `enriched.Release()`) from the `!yield` branches here and let `releasing` own it — the contract on `WriteRecords` ("releases each RecordBatch it consumes") already covers that. A regression test that aborts the iterator mid-stream and asserts `CurrentAlloc() == 0` would lock it in. ########## table/position_delta_writer.go: ########## @@ -0,0 +1,241 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table + +import ( + "context" + "fmt" + "iter" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/memory" + "github.com/apache/iceberg-go" + "github.com/google/uuid" +) + +// PositionDeltaWriter writes data files for the position-delta MoR pattern, +// distinguishing reinserted rows (survivors of a position-delta rewrite that +// preserve their original _row_id) from fresh inserts (rows that get a new +// _row_id synthesized at read time). +// +// Scope: this writer produces *data files only*. The position-delete entries +// that pair with reinserts (and that turn an UPDATE into delete-old + reinsert) +// are not emitted here — the engine driver is responsible for composing this +// writer with a position-delete writer and committing both via a RowDelta-style +// snapshot. This mirrors the data-file half of Java's +// SparkPositionDeltaWrite.reinsert(meta, row) vs insert(row) split. +// +// _last_updated_sequence_number is intentionally not written: the row-id-only +// file schema lets the reader synthesize it from the manifest entry's +// data_sequence_number, which after the rewrite is the new snapshot's sequence +// number — the value the spec requires. +// +// Usage: +// +// w, err := table.NewPositionDeltaWriter(tbl) +// w.Reinsert(survivorBatch) // batch must include _row_id column with non-null values +// w.Insert(freshBatch) // batch without _row_id (writer appends nulls) +// dataFiles, err := w.Close(ctx) +type PositionDeltaWriter struct { + tbl *Table + writeUUID uuid.UUID + + // reinsertBatches hold records with explicit _row_id values (preserving lineage). + reinsertBatches []arrow.RecordBatch + // insertBatches hold fresh records without lineage. + insertBatches []arrow.RecordBatch + + closed bool +} + +// NewPositionDeltaWriter creates a writer for the position-delta MoR update +// pattern on the given table. The table must be format version 3 or higher. +func NewPositionDeltaWriter(tbl *Table) (*PositionDeltaWriter, error) { + if tbl.metadata.Version() < 3 { + return nil, fmt.Errorf("%w: PositionDeltaWriter requires format version >= 3, got %d", + iceberg.ErrInvalidArgument, tbl.metadata.Version()) + } + + return &PositionDeltaWriter{ + tbl: tbl, + writeUUID: uuid.New(), + }, nil +} + +// Reinsert adds survivor rows that preserve their original _row_id. The batch +// MUST contain a _row_id column (field name "_row_id") with non-null int64 +// values representing the preserved row identities. +// +// Per the Iceberg spec, only position-delete rewrites and CoW can preserve +// lineage. Equality deletes cannot preserve lineage because the engine writes +// without reading old identity. +func (w *PositionDeltaWriter) Reinsert(batch arrow.RecordBatch) error { + if w.closed { + return fmt.Errorf("%w: writer is already closed", ErrInvalidOperation) + } + + indices := batch.Schema().FieldIndices(iceberg.RowIDColumnName) + if len(indices) == 0 { + return fmt.Errorf("%w: Reinsert batch must contain %s column", + iceberg.ErrInvalidArgument, iceberg.RowIDColumnName) + } + + col := batch.Column(indices[0]) + if col.NullN() > 0 { + return fmt.Errorf("%w: Reinsert batch %s column must not contain null values", + iceberg.ErrInvalidArgument, iceberg.RowIDColumnName) + } + + batch.Retain() + w.reinsertBatches = append(w.reinsertBatches, batch) + + return nil +} + +// Insert adds fresh rows that get a new _row_id at read time. The batch should +// NOT contain a _row_id column; if it does, all values must be null. These rows +// represent genuinely new data (not survivors of a rewrite). +func (w *PositionDeltaWriter) Insert(batch arrow.RecordBatch) error { + if w.closed { + return fmt.Errorf("%w: writer is already closed", ErrInvalidOperation) + } + + if indices := batch.Schema().FieldIndices(iceberg.RowIDColumnName); len(indices) > 0 { + if batch.Column(indices[0]).NullN() != int(batch.NumRows()) { + return fmt.Errorf("%w: Insert batch %s column must be all null (use Reinsert for preserved IDs)", + iceberg.ErrInvalidArgument, iceberg.RowIDColumnName) + } + } + + batch.Retain() + w.insertBatches = append(w.insertBatches, batch) + + return nil +} + +// Close finalizes the writer and returns the data files produced. The returned +// files contain both reinserted and fresh rows, with the _row_id column written +// explicitly for reinserted rows (non-null) and left null for fresh inserts. +// +// The caller is responsible for adding these files to a snapshot (typically via +// a Transaction's snapshot producer) along with any position-delete entries +// that pair with the reinserts. +func (w *PositionDeltaWriter) Close(ctx context.Context) ([]iceberg.DataFile, error) { + if w.closed { + return nil, fmt.Errorf("%w: writer is already closed", ErrInvalidOperation) + } + w.closed = true + + defer func() { + for _, b := range w.reinsertBatches { + b.Release() + } + for _, b := range w.insertBatches { + b.Release() + } + }() + + if len(w.reinsertBatches) == 0 && len(w.insertBatches) == 0 { + return nil, nil + } + + fileSchema := iceberg.SchemaWithRowID(w.tbl.Schema()) + arrowSc, err := SchemaToArrowSchema(fileSchema, nil, true, false) + if err != nil { + return nil, fmt.Errorf("PositionDeltaWriter: build arrow schema: %w", err) + } + + writeOpts := []WriteRecordOption{ + WithPreserveRowLineage(fileSchema), + WithWriteUUID(w.writeUUID), + } + + records := w.buildUnifiedIterator() + + var result []iceberg.DataFile + for df, err := range WriteRecords(ctx, w.tbl, arrowSc, records, writeOpts...) { + if err != nil { + return nil, fmt.Errorf("PositionDeltaWriter: write records: %w", err) + } + result = append(result, df) + } + + return result, nil +} + +// buildUnifiedIterator merges reinsert and insert batches into a single +// iterator. Reinsert batches already have _row_id; insert batches get a null +// _row_id column appended. +func (w *PositionDeltaWriter) buildUnifiedIterator() iter.Seq2[arrow.RecordBatch, error] { + return func(yield func(arrow.RecordBatch, error) bool) { + alloc := memory.NewGoAllocator() + + for _, batch := range w.reinsertBatches { + batch.Retain() + if !yield(batch, nil) { + batch.Release() + + return + } + } + + for _, batch := range w.insertBatches { + enriched, err := appendNullRowIDColumn(alloc, batch) + if err != nil { + yield(nil, err) + + return + } + if !yield(enriched, nil) { + enriched.Release() + + return + } + } + } +} + +// appendNullRowIDColumn appends a null-filled _row_id column to a batch that +// doesn't have one, signaling that these are fresh inserts. If the batch +// already has _row_id (validated to be all-null by Insert), it is returned +// retained as-is. +func appendNullRowIDColumn(alloc memory.Allocator, batch arrow.RecordBatch) (arrow.RecordBatch, error) { + if indices := batch.Schema().FieldIndices(iceberg.RowIDColumnName); len(indices) > 0 { + batch.Retain() + + return batch, nil + } + + nrows := batch.NumRows() + bldr := array.NewInt64Builder(alloc) + defer bldr.Release() + bldr.AppendNulls(int(nrows)) + nullCol := bldr.NewArray() + defer nullCol.Release() + + fields := append(batch.Schema().Fields(), arrow.Field{ + Name: iceberg.RowIDColumnName, + Type: arrow.PrimitiveTypes.Int64, + Nullable: true, + }) + cols := append(batch.Columns(), nullCol) Review Comment: `batch.Columns()` returns the record's internal `arrs` slice directly. If `len(arrs) < cap(arrs)` (which can happen depending on how the batch was constructed upstream), this `append` writes `nullCol` into `batch.arrs[len]` in place — silently mutating the source batch's column array. Same risk one line up with `batch.Schema().Fields()` if the Arrow version doesn't defensively copy. Three-index slice it: `cols := append(batch.Columns()[:batch.NumCols():batch.NumCols()], nullCol)`. Or just `make` + `copy` explicitly. ########## table/position_delta_writer_test.go: ########## @@ -0,0 +1,261 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table_test + +import ( + "context" + "testing" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/memory" + "github.com/apache/iceberg-go" + "github.com/apache/iceberg-go/table" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestPositionDeltaWriter_ReinsertPreservesRowID(t *testing.T) { + ctx := context.Background() + mem := memory.DefaultAllocator + + tbl := newV3RowLineageTestTable(t) + + // Append initial data: id=1, id=2 → _row_id 0, 1 in the initial file. + dataSchema := arrow.NewSchema([]arrow.Field{ + {Name: "id", Type: arrow.PrimitiveTypes.Int64, Nullable: false}, + {Name: "data", Type: arrow.BinaryTypes.String, Nullable: true}, + }, nil) + initial, err := array.TableFromJSON(mem, dataSchema, []string{ + `[{"id": 1, "data": "a"}, {"id": 2, "data": "b"}]`, + }) + require.NoError(t, err) + defer initial.Release() + + tbl, err = tbl.Append(ctx, array.NewTableReader(initial, -1), nil) + require.NoError(t, err) + + // Capture the initial data file so we can replace it via NewRewrite. + initialFiles, err := planFilesAll(ctx, tbl) + require.NoError(t, err) + require.Len(t, initialFiles, 1) + initialFile := initialFiles[0] + + // Create a PositionDeltaWriter and stage: + // - Reinsert(id=1, _row_id=0) — survivor with updated value. + // - Reinsert(id=2, _row_id=1) — unchanged survivor. + // - Insert(id=99) — fresh row, gets synthesized _row_id at read time. + w, err := table.NewPositionDeltaWriter(tbl) + require.NoError(t, err) + + reinsertBatch := buildReinsertBatch(mem, + []int64{1, 2}, + []string{"a_updated", "b"}, + []int64{0, 1}, + ) + defer reinsertBatch.Release() + require.NoError(t, w.Reinsert(reinsertBatch)) + + insertBatch := buildInsertBatch(mem, []int64{99}, []string{"new"}) + defer insertBatch.Release() + require.NoError(t, w.Insert(insertBatch)) + + dataFiles, err := w.Close(ctx) + require.NoError(t, err) + require.NotEmpty(t, dataFiles) + + // Atomically replace the initial file with the writer's output. This is + // a library-internal rewrite (NewRewrite skips the explicit first_row_id + // requirement that AddDataFiles enforces for externally-written files). + tx := tbl.NewTransaction() + rw := tx.NewRewrite(nil) + rw.DeleteFile(initialFile) + for _, df := range dataFiles { + rw.AddDataFile(df) + } + require.NoError(t, rw.Commit(ctx)) + tbl, err = tx.Commit(ctx) + require.NoError(t, err) + + // Scan with lineage — survivors must keep their original _row_ids; the + // fresh row gets a new synthesized _row_id from the new file's first_row_id. + scan := tbl.Scan(table.WithRowLineage()) + _, itr, err := scan.ToArrowRecords(ctx) + require.NoError(t, err) + + rowsByID := map[int64]int64{} + for rec, err := range itr { + require.NoError(t, err) + idCol := rec.Column(rec.Schema().FieldIndices("id")[0]).(*array.Int64) + rowIDCol := rec.Column(rec.Schema().FieldIndices(iceberg.RowIDColumnName)[0]).(*array.Int64) + for i := 0; i < int(rec.NumRows()); i++ { + rowsByID[idCol.Value(i)] = rowIDCol.Value(i) + } + rec.Release() + } + + require.Contains(t, rowsByID, int64(1)) + require.Contains(t, rowsByID, int64(2)) + require.Contains(t, rowsByID, int64(99)) + + assert.Equal(t, int64(0), rowsByID[1], "reinserted id=1 must preserve original _row_id=0") + assert.Equal(t, int64(1), rowsByID[2], "reinserted id=2 must preserve original _row_id=1") + // The fresh row's _row_id must collide with no preserved survivor's id. + assert.NotEqual(t, int64(0), rowsByID[99]) + assert.NotEqual(t, int64(1), rowsByID[99]) Review Comment: This test doesn't actually distinguish "we wrote `_row_id` explicitly" from "the reader synthesized it and happened to land on the same values." If the writer silently dropped `_row_id` (hypothetical bug), the new file's `first_row_id` could easily be 0, and `synthesizeRowLineageColumns` would produce row_ids 0, 1, 2 for the three rows — exactly what the test asserts. The `NotEqual(rowsByID[99], 0/1)` guard only catches it if the file's `first_row_id` happens to overlap reinsert IDs. Two ways to strengthen, wdyt: - Pick reinsert `_row_id` values that can't collide with any plausible synthesized range, e.g. `[]int64{1 << 40, (1<<40) + 1}`. Then `Equal` checks become real proof. - Or read the data file directly and assert the `_row_id` column physically exists with non-null values matching what we passed. The first is cheaper. ########## table/position_delta_writer.go: ########## @@ -0,0 +1,241 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table + +import ( + "context" + "fmt" + "iter" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/memory" + "github.com/apache/iceberg-go" + "github.com/google/uuid" +) + +// PositionDeltaWriter writes data files for the position-delta MoR pattern, +// distinguishing reinserted rows (survivors of a position-delta rewrite that +// preserve their original _row_id) from fresh inserts (rows that get a new +// _row_id synthesized at read time). +// +// Scope: this writer produces *data files only*. The position-delete entries +// that pair with reinserts (and that turn an UPDATE into delete-old + reinsert) +// are not emitted here — the engine driver is responsible for composing this +// writer with a position-delete writer and committing both via a RowDelta-style +// snapshot. This mirrors the data-file half of Java's +// SparkPositionDeltaWrite.reinsert(meta, row) vs insert(row) split. +// +// _last_updated_sequence_number is intentionally not written: the row-id-only +// file schema lets the reader synthesize it from the manifest entry's +// data_sequence_number, which after the rewrite is the new snapshot's sequence +// number — the value the spec requires. +// +// Usage: +// +// w, err := table.NewPositionDeltaWriter(tbl) +// w.Reinsert(survivorBatch) // batch must include _row_id column with non-null values +// w.Insert(freshBatch) // batch without _row_id (writer appends nulls) +// dataFiles, err := w.Close(ctx) +type PositionDeltaWriter struct { + tbl *Table + writeUUID uuid.UUID + + // reinsertBatches hold records with explicit _row_id values (preserving lineage). + reinsertBatches []arrow.RecordBatch + // insertBatches hold fresh records without lineage. + insertBatches []arrow.RecordBatch + + closed bool +} + +// NewPositionDeltaWriter creates a writer for the position-delta MoR update +// pattern on the given table. The table must be format version 3 or higher. +func NewPositionDeltaWriter(tbl *Table) (*PositionDeltaWriter, error) { + if tbl.metadata.Version() < 3 { + return nil, fmt.Errorf("%w: PositionDeltaWriter requires format version >= 3, got %d", + iceberg.ErrInvalidArgument, tbl.metadata.Version()) + } + + return &PositionDeltaWriter{ + tbl: tbl, + writeUUID: uuid.New(), + }, nil +} + +// Reinsert adds survivor rows that preserve their original _row_id. The batch +// MUST contain a _row_id column (field name "_row_id") with non-null int64 +// values representing the preserved row identities. +// +// Per the Iceberg spec, only position-delete rewrites and CoW can preserve +// lineage. Equality deletes cannot preserve lineage because the engine writes +// without reading old identity. +func (w *PositionDeltaWriter) Reinsert(batch arrow.RecordBatch) error { + if w.closed { + return fmt.Errorf("%w: writer is already closed", ErrInvalidOperation) + } + + indices := batch.Schema().FieldIndices(iceberg.RowIDColumnName) + if len(indices) == 0 { + return fmt.Errorf("%w: Reinsert batch must contain %s column", + iceberg.ErrInvalidArgument, iceberg.RowIDColumnName) + } + + col := batch.Column(indices[0]) + if col.NullN() > 0 { + return fmt.Errorf("%w: Reinsert batch %s column must not contain null values", + iceberg.ErrInvalidArgument, iceberg.RowIDColumnName) + } + + batch.Retain() + w.reinsertBatches = append(w.reinsertBatches, batch) + + return nil +} + +// Insert adds fresh rows that get a new _row_id at read time. The batch should +// NOT contain a _row_id column; if it does, all values must be null. These rows +// represent genuinely new data (not survivors of a rewrite). +func (w *PositionDeltaWriter) Insert(batch arrow.RecordBatch) error { + if w.closed { + return fmt.Errorf("%w: writer is already closed", ErrInvalidOperation) + } + + if indices := batch.Schema().FieldIndices(iceberg.RowIDColumnName); len(indices) > 0 { + if batch.Column(indices[0]).NullN() != int(batch.NumRows()) { + return fmt.Errorf("%w: Insert batch %s column must be all null (use Reinsert for preserved IDs)", + iceberg.ErrInvalidArgument, iceberg.RowIDColumnName) + } + } + + batch.Retain() + w.insertBatches = append(w.insertBatches, batch) + + return nil +} + +// Close finalizes the writer and returns the data files produced. The returned +// files contain both reinserted and fresh rows, with the _row_id column written +// explicitly for reinserted rows (non-null) and left null for fresh inserts. +// +// The caller is responsible for adding these files to a snapshot (typically via +// a Transaction's snapshot producer) along with any position-delete entries +// that pair with the reinserts. +func (w *PositionDeltaWriter) Close(ctx context.Context) ([]iceberg.DataFile, error) { + if w.closed { + return nil, fmt.Errorf("%w: writer is already closed", ErrInvalidOperation) + } + w.closed = true + + defer func() { + for _, b := range w.reinsertBatches { + b.Release() + } + for _, b := range w.insertBatches { + b.Release() + } + }() + + if len(w.reinsertBatches) == 0 && len(w.insertBatches) == 0 { + return nil, nil + } + + fileSchema := iceberg.SchemaWithRowID(w.tbl.Schema()) + arrowSc, err := SchemaToArrowSchema(fileSchema, nil, true, false) + if err != nil { + return nil, fmt.Errorf("PositionDeltaWriter: build arrow schema: %w", err) + } + + writeOpts := []WriteRecordOption{ + WithPreserveRowLineage(fileSchema), + WithWriteUUID(w.writeUUID), + } + + records := w.buildUnifiedIterator() + + var result []iceberg.DataFile + for df, err := range WriteRecords(ctx, w.tbl, arrowSc, records, writeOpts...) { + if err != nil { + return nil, fmt.Errorf("PositionDeltaWriter: write records: %w", err) + } + result = append(result, df) + } + + return result, nil +} + +// buildUnifiedIterator merges reinsert and insert batches into a single +// iterator. Reinsert batches already have _row_id; insert batches get a null +// _row_id column appended. +func (w *PositionDeltaWriter) buildUnifiedIterator() iter.Seq2[arrow.RecordBatch, error] { + return func(yield func(arrow.RecordBatch, error) bool) { + alloc := memory.NewGoAllocator() + + for _, batch := range w.reinsertBatches { + batch.Retain() + if !yield(batch, nil) { + batch.Release() + + return + } + } + + for _, batch := range w.insertBatches { + enriched, err := appendNullRowIDColumn(alloc, batch) + if err != nil { + yield(nil, err) + + return + } + if !yield(enriched, nil) { + enriched.Release() + + return + } + } + } +} + +// appendNullRowIDColumn appends a null-filled _row_id column to a batch that +// doesn't have one, signaling that these are fresh inserts. If the batch +// already has _row_id (validated to be all-null by Insert), it is returned +// retained as-is. +func appendNullRowIDColumn(alloc memory.Allocator, batch arrow.RecordBatch) (arrow.RecordBatch, error) { + if indices := batch.Schema().FieldIndices(iceberg.RowIDColumnName); len(indices) > 0 { + batch.Retain() + + return batch, nil + } + + nrows := batch.NumRows() + bldr := array.NewInt64Builder(alloc) + defer bldr.Release() + bldr.AppendNulls(int(nrows)) + nullCol := bldr.NewArray() + defer nullCol.Release() + + fields := append(batch.Schema().Fields(), arrow.Field{ Review Comment: The appended Arrow field has no `PARQUET:field_id` metadata. In practice the static `arrowSc` we pass to `WriteRecords` (derived from `SchemaToArrowSchema(SchemaWithRowID(...), ...)`) carries the field ID, so the Parquet writer tags the column correctly downstream — but per-batch the field is unannotated, which is brittle: any future code path that reads field IDs from the per-batch schema rather than the static one would silently misroute the column. Worth attaching `ArrowParquetFieldIDKey → strconv.Itoa(iceberg.RowIDFieldID)` here so the two schemas agree. ########## table/position_delta_writer.go: ########## @@ -0,0 +1,241 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table + +import ( + "context" + "fmt" + "iter" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/memory" + "github.com/apache/iceberg-go" + "github.com/google/uuid" +) + +// PositionDeltaWriter writes data files for the position-delta MoR pattern, +// distinguishing reinserted rows (survivors of a position-delta rewrite that +// preserve their original _row_id) from fresh inserts (rows that get a new +// _row_id synthesized at read time). +// +// Scope: this writer produces *data files only*. The position-delete entries +// that pair with reinserts (and that turn an UPDATE into delete-old + reinsert) +// are not emitted here — the engine driver is responsible for composing this +// writer with a position-delete writer and committing both via a RowDelta-style +// snapshot. This mirrors the data-file half of Java's +// SparkPositionDeltaWrite.reinsert(meta, row) vs insert(row) split. +// +// _last_updated_sequence_number is intentionally not written: the row-id-only +// file schema lets the reader synthesize it from the manifest entry's +// data_sequence_number, which after the rewrite is the new snapshot's sequence +// number — the value the spec requires. +// +// Usage: +// +// w, err := table.NewPositionDeltaWriter(tbl) +// w.Reinsert(survivorBatch) // batch must include _row_id column with non-null values +// w.Insert(freshBatch) // batch without _row_id (writer appends nulls) +// dataFiles, err := w.Close(ctx) +type PositionDeltaWriter struct { + tbl *Table + writeUUID uuid.UUID + + // reinsertBatches hold records with explicit _row_id values (preserving lineage). + reinsertBatches []arrow.RecordBatch + // insertBatches hold fresh records without lineage. + insertBatches []arrow.RecordBatch + + closed bool +} + +// NewPositionDeltaWriter creates a writer for the position-delta MoR update +// pattern on the given table. The table must be format version 3 or higher. +func NewPositionDeltaWriter(tbl *Table) (*PositionDeltaWriter, error) { Review Comment: No way for callers to thread `WithTargetFileSize`, `WithMaxWriteWorkers`, or `WithClusteredWrite` through. A large rewrite silently uses table defaults, which may produce a different file count/shape than the caller expects. Could we take `opts ...WriteRecordOption` and merge them at the `WriteRecords` call site? `WithWriteUUID` and `WithPreserveRowLineage` stay hardcoded; everything else flows from the caller. Thoughts? ########## table/position_delta_writer.go: ########## @@ -0,0 +1,241 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table + +import ( + "context" + "fmt" + "iter" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/memory" + "github.com/apache/iceberg-go" + "github.com/google/uuid" +) + +// PositionDeltaWriter writes data files for the position-delta MoR pattern, +// distinguishing reinserted rows (survivors of a position-delta rewrite that +// preserve their original _row_id) from fresh inserts (rows that get a new +// _row_id synthesized at read time). +// +// Scope: this writer produces *data files only*. The position-delete entries +// that pair with reinserts (and that turn an UPDATE into delete-old + reinsert) +// are not emitted here — the engine driver is responsible for composing this +// writer with a position-delete writer and committing both via a RowDelta-style +// snapshot. This mirrors the data-file half of Java's +// SparkPositionDeltaWrite.reinsert(meta, row) vs insert(row) split. +// +// _last_updated_sequence_number is intentionally not written: the row-id-only +// file schema lets the reader synthesize it from the manifest entry's +// data_sequence_number, which after the rewrite is the new snapshot's sequence +// number — the value the spec requires. +// +// Usage: +// +// w, err := table.NewPositionDeltaWriter(tbl) +// w.Reinsert(survivorBatch) // batch must include _row_id column with non-null values +// w.Insert(freshBatch) // batch without _row_id (writer appends nulls) +// dataFiles, err := w.Close(ctx) +type PositionDeltaWriter struct { + tbl *Table + writeUUID uuid.UUID + + // reinsertBatches hold records with explicit _row_id values (preserving lineage). + reinsertBatches []arrow.RecordBatch + // insertBatches hold fresh records without lineage. + insertBatches []arrow.RecordBatch Review Comment: Both slices grow unbounded until `Close()` flushes, so a 10GB partition rewrite materializes 10GB of Arrow before a byte hits disk. For the engine-driver use cases this writer targets, that might be fine; for compaction it's not. Either document the memory model on the type doc explicitly so callers don't get surprised, or take the iterator at construction and stream straight through to `WriteRecords` rather than accumulating. I'd be fine with documenting it for now and revisiting in the position-delete writer PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
