Dominik Dębowczyk created FLINK-39053:
-----------------------------------------
Summary: AvroTypeException when writing Flink VARCHAR to Avro ENUM
in Kafka avro-confluent format
Key: FLINK-39053
URL: https://issues.apache.org/jira/browse/FLINK-39053
Project: Flink
Issue Type: Bug
Components: Connectors / Kafka, Formats (JSON, Avro, Parquet, ORC,
SequenceFile)
Affects Versions: 2.1.1, 1.20.3, 2.0.1, 2.1.0, 1.20.2, 1.20.1, 1.20.0, 2.0.0
Environment: - Flink Version: 1.20.3
- Kafka Connector Version: 3.4.0-1.20
- Format: avro-confluent
Reporter: Dominik Dębowczyk
When writing to Kafka using the avro-confluent format with a schema that
contains ENUM types, Flink fails with an AvroTypeException when the source
table uses VARCHAR for fields that map to Avro ENUMs.
h3. Error
{code:java}
Caused by: org.apache.avro.AvroTypeException: value RESTAURANT (a
org.apache.avro.util.Utf8) is not a BusinessType at PartnerCore.businessType
{code}
h3. Reproduction scenario:
# Read Avro data from Kafka containing ENUM fields (e.g. BusinessType enum)
Flink automatically casts these ENUMs to VARCHAR (since Flink SQL doesn't
support ENUM)
# Process the data in Flink (VARCHAR operations work fine)
# Write back to Kafka using avro-confluent format with the original schema
containing ENUM
# Conversion fails with {{{}AvroTypeException{}}}
h3. Root cause
In {{{}RowDataToAvroConverter{}}}, the VARCHAR case creates a converter that
always returns {{{}Utf8{}}}:
{code:java}
case VARCHAR:
converter = new RowDataToAvroConverter() {
@Override
public Object convert(Schema schema, Object object) {
return new Utf8(object.toString());
}
};
break; {code}
Since Flink SQL doesn't have native ENUM support and automatically represents
ENUMs as VARCHAR, there's no way for users to write data with ENUM fields in
their Avro schemas. The converter doesn't check if the target Avro schema
expects an ENUM type.
h3. Proposed Solution
Modify the VARCHAR converter to check if the target schema is an ENUM and
create a {{{}GenericData.EnumSymbol{}}} instead:
{code:java}
case VARCHAR:
converter =
new RowDataToAvroConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Schema schema, Object object) {
if (schema.getType() == Schema.Type.ENUM) {
return new GenericData.EnumSymbol(schema,
object.toString());
}
return new Utf8(object.toString());
}
};
break; {code}
This allows VARCHAR values to be properly converted to ENUMs when the target
Avro schema requires it, enabling round-trip processing of Avro data with ENUM
fields through Flink.
h3. Scope
This fix addresses the case when:
* Using `avro-confluent` format
* Schema is provided via `avro-confluent.schema` configuration or schema
registry
* Source field is VARCHAR (which may represent an ENUM from upstream Avro data)
* Target Avro field is ENUM
{*}Note{*}: This does NOT cover the case where the schema is auto-derived from
the Flink table DDL (which wouldn't include ENUM types anyway since Flink SQL
doesn't support them).
--
This message was sent by Atlassian Jira
(v8.20.10#820010)