tanmayrauth commented on code in PR #1099:
URL: https://github.com/apache/iceberg-go/pull/1099#discussion_r3291870842
##########
table/rewrite_data_files.go:
##########
@@ -289,6 +289,20 @@ func ExecuteCompactionGroup(ctx context.Context, tbl
*Table, group CompactionTas
scanOpts = append(scanOpts,
WitMaxConcurrency(cfg.scanConcurrency))
}
+ // Preserve row lineage only when every source file in the group carries
+ // it. A mixed group (some files with FirstRowID, some without — e.g.
+ // legacy files on a v3 table) would otherwise produce one output where
+ // post-lineage rows have explicit _row_id values and pre-lineage rows
+ // have nulls, which violates the per-file uniqueness/coverage
+ // invariant the v3 spec requires. Splitting mixed groups into separate
+ // outputs is a larger refactor and is left as a follow-up; for now we
+ // degrade gracefully (the rewrite still succeeds, but lineage is not
+ // preserved for the surviving rows).
+ preserveLineage := tbl.metadata.Version() >= 3 &&
allTasksHaveRowLineage(group.Tasks)
Review Comment:
Added a slog.Warn matching the v3 pos-delete pattern in
ExecuteCompactionGroup when a v3 table sees a mixed group with at least one
lineage file. Holding off on the RowLineagePreserved bool on
CompactionGroupResult for now — the log surfaces the silent loss, and adding to
the result struct locks in API surface for what should be a transitional
migration concern. Happy to add it as a follow-up if it shows up in practice.
##########
table/scanner_internal_test.go:
##########
@@ -238,98 +236,71 @@ func TestBuildPartitionEvaluatorWithInvalidSpecID(t
*testing.T) {
assert.ErrorContains(t, err, "id 999")
}
-// TestProjectionV3PreLineageFile verifies that Projection() succeeds and
returns
-// _row_id and _last_updated_sequence_number as nullable (all-null-capable)
fields when
-// the table is v3 with next-row-id set but the data file predates row lineage
(those
-// columns are absent from the schema).
-func TestProjectionV3PreLineageFile(t *testing.T) {
- schema := iceberg.NewSchema(
- 1,
- iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int64, Required: true},
- iceberg.NestedField{ID: 2, Name: "payload", Type:
iceberg.PrimitiveTypes.String, Required: false},
- )
+// TestSynthesizeRowLineageColumns verifies that _row_id and
_last_updated_sequence_number
+// are filled from task constants when those columns are present and null.
+func TestSynthesizeRowLineageColumns(t *testing.T) {
Review Comment:
Added TestSynthesizeRowLineageColumnsPreservesExplicit with the 3-row mixed
fixture (explicit / null / explicit-different) for both _row_id and
_last_updated_sequence_number. Asserts the non-null rows survive untouched and
the null row picks up firstRowID + position / task.DataSequenceNumber.
##########
table/transaction.go:
##########
@@ -1488,13 +1501,23 @@ func (t *Transaction)
classifyFilesForFilteredDeletions(ctx context.Context, fs
localDelete = append(localDelete, df)
} else {
localRewrite = append(localRewrite, df)
+ // Capture the file's sequence number
from the
+ // manifest entry so the rewrite path
can synthesize
+ // _last_updated_sequence_number for
source rows
+ // that have a null value (or no
column) in the file.
+ if fseq := entry.FileSequenceNum();
fseq != nil {
Review Comment:
Good catch, switched both sites to entry.SequenceNum() and added a guard on
>= 0 (since SequenceNum() returns -1 for unset). Left
FileScanTask.DataSequenceNumber named as-is since the field name already
matches what we're now putting in it (data sequence number); the bug was the
source, not the destination.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]