justintime4tea opened a new issue, #24966:
URL: https://github.com/apache/pulsar/issues/24966

   ### Search before reporting
   
   - [x] I searched in the [issues](https://github.com/apache/pulsar/issues) 
and found nothing similar.
   
   
   ### Read release policy
   
   - [x] I understand that [unsupported 
versions](https://pulsar.apache.org/contribute/release-policy/#supported-versions)
 don't get bug fixes. I will attempt to reproduce the issue on a supported 
version of Pulsar client and Pulsar broker.
   
   
   ### User environment
   
   - K8s via apachepulsar/pulsar Helm chart
   - Java, OS, etc... are all determined by Apache Pulsar Helm chart / 
container specs
   
   
   ### Issue Description
   
   Messages published via Pulsar REST endpoint do not have schema properly 
associated with the message/payload. This is evident when using the 
"bin/pulsar-consumer" client but more importantly this affects sink connectors 
like JDBC. Documentation states you should be able to supply either a schema 
version _or_ an entire schema definition but neither seem to work as intended 
(see repro below). 
   
   * [Pulsar Rest 
Docs](https://pulsar.apache.org/docs/4.1.x/client-libraries-rest/)
   
   ## Use case
   We want to publish messages to Pulsar HTTP endpoint and have them consumed 
by a Pulsar JDBC sink finally being written to a database. This doesn't work 
without proper schema support by the REST endpoint. To be clear - this is an 
issue with the REST endpoint itself and is not exclusive to JDBC setups, notice 
the proper output/handling via "bin/pulsar-client" when consuming the same 
message produced by the node JS client VS REST.
   
   ### Error messages
   
   ```text
   
   ```
   
   ### Reproducing the issue
   
   ## Reproduction
   Reproduction has been done by "exec-ing" into the Pulsar broker pod like so:
   ```shell
   kubectl exec -it pulsar-broker-0 -- bash
   ```
   
   All commands below can be ran from within the official Pulsar deployment 
using "kubectl exec" - this is the official Apache Pulsar helm distribution.
   
   ### Setup tenant, namespace, topic, and schema
   ```shell
   # Create test tenant
   bin/pulsar-admin tenants create test-tenant
   
   # Create test namespace
   bin/pulsar-admin namespaces create test-tenant/test-namespace
   
   # Create test topic
   bin/pulsar-admin topics create 
persistent://test-tenant/test-namespace/test-topic
   
   # Set schema for topic
   curl -X POST \
     
http://localhost:8080/admin/v2/schemas/test-tenant/test-namespace/test-topic/schema
 \
     -H "Content-Type: application/json" \
     -d 
'{"type":"JSON","schema":"{\"type\":\"record\",\"name\":\"test_schema\",\"fields\":[{\"name\":\"firstName\",\"type\":\"string\"}]}"}'
   
   # Confirm schema is associated with topic
   bin/pulsar-admin schemas get 
persistent://test-tenant/test-namespace/test-topic
   
   # Sample output:
   #
   #  {
   #  "version": 1,
   #  "schemaInfo": {
   #    "name": "test-topic",
   #    "schema": {
   #      "type": "record",
   #      "name": "test_schema",
   #      "fields": [
   #        {
   #          "name": "firstName",
   #          "type": "string"
   #        }
   #      ]
   #    },
   #    "type": "JSON",
   #    "timestamp": 1762811737314,
   #    "properties": {}
   #  }
   # }
   ```
   
   ### Testing with simple "bin/pulsar-client"
   
   ```shell
   # In one terminal - run consumer - notice the schema associated with the 
topic in the CLI output
   bin/pulsar-client consume \
     persistent://test-tenant/test-namespace/test-topic \
     -s verify-sub -n 1 --schema-type auto_consume
   
   # From a different terminal - post a message to the Pulsar REST API
   curl -X POST \
     
http://localhost:8080/topics/persistent/test-tenant/test-namespace/test-topic \
     -H "Content-Type: application/json" \
     -d '{
       "schemaVersion": 0,
       "messages": [
         { "payload": "{\"firstName\":\"Justin\"}" }
       ]
     }'
   
   # Notice the output with type=class java.lang.String - Even when providing 
schema directly to POST
   #
   # publishTime:[1762816147331], eventTime:[0], key:[null], properties:[], 
content:{type=class java.lang.String, value={"firstName":"Justin"}}
   ```
   
   ### Testing with JDBC sink
   
   ```shell
   # Create JDBC Sink
   ./bin/pulsar-admin sinks create \
     --name test-schema \
     --tenant test-tenant \
     --namespace billing \
     --inputs persistent://test-tenant/test-namespace/test-topic \
     --sink-type jdbc-postgres \
     --sink-config '{"userName": "USERNAME", "password": "PASSWORD", "jdbcUrl": 
"jdbc:postgresql://your-database-url:5432/db_name", "tableName": "test-schema"}'
   
   # Inspect logs and see that schema is recognized
   cat 
/pulsar/logs/functions/test-tenant/test-namespace/test-schema/test-schema-0.log
   
   # Send message via REST API (neither schemaVersion OR valueSchema work)
   curl -X POST \
     
http://localhost:8080/topics/persistent/test-tenant/test-namespace/test-topic \
     -H "Content-Type: application/json" \
     -d '{
       "schemaVersion": 0,
       "messages": [
         { "payload": "{\"firstName\":\"Justin\"}" }
       ]
     }'
   
   # See the failure messages in JDBC Sink logs
   cat 
/pulsar/logs/functions/test-tenant/test-namespace/test-schema/test-schema-0.log
   
   # java.lang.UnsupportedOperationException: Primitive schema is not 
supported: BYTES
   #         at 
org.apache.pulsar.io.jdbc.BaseJdbcAutoSchemaSink.createMutation(BaseJdbcAutoSchemaSink.java:201)
 ~[pulsar-io-jdbc-core-4.1.1.jar:?]
   #         at java.util.stream.ReferencePipeline$3$1.accept(Unknown Source) 
~[?:?]
   #         at java.util.LinkedList$LLSpliterator.forEachRemaining(Unknown 
Source) ~[?:?]
   #         at java.util.stream.AbstractPipeline.copyInto(Unknown Source) 
~[?:?]
   #         at java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown 
Source) ~[?:?]
   #         at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(Unknown 
Source) ~[?:?]
   #         at java.util.stream.AbstractPipeline.evaluate(Unknown Source) 
~[?:?]
   #         at java.util.stream.ReferencePipeline.collect(Unknown Source) 
~[?:?]
   #         at 
org.apache.pulsar.io.jdbc.JdbcAbstractSink.flush(JdbcAbstractSink.java:256) 
~[pulsar-io-jdbc-core-4.1.1.jar:?]
   #         at java.util.concurrent.Executors$RunnableAdapter.call(Unknown 
Source) ~[?:?]
   #         at java.util.concurrent.FutureTask.runAndReset(Unknown Source) 
~[?:?]
   #         at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
 Source) ~[?:?]
   #         at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
Source) ~[?:?]
   #         at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
Source) ~[?:?]
   #         at java.lang.Thread.run(Unknown Source) [?:?]
   ```
   
   ### Test using node "pulsar-client" 
   When using nodes "pulsar-client" the schema is properly associated with the 
message (though you have to use a weird casing for the "type" and use 
"schemaType" instead of "type" in the schemas definition... 0_0 This is 
properly recognized by Pulsar and the JDBC sink and records will be written to 
the database as expected.
   
   ```typescript
     const {Client} = require('pulsar-client')
   
     const client = new Client({
       serviceUrl: config.get('pulsar-client-service-url')
     })
   
     const producer = await client.createProducer({
       topic: 'persistent://test-tenant/test-namespace/test-topic'
     , accessMode: 'Shared'
     , schema: {
         schemaType: 'Json'
       , schema: JSON.stringify({
           type: 'record'
         , name: 'test_schema'
         , fields: [
             {name: 'firstName', type: 'string'}
           ]
         })
       }
     })
   
     await producer.send({
       data: Buffer.from(JSON.stringify({
         firstName: 'Justin'
       }))
     })
     await producer.flush()
   ```
   
   
   The output in the "bin/pulsar-client" consumer is different when consuming 
from the node-js sent message and it's clear that schema has been associated 
with the message properly:
   ```shell
   2025-11-10T22:52:44,654+0000 [main] INFO  
org.apache.pulsar.client.impl.schema.AutoConsumeSchema - Configure topic schema 
\x00\x00\x00\x00\x00\x00\x00\x00 for topic 
persistent://test-tenant/test-namespace/test-topic : 
{"type":"record","name":"IngressUsage","fields":[{"name":"firstName","type":"string"}]}
   2025-11-10T22:52:44,655+0000 [main] INFO  
org.apache.pulsar.client.impl.schema.generic.MultiVersionGenericAvroReader - 
Load schema reader for version(0), schema is : 
{"type":"record","name":"IngressUsage","fields":[{"name":"firstName","type":"string"}]}
   publishTime:[1762814949124], eventTime:[0], key:[null], properties:[], 
content:{firstName=Justin}
   ```
   
   ### Additional information
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [ ] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to