Dominik Dębowczyk created FLINK-39053:
-----------------------------------------

             Summary: AvroTypeException when writing Flink VARCHAR to Avro ENUM 
in Kafka avro-confluent format
                 Key: FLINK-39053
                 URL: https://issues.apache.org/jira/browse/FLINK-39053
             Project: Flink
          Issue Type: Bug
          Components: Connectors / Kafka, Formats (JSON, Avro, Parquet, ORC, 
SequenceFile)
    Affects Versions: 2.1.1, 1.20.3, 2.0.1, 2.1.0, 1.20.2, 1.20.1, 1.20.0, 2.0.0
         Environment: - Flink Version: 1.20.3
- Kafka Connector Version: 3.4.0-1.20
- Format: avro-confluent
            Reporter: Dominik Dębowczyk


When writing to Kafka using the avro-confluent format with a schema that 
contains ENUM types, Flink fails with an AvroTypeException when the source 
table uses VARCHAR for fields that map to Avro ENUMs.
h3. Error
{code:java}
Caused by: org.apache.avro.AvroTypeException: value RESTAURANT (a 
org.apache.avro.util.Utf8) is not a BusinessType at PartnerCore.businessType
{code}
h3. Reproduction scenario:
 # Read Avro data from Kafka containing ENUM fields (e.g. BusinessType enum)
Flink automatically casts these ENUMs to VARCHAR (since Flink SQL doesn't 
support ENUM)
 # Process the data in Flink (VARCHAR operations work fine)
 # Write back to Kafka using avro-confluent format with the original schema 
containing ENUM
 # Conversion fails with {{{}AvroTypeException{}}}

h3. Root cause

In {{{}RowDataToAvroConverter{}}}, the VARCHAR case creates a converter that 
always returns {{{}Utf8{}}}:
{code:java}
case VARCHAR:
    converter = new RowDataToAvroConverter() {
        @Override
        public Object convert(Schema schema, Object object) {
            return new Utf8(object.toString());
        }
    };
    break; {code}
Since Flink SQL doesn't have native ENUM support and automatically represents 
ENUMs as VARCHAR, there's no way for users to write data with ENUM fields in 
their Avro schemas. The converter doesn't check if the target Avro schema 
expects an ENUM type.
h3. Proposed Solution

Modify the VARCHAR converter to check if the target schema is an ENUM and 
create a {{{}GenericData.EnumSymbol{}}} instead:

 
{code:java}
case VARCHAR:
    converter =
            new RowDataToAvroConverter() {
                private static final long serialVersionUID = 1L;

                @Override
                public Object convert(Schema schema, Object object) {
                    if (schema.getType() == Schema.Type.ENUM) {
                        return new GenericData.EnumSymbol(schema, 
object.toString());
                    }
                    return new Utf8(object.toString());
                }
            };
    break; {code}
This allows VARCHAR values to be properly converted to ENUMs when the target 
Avro schema requires it, enabling round-trip processing of Avro data with ENUM 
fields through Flink.
h3. Scope

This fix addresses the case when:
 * Using `avro-confluent` format
 * Schema is provided via `avro-confluent.schema` configuration or schema 
registry
 * Source field is VARCHAR (which may represent an ENUM from upstream Avro data)
 * Target Avro field is ENUM

{*}Note{*}: This does NOT cover the case where the schema is auto-derived from 
the Flink table DDL (which wouldn't include ENUM types anyway since Flink SQL 
doesn't support them).

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to