This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new be57e9a79b0 [fix][schema]Fix AutoProduceBytes producer can not be 
created to a topic with ProtoBuf schema (#19767)
be57e9a79b0 is described below

commit be57e9a79b0b6fd40ef02414df7238050b7a885d
Author: Xiangying Meng <[email protected]>
AuthorDate: Sat Apr 1 19:21:58 2023 +0800

    [fix][schema]Fix AutoProduceBytes producer can not be created to a topic 
with ProtoBuf schema (#19767)
    
    ### Motivation
    1. There is a topic1 with a protobuf schema.
    2. Create a producer1 with AutoProduceBytes schema.
    3. The producer1 will be created failed because the way to get the schema 
of protobuf schema is not supported. ### ###
    ### Modification
    Because the Protobuf schema is implemented from the AvroBaseStructSchema. 
So we add a way to get Protobuf schema just like the AvroSchema.
---
 .../test/java/org/apache/pulsar/schema/SchemaTest.java  | 17 +++++++++++++++++
 .../org/apache/pulsar/client/impl/PulsarClientImpl.java |  9 ++++++++-
 2 files changed, 25 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index d99496f4a96..0ec72b2ef47 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -70,6 +70,7 @@ import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
+import org.apache.pulsar.client.impl.schema.ProtobufSchema;
 import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
 import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
 import org.apache.pulsar.client.impl.schema.writer.AvroWriter;
@@ -113,6 +114,22 @@ public class SchemaTest extends 
MockedPulsarServiceBaseTest {
         super.internalCleanup();
     }
 
+    @Test
+    public void testGetSchemaWhenCreateAutoProduceBytesProducer() throws 
Exception{
+        final String tenant = PUBLIC_TENANT;
+        final String namespace = "test-namespace-" + randomName(16);
+        final String topic = tenant + "/" + namespace + "/test-getSchema";
+        admin.namespaces().createNamespace(
+                tenant + "/" + namespace,
+                Sets.newHashSet(CLUSTER_NAME)
+        );
+
+        
ProtobufSchema<org.apache.pulsar.client.api.schema.proto.Test.TestMessage> 
protobufSchema =
+                
ProtobufSchema.of(org.apache.pulsar.client.api.schema.proto.Test.TestMessage.class);
+        pulsarClient.newProducer(protobufSchema).topic(topic).create();
+        
pulsarClient.newProducer(org.apache.pulsar.client.api.Schema.AUTO_PRODUCE_BYTES()).topic(topic).create();
+    }
+
     @Test
     public void testMultiTopicSetSchemaProvider() throws Exception {
         final String tenant = PUBLIC_TENANT;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index ebc11f44ce7..f37709f3d84 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -70,6 +70,7 @@ import 
org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
 import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
 import org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
 import 
org.apache.pulsar.client.impl.schema.generic.MultiVersionSchemaInfoProvider;
 import org.apache.pulsar.client.impl.transaction.TransactionBuilderImpl;
 import 
org.apache.pulsar.client.impl.transaction.TransactionCoordinatorClientImpl;
@@ -81,6 +82,7 @@ import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.topics.TopicList;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
@@ -350,7 +352,12 @@ public class PulsarClientImpl implements PulsarClient {
             return lookup.getSchema(TopicName.get(conf.getTopicName()))
                     .thenCompose(schemaInfoOptional -> {
                         if (schemaInfoOptional.isPresent()) {
-                            
autoProduceBytesSchema.setSchema(Schema.getSchema(schemaInfoOptional.get()));
+                            SchemaInfo schemaInfo = schemaInfoOptional.get();
+                            if (schemaInfo.getType() == SchemaType.PROTOBUF) {
+                                autoProduceBytesSchema.setSchema(new 
GenericAvroSchema(schemaInfo));
+                            } else {
+                                
autoProduceBytesSchema.setSchema(Schema.getSchema(schemaInfo));
+                            }
                         } else {
                             autoProduceBytesSchema.setSchema(Schema.BYTES);
                         }

Reply via email to