This is an automated email from the ASF dual-hosted git repository.

laskoviymishka pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 1a446112 feat(table): project reserved row-lineage fields as null when 
the file lacks them (#1045)
1a446112 is described below

commit 1a446112ccb7a3453853ab5c5f10c7aa24cfb945
Author: David Zhao <[email protected]>
AuthorDate: Mon May 18 23:36:57 2026 +0200

    feat(table): project reserved row-lineage fields as null when the file 
lacks them (#1045)
    
    fixes #1010.
    
    ---------
    
    Signed-off-by: happydave1 <[email protected]>
---
 schema_test.go                 |  36 ++++
 table/scanner.go               |  95 +++++++++++
 table/scanner_internal_test.go | 369 +++++++++++++++++++++++++++++++++++++++++
 3 files changed, 500 insertions(+)

diff --git a/schema_test.go b/schema_test.go
index 0ae676b3..b0919ede 100644
--- a/schema_test.go
+++ b/schema_test.go
@@ -1035,3 +1035,39 @@ func TestSanitizeColumnNamesEmptyFieldName(t *testing.T) 
{
        assert.ErrorIs(t, err, iceberg.ErrInvalidSchema)
        assert.ErrorContains(t, err, "field name cannot be empty")
 }
+
+func TestSchemaSelectCaseSensitiveSuccess(t *testing.T) {
+       selected, err := tableSchemaSimple.Select(true, "foo", "bar")
+       require.NoError(t, err)
+
+       expected := iceberg.NewSchemaWithIdentifiers(1, []int{2},
+               iceberg.NestedField{ID: 1, Name: "foo", Type: 
iceberg.PrimitiveTypes.String},
+               iceberg.NestedField{ID: 2, Name: "bar", Type: 
iceberg.PrimitiveTypes.Int32, Required: true},
+       )
+       assert.Truef(t, selected.Equals(expected), "expected: %s\ngot: %s", 
expected, selected)
+}
+
+func TestSchemaSelectCaseSensitiveNameMismatch(t *testing.T) {
+       _, err := tableSchemaSimple.Select(true, "FOO")
+       require.Error(t, err)
+       assert.ErrorIs(t, err, iceberg.ErrInvalidSchema)
+       assert.ErrorContains(t, err, "could not find column FOO")
+}
+
+func TestSchemaSelectCaseInsensitiveSuccess(t *testing.T) {
+       selected, err := tableSchemaSimple.Select(false, "FOO", "BaR")
+       require.NoError(t, err)
+
+       expected := iceberg.NewSchemaWithIdentifiers(1, []int{2},
+               iceberg.NestedField{ID: 1, Name: "foo", Type: 
iceberg.PrimitiveTypes.String},
+               iceberg.NestedField{ID: 2, Name: "bar", Type: 
iceberg.PrimitiveTypes.Int32, Required: true},
+       )
+       assert.Truef(t, selected.Equals(expected), "expected: %s\ngot: %s", 
expected, selected)
+}
+
+func TestSchemaSelectCaseInsensitiveMissingColumn(t *testing.T) {
+       _, err := tableSchemaSimple.Select(false, "missing_col")
+       require.Error(t, err)
+       assert.ErrorIs(t, err, iceberg.ErrInvalidSchema)
+       assert.ErrorContains(t, err, "could not find column missing_col")
+}
diff --git a/table/scanner.go b/table/scanner.go
index 7263c028..b5a9b15d 100644
--- a/table/scanner.go
+++ b/table/scanner.go
@@ -24,6 +24,7 @@ import (
        "iter"
        "math"
        "slices"
+       "strings"
        "sync"
 
        "github.com/apache/arrow-go/v18/arrow"
@@ -241,6 +242,8 @@ func (scan *Scan) Snapshot() *Snapshot {
 
 func (scan *Scan) Projection() (*iceberg.Schema, error) {
        curSchema := scan.metadata.CurrentSchema()
+       curVersion := scan.metadata.Version()
+       caseSensitive := scan.caseSensitive
        if scan.snapshotID != nil {
                snap := scan.metadata.SnapshotByID(*scan.snapshotID)
                if snap == nil {
@@ -262,6 +265,21 @@ func (scan *Scan) Projection() (*iceberg.Schema, error) {
                return curSchema, nil
        }
 
+       selectedFieldsMeta := metaFieldsFromSelectedFields(scan.selectedFields, 
caseSensitive)
+       schemaMeta := metaFieldsFromSchema(curSchema)
+       synthesisMeta := synthesizeMeta(selectedFieldsMeta, schemaMeta)
+       if len(synthesisMeta) > 0 && curVersion >= minFormatVersionRowLineage {
+
+               // synthesis path
+               removedMetaSlice, missingMetaFields := 
removeMetadataFromSelectedFields(scan.selectedFields, synthesisMeta)
+               sch, err := curSchema.Select(scan.caseSensitive, 
removedMetaSlice...)
+               if err != nil {
+                       return nil, err
+               }
+
+               return iceberg.NewSchemaWithIdentifiers(sch.ID, 
sch.IdentifierFieldIDs, append(sch.Fields(), missingMetaFields...)...), nil
+       }
+
        return curSchema.Select(scan.caseSensitive, scan.selectedFields...)
 }
 
@@ -674,3 +692,80 @@ func (scan *Scan) ToArrowTable(ctx context.Context) 
(arrow.Table, error) {
 
        return array.NewTableFromRecords(schema, records), nil
 }
+
+// Removes metaFields from selectedField if it exists. Returns a []string 
representing the filtered selectedFields
+// and an iceberg.NestedField[] representing the removed metadata. Note that 
metaFields is passed in
+// after being validated from metaFieldsFromSelectedFields.
+func removeMetadataFromSelectedFields(selectedFields []string, metaFields 
[]string) ([]string, []iceberg.NestedField) {
+       filteredFields := []string{}
+       meta := []iceberg.NestedField{}
+
+       for _, field := range selectedFields {
+               if slices.Contains(metaFields, strings.ToLower(field)) {
+
+                       switch strings.ToLower(field) {
+                       case iceberg.LastUpdatedSequenceNumberColumnName:
+                               meta = append(meta, 
iceberg.LastUpdatedSequenceNumber())
+                       case iceberg.RowIDColumnName:
+                               meta = append(meta, iceberg.RowID())
+                       }
+
+                       continue
+               }
+
+               filteredFields = append(filteredFields, field)
+       }
+
+       return filteredFields, meta
+}
+
+func metaFieldsFromSelectedFields(selectedFields []string, caseSensitive bool) 
[]string {
+       meta := []string{}
+       if !caseSensitive {
+               for _, field := range selectedFields {
+                       if strings.EqualFold(field, iceberg.RowIDColumnName) || 
strings.EqualFold(field, iceberg.LastUpdatedSequenceNumberColumnName) {
+                               meta = append(meta, strings.ToLower(field))
+                       }
+               }
+
+               return meta
+       }
+
+       for _, field := range selectedFields {
+               if field == iceberg.RowIDColumnName || field == 
iceberg.LastUpdatedSequenceNumberColumnName {
+                       meta = append(meta, strings.ToLower(field))
+               }
+       }
+
+       return meta
+}
+
+// Takes in a *iceberg.Schema and returns a []string representing the row 
lineage metadata present
+// in the schema.
+func metaFieldsFromSchema(sch *iceberg.Schema) []string {
+       meta := []string{}
+       _, hasRowIDMeta := sch.FindFieldByName(iceberg.RowIDColumnName)
+       _, hasSeqMeta := 
sch.FindFieldByName(iceberg.LastUpdatedSequenceNumberColumnName)
+
+       if hasRowIDMeta {
+               meta = append(meta, iceberg.RowIDColumnName)
+       }
+       if hasSeqMeta {
+               meta = append(meta, iceberg.LastUpdatedSequenceNumberColumnName)
+       }
+
+       return meta
+}
+
+// Any metadata which is in selectedFieldsMeta and not in schemaMeta is a 
synthesis meta
+func synthesizeMeta(selectedFieldsMeta []string, schemaMeta []string) []string 
{
+       synthesis := []string{}
+
+       for _, f := range selectedFieldsMeta {
+               if !slices.Contains(schemaMeta, f) {
+                       synthesis = append(synthesis, f)
+               }
+       }
+
+       return synthesis
+}
diff --git a/table/scanner_internal_test.go b/table/scanner_internal_test.go
index d5808728..789d9906 100644
--- a/table/scanner_internal_test.go
+++ b/table/scanner_internal_test.go
@@ -19,6 +19,8 @@ package table
 
 import (
        "runtime"
+       "slices"
+       "strconv"
        "sync"
        "sync/atomic"
        "testing"
@@ -236,6 +238,330 @@ func TestBuildPartitionEvaluatorWithInvalidSpecID(t 
*testing.T) {
        assert.ErrorContains(t, err, "id 999")
 }
 
+// TestProjectionV3PreLineageFile verifies that Projection() succeeds and 
returns
+// _row_id and _last_updated_sequence_number as nullable (all-null-capable) 
fields when
+// the table is v3 with next-row-id set but the data file predates row lineage 
(those
+// columns are absent from the schema).
+func TestProjectionV3PreLineageFile(t *testing.T) {
+       schema := iceberg.NewSchema(
+               1,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
+               iceberg.NestedField{ID: 2, Name: "payload", Type: 
iceberg.PrimitiveTypes.String, Required: false},
+       )
+
+       metadata, err := NewMetadata(
+               schema,
+               iceberg.UnpartitionedSpec,
+               UnsortedSortOrder,
+               "s3://test-bucket/test_table",
+               iceberg.Properties{"format-version": "3"},
+       )
+       require.NoError(t, err)
+       assert.Equal(t, 3, metadata.Version(), "sanity: must be v3")
+
+       // Request the two user columns plus both row-lineage metadata columns.
+       // These metadata columns do NOT exist in the physical schema of a 
pre-lineage file.
+       scan := &Scan{
+               metadata:       metadata,
+               selectedFields: []string{"id", "payload", 
iceberg.RowIDColumnName, iceberg.LastUpdatedSequenceNumberColumnName},
+               caseSensitive:  true,
+       }
+
+       proj, err := scan.Projection()
+       require.NoError(t, err, "Projection must not error for pre-lineage 
metadata columns")
+       require.NotNil(t, proj)
+
+       fields := proj.Fields()
+       require.Len(t, fields, 4, "projected schema must contain all four 
requested fields")
+
+       fieldByName := make(map[string]iceberg.NestedField, len(fields))
+       for _, f := range fields {
+               fieldByName[f.Name] = f
+       }
+
+       // Regular columns must survive unchanged.
+       idField, ok := fieldByName["id"]
+       require.True(t, ok, "id must be in projection")
+       assert.Equal(t, 1, idField.ID)
+
+       payloadField, ok := fieldByName["payload"]
+       require.True(t, ok, "payload must be in projection")
+       assert.Equal(t, 2, payloadField.ID)
+
+       // Row lineage columns must be present as optional (nullable) fields — 
the scanner
+       // will return all-nulls for any data file that was written before row 
lineage existed.
+       rowIDField, ok := fieldByName[iceberg.RowIDColumnName]
+       require.True(t, ok, "_row_id must be in projection")
+       assert.Equal(t, iceberg.RowIDFieldID, rowIDField.ID, "_row_id field ID")
+       assert.False(t, rowIDField.Required, "_row_id must be optional 
(nullable) for pre-lineage files")
+
+       seqField, ok := fieldByName[iceberg.LastUpdatedSequenceNumberColumnName]
+       require.True(t, ok, "_last_updated_sequence_number must be in 
projection")
+       assert.Equal(t, iceberg.LastUpdatedSequenceNumberFieldID, seqField.ID, 
"_last_updated_sequence_number field ID")
+       assert.False(t, seqField.Required, "_last_updated_sequence_number must 
be optional (nullable) for pre-lineage files")
+}
+
+func TestProjectionV3PreLineageFileCaseSensitive(t *testing.T) {
+       schema := iceberg.NewSchema(
+               1,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
+               iceberg.NestedField{ID: 2, Name: "payload", Type: 
iceberg.PrimitiveTypes.String, Required: false},
+       )
+
+       metadata, err := NewMetadata(
+               schema,
+               iceberg.UnpartitionedSpec,
+               UnsortedSortOrder,
+               "s3://test-bucket/test_table",
+               iceberg.Properties{"format-version": "3"},
+       )
+       require.NoError(t, err)
+       assert.Equal(t, 3, metadata.Version(), "sanity: must be v3")
+
+       scan := &Scan{
+               metadata:       metadata,
+               selectedFields: []string{"id", "payload", "_Row_Id"},
+               caseSensitive:  true,
+       }
+
+       _, err = scan.Projection()
+       require.Error(t, err)
+       require.ErrorContains(t, err, "could not find column _Row_Id")
+}
+
+func TestProjectionV3PreLineageFileCaseInsensitive(t *testing.T) {
+       schema := iceberg.NewSchema(
+               1,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
+               iceberg.NestedField{ID: 2, Name: "payload", Type: 
iceberg.PrimitiveTypes.String, Required: false},
+       )
+
+       metadata, err := NewMetadata(
+               schema,
+               iceberg.UnpartitionedSpec,
+               UnsortedSortOrder,
+               "s3://test-bucket/test_table",
+               iceberg.Properties{"format-version": "3"},
+       )
+       require.NoError(t, err)
+       assert.Equal(t, 3, metadata.Version(), "sanity: must be v3")
+
+       scan := &Scan{
+               metadata:       metadata,
+               selectedFields: []string{"id", "payload", "_Row_Id", 
"_Last_Updated_SEQUENCE_number"},
+               caseSensitive:  false,
+       }
+
+       proj, err := scan.Projection()
+       require.NoError(t, err)
+       require.NotNil(t, proj)
+
+       fields := proj.Fields()
+       require.Len(t, fields, 4, "projected schema must contain all four 
requested fields")
+
+       fieldByName := make(map[string]iceberg.NestedField, len(fields))
+       for _, f := range fields {
+               fieldByName[f.Name] = f
+       }
+
+       idField, ok := fieldByName["id"]
+       require.True(t, ok, "id must be in projection")
+       assert.Equal(t, 1, idField.ID)
+
+       payloadField, ok := fieldByName["payload"]
+       require.True(t, ok, "payload must be in projection")
+       assert.Equal(t, 2, payloadField.ID)
+
+       rowIDField, ok := fieldByName[iceberg.RowIDColumnName]
+       require.True(t, ok, "_row_id must be in projection")
+       assert.Equal(t, iceberg.RowIDFieldID, rowIDField.ID, "_row_id field ID")
+       assert.False(t, rowIDField.Required, "_row_id must be optional 
(nullable) for pre-lineage files")
+
+       seqField, ok := fieldByName[iceberg.LastUpdatedSequenceNumberColumnName]
+       require.True(t, ok, "_last_updated_sequence_number must be in 
projection")
+       assert.Equal(t, iceberg.LastUpdatedSequenceNumberFieldID, seqField.ID, 
"_last_updated_sequence_number field ID")
+       assert.False(t, seqField.Required, "_last_updated_sequence_number must 
be optional (nullable) for pre-lineage files")
+}
+
+// TestProjectionV2RowLineage asserts that requesting row-lineage metadata 
columns on a v1 or v2
+// table does not use the v3-only synthesis path
+func TestProjectionV2RowLineage(t *testing.T) {
+       schema := iceberg.NewSchema(
+               1,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
+               iceberg.NestedField{ID: 2, Name: "payload", Type: 
iceberg.PrimitiveTypes.String, Required: false},
+       )
+
+       for _, tc := range []struct {
+               name string
+               ver  int
+       }{
+               {name: "v1", ver: 1},
+               {name: "v2", ver: 2},
+       } {
+               t.Run(tc.name, func(t *testing.T) {
+                       metadata, err := NewMetadata(
+                               schema,
+                               iceberg.UnpartitionedSpec,
+                               UnsortedSortOrder,
+                               "s3://test-bucket/test_table",
+                               iceberg.Properties{PropertyFormatVersion: 
strconv.Itoa(tc.ver)},
+                       )
+                       require.NoError(t, err)
+                       assert.Equal(t, tc.ver, metadata.Version(), "sanity: 
metadata format version")
+
+                       scan := &Scan{
+                               metadata:       metadata,
+                               selectedFields: []string{"id", 
iceberg.RowIDColumnName},
+                               caseSensitive:  true,
+                       }
+
+                       _, err = scan.Projection()
+                       require.Error(t, err)
+                       assert.ErrorIs(t, err, iceberg.ErrInvalidSchema)
+                       assert.ErrorContains(t, err, iceberg.RowIDColumnName)
+               })
+       }
+}
+
+// TestProjectionV3SchemaWithRowIDOnly covers a v3 table whose schema
+// already declares _row_id (reserved field id) but does not declare 
_last_updated_sequence_number.
+func TestProjectionV3SchemaWithRowIDOnly(t *testing.T) {
+       schema := iceberg.NewSchema(
+               1,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
+               iceberg.NestedField{ID: 2, Name: "payload", Type: 
iceberg.PrimitiveTypes.String, Required: false},
+               iceberg.RowID(),
+       )
+
+       metadata, err := NewMetadata(
+               schema,
+               iceberg.UnpartitionedSpec,
+               UnsortedSortOrder,
+               "s3://test-bucket/test_table",
+               iceberg.Properties{"format-version": "3"},
+       )
+       require.NoError(t, err)
+       assert.Equal(t, 3, metadata.Version(), "sanity: must be v3")
+
+       scan := &Scan{
+               metadata: metadata,
+               selectedFields: []string{
+                       "id", "payload",
+                       iceberg.RowIDColumnName,
+                       iceberg.LastUpdatedSequenceNumberColumnName,
+               },
+               caseSensitive: true,
+       }
+
+       var proj *iceberg.Schema
+       require.NotPanics(t, func() {
+               var perr error
+               proj, perr = scan.Projection()
+               require.NoError(t, perr)
+       })
+       require.NotNil(t, proj)
+
+       fields := proj.Fields()
+       require.Len(t, fields, 4, "projection must include id, payload, 
_row_id, _last_updated_sequence_number")
+
+       fieldByName := make(map[string]iceberg.NestedField, len(fields))
+       idsSeen := make(map[int]string, len(fields))
+       for _, f := range fields {
+               if prev, dup := idsSeen[f.ID]; dup {
+                       t.Fatalf("duplicate field id %d: %q and %q", f.ID, 
prev, f.Name)
+               }
+               idsSeen[f.ID] = f.Name
+               fieldByName[f.Name] = f
+       }
+
+       idField, ok := fieldByName["id"]
+       require.True(t, ok)
+       assert.Equal(t, 1, idField.ID)
+
+       payloadField, ok := fieldByName["payload"]
+       require.True(t, ok)
+       assert.Equal(t, 2, payloadField.ID)
+
+       rowIDField, ok := fieldByName[iceberg.RowIDColumnName]
+       require.True(t, ok)
+       assert.NotEqual(t, iceberg.RowIDFieldID, rowIDField.ID) // NewMetadata 
reorders schema field numbers
+       assert.False(t, rowIDField.Required)
+
+       seqField, ok := fieldByName[iceberg.LastUpdatedSequenceNumberColumnName]
+       require.True(t, ok)
+       assert.Equal(t, iceberg.LastUpdatedSequenceNumberFieldID, seqField.ID)
+       assert.False(t, seqField.Required)
+}
+
+func TestProjectionV3SchemaWithLastUpdatedSequenceNumberOnly(t *testing.T) {
+       schema := iceberg.NewSchema(
+               1,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
+               iceberg.NestedField{ID: 2, Name: "payload", Type: 
iceberg.PrimitiveTypes.String, Required: false},
+               iceberg.LastUpdatedSequenceNumber(),
+       )
+
+       metadata, err := NewMetadata(
+               schema,
+               iceberg.UnpartitionedSpec,
+               UnsortedSortOrder,
+               "s3://test-bucket/test_table",
+               iceberg.Properties{"format-version": "3"},
+       )
+       require.NoError(t, err)
+       assert.Equal(t, 3, metadata.Version(), "sanity: must be v3")
+
+       scan := &Scan{
+               metadata: metadata,
+               selectedFields: []string{
+                       "id", "payload",
+                       iceberg.RowIDColumnName,
+                       iceberg.LastUpdatedSequenceNumberColumnName,
+               },
+               caseSensitive: true,
+       }
+
+       var proj *iceberg.Schema
+       require.NotPanics(t, func() {
+               var perr error
+               proj, perr = scan.Projection()
+               require.NoError(t, perr)
+       })
+       require.NotNil(t, proj)
+
+       fields := proj.Fields()
+       require.Len(t, fields, 4, "projection must include id, payload, 
_row_id, _last_updated_sequence_number")
+
+       fieldByName := make(map[string]iceberg.NestedField, len(fields))
+       idsSeen := make(map[int]string, len(fields))
+       for _, f := range fields {
+               if prev, dup := idsSeen[f.ID]; dup {
+                       t.Fatalf("duplicate field id %d: %q and %q", f.ID, 
prev, f.Name)
+               }
+               idsSeen[f.ID] = f.Name
+               fieldByName[f.Name] = f
+       }
+
+       idField, ok := fieldByName["id"]
+       require.True(t, ok)
+       assert.Equal(t, 1, idField.ID)
+
+       payloadField, ok := fieldByName["payload"]
+       require.True(t, ok)
+       assert.Equal(t, 2, payloadField.ID)
+
+       rowIDField, ok := fieldByName[iceberg.RowIDColumnName]
+       require.True(t, ok)
+       assert.Equal(t, iceberg.RowIDFieldID, rowIDField.ID)
+       assert.False(t, rowIDField.Required)
+
+       seqField, ok := fieldByName[iceberg.LastUpdatedSequenceNumberColumnName]
+       require.True(t, ok)
+       assert.NotEqual(t, iceberg.LastUpdatedSequenceNumberFieldID, 
seqField.ID) // NewMetadata reorders schema field numbers
+       assert.False(t, seqField.Required)
+}
+
 // TestSynthesizeRowLineageColumns verifies that _row_id and 
_last_updated_sequence_number
 // are filled from task constants when those columns are present and null.
 func TestSynthesizeRowLineageColumns(t *testing.T) {
@@ -295,3 +621,46 @@ func TestSynthesizeRowLineageColumns(t *testing.T) {
        }
        assert.EqualValues(t, 3, rowOffset)
 }
+
+func TestRemoveMetadataFromSelectedFields(t *testing.T) {
+       selectedFields := []string{
+               "id",
+               "payload",
+       }
+
+       metaFields := []string{
+               "_row_id",
+       }
+
+       sf, mf := removeMetadataFromSelectedFields(selectedFields, metaFields)
+
+       assert.Equal(t, 2, len(sf))
+       assert.Equal(t, 0, len(mf))
+
+       assert.True(t, slices.Contains(sf, "id"))
+       assert.True(t, slices.Contains(sf, "payload"))
+}
+
+func TestRemoveMetadataFromSelectedFieldsCasing(t *testing.T) {
+       selectedFields := []string{
+               "id",
+               "payload",
+               "_ROW_Id",
+               "lastupdatedsequence_number",
+       }
+
+       metaFields := []string{
+               "_row_id",
+               "_last_updated_sequence_number",
+       }
+
+       sf, mf := removeMetadataFromSelectedFields(selectedFields, metaFields)
+
+       assert.Equal(t, 3, len(sf))
+       assert.Equal(t, 1, len(mf))
+
+       assert.True(t, slices.Contains(sf, "id"))
+       assert.True(t, slices.Contains(sf, "payload"))
+       assert.True(t, slices.Contains(sf, "lastupdatedsequence_number"))
+       assert.True(t, slices.Contains(mf, iceberg.RowID()))
+}

Reply via email to