[ 
https://issues.apache.org/jira/browse/FLINK-39053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-39053:
-----------------------------------
    Labels: pull-request-available  (was: )

> AvroTypeException when writing Flink VARCHAR to Avro ENUM in Kafka 
> avro-confluent format
> ----------------------------------------------------------------------------------------
>
>                 Key: FLINK-39053
>                 URL: https://issues.apache.org/jira/browse/FLINK-39053
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka, Formats (JSON, Avro, Parquet, ORC, 
> SequenceFile)
>    Affects Versions: 2.0.0, 1.20.0, 1.20.1, 1.20.2, 2.1.0, 2.0.1, 1.20.3, 
> 2.1.1
>         Environment: - Flink Version: 1.20.3
> - Kafka Connector Version: 3.4.0-1.20
> - Format: avro-confluent
>            Reporter: Dominik Dębowczyk
>            Priority: Major
>              Labels: pull-request-available
>
> When writing to Kafka using the avro-confluent format with a schema that 
> contains ENUM types, Flink fails with an AvroTypeException when the source 
> table uses VARCHAR for fields that map to Avro ENUMs.
> h3. Error
> {code:java}
> Caused by: org.apache.avro.AvroTypeException: value RESTAURANT (a 
> org.apache.avro.util.Utf8) is not a BusinessType at PartnerCore.businessType
> {code}
> h3. Reproduction scenario:
>  # Read Avro data from Kafka containing ENUM fields (e.g. BusinessType enum)
> Flink automatically casts these ENUMs to VARCHAR (since Flink SQL doesn't 
> support ENUM)
>  # Process the data in Flink (VARCHAR operations work fine)
>  # Write back to Kafka using avro-confluent format with the original schema 
> containing ENUM
>  # Conversion fails with {{{}AvroTypeException{}}}
> h3. Root cause
> In {{{}RowDataToAvroConverter{}}}, the VARCHAR case creates a converter that 
> always returns {{{}Utf8{}}}:
> {code:java}
> case VARCHAR:
>     converter = new RowDataToAvroConverter() {
>         @Override
>         public Object convert(Schema schema, Object object) {
>             return new Utf8(object.toString());
>         }
>     };
>     break; {code}
> Since Flink SQL doesn't have native ENUM support and automatically represents 
> ENUMs as VARCHAR, there's no way for users to write data with ENUM fields in 
> their Avro schemas. The converter doesn't check if the target Avro schema 
> expects an ENUM type.
> h3. Proposed Solution
> Modify the VARCHAR converter to check if the target schema is an ENUM and 
> create a {{{}GenericData.EnumSymbol{}}} instead:
>  
> {code:java}
> case VARCHAR:
>     converter =
>             new RowDataToAvroConverter() {
>                 private static final long serialVersionUID = 1L;
>                 @Override
>                 public Object convert(Schema schema, Object object) {
>                     if (schema.getType() == Schema.Type.ENUM) {
>                         return new GenericData.EnumSymbol(schema, 
> object.toString());
>                     }
>                     return new Utf8(object.toString());
>                 }
>             };
>     break; {code}
> This allows VARCHAR values to be properly converted to ENUMs when the target 
> Avro schema requires it, enabling round-trip processing of Avro data with 
> ENUM fields through Flink.
> h3. Scope
> This fix addresses the case when:
>  * Using `avro-confluent` format
>  * Schema is provided via `avro-confluent.schema` configuration or schema 
> registry
>  * Source field is VARCHAR (which may represent an ENUM from upstream Avro 
> data)
>  * Target Avro field is ENUM
> {*}Note{*}: This does NOT cover the case where the schema is auto-derived 
> from the Flink table DDL (which wouldn't include ENUM types anyway since 
> Flink SQL doesn't support them).
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to