Kartik created CAMEL-20864:
------------------------------

             Summary: Camel Kafka with confluent schema registry does not work 
properly.
                 Key: CAMEL-20864
                 URL: https://issues.apache.org/jira/browse/CAMEL-20864
             Project: Camel
          Issue Type: Bug
          Components: camel-kafka
    Affects Versions: 3.22.2
         Environment: Confluent Kafka

Camel-3.22.x

Java 17
            Reporter: Kartik
             Fix For: 3.22.3
         Attachments: image-2024-06-12-11-05-12-616.png, 
image-2024-06-12-11-06-33-915.png

In confluent kafka, we can register the topic against schema validation from 
the schema registry. When configured the confluent document says either we 
should have a pojo object defined in the code that is used for 
serialization/deserialization *OR* a custom "ObjectNode" can be created from 
their schema utils. Attaching the document below

[https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/serdes-json.html#sending-a-jsonnode-payload]
 

 

For our case, we have a different schema registered and can't have all the POJO 
defined as schema registered at run time, so we are using the below code to 
generate an object from the schema.
{code:java}
Map<String, Object> config = new HashMap<>();
config.put("basic.auth.credentials.source", "USER_INFO");
config.put("basic.auth.user.info", "<secret-key>");
config.put("auto.register.schemas", false);
config.put("use.latest.version", true);

CachedSchemaRegistryClient registryClient = new 
CachedSchemaRegistryClient("<registry-url>", 10, config);
String schemaDoc = 
registryClient.getLatestSchemaMetadata("topicTest-value").getSchema();
JsonSchema schema = new JsonSchema(schemaDoc);

ObjectMapper mapper = new ObjectMapper();
JsonNode jsonNode = 
mapper.readTree("{\"myField1\":123,\"myField2\":123.32,\"myField3\":\"pqr\"}");
ObjectNode envelope = JsonSchemaUtils.envelope(schema, jsonNode);

from("timer://foo?fixedRate=true&period=60000")
        .setBody(ExpressionBuilder.constantExpression(envelope))
        .log("Sent new message")// Message to send
        .to(kafkaEndpoint); {code}
If the "ObjectNode" payload is directly written using kafka-client library it 
works. But when written using camel component, The "KafkaProducer" in camel 
component does "isIterable" check and if true sends each value and this doesn't 
work for confluent kafka as the custom 
"{*}io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer{*}" expects a 
whole object.

!image-2024-06-12-11-05-12-616.png|width=733,height=459!

 

 The code in "

io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer expects whole 
object.

!image-2024-06-12-11-06-33-915.png|width=906,height=438!

 

Basically, in simple words, The "envelope" object created is no longer the same 
object but is iterated and values are iterated and sent independently resulting 
in schema validation error.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to