kzajaczkowski opened a new issue, #16212:
URL: https://github.com/apache/iceberg/issues/16212

   ## Summary
   
   `iceberg.tables.evolve-schema-enabled: true` cannot rescue an **empty-struct 
→ non-empty-struct** migration that arises from Protobuf `oneof` arms that were 
initially zero-field marker messages. When a new schema version adds fields to 
those arms, the Parquet writer opens against the stale table schema (which 
still contains `struct<>` columns) and immediately throws 
`InvalidSchemaException`. The task is killed; data is lost.
   
   This is distinct from 
[#15395](https://github.com/apache/iceberg/issues/15395), which describes a 
**silent drop** (no evolution, no crash) for nested structs that were never 
empty. The failure here is louder — the task transitions to `FAILED` at the 
very first record of the new schema version — and the root cause is different.
   
   ---
   
   ## Version
   
   - `iceberg-kafka-connect` 1.9.2 (also reproduced on the 1.9.2 connector 
against `iceberg-core` 1.9.2, `parquet-column` 1.15.2)
   - Confluent `cp-kafka-connect-base` 7.6.6 (Apache Kafka Connect 3.7.x)
   - Schema Registry: Confluent 7.6.6
   - Catalog: Iceberg REST (Nessie 0.103.3 + MinIO)
   
   ---
   
   ## Proto Shape (Minimum Reproduction)
   
   **V1** — zero-field marker arms:
   
   ```protobuf
   syntax = "proto3";
   package example.v1;
   import "google/protobuf/timestamp.proto";
   
   message CardPINEvent {
     string card_id = 1;
     google.protobuf.Timestamp event_timestamp = 2;
     oneof event {
       CardPINBlockedEvent   blocked   = 3;
       CardPINUnblockedEvent unblocked = 4;
       CardPINChangedEvent   changed   = 5;
     }
   }
   message CardPINBlockedEvent   {}
   message CardPINUnblockedEvent {}
   message CardPINChangedEvent   {}
   ```
   
   **V2** — Timestamp field added to each arm (BACKWARD-compatible per SR):
   
   ```protobuf
   message CardPINBlockedEvent   { google.protobuf.Timestamp blocked_timestamp  
 = 1; }
   message CardPINUnblockedEvent { google.protobuf.Timestamp 
unblocked_timestamp = 1; }
   message CardPINChangedEvent   { google.protobuf.Timestamp changed_timestamp  
 = 1; }
   ```
   
   ---
   
   ## Connector Config
   
   ```json
   {
     "connector.class": "org.apache.iceberg.connect.IcebergSinkConnector",
     "tasks.max": "1",
     "iceberg.tables.auto-create-enabled": "true",
     "iceberg.tables.evolve-schema-enabled": "true",
     "iceberg.control.commit.interval-ms": "5000",
     "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
     "value.converter.schema.registry.url": "http://schema-registry:8081";
   }
   ```
   
   ---
   
   ## Reproduction Steps
   
   1. Register V1 schema; produce **3** V1 records (deliberately below the 
flush threshold of ~40). The sink auto-creates the Iceberg table:
      ```
      event_0       struct<
        blocked:    struct<>   ← empty arm
        unblocked:  struct<>   ← empty arm
        changed:    struct<>   ← empty arm
      >
      ```
      The coordinator emits `committed to 0 table(s)` — table exists with 
empty-arm schema but no Parquet data is written yet (silent-drop state).
   
   2. Produce **50** V2 records (total = 53, crosses the flush threshold). Each 
record sets only the `unblocked` arm:
      ```json
      
{"cardId":"se-1","eventTimestamp":"2026-04-24T11:00:00Z","unblocked":{"unblockedTimestamp":"2026-04-24T11:00:00Z"},"reason":"PIN
 unblocked"}
      ```
   
   3. Within seconds the task transitions to `FAILED`.
   
   ---
   
   ## Observed Behaviour
   
   Task FAILED at offset 3 (the very first V2 record):
   
   ```
   Caused by: org.apache.kafka.connect.errors.DataException:
     An error occurred converting record, topic: example.card_pin_event.v1,
     partition: 0, offset: 3
   Caused by: org.apache.parquet.schema.InvalidSchemaException:
     Cannot write a schema with an empty group: optional group blocked = 2 {
   }
     at 
org.apache.parquet.schema.TypeUtil.checkValidWriteSchema(TypeUtil.java:23)
     at 
org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:458)
     at 
org.apache.iceberg.parquet.ParquetWriter.ensureWriterInitialized(ParquetWriter.java:114)
     at 
org.apache.iceberg.parquet.ParquetWriter.flushRowGroup(ParquetWriter.java:214)
     at org.apache.iceberg.parquet.ParquetWriter.close(ParquetWriter.java:258)
     at 
org.apache.iceberg.io.PartitionedFanoutWriter.close(PartitionedFanoutWriter.java:70)
     at org.apache.iceberg.io.BaseTaskWriter.complete(BaseTaskWriter.java:100)
     at 
org.apache.iceberg.connect.data.IcebergWriter.flush(IcebergWriter.java:103)
     at 
org.apache.iceberg.connect.data.IcebergWriter.complete(IcebergWriter.java:118)
     at 
org.apache.iceberg.connect.data.SinkWriter.lambda$completeWrite$0(SinkWriter.java:57)
   ```
   
   No schema-evolution log line (`Schema for table … updated`) appears before 
the failure. The Parquet writer is opened against the stale V1 table schema 
**before** any evolution is attempted.
   
   ---
   
   ## Expected Behaviour
   
   `evolve-schema-enabled: true` should promote all empty-arm structs to their 
new non-empty shape before the Parquet writer is initialised, so the write 
succeeds without manual intervention.
   
   ---
   
   ## Root Cause Analysis
   
   The bug is in `RecordConverter.convertStructValue()`. When the sink 
processes a `SinkRecord`, it calls the schema-update consumer (`addColumn`) 
only for struct fields whose value is **non-null in the current record**:
   
   ```java
   // RecordConverter.convertStructValue() — simplified
   for (Field recordField : recordSchema.fields()) {
       Object value = struct.get(recordField);
       if (value == null) {
           continue;   // ← null field: no addColumn callback fired
       }
       // schema-update path runs only here ...
       schemaUpdateConsumer.addColumn(path, name, icebergType);
   }
   ```
   
   In a Protobuf `oneof`, the Confluent ProtobufConverter wraps all arms in a 
synthetic outer struct (`event_0: struct<blocked, unblocked, changed>`). The 
active arm contains a value; all other arms are `null`. Therefore:
   
   - Only the **set arm** (`unblocked`) has its sub-fields discovered via 
`addColumn`.
   - `blocked` and `changed` are null → no callback → they remain `struct<>` in 
the table schema.
   - When the writer reinitialises, it reads the partially-evolved table. 
Parquet's `TypeUtil.checkValidWriteSchema` rejects the first `struct<>` it 
encounters — in this run, `optional group blocked = 2 {}`.
   
   The Connect schema attached to the `SinkRecord` (`record.valueSchema()`) 
**does** contain the full V2 type for every arm, including the arms that are 
null in this record. The fix should use that typed schema information to evolve 
the column even when the value is null.
   
   ---
   
   ## Proposed Fix
   
   In `RecordConverter.convertStructValue()`, in the `else` branch where `value 
== null`, check whether the existing table column for that field is a `struct` 
that is currently empty. If it is, look up the expected Iceberg type from the 
Connect schema (via `SchemaUtils.toIcebergType(recordField.schema(), config)`) 
and fire `addColumn` for each sub-field:
   
   ```java
   } else {
       // value is null — but if the existing column is an empty struct and
       // the Connect schema has sub-fields, evolve the column now
       // so the Parquet writer never sees a zero-field group.
       if (existingColumn != null
               && existingColumn.type().isStructType()
               && existingColumn.type().asStructType().fields().isEmpty()
               && recordField.schema().type() == Schema.Type.STRUCT) {
           Type newType = SchemaUtils.toIcebergType(recordField.schema(), 
config);
           schemaUpdateConsumer.addColumn(path, name, newType);
       }
   }
   ```
   
   This approach requires no value — only the schema — and is therefore safe 
for any null field, not just oneof arms.
   
   ---
   
   ## Relationship to #15395
   
   [#15395](https://github.com/apache/iceberg/issues/15395) ("Does not evolve 
schema for nested fields inside struct/list/map of structs") reports that 
evolution silently skips sub-fields of a **non-empty** existing struct when new 
sub-fields are added. That is a silent-drop; the task stays `RUNNING`.
   
   This issue is a separate, more severe case: the existing column is a 
**zero-field struct**, and the failure mode is a task `FAILED` at the first 
write attempt. Both issues share the same null-skipping code path but trigger 
different downstream failures.
   
   ---
   
   ## Workaround
   
   `evolve-schema-enabled` alone cannot recover from this state. Operators must 
either:
   
   - Drop and recreate the Iceberg table (losing any metadata snapshot 
history), or
   - Manually add the missing sub-fields via `ALTER TABLE … ADD COLUMN` before 
restarting the connector.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to