nicolo-paganin edited a comment on issue #5454: mysql JDBC Sink - consumer error
URL: https://github.com/apache/pulsar/issues/5454#issuecomment-586734562
 
 
   @tuteng yes I am running a pulsar standalone version 2.5.0 from binaries 
(extracting the binaries from the binary release 
[here](https://www.apache.org/dyn/mirrors/mirrors.cgi?action=download&filename=pulsar/pulsar-2.5.0/apache-pulsar-2.5.0-bin.tar.gz).
 
   I am running it in MacOS 10.15.3 with python 3.7.3 and pulsar-client 2.5.0 
installed with pip globally.
   
   The pulsar function is copying the value from one topic with an Avro schema 
directly to another topic, something like this (I tried to simplify my pulsar 
function removing implementations that are not needed in the example):
   
   ```python
   from pulsar import Function, SerDe
   from pulsar.schema import *
   import fastavro
   import io
   
   
   class SensorSchema(Record):
       value = Double(required=True)
       timestamp = Long(required=True)
   
   class AvroSerDe(SerDe):
       def __init__(self):
           pass
   
       def serialize(self, object):
           return object
   
   class BaseFunction(Function):
       def __init__(self):
           self.parsed_schema = SensorSchema.schema()  # 
fastavro.parse_schema(schema)
   
       def process(self, input, context):
           buffer = io.BytesIO(input)
           d = fastavro.schemaless_reader(buffer, self.parsed_schema)
           newValue = self.customProcess(d, context)
           outbuffer = io.BytesIO()
           fastavro.schemaless_writer(outbuffer, self.parsed_schema, newValue)
           return outbuffer.getvalue()
   
       def customProcess(self, input, context):
           return {}
   
   
   class CopyFunction(BaseFunction):
       def customProcess(self, input, context):
   
           output = {'value': input['value'], 'timestamp': input['timestamp']}
           return output
   
   ```
   I am creating the pulsar function using pulsar-admin in this way
   
   ```
   ./pulsar-admin functions create \
     --py copyFunction.py \
     --classname copyFunction.copyFunction --schema-type SensorSchema 
--output-serde-classname copyFunction.AvroSerDe \
     --tenant public \
     --namespace default \
     --name copyFunction1 \
     --inputs persistent://public/default/topic1 \
     --output persistent://public/default/topic2
   ```
   
   The problem is the following:
   
   1. the input topic (topic1) is created from a Java producer. At the 
beginning my pulsar function is not started so my sink is copying to a DB only 
topic1 and all is working well
   2. I start the pulsar function, it is creating topic2 correctly, also if I 
am starting a consumer on topic2 I see correctly the values generated by the 
pulsar function
   3. The sink now is not working showing this :
   
   `org.apache.pulsar.functions.instance.JavaInstanceRunnable - 
[public/default/pulsar-mysql-jdbc-sink:0] Uncaught exception in Java Instance 
java.lang.NullPointerException: null at 
com.google.common.base.Preconditions.checkNotNull(Preconditions.java:770) 
~[com.google.guava-guava-21.0.jar:?] at 
com.google.common.cache.LocalCache.get(LocalCache.java:4153) 
~[com.google.guava-guava-21.0.jar:?] at 
com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4158) 
~[com.google.guava-guava-21.0.jar:?] at 
com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:5147) 
~[com.google.guava-guava-21.0.jar:?] at 
org.apache.pulsar.client.impl.schema.StructSchema.decode(StructSchema.java:94) 
~`
   
   I understood that this should be fixed in 2.5.0 by in my case the fix seems 
to not work. 
   I can give to you any other information in case.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to