kzajaczkowski commented on PR #15396:
URL: https://github.com/apache/iceberg/pull/15396#issuecomment-4379277258
@kumarpritam863 the argument that existing code already traverses nested
structs only holds when **the parent value is non-null at every level**. The
current `convertStructValue` does:
```java
for (Field recordField : recordSchema.fields()) {
Object value = struct.get(recordField);
if (value == null) {
continue; // null parent → no recursion → no addColumn
}
// schema-update path...
}
```
When a parent struct value is null in the current record but has new
sub-fields in the *schema*, those sub-fields are never detected, even though
the typed schema is sitting right there on `record.valueSchema()`.
A practical case where this matters is a Protobuf `oneof` whose arms started
as zero-field marker messages and were later evolved to carry data. Generic
example:
**V1**:
```protobuf
message OrderLifecycleEvent {
string order_id = 1;
oneof event {
OrderPlacedEvent placed = 2;
OrderShippedEvent shipped = 3;
OrderCancelledEvent cancelled = 4;
}
}
message OrderPlacedEvent {}
message OrderShippedEvent {}
message OrderCancelledEvent {}
```
The Iceberg sink with `auto-create-enabled: true` creates the table with
`event.placed: struct<>`, `event.shipped: struct<>`, `event.cancelled:
struct<>`.
**V2** adds a Timestamp to each arm (BACKWARD-compatible per Schema
Registry):
```protobuf
message OrderPlacedEvent { google.protobuf.Timestamp placed_at = 1; }
message OrderShippedEvent { google.protobuf.Timestamp shipped_at = 1; }
message OrderCancelledEvent { google.protobuf.Timestamp cancelled_at = 1; }
```
Confluent's ProtobufConverter wraps the oneof in a synthetic outer struct
where exactly one arm is non-null per record and the other two are null. With
value-driven recursion on a V2 record that sets only `shipped`:
- `shipped` → non-null → recurse → `addColumn(shipped_at)` fires ✓
- `placed` → null → `continue` → no `addColumn(placed_at)` ✗
- `cancelled` → null → `continue` → no `addColumn(cancelled_at)` ✗
Only one arm gets evolved per record. The other two remain `struct<>` in the
table — possibly forever, if a producer emits a homogeneous stream that never
sets a particular arm.
This PR's schema-driven `evolveSchema` (iterating
`recordField.schema().fields()` regardless of value) is exactly the right
design for this case — it discovers all three arms' new fields from a single
record. The value-driven defense above doesn't cover null sibling fields.
One thing worth considering for the empty-struct variant: even with
detection working correctly, the call order in `IcebergWriter.convertToRow`
(`flush()` before `applySchemaUpdates`) still tries to close the existing
writer that was created with the V1 empty-struct table schema.
`BaseTaskWriter.complete()` → `ParquetWriter.close()` →
`ensureWriterInitialized()` → `new ParquetFileWriter(parquetSchema)` →
`TypeUtil.checkValidWriteSchema()` rejects empty groups, so the inline flush
throws `InvalidSchemaException` before `applySchemaUpdates` runs. The new
try/catch around that block would swallow the exception silently and
`initNewWriter()` would recreate against the same un-evolved schema, looping.
Might be worth either skipping empty-struct columns at auto-create (in
`SchemaUtils.toIcebergType` STRUCT case) or applying schema updates before
flush in the empty-existing-struct case. Happy to file or contribute either as
a follow-up.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]