danaford opened a new issue, #14697:
URL: https://github.com/apache/iceberg/issues/14697
### Apache Iceberg version
1.8.0
### Query engine
Kafka Connect
### Please describe the bug 🐞
### Problem
When the Kafka Connect sink uses AWS Glue auto-creation and receives
Protobuf schemas with either:
(1) empty messages (e.g. message KeepAlive {}) or
(2) recursive structures (e.g. google.protobuf.Struct)
`SchemaUtils.SchemaGenerator.toIcebergType` tries to convert those schemas
into Iceberg types and either produces an empty struct (which Parquet rejects
with `InvalidSchemaException`) or recurses indefinitely through the nested
schema and blows the stack. This happens during task initialization, before any
records are written.
### Steps to Reproduce
1. Define the Protobuf schemas:
```
// event_payload.proto
syntax = "proto3";
package events;
import "google/protobuf/struct.proto";
message Event {
sfixed64 timestamp_ns = 1;
EventData data = 2;
message EventData {
KeepAlive keep_alive = 1;
ErrorReport error_report = 2;
}
}
message KeepAlive {}
message ErrorReport {
google.protobuf.Struct details = 1;
}
// device_reading.proto
syntax = "proto3";
package readings;
import "event_payload.proto";
import "google/protobuf/timestamp.proto";
message DeviceReading {
string device_id = 1;
google.protobuf.Timestamp observed_at = 2;
events.Event.EventData data = 3;
}
```
2. Register the schemas with Schema Registry.
3. Produce a Kafka message to a topic (e.g., device-readings-topic) with the
DeviceReading schema. The keep_alive field will be an empty message.
4. Configure and run the Iceberg Kafka Connect sink to consume from
device-readings-topic and write to an Iceberg table.
### Observed Behavior
1. `InvalidSchemaException` due to empty struct:
```
org.apache.parquet.schema.InvalidSchemaException: Cannot write a schema with
an empty group: optional group keep_alive = 1 {}
at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:27)
```
2. `StackOverflowError` due to recursive struct:
```
java.lang.StackOverflowError
at org.apache.iceberg.types.TypeUtil.visit(TypeUtil.java:641)
```
### Expected Behavior
The Iceberg Kafka Connect sink should handle these schemas gracefully,
without crashing. A possible approach would be to convert empty or recursive
structs to a StringType in the Iceberg schema.
### Proposed Solution
The issue seems to be in the SchemaUtils$SchemaGenerator.toIcebergType
method. Here are two suggested fixes:
1. Handle empty structs:
In the STRUCT case, after building the list of fields, check if the list is
empty. If it is, return StringType.get() instead of creating an empty
StructType.
```
// Handle empty structs - Parquet cannot write empty groups
if (fields.isEmpty()) {
return StringType.get();
}
```
2. Handle recursive structs:
Use a Map<String, Type> to track visited schemas, using a logical key (e.g.,
schema.type() + ":" + schema.name()). If a cycle is detected, return
StringType.get().
```
// At the beginning of toIcebergType
String schemaKey = getSchemaKey(valueSchema);
Type result = visited.get(schemaKey);
if (result != null) {
// Break the recursion by returning a StringType
return StringType.get();
}
```
This would prevent both the InvalidSchemaException and the
StackOverflowError.
### Willingness to contribute
- [x] I can contribute a fix for this bug independently
- [x] I would be willing to contribute a fix for this bug with guidance from
the Iceberg community
- [ ] I cannot contribute a fix for this bug at this time
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]