GitHub user sahatsawats closed a discussion: Handling KeyValue AVRO Messages in
Pulsar with Incompatible Schema Exception
## Environment
- DataStax Enterprise 6.8.42 (Cassandra)
- Luna Streaming (Apache Pulsar version 2.7.2)
## Problem Statement
I want to create a project that will received the CDC messages from Cassandra
Cluster to Pulsar then perform some aggregation with time-window function.
`[CDC Agent] --> [Events-topic] --> [Source Connector] --> [Data-topic] -->
**[Function]** --> [Output-topic]`
The Java function will received the KeyValue type with key and value are AVRO
following the latest schema from the `Data-topic`. **(The latest schema
generated by `--st auto_consume` option)
```
Latest Data-topic Schema:
{
"version": 1,
"schemaInfo": {
"name": "data-exampleio.demo",
"schema": {
"key": {
"name": "demo",
"schema": {
"type": "record",
"name": "demo",
"namespace": "exampleio",
"doc": "Table exampleio.demo",
"fields": [
{
"name": "id",
"type": "int"
}
]
},
"type": "AVRO",
"timestamp": 0,
"properties": {}
},
"value": {
"name": "demo",
"schema": {
"type": "record",
"name": "demo",
"namespace": "exampleio",
"doc": "Table exampleio.demo",
"fields": [
{
"name": "amount",
"type": [
"null",
{
"type": "record",
"name": "cql_decimal",
"namespace": "",
"fields": [
{
"name": "bigint",
"type": "bytes"
},
{
"name": "scale",
"type": "int"
}
],
"logicalType": "cql_decimal"
}
]
},
{
"name": "context",
"type": [
"null",
"string"
]
}
]
},
"type": "AVRO",
"timestamp": 0,
"properties": {}
}
},
"type": "KEY_VALUE",
"timestamp": 1709620937104,
"properties": {
"key.schema.name": "demo",
"key.schema.properties": "{}",
"key.schema.type": "AVRO",
"kv.encoding.type": "SEPARATED",
"value.schema.name": "demo",
"value.schema.properties": "{}",
"value.schema.type": "AVRO"
}
}
}
```
However, my function is not working, after review the function log, the problem
is the function-consumer is received the messages in bytes format and
incompatible with the existing schema.
> 2024-03-05T12:53:18,038+0000 [public/default/demo_sumbyId-0] ERROR
> org.apache.pulsar.functions.instance.JavaInstanceRunnable -
> [public/default/demo_sumbyId:0] Uncaught exception in Java Instance
> java.util.concurrent.CompletionException:
> org.apache.pulsar.client.api.PulsarClientException$IncompatibleSchemaException:
> {"errorMsg":"Key schemas or Value schemas are different schema type, from
> key schema type is AVRO and to key schema is BYTES, from value schema is AVRO
> and to value schema is BYTES","reqId":1870435977707154767,
> "remote":"Censored", "local":"Censored"}
In Client-CLI, I cannot receive the structure messages without the --st
auto_consume even after the schema is registered.
`with --st auto_consume:`
> key:[MA==], properties:[writetime=1709646681322536], content:{key={id=24},
> value={amount={scale=2, bigint=java.nio.HeapByteBuffer[pos=0 lim=3 cap=3]},
> context=exampleagain}}
`without --st auto_consume:`
> key:[Mg==], properties:[writetime=1709647768862639], content:??exampleagain
Could you help identify the problem or what I missing please?
GitHub link: https://github.com/apache/pulsar/discussions/22207
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]