nicolo-paganin edited a comment on issue #5454: mysql JDBC Sink - consumer error URL: https://github.com/apache/pulsar/issues/5454#issuecomment-586914531 @sijie I did the tests, following the results: **Using python producer** I am using this producer ```python import pulsar import time import datetime import calendar import time from pulsar.schema import * class SensorSchema(Record): value = Double(required=True) timestamp = Long(required=True) client = pulsar.Client('pulsar://localhost:6650') producer = client.create_producer('topic_python_producer', schema=AvroSchema(SensorSchema)) i = 0 while(True): i += 1 message = SensorSchema(value=i, timestamp=calendar.timegm(time.gmtime())*1000) print(message) producer.send(message) time.sleep(1) client.close() ``` The sink is working well, no error and I can see the data in the database **Using pulsar function** I am using this pulsar function, I removed all my customizations so I am using the released ones and I still have the error. So the problem is related only to pulsar functions ```python from pulsar import Function, SerDe from pulsar.schema import * import fastavro import io import json import ast class SensorSchema(Record): value = Double(required=True) timestamp = Long(required=True) class CopyFunction(Function): def __init__(self): self.parsed_schema = SensorSchema.schema() def process(self, input, context): buffer = io.BytesIO(input) d = fastavro.schemaless_reader(buffer, self.parsed_schema) output = {'value': d['value'], 'timestamp': d['timestamp']} outbuffer = io.BytesIO() fastavro.schemaless_writer(outbuffer, self.parsed_schema, output) return outbuffer.getvalue() ``` I created my function in this way, so I am passing the `--schema-type` param. ``` ./pulsar-admin functions create \ --py copyFunction.py \ --classname copyFunction.CopyFunction \ --schema-type SensorSchema \ --tenant public \ --namespace default \ --name test_function1 \ --inputs persistent://public/default/topic_java \ --output persistent://public/default/topic_python ``` I don't know how that `--schema-type` param is working: I am passing the schema name of another topic. In the logs it seems that it is finding it and seems to be associated to the topic since if I run this command `pulsar-admin schemas get topic_python` this is the output. Moreover if run any consumer in that topic I can read the values without problems ``` { "version": 0, "schemaInfo": { "name": "topic_python", "schema": { "type": "record", "name": "SensorSchema", "namespace": "it.oncode.argo.api.actors", "fields": [ { "name": "value", "type": "double" }, { "name": "timestamp", "type": "long" } ] }, "type": "AVRO", "properties": {} } } ```
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
