laskoviymishka commented on code in PR #1045:
URL: https://github.com/apache/iceberg-go/pull/1045#discussion_r3262323670
##########
table/scanner_internal_test.go:
##########
@@ -236,6 +238,330 @@ func TestBuildPartitionEvaluatorWithInvalidSpecID(t
*testing.T) {
assert.ErrorContains(t, err, "id 999")
}
+// TestProjectionV3PreLineageFile verifies that Projection() succeeds and
returns
+// _row_id and _last_updated_sequence_number as nullable (all-null-capable)
fields when
+// the table is v3 with next-row-id set but the data file predates row lineage
(those
+// columns are absent from the schema).
+func TestProjectionV3PreLineageFile(t *testing.T) {
+ schema := iceberg.NewSchema(
+ 1,
+ iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int64, Required: true},
+ iceberg.NestedField{ID: 2, Name: "payload", Type:
iceberg.PrimitiveTypes.String, Required: false},
+ )
+
+ metadata, err := NewMetadata(
+ schema,
+ iceberg.UnpartitionedSpec,
+ UnsortedSortOrder,
+ "s3://test-bucket/test_table",
+ iceberg.Properties{"format-version": "3"},
+ )
+ require.NoError(t, err)
+ assert.Equal(t, 3, metadata.Version(), "sanity: must be v3")
+
+ // Request the two user columns plus both row-lineage metadata columns.
+ // These metadata columns do NOT exist in the physical schema of a
pre-lineage file.
+ scan := &Scan{
+ metadata: metadata,
+ selectedFields: []string{"id", "payload",
iceberg.RowIDColumnName, iceberg.LastUpdatedSequenceNumberColumnName},
+ caseSensitive: true,
+ }
+
+ proj, err := scan.Projection()
+ require.NoError(t, err, "Projection must not error for pre-lineage
metadata columns")
+ require.NotNil(t, proj)
+
+ fields := proj.Fields()
+ require.Len(t, fields, 4, "projected schema must contain all four
requested fields")
+
+ fieldByName := make(map[string]iceberg.NestedField, len(fields))
+ for _, f := range fields {
+ fieldByName[f.Name] = f
+ }
+
+ // Regular columns must survive unchanged.
+ idField, ok := fieldByName["id"]
+ require.True(t, ok, "id must be in projection")
+ assert.Equal(t, 1, idField.ID)
+
+ payloadField, ok := fieldByName["payload"]
+ require.True(t, ok, "payload must be in projection")
+ assert.Equal(t, 2, payloadField.ID)
+
+ // Row lineage columns must be present as optional (nullable) fields —
the scanner
+ // will return all-nulls for any data file that was written before row
lineage existed.
+ rowIDField, ok := fieldByName[iceberg.RowIDColumnName]
+ require.True(t, ok, "_row_id must be in projection")
+ assert.Equal(t, iceberg.RowIDFieldID, rowIDField.ID, "_row_id field ID")
+ assert.False(t, rowIDField.Required, "_row_id must be optional
(nullable) for pre-lineage files")
+
+ seqField, ok := fieldByName[iceberg.LastUpdatedSequenceNumberColumnName]
+ require.True(t, ok, "_last_updated_sequence_number must be in
projection")
+ assert.Equal(t, iceberg.LastUpdatedSequenceNumberFieldID, seqField.ID,
"_last_updated_sequence_number field ID")
+ assert.False(t, seqField.Required, "_last_updated_sequence_number must
be optional (nullable) for pre-lineage files")
+}
+
+func TestProjectionV3PreLineageFileCaseSensitive(t *testing.T) {
+ schema := iceberg.NewSchema(
+ 1,
+ iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int64, Required: true},
+ iceberg.NestedField{ID: 2, Name: "payload", Type:
iceberg.PrimitiveTypes.String, Required: false},
+ )
+
+ metadata, err := NewMetadata(
+ schema,
+ iceberg.UnpartitionedSpec,
+ UnsortedSortOrder,
+ "s3://test-bucket/test_table",
+ iceberg.Properties{"format-version": "3"},
+ )
+ require.NoError(t, err)
+ assert.Equal(t, 3, metadata.Version(), "sanity: must be v3")
+
+ scan := &Scan{
+ metadata: metadata,
+ selectedFields: []string{"id", "payload", "_Row_Id"},
+ caseSensitive: true,
+ }
+
+ _, err = scan.Projection()
+ require.Error(t, err)
+ require.ErrorContains(t, err, "could not find column _Row_Id")
+}
+
+func TestProjectionV3PreLineageFileCaseInsensitive(t *testing.T) {
+ schema := iceberg.NewSchema(
+ 1,
+ iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int64, Required: true},
+ iceberg.NestedField{ID: 2, Name: "payload", Type:
iceberg.PrimitiveTypes.String, Required: false},
+ )
+
+ metadata, err := NewMetadata(
+ schema,
+ iceberg.UnpartitionedSpec,
+ UnsortedSortOrder,
+ "s3://test-bucket/test_table",
+ iceberg.Properties{"format-version": "3"},
+ )
+ require.NoError(t, err)
+ assert.Equal(t, 3, metadata.Version(), "sanity: must be v3")
+
+ scan := &Scan{
+ metadata: metadata,
+ selectedFields: []string{"id", "payload", "_Row_Id",
"_Last_Updated_SEQUENCE_number"},
+ caseSensitive: false,
+ }
+
+ proj, err := scan.Projection()
+ require.NoError(t, err)
+ require.NotNil(t, proj)
+
+ fields := proj.Fields()
+ require.Len(t, fields, 4, "projected schema must contain all four
requested fields")
+
+ fieldByName := make(map[string]iceberg.NestedField, len(fields))
+ for _, f := range fields {
+ fieldByName[f.Name] = f
+ }
+
+ idField, ok := fieldByName["id"]
+ require.True(t, ok, "id must be in projection")
+ assert.Equal(t, 1, idField.ID)
+
+ payloadField, ok := fieldByName["payload"]
+ require.True(t, ok, "payload must be in projection")
+ assert.Equal(t, 2, payloadField.ID)
+
+ rowIDField, ok := fieldByName[iceberg.RowIDColumnName]
+ require.True(t, ok, "_row_id must be in projection")
+ assert.Equal(t, iceberg.RowIDFieldID, rowIDField.ID, "_row_id field ID")
+ assert.False(t, rowIDField.Required, "_row_id must be optional
(nullable) for pre-lineage files")
+
+ seqField, ok := fieldByName[iceberg.LastUpdatedSequenceNumberColumnName]
+ require.True(t, ok, "_last_updated_sequence_number must be in
projection")
+ assert.Equal(t, iceberg.LastUpdatedSequenceNumberFieldID, seqField.ID,
"_last_updated_sequence_number field ID")
+ assert.False(t, seqField.Required, "_last_updated_sequence_number must
be optional (nullable) for pre-lineage files")
+}
+
+// TestProjectionV2RowLineage asserts that requesting row-lineage metadata
columns on a v1 or v2
+// table does not use the v3-only synthesis path
+func TestProjectionV2RowLineage(t *testing.T) {
+ schema := iceberg.NewSchema(
+ 1,
+ iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int64, Required: true},
+ iceberg.NestedField{ID: 2, Name: "payload", Type:
iceberg.PrimitiveTypes.String, Required: false},
+ )
+
+ for _, tc := range []struct {
+ name string
+ ver int
+ }{
+ {name: "v1", ver: 1},
+ {name: "v2", ver: 2},
+ } {
+ t.Run(tc.name, func(t *testing.T) {
+ metadata, err := NewMetadata(
+ schema,
+ iceberg.UnpartitionedSpec,
+ UnsortedSortOrder,
+ "s3://test-bucket/test_table",
+ iceberg.Properties{PropertyFormatVersion:
strconv.Itoa(tc.ver)},
+ )
+ require.NoError(t, err)
+ assert.Equal(t, tc.ver, metadata.Version(), "sanity:
metadata format version")
+
+ scan := &Scan{
+ metadata: metadata,
+ selectedFields: []string{"id",
iceberg.RowIDColumnName},
+ caseSensitive: true,
+ }
+
+ _, err = scan.Projection()
+ require.Error(t, err)
+ assert.ErrorIs(t, err, iceberg.ErrInvalidSchema)
+ assert.ErrorContains(t, err, iceberg.RowIDColumnName)
+ })
+ }
+}
+
+// TestProjectionV3SchemaWithRowIDOnly covers a v3 table whose schema
+// already declares _row_id (reserved field id) but does not declare
_last_updated_sequence_number.
+func TestProjectionV3SchemaWithRowIDOnly(t *testing.T) {
+ schema := iceberg.NewSchema(
+ 1,
+ iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int64, Required: true},
+ iceberg.NestedField{ID: 2, Name: "payload", Type:
iceberg.PrimitiveTypes.String, Required: false},
+ iceberg.RowID(),
+ )
+
+ metadata, err := NewMetadata(
+ schema,
+ iceberg.UnpartitionedSpec,
+ UnsortedSortOrder,
+ "s3://test-bucket/test_table",
+ iceberg.Properties{"format-version": "3"},
+ )
+ require.NoError(t, err)
+ assert.Equal(t, 3, metadata.Version(), "sanity: must be v3")
+
+ scan := &Scan{
+ metadata: metadata,
+ selectedFields: []string{
+ "id", "payload",
+ iceberg.RowIDColumnName,
+ iceberg.LastUpdatedSequenceNumberColumnName,
+ },
+ caseSensitive: true,
+ }
+
+ var proj *iceberg.Schema
+ require.NotPanics(t, func() {
+ var perr error
+ proj, perr = scan.Projection()
+ require.NoError(t, perr)
+ })
+ require.NotNil(t, proj)
+
+ fields := proj.Fields()
+ require.Len(t, fields, 4, "projection must include id, payload,
_row_id, _last_updated_sequence_number")
+
+ fieldByName := make(map[string]iceberg.NestedField, len(fields))
+ idsSeen := make(map[int]string, len(fields))
+ for _, f := range fields {
+ if prev, dup := idsSeen[f.ID]; dup {
+ t.Fatalf("duplicate field id %d: %q and %q", f.ID,
prev, f.Name)
+ }
+ idsSeen[f.ID] = f.Name
+ fieldByName[f.Name] = f
+ }
+
+ idField, ok := fieldByName["id"]
+ require.True(t, ok)
+ assert.Equal(t, 1, idField.ID)
+
+ payloadField, ok := fieldByName["payload"]
+ require.True(t, ok)
+ assert.Equal(t, 2, payloadField.ID)
+
+ rowIDField, ok := fieldByName[iceberg.RowIDColumnName]
+ require.True(t, ok)
+ assert.NotEqual(t, iceberg.RowIDFieldID, rowIDField.ID) // NewMetadata
reorders schema field numbers
Review Comment:
The comment captures the symptom, but the root cause is upstream —
`AssignFreshSchemaIDs` (`schema.go:1366`) renumbers reserved metadata column
IDs along with everything else. Concrete consequence outside this PR's scope:
when a v3 table's `_row_id` ends up at id=3 in the projection,
`arrowAccessor.FieldPartner` (`table/arrow_utils.go:677`) will look up id=3 in
file schemas — Java-written files have `_row_id` at id=`MaxInt32-107`, so we'd
silently read whichever column the Java file has at id=3 instead.
Not asking to fix in this PR — would you mind filing a follow-up issue so
the divergence is traceable? Either special-casing reserved ids in
`AssignFreshSchemaIDs` or having `NewMetadata` reject metadata columns in the
user schema (matching Java) are both reasonable directions to discuss there.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]