annurahar opened a new issue, #15395: URL: https://github.com/apache/iceberg/issues/15395
### Apache Iceberg version 1.10.1 (latest release) ### Query engine None ### Please describe the bug 🐞 When schema evolution is enabled `iceberg.tables.evolve-schema-enabled=true` in the Iceberg Kafka Connect sink, nested fields inside `struct<struct<...>>`, `list<struct<...>>`, or `map<string, struct<...>>` are not traversed for schema updates. The connector continues without error, but new or updated nested columns are silently ignored—they are not added/updated in the Iceberg table schema. --- ## Steps to Reproduce ### 1. Connector Configuration ```properties name=iceberg-sink connector.class=org.apache.iceberg.connect.IcebergSinkConnector tasks.max=1 topics=orders-topic iceberg.tables=default.orders iceberg.catalog.type=rest iceberg.catalog.uri=http://localhost:8181 iceberg.tables.evolve-schema-enabled=true ``` ### 2. Initial Iceberg Table Schema ``` orders ( order_id : int (required) customer : struct (required) - customer_id : int (required) items : list<struct> (required) - product_id : int (required) metadata : map<string, struct> (optional) - key : string (required) ) ``` ### 3. Kafka Topic Schema (New schema registered in Schema Registry) ```java // Customer struct with additional field "customer_details" Schema CUSTOMER_DETAILS_SCHEMA = SchemaBuilder.struct() .optional() .field("name", Schema.OPTIONAL_STRING_SCHEMA) .field("email", Schema.OPTIONAL_STRING_SCHEMA) .build(); Schema CUSTOMER_SCHEMA = SchemaBuilder.struct() .field("customer_id", Schema.INT32_SCHEMA) .field("customer_details", CUSTOMER_DETAILS_SCHEMA) // <-- NEW optional nested field in struct<struct> .build(); // Order item struct with additional field "product_details" Schema PRODUCT_DETAILS_SCHEMA = SchemaBuilder.struct() .optional() .field("name", Schema.OPTIONAL_STRING_SCHEMA) .field("price", Schema.OPTIONAL_FLOAT64_SCHEMA) .field("category", Schema.OPTIONAL_STRING_SCHEMA) .build(); Schema ORDER_ITEM_SCHEMA = SchemaBuilder.struct() .field("product_id", Schema.INT32_SCHEMA) .field("product_details", PRODUCT_DETAILS_SCHEMA) // <-- NEW optional nested field in list<struct> .build(); // Metadata value struct with additional field "value" Schema METADATA_VALUE_SCHEMA = SchemaBuilder.struct() .field("key", Schema.STRING_SCHEMA) .field("value", Schema.OPTIONAL_STRING_SCHEMA) // <-- NEW optional nested field in map<string, struct> .build(); Schema ORDER_SCHEMA = SchemaBuilder.struct() .field("order_id", Schema.INT32_SCHEMA) .field("customer", CUSTOMER_SCHEMA) .field("items", SchemaBuilder.array(ORDER_ITEM_SCHEMA)) .field("metadata", SchemaBuilder.map(Schema.STRING_SCHEMA, METADATA_VALUE_SCHEMA)) .build(); ``` ### 4. Send Record Send a record to `orders-topic` without the new nested fields. This simulates a producer using an older schema version that doesn't include the new nested fields (`customer_details`, `product_details`, `value`). ```json { "order_id": 1001, "customer": { "customer_id": 501 }, "items": [ { "product_id": 101 } ], "metadata": { "source": { "key": "source_system" } } } ``` ### 5. Observe - Connector runs without exception. - Iceberg table schema remains **unchanged**: ``` orders ( order_id : int (required) customer : struct (required) - customer_id : int (required) items : list<struct> (required) - product_id : int (required) metadata : map<string, struct> (optional) - key : string (required) ) ``` - The new nested fields (`customer_details`, `product_details`, `value`) defined in the record schema are not added to the Iceberg table, even though the record schema contains these fields. - Since the new nested fields are optional and the record data doesn't include values for them, the schema evolution should still detect these fields from the record schema and add them to the Iceberg table schema. --- ## When Does This Issue Occur? This issue manifests when: 1. **A Kafka Connect schema (record schema) includes new optional nested fields** (e.g., `customer_details`, `product_details`, `value`) 2. **A producer sends data without populating these optional nested fields** (since they are optional, the data is valid) 3. **The Kafka Connect sink receives this record** and attempts schema evolution 4. **Schema evolution fails silently** for the nested fields because `RecordConverter` only checks top-level fields, not nested fields inside `struct<struct>`, `list<struct>`, or `map<string, struct>` As a result, the Iceberg table schema is never updated to include the new nested fields, even though they exist in the record schema. --- ## Expected Behavior - Nested struct fields inside `struct<struct<...>>`, `list<struct<...>>`, and `map<string, struct<...>>` should be traversed recursively. - Schema evolution should add/update nested columns. - Optionality changes for nested fields should be respected. - **Expected table schema after evolution:** ``` orders ( order_id : int (required) customer : struct (required) - customer_id : int (required) - customer_details : struct (optional) - name : string (optional) - email : string (optional) items : list<struct> (required) - product_id : int (required) - product_details : struct (optional) - name : string (optional) - price : double (optional) - category : string (optional) metadata : map<string, struct> (optional) - key : string (required) - value : string (optional) ) ``` ## Actual Behavior - Connector runs without exception. - Schema evolution does **not** update nested fields inside `struct<struct<...>>`, `list<struct<...>>`, or `map<string, struct<...>>`. - Nested columns remain missing from the Iceberg schema; nested values are silently dropped. --- ## Root Cause Analysis In `RecordConverter.convertToStruct(Struct, StructType, int, SchemaUpdate.Consumer)`, the method iterates over `struct.schema().fields()` (the top-level struct fields) but does not recursively traverse nested fields inside `struct<struct<...>>`, `list<struct<...>>`, or `map<string, struct<...>>` types. When schema evolution is enabled, only top-level fields are considered for `addColumn`, `updateType`, or `makeOptional` operations. ### Willingness to contribute - [x] I can contribute a fix for this bug independently - [ ] I would be willing to contribute a fix for this bug with guidance from the Iceberg community - [ ] I cannot contribute a fix for this bug at this time -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
