laskoviymishka commented on code in PR #1099:
URL: https://github.com/apache/iceberg-go/pull/1099#discussion_r3277166972
##########
metadata_columns.go:
##########
@@ -59,3 +59,12 @@ func LastUpdatedSequenceNumber() NestedField {
func IsMetadataColumn(fieldID int) bool {
return fieldID == RowIDFieldID || fieldID ==
LastUpdatedSequenceNumberFieldID
}
+
+// SchemaWithRowID returns a new schema that appends the _row_id metadata
column
+// to the given schema's fields. Used when reading source files during a CoW
rewrite
+// so that row identity is preserved in the output.
+func SchemaWithRowID(s *Schema) *Schema {
+ fields := append(s.Fields(), RowID())
Review Comment:
Two issues here that bite in practice.
First, if the input schema already has a field with ID 1 (e.g. a user column
named `id`), `NewSchemaWithIdentifiers` will reject the duplicate and we panic
at runtime. We can't assume user schemas avoid 1, 2, 3.
Second, `append(schema.Fields(), ...)` aliases the original schema's backing
array when there's spare cap. A later mutation of either schema can corrupt the
other. I'd `slices.Clone(schema.Fields())` first, and either skip-or-error when
one of the row-lineage IDs collides instead of panicking. wdyt?
##########
table/rewrite_data_files.go:
##########
@@ -289,6 +289,11 @@ func ExecuteCompactionGroup(ctx context.Context, tbl
*Table, group CompactionTas
scanOpts = append(scanOpts,
WitMaxConcurrency(cfg.scanConcurrency))
}
+ preserveLineage := tbl.metadata.Version() >= 3 &&
hasRowLineage(group.Tasks)
Review Comment:
Mixing row-lineage and non-row-lineage source files in a single rewrite
group is going to produce a file where some rows have `_row_id` /
`_last_updated_sequence_number` and others don't, which violates the per-file
invariant. `hasRowLineage` returning true if *any* task has lineage means the
lineage path runs for the whole group regardless of the mix.
I'd partition the input files by file-level `has_row_lineage` and rewrite
each partition separately, or skip non-lineage files when the table has row
lineage enabled. Silently merging them feels like a correctness bug waiting to
surface.
##########
table/transaction.go:
##########
@@ -1532,6 +1532,26 @@ func (t *Transaction) rewriteSingleFile(ctx
context.Context, fs io.IO, originalF
Length: originalFile.FileSizeBytes(),
}
+ // Preserve row lineage for v3 tables: include _row_id in the scan
+ // projection so it is synthesized (or read from existing explicit
values)
+ // and written into the output file. _last_updated_sequence_number is
left
+ // out — it will be null in the output and inherited from the new file's
+ // data_sequence_number at read time, which satisfies the spec
requirement
Review Comment:
The spec says `last-updated-sequence-number` on a data file is the sequence
number of the snapshot in which the row was last updated. Leaving it null on
rewrite and inheriting from the new file's `data_sequence_number` conflates
"physically rewritten" with "logically updated" and breaks downstream readers
that use this for CDC / incremental scans.
For a pure compaction (no row changes), we should preserve the original
per-row `_last_updated_sequence_number` from the source files. Otherwise every
compaction looks like a full table update to consumers.
##########
table/scanner.go:
##########
@@ -261,26 +260,22 @@ func (scan *Scan) Projection() (*iceberg.Schema, error) {
}
}
+ var schema *iceberg.Schema
if slices.Contains(scan.selectedFields, "*") {
- return curSchema, nil
- }
-
- selectedFieldsMeta := metaFieldsFromSelectedFields(scan.selectedFields,
caseSensitive)
- schemaMeta := metaFieldsFromSchema(curSchema)
- synthesisMeta := synthesizeMeta(selectedFieldsMeta, schemaMeta)
- if len(synthesisMeta) > 0 && curVersion >= minFormatVersionRowLineage {
-
- // synthesis path
- removedMetaSlice, missingMetaFields :=
removeMetadataFromSelectedFields(scan.selectedFields, synthesisMeta)
- sch, err := curSchema.Select(scan.caseSensitive,
removedMetaSlice...)
+ schema = curSchema
+ } else {
+ var err error
+ schema, err = curSchema.Select(scan.caseSensitive,
scan.selectedFields...)
Review Comment:
This loses the metadata-column escape hatch that the old synthesis path
supported. `Select("_row_id")` (or `Select("_last_updated_sequence_number")`)
now goes straight to `curSchema.Select` and fails because the user schema
doesn't contain those names — the only way to get those columns is
`WithRowLineage()`, which is a separate API.
If that's intentional, worth a CHANGELOG note. If not, we should intercept
the known metadata column names here before the schema lookup, the way the
removed `synthesizeMeta`/`removeMetadataFromSelectedFields` did.
##########
table/write_records.go:
##########
@@ -94,6 +95,16 @@ func WithClusteredWrite() WriteRecordOption {
}
}
+// WithPreserveRowLineage sets the output file schema to include _row_id so
that
+// row identity is preserved through rewrites and compactions on v3 tables. The
+// input records must already contain the _row_id column (e.g. from a scan that
+// projected lineage columns).
+func WithPreserveRowLineage(schema *iceberg.Schema) WriteRecordOption {
Review Comment:
`WithPreserveRowLineage` is accepted on any table — there's no check that
the table actually has v3 row lineage enabled, or that the input records
contain `_row_id`. A caller who sets it on a v2 table or feeds records without
the lineage column gets silently-wrong output rather than an error.
I'd validate at option-application time (or inside `WriteRecords` once we
have the `tbl`): if the table format is < 3, error. If the input Arrow schema
is missing `_row_id`, error.
##########
table/table.go:
##########
@@ -816,6 +816,14 @@ func WithOptions(opts iceberg.Properties) ScanOption {
}
}
+// WithRowLineage adds _row_id to the scan projection so that row identity is
Review Comment:
The doc says this is "ignored" on non-v3 tables, but the implementation sets
`scan.includeRowLineage = true` unconditionally and `SchemaWithRowID` then runs
in `Projection()` regardless of format version. On a v1/v2 table that ends up
projecting a metadata column the table doesn't support.
Either the doc is wrong or the guard is missing. I'd add an explicit
format-version check (either error or true no-op) and make the doc match the
behavior.
##########
table/transaction.go:
##########
@@ -1788,3 +1843,40 @@ func (s *StagedTable) Refresh(ctx context.Context)
(*Table, error) {
func (s *StagedTable) Scan(opts ...ScanOption) *Scan {
panic(fmt.Errorf("%w: cannot scan a staged table", ErrInvalidOperation))
}
+
+// filterRecordBatch applies an Iceberg filter to a record batch, returning
+// only matching rows. The filter is provided unbound and is bound against the
+// given schema internally. Used for post-synthesis row filtering in CoW
rewrites
+// where row lineage must be synthesized before filtering.
+func filterRecordBatch(ctx context.Context, rec arrow.RecordBatch, filter
iceberg.BooleanExpression, schema *iceberg.Schema, caseSensitive bool)
(arrow.RecordBatch, error) {
Review Comment:
Two things in `filterRecordBatch`:
This is called once per batch from the iterator loop, but `BindExpr` and
`substrait.ConvertExpr` depend only on the (schema, filter) pair, not the
batch. We're paying that cost N times for no reason — hoist the bind + convert
out and pass them in, or memoize on first call.
Also, the `AlwaysFalse` branch returns `array.NewRecordBatch(emptySchema,
nil, 0)` with no columns at all — downstream code that does `rec.Column(i)`
will panic. The empty record should have the right schema *and* empty arrays
for each field.
##########
table/rewrite_data_files.go:
##########
@@ -328,6 +336,18 @@ func ExecuteCompactionGroup(ctx context.Context, tbl
*Table, group CompactionTas
}, nil
}
+// hasRowLineage returns true if any task in the group has a non-nil
FirstRowID,
Review Comment:
The comment says "indicating the table has v3 row lineage data to preserve",
but the body checks per-task `FirstRowID` — a per-file property, not a table
property. A v3 table can legitimately have a mix of tasks where some have
`FirstRowID` and some don't (legacy files), so "any task has it" doesn't really
mean "the table has lineage".
I'd either rename to `anyTaskHasRowLineage` (and fix the comment to match),
or — better — gate on `tbl.metadata.Version() >= 3` at the call site and
rewrite this to mean "all tasks have lineage" so we don't trip the
mixed-lineage bug above.
##########
table/table.go:
##########
@@ -816,6 +816,14 @@ func WithOptions(opts iceberg.Properties) ScanOption {
}
}
+// WithRowLineage adds _row_id to the scan projection so that row identity is
+// preserved through rewrites. Only meaningful for v3 tables; ignored
otherwise.
+func WithRowLineage() ScanOption {
Review Comment:
The name `WithRowLineage` suggests it controls the whole row-lineage
feature, but it only adds `_row_id` to the projection —
`_last_updated_sequence_number` is governed elsewhere. That's confusing for
callers who'd expect one switch to mean one thing.
Could we either rename to scope it (e.g. `WithRowID`) or extend it to also
project `_last_updated_sequence_number`? wdyt?
##########
table/transaction.go:
##########
@@ -1542,11 +1562,19 @@ func (t *Transaction) rewriteSingleFile(ctx
context.Context, fs io.IO, originalF
return nil, fmt.Errorf("failed to build metadata: %w", err)
}
+ // When preserving row lineage, omit the row filter from the scanner so
that
+ // _row_id is synthesized using the true file-level row positions. The
filter
+ // is applied below in the record iterator.
+ scanFilter := iceberg.BooleanExpression(iceberg.AlwaysTrue{})
Review Comment:
Passing `AlwaysTrue` here disables both manifest-level and file-level
pushdown for the rewrite scan when row lineage is on. For a selective delete on
a wide partition that's a lot of unnecessary IO — we read every file in scope
just to filter post-synthesis.
Worst case this is unavoidable because `_row_id` synthesis needs original
positions, but the comment treats it as obviously fine. At minimum I'd leave a
TODO calling out the cost, ideally we'd still apply file-level (not row-level)
skipping based on `boundFilter`'s residuals.
##########
table/row_lineage_rewrite_test.go:
##########
@@ -0,0 +1,188 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table_test
+
+import (
+ "context"
+ "path/filepath"
+ "testing"
+
+ "github.com/apache/arrow-go/v18/arrow"
+ "github.com/apache/arrow-go/v18/arrow/array"
+ "github.com/apache/arrow-go/v18/arrow/memory"
+ "github.com/apache/iceberg-go"
+ iceio "github.com/apache/iceberg-go/io"
+ "github.com/apache/iceberg-go/table"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+func newV3RowLineageTestTable(t *testing.T) *table.Table {
+ t.Helper()
+
+ location := filepath.ToSlash(t.TempDir())
+ schema := iceberg.NewSchema(0,
+ iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int64, Required: true},
+ iceberg.NestedField{ID: 2, Name: "data", Type:
iceberg.PrimitiveTypes.String, Required: false},
+ )
+ meta, err := table.NewMetadata(schema, iceberg.UnpartitionedSpec,
table.UnsortedSortOrder, location,
+ iceberg.Properties{table.PropertyFormatVersion: "3"})
+ require.NoError(t, err)
+
+ metaLoc := location + "/metadata/v1.metadata.json"
+ fsF := func(context.Context) (iceio.IO, error) { return
iceio.LocalFS{}, nil }
+ cat := &concurrentTestCatalog{metadata: meta, location: metaLoc, fsF:
fsF}
+
+ return table.New(table.Identifier{"db", "row_lineage_test"}, meta,
metaLoc, fsF, cat)
+}
+
+// TestCoWRewritePreservesRowID verifies that a copy-on-write overwrite with a
+// row filter preserves the original _row_id values in the rewritten file. The
+// _last_updated_sequence_number should be null (inherited from
data_sequence_number).
+func TestCoWRewritePreservesRowID(t *testing.T) {
+ ctx := context.Background()
+ mem := memory.DefaultAllocator
+
+ tbl := newV3RowLineageTestTable(t)
+
+ // Append 3 rows: id=1,2,3
+ arrowSchema := arrow.NewSchema([]arrow.Field{
+ {Name: "id", Type: arrow.PrimitiveTypes.Int64, Nullable: false},
+ {Name: "data", Type: arrow.BinaryTypes.String, Nullable: true},
+ }, nil)
+
+ initialData, err := array.TableFromJSON(mem, arrowSchema, []string{
+ `[{"id": 1, "data": "a"}, {"id": 2, "data": "b"}, {"id": 3,
"data": "c"}]`,
+ })
+ require.NoError(t, err)
+ defer initialData.Release()
+
+ tbl, err = tbl.Append(ctx, array.NewTableReader(initialData, -1), nil)
+ require.NoError(t, err)
+
+ // Verify the append created a valid v3 snapshot with row lineage.
+ snap := tbl.CurrentSnapshot()
+ require.NotNil(t, snap)
+ require.NotNil(t, snap.FirstRowID, "v3 snapshot must have first-row-id")
+ require.NotNil(t, snap.AddedRows, "v3 snapshot must have added-rows")
+ assert.Equal(t, int64(0), *snap.FirstRowID)
+ assert.Equal(t, int64(3), *snap.AddedRows)
+
+ // Scan with row lineage to see synthesized _row_id values before the
rewrite.
+ lineageScan := tbl.Scan(table.WithRowLineage())
+ schema, itr, err := lineageScan.ToArrowRecords(ctx)
+ require.NoError(t, err)
+
+ rowIDIdx := -1
+ for i, f := range schema.Fields() {
+ if f.Name == iceberg.RowIDColumnName {
+ rowIDIdx = i
+
+ break
+ }
+ }
+ require.GreaterOrEqual(t, rowIDIdx, 0, "_row_id should be in scan
projection")
+
+ var originalRowIDs []int64
+ for rec, err := range itr {
+ require.NoError(t, err)
+ col := rec.Column(rowIDIdx).(*array.Int64)
+ for i := 0; i < col.Len(); i++ {
+ originalRowIDs = append(originalRowIDs, col.Value(i))
+ }
+ rec.Release()
+ }
+ require.Equal(t, []int64{0, 1, 2}, originalRowIDs, "initial _row_id
should be 0,1,2")
+
+ // CoW overwrite: delete the row where id=2, preserving id=1 and id=3.
+ filter := iceberg.EqualTo(iceberg.Reference("id"), int64(2))
+ tbl, err = tbl.Delete(ctx, filter, nil)
+ require.NoError(t, err)
+
+ snap = tbl.CurrentSnapshot()
+ require.NotNil(t, snap)
+
+ // Scan the result with row lineage. The surviving rows should preserve
their
+ // original _row_id values: 0 and 2.
+ lineageScan = tbl.Scan(table.WithRowLineage())
+ _, itr, err = lineageScan.ToArrowRecords(ctx)
+ require.NoError(t, err)
+
+ var afterRowIDs []int64
+ var afterIDs []int64
+ for rec, err := range itr {
+ require.NoError(t, err)
+ idIdx := rec.Schema().FieldIndices("id")
+ require.NotEmpty(t, idIdx)
+ rowIDIndices :=
rec.Schema().FieldIndices(iceberg.RowIDColumnName)
+ require.NotEmpty(t, rowIDIndices)
+
+ idCol := rec.Column(idIdx[0]).(*array.Int64)
+ rowIDCol := rec.Column(rowIDIndices[0]).(*array.Int64)
+ for i := 0; i < int(rec.NumRows()); i++ {
+ afterIDs = append(afterIDs, idCol.Value(i))
+ afterRowIDs = append(afterRowIDs, rowIDCol.Value(i))
+ }
+ rec.Release()
+ }
+
+ assert.Equal(t, []int64{1, 3}, afterIDs, "remaining rows should be
id=1,3")
+ assert.Equal(t, []int64{0, 2}, afterRowIDs,
+ "_row_id must be preserved through CoW rewrite: row with id=1
keeps _row_id=0, row with id=3 keeps _row_id=2")
+}
+
+// TestCoWRewriteRowIDNextRowIDAccounting verifies that row-id accounting
remains
+// correct after a CoW rewrite. The overcounting (where next-row-id advances by
+// the full manifest row count including preserved survivors) is intentional
and
+// matches Java's ManifestListWriter.V4Writer behavior.
+func TestCoWRewriteRowIDNextRowIDAccounting(t *testing.T) {
+ ctx := context.Background()
+ mem := memory.DefaultAllocator
+
+ tbl := newV3RowLineageTestTable(t)
+
+ arrowSchema := arrow.NewSchema([]arrow.Field{
+ {Name: "id", Type: arrow.PrimitiveTypes.Int64, Nullable: false},
+ {Name: "data", Type: arrow.BinaryTypes.String, Nullable: true},
+ }, nil)
+
+ data, err := array.TableFromJSON(mem, arrowSchema, []string{
+ `[{"id": 10, "data": "x"}, {"id": 20, "data": "y"}, {"id": 30,
"data": "z"}]`,
+ })
+ require.NoError(t, err)
+ defer data.Release()
+
+ tbl, err = tbl.Append(ctx, array.NewTableReader(data, -1), nil)
+ require.NoError(t, err)
+
+ // next-row-id should be 3 after appending 3 rows.
+ assert.Equal(t, int64(3), tbl.Metadata().NextRowID())
+
+ // Delete one row via CoW.
+ filter := iceberg.EqualTo(iceberg.Reference("id"), int64(20))
+ tbl, err = tbl.Delete(ctx, filter, nil)
+ require.NoError(t, err)
+
+ // next-row-id advances by the rewritten manifest's total row count (2
surviving
+ // rows), even though those rows preserve old IDs. This "wastes" ID
space but
+ // doesn't violate uniqueness — actual row IDs come from the explicit
Parquet
+ // column, not the global counter.
+ nextRowID := tbl.Metadata().NextRowID()
+ assert.GreaterOrEqual(t, nextRowID, int64(5),
Review Comment:
`GreaterOrEqual(..., int64(5))` makes the assertion vacuous in the loose
direction — it passes even if next-row-id leapt to some huge value because of a
bug elsewhere. And the comment above says the expected accounting is "original
+ rewritten = 3 + 2 = 5", so we actually know the expected value.
I'd assert `Equal(t, int64(5), nextRowID)` (or whatever the exact
post-rewrite expectation is). As written, this test would still pass if
next-row-id accounting drifted in either direction.
##########
table/snapshot_producers.go:
##########
@@ -917,6 +917,11 @@ func (sp *snapshotProducer) commit(ctx context.Context) (_
[]Update, _ []Require
return nil, nil, err
}
if writer.NextRowID() != nil {
+ // addedRows counts ALL rows in new manifests (existing
+ added), even
+ // for rewrites where survivors preserve old _row_id
values. This
+ // "wastes" ID space but doesn't violate uniqueness:
actual row IDs come
+ // from the explicit Parquet column, not the global
counter. Java's
+ // ManifestListWriter.V4Writer uses the same accounting.
Review Comment:
`ManifestListWriter.V4Writer` looks like a typo — Java's manifest-list
writer for row lineage is `V3Writer` (the v3 format-version writer); there is
no V4 manifest. Unless there's a v4 draft I'm missing, the comment should read
`V3Writer`.
Misleading provenance comments are extra costly because the next reader
trying to verify parity will hunt for a class that doesn't exist.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]