[
https://issues.apache.org/jira/browse/FLINK-39053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-39053:
-----------------------------------
Labels: pull-request-available (was: )
> AvroTypeException when writing Flink VARCHAR to Avro ENUM in Kafka
> avro-confluent format
> ----------------------------------------------------------------------------------------
>
> Key: FLINK-39053
> URL: https://issues.apache.org/jira/browse/FLINK-39053
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka, Formats (JSON, Avro, Parquet, ORC,
> SequenceFile)
> Affects Versions: 2.0.0, 1.20.0, 1.20.1, 1.20.2, 2.1.0, 2.0.1, 1.20.3,
> 2.1.1
> Environment: - Flink Version: 1.20.3
> - Kafka Connector Version: 3.4.0-1.20
> - Format: avro-confluent
> Reporter: Dominik Dębowczyk
> Priority: Major
> Labels: pull-request-available
>
> When writing to Kafka using the avro-confluent format with a schema that
> contains ENUM types, Flink fails with an AvroTypeException when the source
> table uses VARCHAR for fields that map to Avro ENUMs.
> h3. Error
> {code:java}
> Caused by: org.apache.avro.AvroTypeException: value RESTAURANT (a
> org.apache.avro.util.Utf8) is not a BusinessType at PartnerCore.businessType
> {code}
> h3. Reproduction scenario:
> # Read Avro data from Kafka containing ENUM fields (e.g. BusinessType enum)
> Flink automatically casts these ENUMs to VARCHAR (since Flink SQL doesn't
> support ENUM)
> # Process the data in Flink (VARCHAR operations work fine)
> # Write back to Kafka using avro-confluent format with the original schema
> containing ENUM
> # Conversion fails with {{{}AvroTypeException{}}}
> h3. Root cause
> In {{{}RowDataToAvroConverter{}}}, the VARCHAR case creates a converter that
> always returns {{{}Utf8{}}}:
> {code:java}
> case VARCHAR:
> converter = new RowDataToAvroConverter() {
> @Override
> public Object convert(Schema schema, Object object) {
> return new Utf8(object.toString());
> }
> };
> break; {code}
> Since Flink SQL doesn't have native ENUM support and automatically represents
> ENUMs as VARCHAR, there's no way for users to write data with ENUM fields in
> their Avro schemas. The converter doesn't check if the target Avro schema
> expects an ENUM type.
> h3. Proposed Solution
> Modify the VARCHAR converter to check if the target schema is an ENUM and
> create a {{{}GenericData.EnumSymbol{}}} instead:
>
> {code:java}
> case VARCHAR:
> converter =
> new RowDataToAvroConverter() {
> private static final long serialVersionUID = 1L;
> @Override
> public Object convert(Schema schema, Object object) {
> if (schema.getType() == Schema.Type.ENUM) {
> return new GenericData.EnumSymbol(schema,
> object.toString());
> }
> return new Utf8(object.toString());
> }
> };
> break; {code}
> This allows VARCHAR values to be properly converted to ENUMs when the target
> Avro schema requires it, enabling round-trip processing of Avro data with
> ENUM fields through Flink.
> h3. Scope
> This fix addresses the case when:
> * Using `avro-confluent` format
> * Schema is provided via `avro-confluent.schema` configuration or schema
> registry
> * Source field is VARCHAR (which may represent an ENUM from upstream Avro
> data)
> * Target Avro field is ENUM
> {*}Note{*}: This does NOT cover the case where the schema is auto-derived
> from the Flink table DDL (which wouldn't include ENUM types anyway since
> Flink SQL doesn't support them).
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)