GitHub user sahatsawats closed a discussion: Handling KeyValue AVRO Messages in 
Pulsar with Incompatible Schema Exception

## Environment

- DataStax Enterprise 6.8.42 (Cassandra)
- Luna Streaming (Apache Pulsar version 2.7.2)

## Problem Statement
I want to create a project that will received the CDC messages from Cassandra 
Cluster to Pulsar then perform some aggregation with time-window function.

`[CDC Agent] --> [Events-topic] --> [Source Connector] --> [Data-topic] --> 
**[Function]** --> [Output-topic]`

The Java function will received the KeyValue type with key and value are AVRO 
following the latest schema from the `Data-topic`. **(The latest schema 
generated by `--st auto_consume` option)


```
Latest Data-topic Schema:
{
  "version": 1,
  "schemaInfo": {
    "name": "data-exampleio.demo",
    "schema": {
      "key": {
        "name": "demo",
        "schema": {
          "type": "record",
          "name": "demo",
          "namespace": "exampleio",
          "doc": "Table exampleio.demo",
          "fields": [
            {
              "name": "id",
              "type": "int"
            }
          ]
        },
        "type": "AVRO",
        "timestamp": 0,
        "properties": {}
      },
      "value": {
        "name": "demo",
        "schema": {
          "type": "record",
          "name": "demo",
          "namespace": "exampleio",
          "doc": "Table exampleio.demo",
          "fields": [
            {
              "name": "amount",
              "type": [
                "null",
                {
                  "type": "record",
                  "name": "cql_decimal",
                  "namespace": "",
                  "fields": [
                    {
                      "name": "bigint",
                      "type": "bytes"
                    },
                    {
                      "name": "scale",
                      "type": "int"
                    }
                  ],
                  "logicalType": "cql_decimal"
                }
              ]
            },
            {
              "name": "context",
              "type": [
                "null",
                "string"
              ]
            }
          ]
        },
        "type": "AVRO",
        "timestamp": 0,
        "properties": {}
      }
    },
    "type": "KEY_VALUE",
    "timestamp": 1709620937104,
    "properties": {
      "key.schema.name": "demo",
      "key.schema.properties": "{}",
      "key.schema.type": "AVRO",
      "kv.encoding.type": "SEPARATED",
      "value.schema.name": "demo",
      "value.schema.properties": "{}",
      "value.schema.type": "AVRO"
    }
  }
}
``` 
However, my function is not working, after review the function log, the problem 
is the function-consumer is received the messages in bytes format and 
incompatible with the existing schema.

> 2024-03-05T12:53:18,038+0000 [public/default/demo_sumbyId-0] ERROR 
> org.apache.pulsar.functions.instance.JavaInstanceRunnable - 
> [public/default/demo_sumbyId:0] Uncaught exception in Java Instance 
> java.util.concurrent.CompletionException: 
> org.apache.pulsar.client.api.PulsarClientException$IncompatibleSchemaException:
>  {"errorMsg":"Key schemas or Value schemas are different schema type, from 
> key schema type is AVRO and to key schema is BYTES, from value schema is AVRO 
> and to value schema is BYTES","reqId":1870435977707154767, 
> "remote":"Censored", "local":"Censored"}

In Client-CLI, I cannot receive the structure messages without the --st 
auto_consume even after the schema is registered.

`with --st auto_consume:`

> key:[MA==], properties:[writetime=1709646681322536], content:{key={id=24}, 
> value={amount={scale=2, bigint=java.nio.HeapByteBuffer[pos=0 lim=3 cap=3]}, 
> context=exampleagain}}

`without --st auto_consume:`

> key:[Mg==], properties:[writetime=1709647768862639], content:??exampleagain


Could you help identify the problem or what I missing please?


GitHub link: https://github.com/apache/pulsar/discussions/22207

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to