nicolo-paganin commented on issue #5454: mysql JDBC Sink - consumer error
URL: https://github.com/apache/pulsar/issues/5454#issuecomment-586734562
 
 
   @tuteng yes I am running a pulsar standalone version 2.5.0 from binaries 
(extracting the binaries from the binary release 
[here](https://www.apache.org/dyn/mirrors/mirrors.cgi?action=download&filename=pulsar/pulsar-2.5.0/apache-pulsar-2.5.0-bin.tar.gz).
 
   I am running it in MacOS 10.15.3 with python 3.7.3 and pulsar-client 2.5.0 
installed with pip globally.
   
   The pulsar function is copying the value from one topic with an Avro schema 
directly to another topic, something like this (I tried to simplify my pulsar 
function removing implementations that are not needed in the example):
   
   ```python
   from pulsar import Function, SerDe
   from pulsar.schema import *
   import fastavro
   import io
   import json
   import ast
   
   
   class SensorSchema(Record):
       value = Double(required=True)
       timestamp = Long(required=True)
   
   class AvroSerDe(SerDe):
       def __init__(self):
           pass
   
       def serialize(self, object):
           return object
   
   class BaseFunction(Function):
       def __init__(self):
           self.parsed_schema = SensorSchema.schema()  # 
fastavro.parse_schema(schema)
   
       def process(self, input, context):
           buffer = io.BytesIO(input)
           d = fastavro.schemaless_reader(buffer, self.parsed_schema)
           newValue = self.customProcess(d, context)
           outbuffer = io.BytesIO()
           fastavro.schemaless_writer(outbuffer, self.parsed_schema, newValue)
           return outbuffer.getvalue()
   
       def customProcess(self, input, context):
           return {}
   
   
   class CopyFunction(BaseFunction):
       def customProcess(self, input, context):
   
           output = {'value': input['value'], 'timestamp': input['timestamp']}
           return output
   
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to