dhinesherode91 commented on issue #8429:
URL: https://github.com/apache/pulsar/issues/8429#issuecomment-867272525
Hey,
I am facing a similar issue.
I am running pulsar 2.7.1 and I have a topic with AVRO schema where my
producers push avro messages and consumer receives and process the avro data.
My consumer application receives the message through pulsar client's
MessageListener. All my topic messages are tagged with the latest avro schema
version. And I could see that schema in the Pulsar cluster's SchemaRegistry.
But I am facing the following issue when my consumer application starts and
consumes message.
`org.apache.pulsar.shade.com.google.common.util.concurrent.UncheckedExecutionException:
org.apache.pulsar.shade.org.apache.avro.SchemaParseException: Cannot parse
<null> schema
at
org.apache.pulsar.shade.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2051)
at
org.apache.pulsar.shade.com.google.common.cache.LocalCache.get(LocalCache.java:3951)
at
org.apache.pulsar.shade.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
at
org.apache.pulsar.shade.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4935)
at
org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.read(AbstractMultiVersionReader.java:86)
at
org.apache.pulsar.client.impl.schema.AbstractStructSchema.decode(AbstractStructSchema.java:60)
at
org.apache.pulsar.client.impl.MessageImpl.getValue(MessageImpl.java:301)
at
org.apache.pulsar.client.impl.TopicMessageImpl.getValue(TopicMessageImpl.java:143)
at
com.kohia.galaxy.listener.MessageListenerImpl.received(MessageListenerImpl.kt:20)
at
org.apache.pulsar.client.impl.ConsumerBase.callMessageListener(ConsumerBase.java:882)
at
org.apache.pulsar.client.impl.ConsumerBase.lambda$triggerListener$5(ConsumerBase.java:862)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at
org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.pulsar.shade.org.apache.avro.SchemaParseException:
Cannot parse <null> schema
at
org.apache.pulsar.shade.org.apache.avro.Schema.parse(Schema.java:1597)
at
org.apache.pulsar.shade.org.apache.avro.Schema$Parser.parse(Schema.java:1396)
at
org.apache.pulsar.shade.org.apache.avro.Schema$Parser.parse(Schema.java:1384)
at
org.apache.pulsar.client.impl.schema.util.SchemaUtil.parseAvroSchema(SchemaUtil.java:46)
at
org.apache.pulsar.client.impl.schema.reader.MultiVersionAvroReader.loadReader(MultiVersionAvroReader.java:53)
at
org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader$1.load(AbstractMultiVersionReader.java:52)
at
org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader$1.load(AbstractMultiVersionReader.java:49)
at
org.apache.pulsar.shade.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529)
at
org.apache.pulsar.shade.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278)
at
org.apache.pulsar.shade.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155)
at
org.apache.pulsar.shade.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045)
... 18 common frames omitted`
The above exception occurs for all the message that is received by the
consumer.
But when I restart my consumer application, it starts working fine and the
above exception is not occurring.
This issue occurs randomly when I restart my consumer application and when I
again restarted it works fine and all the message are processed properly.
Here is my subcription code,
`pulsarClient
.newConsumer(AvroSchema.of(MyMetric::class))
.topic(topicName).subscriptionName(subscriptionName)
.subscriptionType(SubscriptionType.Shared)
.receiverQueueSize(localQueueMsgCount)
.messageListener(myMessageListener)
.subscribe()`
Here is my Listener code(short version),
`import org.apache.pulsar.client.api.Consumer
import org.apache.pulsar.client.api.Message
import org.apache.pulsar.client.api.MessageListener
import java.net.InetAddress
class MyMessageListener<T>() : MessageListener<T> {
override fun received(consumer: Consumer<T>, msg: Message<T>) {
try {
process(msg.value)
consumer.acknowledge(msg)
} catch (ex: Exception) {
consumer.negativeAcknowledge(msg)
}
}
fun process(metric:MyMetric){}
}`
When I debug, I could find the following log at the time when my consumer
application is working perfectly.
`Load schema reader for version(2),
schema is :
{"type":"record","name":"MyMetric","namespace":"com.example","fields":[{"name":"assetId","type":"long"},{"name":"utctime","type":{"type":"string","avro.java.string":"String"}}]},
schemaInfo: {
"name": "mytenant/mynamesapce/mytopic",
"schema": {
"type": "record",
"name": "MyMetric",
"namespace": "com.example",
"fields": [
{
"name": "assetId",
"type": "long"
},
{
"name": "utctime",
"type": {
"type": "string",
"avro.java.string": "String"
}
}
]
},
"type": "AVRO",
"properties": {
"__alwaysAllowNull": "true",
"__jsr310ConversionEnabled": "false"
}
}`
But at the time when my consumer application works weird, I could find the
following log which doesn't have any schema info,
`Load schema reader for version(2),
schema is : ,
schemaInfo: {
"name": "",
"schema": "",
"type": "NONE",
"properties": {}
}`
Can u guys please help me resolve this issue. We are already in production
and we find it diffficult to restart our consumer applications each and every
time when this issue occurs.
@robshep @congbobo184 Did you find any root cause for this issue or any way
to fix it, if so kindly share so that it may help us.
Eagerly waiting for your inputs. :)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]