Technoboy- opened a new issue #13701:
URL: https://github.com/apache/pulsar/issues/13701
## Motivation
Currently, producers could produce byte array msg to a topic with schema
info without any error.
Demo below :
```
# Create topic with schema
Producer<User> producer = pulsarClient
.newProducer(Schema.JSON(User.class))
.producerName("p1")
.topic("persistent://public/default/test")
.create();
# Consume
Consumer<User> consumer = pulsarClient
.newConsumer(Schema.AVRO(User.class))
.topic("persistent://public/default/test")
.subscriptionName("sub-2")
.subscriptionType(SubscriptionType.Shared).subscribe();
while(true){
Message<User> message = consumer.receive();
System.out.println("received msg : " + message.getValue());
}
```
If we produce byte array msg to the topic, it succeeds.
```
bin/pulsar-client produce -n 5 -m "hello" test
```
But the consume may face the below error :
```
Exception in thread "main"
com.google.common.util.concurrent.UncheckedExecutionException:
org.apache.commons.lang3.SerializationException: Failed at fetching schema info
for EMPTY
at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2051)
at com.google.common.cache.LocalCache.get(LocalCache.java:3951)
at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
at
com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4935)
at
org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.getSchemaReader(AbstractMultiVersionReader.java:83)
at
org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.read(AbstractMultiVersionReader.java:90)
at
org.apache.pulsar.client.impl.schema.AbstractStructSchema.decode(AbstractStructSchema.java:67)
at
org.apache.pulsar.client.impl.MessageImpl.decode(MessageImpl.java:470)
at
org.apache.pulsar.client.impl.MessageImpl.getValue(MessageImpl.java:448)
at com.tboy.learn.pulsar.sql.TestConsumer.main(TestConsumer.java:24)
Caused by: org.apache.commons.lang3.SerializationException: Failed at
fetching schema info for EMPTY
at
org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.getSchemaInfoByVersion(AbstractMultiVersionReader.java:129)
at
org.apache.pulsar.client.impl.schema.reader.MultiVersionAvroReader.loadReader(MultiVersionAvroReader.java:47)
at
org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader$1.load(AbstractMultiVersionReader.java:52)
at
org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader$1.load(AbstractMultiVersionReader.java:49)
at
com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529)
at
com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278)
at
com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155)
at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045)
```
This is because it can't find schema to decode.
## So it's better to refuse to produce in this case.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]