Kartik created CAMEL-20864:
------------------------------
Summary: Camel Kafka with confluent schema registry does not work
properly.
Key: CAMEL-20864
URL: https://issues.apache.org/jira/browse/CAMEL-20864
Project: Camel
Issue Type: Bug
Components: camel-kafka
Affects Versions: 3.22.2
Environment: Confluent Kafka
Camel-3.22.x
Java 17
Reporter: Kartik
Fix For: 3.22.3
Attachments: image-2024-06-12-11-05-12-616.png,
image-2024-06-12-11-06-33-915.png
In confluent kafka, we can register the topic against schema validation from
the schema registry. When configured the confluent document says either we
should have a pojo object defined in the code that is used for
serialization/deserialization *OR* a custom "ObjectNode" can be created from
their schema utils. Attaching the document below
[https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/serdes-json.html#sending-a-jsonnode-payload]
For our case, we have a different schema registered and can't have all the POJO
defined as schema registered at run time, so we are using the below code to
generate an object from the schema.
{code:java}
Map<String, Object> config = new HashMap<>();
config.put("basic.auth.credentials.source", "USER_INFO");
config.put("basic.auth.user.info", "<secret-key>");
config.put("auto.register.schemas", false);
config.put("use.latest.version", true);
CachedSchemaRegistryClient registryClient = new
CachedSchemaRegistryClient("<registry-url>", 10, config);
String schemaDoc =
registryClient.getLatestSchemaMetadata("topicTest-value").getSchema();
JsonSchema schema = new JsonSchema(schemaDoc);
ObjectMapper mapper = new ObjectMapper();
JsonNode jsonNode =
mapper.readTree("{\"myField1\":123,\"myField2\":123.32,\"myField3\":\"pqr\"}");
ObjectNode envelope = JsonSchemaUtils.envelope(schema, jsonNode);
from("timer://foo?fixedRate=true&period=60000")
.setBody(ExpressionBuilder.constantExpression(envelope))
.log("Sent new message")// Message to send
.to(kafkaEndpoint); {code}
If the "ObjectNode" payload is directly written using kafka-client library it
works. But when written using camel component, The "KafkaProducer" in camel
component does "isIterable" check and if true sends each value and this doesn't
work for confluent kafka as the custom
"{*}io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer{*}" expects a
whole object.
!image-2024-06-12-11-05-12-616.png|width=733,height=459!
The code in "
io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer expects whole
object.
!image-2024-06-12-11-06-33-915.png|width=906,height=438!
Basically, in simple words, The "envelope" object created is no longer the same
object but is iterated and values are iterated and sent independently resulting
in schema validation error.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)