This is an automated email from the ASF dual-hosted git repository.
xiangying pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new be57e9a79b0 [fix][schema]Fix AutoProduceBytes producer can not be
created to a topic with ProtoBuf schema (#19767)
be57e9a79b0 is described below
commit be57e9a79b0b6fd40ef02414df7238050b7a885d
Author: Xiangying Meng <[email protected]>
AuthorDate: Sat Apr 1 19:21:58 2023 +0800
[fix][schema]Fix AutoProduceBytes producer can not be created to a topic
with ProtoBuf schema (#19767)
### Motivation
1. There is a topic1 with a protobuf schema.
2. Create a producer1 with AutoProduceBytes schema.
3. The producer1 will be created failed because the way to get the schema
of protobuf schema is not supported. ### ###
### Modification
Because the Protobuf schema is implemented from the AvroBaseStructSchema.
So we add a way to get Protobuf schema just like the AvroSchema.
---
.../test/java/org/apache/pulsar/schema/SchemaTest.java | 17 +++++++++++++++++
.../org/apache/pulsar/client/impl/PulsarClientImpl.java | 9 ++++++++-
2 files changed, 25 insertions(+), 1 deletion(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index d99496f4a96..0ec72b2ef47 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -70,6 +70,7 @@ import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
+import org.apache.pulsar.client.impl.schema.ProtobufSchema;
import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
import org.apache.pulsar.client.impl.schema.writer.AvroWriter;
@@ -113,6 +114,22 @@ public class SchemaTest extends
MockedPulsarServiceBaseTest {
super.internalCleanup();
}
+ @Test
+ public void testGetSchemaWhenCreateAutoProduceBytesProducer() throws
Exception{
+ final String tenant = PUBLIC_TENANT;
+ final String namespace = "test-namespace-" + randomName(16);
+ final String topic = tenant + "/" + namespace + "/test-getSchema";
+ admin.namespaces().createNamespace(
+ tenant + "/" + namespace,
+ Sets.newHashSet(CLUSTER_NAME)
+ );
+
+
ProtobufSchema<org.apache.pulsar.client.api.schema.proto.Test.TestMessage>
protobufSchema =
+
ProtobufSchema.of(org.apache.pulsar.client.api.schema.proto.Test.TestMessage.class);
+ pulsarClient.newProducer(protobufSchema).topic(topic).create();
+
pulsarClient.newProducer(org.apache.pulsar.client.api.Schema.AUTO_PRODUCE_BYTES()).topic(topic).create();
+ }
+
@Test
public void testMultiTopicSetSchemaProvider() throws Exception {
final String tenant = PUBLIC_TENANT;
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index ebc11f44ce7..f37709f3d84 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -70,6 +70,7 @@ import
org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
import
org.apache.pulsar.client.impl.schema.generic.MultiVersionSchemaInfoProvider;
import org.apache.pulsar.client.impl.transaction.TransactionBuilderImpl;
import
org.apache.pulsar.client.impl.transaction.TransactionCoordinatorClientImpl;
@@ -81,6 +82,7 @@ import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.topics.TopicList;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
@@ -350,7 +352,12 @@ public class PulsarClientImpl implements PulsarClient {
return lookup.getSchema(TopicName.get(conf.getTopicName()))
.thenCompose(schemaInfoOptional -> {
if (schemaInfoOptional.isPresent()) {
-
autoProduceBytesSchema.setSchema(Schema.getSchema(schemaInfoOptional.get()));
+ SchemaInfo schemaInfo = schemaInfoOptional.get();
+ if (schemaInfo.getType() == SchemaType.PROTOBUF) {
+ autoProduceBytesSchema.setSchema(new
GenericAvroSchema(schemaInfo));
+ } else {
+
autoProduceBytesSchema.setSchema(Schema.getSchema(schemaInfo));
+ }
} else {
autoProduceBytesSchema.setSchema(Schema.BYTES);
}