nicolo-paganin commented on issue #5454: mysql JDBC Sink - consumer error
URL: https://github.com/apache/pulsar/issues/5454#issuecomment-586914531
 
 
   @sijie I did the tests, following the results:
   
   **Using python producer**
   I am using this producer
   
   ```python
   import pulsar
   import time
   import datetime
   import calendar
   import time
   from pulsar.schema import *
   
   class SensorSchema(Record):
       value = Double(required=True)
       timestamp = Long(required=True)
       
   client = pulsar.Client('pulsar://localhost:6650')
   
   producer = client.create_producer('topic_python_producer', 
schema=AvroSchema(SensorSchema))
   
   i = 0
   while(True):
       i += 1
       message = SensorSchema(value=i, 
timestamp=calendar.timegm(time.gmtime())*1000)
       print(message)
       producer.send(message)
       time.sleep(1)
   
   client.close()
   ```
   
   The sink is working well, no error and I can see the data in the database
   
   **Using pulsar function**
   I am using this pulsar function, I removed all my customizations so I am 
using the released ones and I still have the error. So the problem is related 
only to pulsar functions
   
   ```python
   from pulsar import Function, SerDe
   from pulsar.schema import *
   import fastavro
   import io
   import json
   import ast
   
   class SensorSchema(Record):
       value = Double(required=True)
       timestamp = Long(required=True)
   
   
   class CopyFunction(Function):
       def __init__(self):
           self.parsed_schema = SensorSchema.schema()
   
       def process(self, input, context):
           buffer = io.BytesIO(input)
           d = fastavro.schemaless_reader(buffer, self.parsed_schema)
           output = {'value': d['value'], 'timestamp': d['timestamp']}
   
           outbuffer = io.BytesIO()
           fastavro.writer(outbuffer, self.parsed_schema, [output])
           return outbuffer.getvalue()
   ```
   
   I created my function in this way, so I am passing the `--schema-type` param.
   
   ```
   ./pulsar-admin functions create \
     --py copyFunction.py \
     --classname copyFunction.CopyFunction \
     --schema-type SensorSchema  \
     --tenant public \
     --namespace default \
     --name test_function1 \
     --inputs persistent://public/default/topic_java \
     --output persistent://public/default/topic_python
   ```
   
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to