laskoviymishka commented on code in PR #1045:
URL: https://github.com/apache/iceberg-go/pull/1045#discussion_r3262323670


##########
table/scanner_internal_test.go:
##########
@@ -236,6 +238,330 @@ func TestBuildPartitionEvaluatorWithInvalidSpecID(t 
*testing.T) {
        assert.ErrorContains(t, err, "id 999")
 }
 
+// TestProjectionV3PreLineageFile verifies that Projection() succeeds and 
returns
+// _row_id and _last_updated_sequence_number as nullable (all-null-capable) 
fields when
+// the table is v3 with next-row-id set but the data file predates row lineage 
(those
+// columns are absent from the schema).
+func TestProjectionV3PreLineageFile(t *testing.T) {
+       schema := iceberg.NewSchema(
+               1,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
+               iceberg.NestedField{ID: 2, Name: "payload", Type: 
iceberg.PrimitiveTypes.String, Required: false},
+       )
+
+       metadata, err := NewMetadata(
+               schema,
+               iceberg.UnpartitionedSpec,
+               UnsortedSortOrder,
+               "s3://test-bucket/test_table",
+               iceberg.Properties{"format-version": "3"},
+       )
+       require.NoError(t, err)
+       assert.Equal(t, 3, metadata.Version(), "sanity: must be v3")
+
+       // Request the two user columns plus both row-lineage metadata columns.
+       // These metadata columns do NOT exist in the physical schema of a 
pre-lineage file.
+       scan := &Scan{
+               metadata:       metadata,
+               selectedFields: []string{"id", "payload", 
iceberg.RowIDColumnName, iceberg.LastUpdatedSequenceNumberColumnName},
+               caseSensitive:  true,
+       }
+
+       proj, err := scan.Projection()
+       require.NoError(t, err, "Projection must not error for pre-lineage 
metadata columns")
+       require.NotNil(t, proj)
+
+       fields := proj.Fields()
+       require.Len(t, fields, 4, "projected schema must contain all four 
requested fields")
+
+       fieldByName := make(map[string]iceberg.NestedField, len(fields))
+       for _, f := range fields {
+               fieldByName[f.Name] = f
+       }
+
+       // Regular columns must survive unchanged.
+       idField, ok := fieldByName["id"]
+       require.True(t, ok, "id must be in projection")
+       assert.Equal(t, 1, idField.ID)
+
+       payloadField, ok := fieldByName["payload"]
+       require.True(t, ok, "payload must be in projection")
+       assert.Equal(t, 2, payloadField.ID)
+
+       // Row lineage columns must be present as optional (nullable) fields — 
the scanner
+       // will return all-nulls for any data file that was written before row 
lineage existed.
+       rowIDField, ok := fieldByName[iceberg.RowIDColumnName]
+       require.True(t, ok, "_row_id must be in projection")
+       assert.Equal(t, iceberg.RowIDFieldID, rowIDField.ID, "_row_id field ID")
+       assert.False(t, rowIDField.Required, "_row_id must be optional 
(nullable) for pre-lineage files")
+
+       seqField, ok := fieldByName[iceberg.LastUpdatedSequenceNumberColumnName]
+       require.True(t, ok, "_last_updated_sequence_number must be in 
projection")
+       assert.Equal(t, iceberg.LastUpdatedSequenceNumberFieldID, seqField.ID, 
"_last_updated_sequence_number field ID")
+       assert.False(t, seqField.Required, "_last_updated_sequence_number must 
be optional (nullable) for pre-lineage files")
+}
+
+func TestProjectionV3PreLineageFileCaseSensitive(t *testing.T) {
+       schema := iceberg.NewSchema(
+               1,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
+               iceberg.NestedField{ID: 2, Name: "payload", Type: 
iceberg.PrimitiveTypes.String, Required: false},
+       )
+
+       metadata, err := NewMetadata(
+               schema,
+               iceberg.UnpartitionedSpec,
+               UnsortedSortOrder,
+               "s3://test-bucket/test_table",
+               iceberg.Properties{"format-version": "3"},
+       )
+       require.NoError(t, err)
+       assert.Equal(t, 3, metadata.Version(), "sanity: must be v3")
+
+       scan := &Scan{
+               metadata:       metadata,
+               selectedFields: []string{"id", "payload", "_Row_Id"},
+               caseSensitive:  true,
+       }
+
+       _, err = scan.Projection()
+       require.Error(t, err)
+       require.ErrorContains(t, err, "could not find column _Row_Id")
+}
+
+func TestProjectionV3PreLineageFileCaseInsensitive(t *testing.T) {
+       schema := iceberg.NewSchema(
+               1,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
+               iceberg.NestedField{ID: 2, Name: "payload", Type: 
iceberg.PrimitiveTypes.String, Required: false},
+       )
+
+       metadata, err := NewMetadata(
+               schema,
+               iceberg.UnpartitionedSpec,
+               UnsortedSortOrder,
+               "s3://test-bucket/test_table",
+               iceberg.Properties{"format-version": "3"},
+       )
+       require.NoError(t, err)
+       assert.Equal(t, 3, metadata.Version(), "sanity: must be v3")
+
+       scan := &Scan{
+               metadata:       metadata,
+               selectedFields: []string{"id", "payload", "_Row_Id", 
"_Last_Updated_SEQUENCE_number"},
+               caseSensitive:  false,
+       }
+
+       proj, err := scan.Projection()
+       require.NoError(t, err)
+       require.NotNil(t, proj)
+
+       fields := proj.Fields()
+       require.Len(t, fields, 4, "projected schema must contain all four 
requested fields")
+
+       fieldByName := make(map[string]iceberg.NestedField, len(fields))
+       for _, f := range fields {
+               fieldByName[f.Name] = f
+       }
+
+       idField, ok := fieldByName["id"]
+       require.True(t, ok, "id must be in projection")
+       assert.Equal(t, 1, idField.ID)
+
+       payloadField, ok := fieldByName["payload"]
+       require.True(t, ok, "payload must be in projection")
+       assert.Equal(t, 2, payloadField.ID)
+
+       rowIDField, ok := fieldByName[iceberg.RowIDColumnName]
+       require.True(t, ok, "_row_id must be in projection")
+       assert.Equal(t, iceberg.RowIDFieldID, rowIDField.ID, "_row_id field ID")
+       assert.False(t, rowIDField.Required, "_row_id must be optional 
(nullable) for pre-lineage files")
+
+       seqField, ok := fieldByName[iceberg.LastUpdatedSequenceNumberColumnName]
+       require.True(t, ok, "_last_updated_sequence_number must be in 
projection")
+       assert.Equal(t, iceberg.LastUpdatedSequenceNumberFieldID, seqField.ID, 
"_last_updated_sequence_number field ID")
+       assert.False(t, seqField.Required, "_last_updated_sequence_number must 
be optional (nullable) for pre-lineage files")
+}
+
+// TestProjectionV2RowLineage asserts that requesting row-lineage metadata 
columns on a v1 or v2
+// table does not use the v3-only synthesis path
+func TestProjectionV2RowLineage(t *testing.T) {
+       schema := iceberg.NewSchema(
+               1,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
+               iceberg.NestedField{ID: 2, Name: "payload", Type: 
iceberg.PrimitiveTypes.String, Required: false},
+       )
+
+       for _, tc := range []struct {
+               name string
+               ver  int
+       }{
+               {name: "v1", ver: 1},
+               {name: "v2", ver: 2},
+       } {
+               t.Run(tc.name, func(t *testing.T) {
+                       metadata, err := NewMetadata(
+                               schema,
+                               iceberg.UnpartitionedSpec,
+                               UnsortedSortOrder,
+                               "s3://test-bucket/test_table",
+                               iceberg.Properties{PropertyFormatVersion: 
strconv.Itoa(tc.ver)},
+                       )
+                       require.NoError(t, err)
+                       assert.Equal(t, tc.ver, metadata.Version(), "sanity: 
metadata format version")
+
+                       scan := &Scan{
+                               metadata:       metadata,
+                               selectedFields: []string{"id", 
iceberg.RowIDColumnName},
+                               caseSensitive:  true,
+                       }
+
+                       _, err = scan.Projection()
+                       require.Error(t, err)
+                       assert.ErrorIs(t, err, iceberg.ErrInvalidSchema)
+                       assert.ErrorContains(t, err, iceberg.RowIDColumnName)
+               })
+       }
+}
+
+// TestProjectionV3SchemaWithRowIDOnly covers a v3 table whose schema
+// already declares _row_id (reserved field id) but does not declare 
_last_updated_sequence_number.
+func TestProjectionV3SchemaWithRowIDOnly(t *testing.T) {
+       schema := iceberg.NewSchema(
+               1,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
+               iceberg.NestedField{ID: 2, Name: "payload", Type: 
iceberg.PrimitiveTypes.String, Required: false},
+               iceberg.RowID(),
+       )
+
+       metadata, err := NewMetadata(
+               schema,
+               iceberg.UnpartitionedSpec,
+               UnsortedSortOrder,
+               "s3://test-bucket/test_table",
+               iceberg.Properties{"format-version": "3"},
+       )
+       require.NoError(t, err)
+       assert.Equal(t, 3, metadata.Version(), "sanity: must be v3")
+
+       scan := &Scan{
+               metadata: metadata,
+               selectedFields: []string{
+                       "id", "payload",
+                       iceberg.RowIDColumnName,
+                       iceberg.LastUpdatedSequenceNumberColumnName,
+               },
+               caseSensitive: true,
+       }
+
+       var proj *iceberg.Schema
+       require.NotPanics(t, func() {
+               var perr error
+               proj, perr = scan.Projection()
+               require.NoError(t, perr)
+       })
+       require.NotNil(t, proj)
+
+       fields := proj.Fields()
+       require.Len(t, fields, 4, "projection must include id, payload, 
_row_id, _last_updated_sequence_number")
+
+       fieldByName := make(map[string]iceberg.NestedField, len(fields))
+       idsSeen := make(map[int]string, len(fields))
+       for _, f := range fields {
+               if prev, dup := idsSeen[f.ID]; dup {
+                       t.Fatalf("duplicate field id %d: %q and %q", f.ID, 
prev, f.Name)
+               }
+               idsSeen[f.ID] = f.Name
+               fieldByName[f.Name] = f
+       }
+
+       idField, ok := fieldByName["id"]
+       require.True(t, ok)
+       assert.Equal(t, 1, idField.ID)
+
+       payloadField, ok := fieldByName["payload"]
+       require.True(t, ok)
+       assert.Equal(t, 2, payloadField.ID)
+
+       rowIDField, ok := fieldByName[iceberg.RowIDColumnName]
+       require.True(t, ok)
+       assert.NotEqual(t, iceberg.RowIDFieldID, rowIDField.ID) // NewMetadata 
reorders schema field numbers

Review Comment:
   The comment captures the symptom, but the root cause is upstream — 
`AssignFreshSchemaIDs` (`schema.go:1366`) renumbers reserved metadata column 
IDs along with everything else. Concrete consequence outside this PR's scope: 
when a v3 table's `_row_id` ends up at id=3 in the projection, 
`arrowAccessor.FieldPartner` (`table/arrow_utils.go:677`) will look up id=3 in 
file schemas — Java-written files have `_row_id` at id=`MaxInt32-107`, so we'd 
silently read whichever column the Java file has at id=3 instead.
   
   Not asking to fix in this PR — would you mind filing a follow-up issue so 
the divergence is traceable? Either special-casing reserved ids in 
`AssignFreshSchemaIDs` or having `NewMetadata` reject metadata columns in the 
user schema (matching Java) are both reasonable directions to discuss there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to