annurahar opened a new issue, #15395:
URL: https://github.com/apache/iceberg/issues/15395

   ### Apache Iceberg version
   
   1.10.1 (latest release)
   
   ### Query engine
   
   None
   
   ### Please describe the bug 🐞
   
   When schema evolution is enabled `iceberg.tables.evolve-schema-enabled=true` 
in the Iceberg Kafka Connect sink, nested fields inside `struct<struct<...>>`, 
`list<struct<...>>`, or `map<string, struct<...>>` are not traversed for schema 
updates. The connector continues without error, but new or updated nested 
columns are silently ignored—they are not added/updated in the Iceberg table 
schema.
   
   ---
   
   ## Steps to Reproduce
   
   ### 1. Connector Configuration
   ```properties
   name=iceberg-sink
   connector.class=org.apache.iceberg.connect.IcebergSinkConnector
   tasks.max=1
   topics=orders-topic
   iceberg.tables=default.orders
   iceberg.catalog.type=rest
   iceberg.catalog.uri=http://localhost:8181
   iceberg.tables.evolve-schema-enabled=true
   ```
   
   ### 2. Initial Iceberg Table Schema
   ```
   orders (
     order_id       : int (required)
     customer       : struct (required)
                        - customer_id : int (required)
     items          : list<struct> (required)
                        - product_id : int (required)
     metadata       : map<string, struct> (optional)
                        - key : string (required)
   )
   ```
   
   ### 3. Kafka Topic Schema (New schema registered in Schema Registry)
   ```java
   // Customer struct with additional field "customer_details"
   Schema CUSTOMER_DETAILS_SCHEMA = SchemaBuilder.struct()
       .optional()
       .field("name", Schema.OPTIONAL_STRING_SCHEMA)
       .field("email", Schema.OPTIONAL_STRING_SCHEMA)
       .build();
   
   Schema CUSTOMER_SCHEMA = SchemaBuilder.struct()
       .field("customer_id", Schema.INT32_SCHEMA)
       .field("customer_details", CUSTOMER_DETAILS_SCHEMA)  // <-- NEW optional 
nested field in struct<struct>
       .build();
   
   // Order item struct with additional field "product_details"
   Schema PRODUCT_DETAILS_SCHEMA = SchemaBuilder.struct()
       .optional()
       .field("name", Schema.OPTIONAL_STRING_SCHEMA)
       .field("price", Schema.OPTIONAL_FLOAT64_SCHEMA)
       .field("category", Schema.OPTIONAL_STRING_SCHEMA)
       .build();
   
   Schema ORDER_ITEM_SCHEMA = SchemaBuilder.struct()
       .field("product_id", Schema.INT32_SCHEMA)
       .field("product_details", PRODUCT_DETAILS_SCHEMA)  // <-- NEW optional 
nested field in list<struct>
       .build();
   
   // Metadata value struct with additional field "value"
   Schema METADATA_VALUE_SCHEMA = SchemaBuilder.struct()
       .field("key", Schema.STRING_SCHEMA)
       .field("value", Schema.OPTIONAL_STRING_SCHEMA)  // <-- NEW optional 
nested field in map<string, struct>
       .build();
   
   Schema ORDER_SCHEMA = SchemaBuilder.struct()
       .field("order_id", Schema.INT32_SCHEMA)
       .field("customer", CUSTOMER_SCHEMA)
       .field("items", SchemaBuilder.array(ORDER_ITEM_SCHEMA))
       .field("metadata", SchemaBuilder.map(Schema.STRING_SCHEMA, 
METADATA_VALUE_SCHEMA))
       .build();
   ```
   
   ### 4. Send Record
   Send a record to `orders-topic` without the new nested fields. This 
simulates a producer using an older schema version that doesn't include the new 
nested fields (`customer_details`, `product_details`, `value`).
   
   ```json
   {
     "order_id": 1001,
     "customer": {
       "customer_id": 501
     },
     "items": [
       {
         "product_id": 101
       }
     ],
     "metadata": {
       "source": {
         "key": "source_system"
       }
     }
   }
   ```
   
   ### 5. Observe
   - Connector runs without exception.
   - Iceberg table schema remains **unchanged**:
   ```
   orders (
     order_id       : int (required)
     customer       : struct (required)
                        - customer_id : int (required)
     items          : list<struct> (required)
                        - product_id : int (required)
     metadata       : map<string, struct> (optional)
                        - key : string (required)
   )
   ```
   - The new nested fields (`customer_details`, `product_details`, `value`) 
defined in the record schema are not added to the Iceberg table, even though 
the record schema contains these fields.
   - Since the new nested fields are optional and the record data doesn't 
include values for them, the schema evolution should still detect these fields 
from the record schema and add them to the Iceberg table schema.
   
   ---
   
   ## When Does This Issue Occur?
   
   This issue manifests when:
   
   1. **A Kafka Connect schema (record schema) includes new optional nested 
fields** (e.g., `customer_details`, `product_details`, `value`)
   2. **A producer sends data without populating these optional nested fields** 
(since they are optional, the data is valid)
   3. **The Kafka Connect sink receives this record** and attempts schema 
evolution
   4. **Schema evolution fails silently** for the nested fields because 
`RecordConverter` only checks top-level fields, not nested fields inside 
`struct<struct>`, `list<struct>`, or `map<string, struct>`
   
   As a result, the Iceberg table schema is never updated to include the new 
nested fields, even though they exist in the record schema.
   
   ---
   
   ## Expected Behavior
   - Nested struct fields inside `struct<struct<...>>`, `list<struct<...>>`, 
and `map<string, struct<...>>` should be traversed recursively.
   - Schema evolution should add/update nested columns.
   - Optionality changes for nested fields should be respected.
   - **Expected table schema after evolution:**
   ```
   orders (
     order_id       : int (required)
     customer       : struct (required)
                        - customer_id       : int (required)
                        - customer_details  : struct (optional)
                            - name  : string (optional)
                            - email : string (optional)
     items          : list<struct> (required)
                        - product_id       : int (required)
                        - product_details  : struct (optional)
                            - name     : string (optional)
                            - price    : double (optional)
                            - category : string (optional)
     metadata       : map<string, struct> (optional)
                        - key   : string (required)
                        - value : string (optional)
   )
   ```
   
   ## Actual Behavior
   - Connector runs without exception.
   - Schema evolution does **not** update nested fields inside 
`struct<struct<...>>`, `list<struct<...>>`, or `map<string, struct<...>>`.
   - Nested columns remain missing from the Iceberg schema; nested values are 
silently dropped.
   
   ---
   
   ## Root Cause Analysis
   In `RecordConverter.convertToStruct(Struct, StructType, int, 
SchemaUpdate.Consumer)`, the method iterates over `struct.schema().fields()` 
(the top-level struct fields) but does not recursively traverse nested fields 
inside `struct<struct<...>>`, `list<struct<...>>`, or `map<string, 
struct<...>>` types. When schema evolution is enabled, only top-level fields 
are considered for `addColumn`, `updateType`, or `makeOptional` operations.
   
   
   ### Willingness to contribute
   
   - [x] I can contribute a fix for this bug independently
   - [ ] I would be willing to contribute a fix for this bug with guidance from 
the Iceberg community
   - [ ] I cannot contribute a fix for this bug at this time


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to