nicolo-paganin edited a comment on issue #5454: mysql JDBC Sink - consumer error URL: https://github.com/apache/pulsar/issues/5454#issuecomment-586734562 @tuteng yes I am running a pulsar standalone version 2.5.0 from binaries (extracting the binaries from the binary release [here](https://www.apache.org/dyn/mirrors/mirrors.cgi?action=download&filename=pulsar/pulsar-2.5.0/apache-pulsar-2.5.0-bin.tar.gz). I am running it in MacOS 10.15.3 with python 3.7.3 and pulsar-client 2.5.0 installed with pip globally. The connector is running in localmode since as written [here](https://github.com/apache/pulsar/issues/6323) in pulsar 2.5.0 I cannot run any connector (to be noted that it is working fine in pulsar 2.4.2) The pulsar function is copying the value from one topic with an Avro schema directly to another topic, something like this (I tried to simplify my pulsar function removing implementations that are not needed in the example): ```python from pulsar import Function, SerDe from pulsar.schema import * import fastavro import io class SensorSchema(Record): value = Double(required=True) timestamp = Long(required=True) class AvroSerDe(SerDe): def __init__(self): pass def serialize(self, object): return object class BaseFunction(Function): def __init__(self): self.parsed_schema = SensorSchema.schema() # fastavro.parse_schema(schema) def process(self, input, context): buffer = io.BytesIO(input) d = fastavro.schemaless_reader(buffer, self.parsed_schema) newValue = self.customProcess(d, context) outbuffer = io.BytesIO() fastavro.schemaless_writer(outbuffer, self.parsed_schema, newValue) return outbuffer.getvalue() def customProcess(self, input, context): return {} class CopyFunction(BaseFunction): def customProcess(self, input, context): output = {'value': input['value'], 'timestamp': input['timestamp']} return output ``` I am creating the pulsar function using pulsar-admin in this way ``` ./pulsar-admin functions create \ --py copyFunction.py \ --classname copyFunction.copyFunction --schema-type SensorSchema --output-serde-classname copyFunction.AvroSerDe \ --tenant public \ --namespace default \ --name copyFunction1 \ --inputs persistent://public/default/topic1 \ --output persistent://public/default/topic2 ``` The problem is the following: 1. the input topic (topic1) is created from a Java producer. At the beginning my pulsar function is not started so my sink is copying to a DB only topic1 and all is working well 2. I start the pulsar function, it is creating topic2 correctly, also if I am starting a consumer on topic2 I see correctly the values generated by the pulsar function 3. The sink now is not working showing this : `org.apache.pulsar.functions.instance.JavaInstanceRunnable - [public/default/pulsar-mysql-jdbc-sink:0] Uncaught exception in Java Instance java.lang.NullPointerException: null at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:770) ~[com.google.guava-guava-21.0.jar:?] at com.google.common.cache.LocalCache.get(LocalCache.java:4153) ~[com.google.guava-guava-21.0.jar:?] at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4158) ~[com.google.guava-guava-21.0.jar:?] at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:5147) ~[com.google.guava-guava-21.0.jar:?] at org.apache.pulsar.client.impl.schema.StructSchema.decode(StructSchema.java:94) ~` I understood that this should be fixed in 2.5.0 by in my case the fix seems to not work. I can give to you any other information in case.
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
