[
https://issues.apache.org/jira/browse/CAMEL-20864?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Claus Ibsen updated CAMEL-20864:
--------------------------------
Summary: camel-kafka - With confluent schema registry does not work
properly. (was: Camel Kafka with confluent schema registry does not work
properly.)
> camel-kafka - With confluent schema registry does not work properly.
> --------------------------------------------------------------------
>
> Key: CAMEL-20864
> URL: https://issues.apache.org/jira/browse/CAMEL-20864
> Project: Camel
> Issue Type: Bug
> Components: camel-kafka
> Affects Versions: 3.22.2
> Environment: Confluent Kafka
> Camel-3.22.x
> Java 17
> Reporter: Kartik
> Priority: Minor
> Labels: camel-kafka
> Attachments: image-2024-06-12-11-05-12-616.png,
> image-2024-06-12-11-06-33-915.png
>
>
> In confluent kafka, we can register the topic against schema validation from
> the schema registry. When configured the confluent document says either we
> should have a pojo object defined in the code that is used for
> serialization/deserialization *OR* a custom "ObjectNode" can be created from
> their schema utils. Attaching the document below
> [https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/serdes-json.html#sending-a-jsonnode-payload]
>
>
> For our case, we have a different schema registered and can't have all the
> POJO defined as schema registered at run time, so we are using the below code
> to generate an object from the schema.
> {code:java}
> Map<String, Object> config = new HashMap<>();
> config.put("basic.auth.credentials.source", "USER_INFO");
> config.put("basic.auth.user.info", "<secret-key>");
> config.put("auto.register.schemas", false);
> config.put("use.latest.version", true);
> CachedSchemaRegistryClient registryClient = new
> CachedSchemaRegistryClient("<registry-url>", 10, config);
> String schemaDoc =
> registryClient.getLatestSchemaMetadata("topicTest-value").getSchema();
> JsonSchema schema = new JsonSchema(schemaDoc);
> ObjectMapper mapper = new ObjectMapper();
> JsonNode jsonNode =
> mapper.readTree("{\"myField1\":123,\"myField2\":123.32,\"myField3\":\"pqr\"}");
> ObjectNode envelope = JsonSchemaUtils.envelope(schema, jsonNode);
> from("timer://foo?fixedRate=true&period=60000")
> .setBody(ExpressionBuilder.constantExpression(envelope))
> .log("Sent new message")// Message to send
> .to(kafkaEndpoint); {code}
> If the "ObjectNode" payload is directly written using kafka-client library it
> works. But when written using camel component, The "KafkaProducer" in camel
> component does "isIterable" check and if true sends each value and this
> doesn't work for confluent kafka as the custom
> "{*}io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer{*}" expects
> a whole object.
> !image-2024-06-12-11-05-12-616.png|width=733,height=459!
>
> The code in "
> io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer expects whole
> object.
> !image-2024-06-12-11-06-33-915.png|width=906,height=438!
>
> Basically, in simple words, The "envelope" object created is no longer the
> same object but is iterated and values are iterated and sent independently
> resulting in schema validation error.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)