Jun Qin created FLINK-30438: ------------------------------- Summary: The generated schema is not correct when using value.format debezium-avro-confluent Key: FLINK-30438 URL: https://issues.apache.org/jira/browse/FLINK-30438 Project: Flink Issue Type: Bug Affects Versions: 1.16.0 Reporter: Jun Qin
With the following code: {code:java} CREATE TABLE TEST( ID BIGINT, INTEGRATION_ID STRING, PRIMARY KEY(INTEGRATION_ID) NOT ENFORCED ) WITH( 'connector' = 'kafka', 'topic' = 'TEST', 'properties.bootstrap.servers' = 'broker:29092', 'properties.group.id' = 'TEST', 'key.format' = 'avro-confluent', 'key.fields' = 'INTEGRATION_ID', 'key.avro-confluent.url' = 'http://schema-registry:8081', 'value.format' = 'debezium-avro-confluent', 'value.debezium-avro-confluent.url' = 'http://schema-registry:8081', 'scan.startup.mode' = 'earliest-offset' ); {code} and this INSERT statement: {code:java} INSERT INTO TEST SELECT 1, '1'; {code} The schema we get in schema registry is: {code:java} [ "null", { "fields": [ { "default": null, "name": "before", "type": [ "null", { "fields": [ { "default": null, "name": "ID", "type": [ "null", "long" ] }, { "name": "INTEGRATION_ID", "type": "string" } ], "name": "record_before", "type": "record" } ] }, { "default": null, "name": "after", "type": [ "null", { "fields": [ { "default": null, "name": "ID", "type": [ "null", "long" ] }, { "name": "INTEGRATION_ID", "type": "string" } ], "name": "record_after", "type": "record" } ] }, { "default": null, "name": "op", "type": [ "null", "string" ] } ], "name": "record", "namespace": "org.apache.flink.avro.generated", "type": "record" } ] {code} The first 'null' in the schema does not look to be correct. Can you check and fix? -- This message was sent by Atlassian Jira (v8.20.10#820010)