This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3da2365  [ISSUE 8832] Avro custom schema not working in consumer 
(#8939)
3da2365 is described below

commit 3da236503b71131023f5de9928ed499c7d1201ea
Author: Weijie Guo <[email protected]>
AuthorDate: Mon Dec 14 12:22:47 2020 +0800

    [ISSUE 8832] Avro custom schema not working in consumer (#8939)
    
    Fix #8832
    
    ### Modifications
    Make AvroSchema's clone method consider custom reader and writer.
---
 .../client/impl/BrokerClientIntegrationTest.java   | 39 +++++++++++++++++++++-
 .../pulsar/client/impl/schema/AvroSchema.java      |  6 +++-
 2 files changed, 43 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
index 9041134..2838293 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
@@ -36,6 +36,7 @@ import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
 import java.lang.reflect.Field;
+import java.nio.charset.StandardCharsets;
 import java.security.GeneralSecurityException;
 import java.util.ArrayList;
 import java.util.List;
@@ -858,8 +859,42 @@ public class BrokerClientIntegrationTest extends 
ProducerConsumerBase {
         consumer.close();
     }
 
+
+    @Test
+    public void testAvroSchemaProducerConsumerWithSpecifiedReaderAndWriter() 
throws PulsarClientException {
+        final String topicName = "persistent://my-property/my-ns/my-topic1";
+        TestMessageObject object = new TestMessageObject();
+        SchemaReader<TestMessageObject> reader = 
Mockito.mock(SchemaReader.class);
+        SchemaWriter<TestMessageObject> writer = 
Mockito.mock(SchemaWriter.class);
+        Mockito.when(reader.read(Mockito.any(byte[].class), 
Mockito.any(byte[].class))).thenReturn(object);
+        
Mockito.when(writer.write(Mockito.any(TestMessageObject.class))).thenReturn("fake
 data".getBytes(StandardCharsets.UTF_8));
+        SchemaDefinition<TestMessageObject> schemaDefinition = new 
SchemaDefinitionBuilderImpl<TestMessageObject>()
+                .withPojo(TestMessageObject.class)
+                .withSchemaReader(reader)
+                .withSchemaWriter(writer)
+                .build();
+        Schema<TestMessageObject> schema = Schema.AVRO(schemaDefinition);
+        PulsarClient client =  PulsarClient.builder()
+                .serviceUrl(lookupUrl.toString())
+                .build();
+
+        try(Producer<TestMessageObject> producer = 
client.newProducer(schema).topic(topicName).create();
+            Consumer<TestMessageObject> consumer =
+                    
client.newConsumer(schema).topic(topicName).subscriptionName("my-subscriber-name").subscribe())
 {
+            assertNotNull(producer);
+            assertNotNull(consumer);
+            producer.newMessage().value(object).send();
+            TestMessageObject testObject = consumer.receive().getValue();
+            Assert.assertEquals(object.getValue(), testObject.getValue());
+            Mockito.verify(writer, Mockito.times(1)).write(Mockito.any());
+            Mockito.verify(reader, 
Mockito.times(1)).read(Mockito.any(byte[].class), Mockito.any(byte[].class));
+        } finally {
+            client.close();
+        }
+    }
+
     @Test
-    public void testProducerConsumerWithSpecifiedReaderAndWriter() throws 
PulsarClientException {
+    public void testJsonSchemaProducerConsumerWithSpecifiedReaderAndWriter() 
throws PulsarClientException {
         final String topicName = "persistent://my-property/my-ns/my-topic1";
         ObjectMapper mapper = new ObjectMapper();
         SchemaReader<TestMessageObject> reader = Mockito.spy(new 
JacksonJsonReader<>(mapper, TestMessageObject.class));
@@ -889,6 +924,8 @@ public class BrokerClientIntegrationTest extends 
ProducerConsumerBase {
 
             Mockito.verify(writer, Mockito.times(1)).write(Mockito.any());
             Mockito.verify(reader, 
Mockito.times(1)).read(Mockito.any(byte[].class));
+        } finally {
+            client.close();
         }
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
index 46227b8..d5a99f8 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
@@ -46,7 +46,7 @@ import static 
org.apache.pulsar.client.impl.schema.util.SchemaUtil.parseSchemaIn
 @Slf4j
 public class AvroSchema<T> extends AvroBaseStructSchema<T> {
     private static final Logger LOG = 
LoggerFactory.getLogger(AvroSchema.class);
-
+    private boolean isCustomReaderAndWriter;
     private ClassLoader pojoClassLoader;
 
     private AvroSchema(SchemaInfo schemaInfo, ClassLoader pojoClassLoader) {
@@ -61,6 +61,7 @@ public class AvroSchema<T> extends AvroBaseStructSchema<T> {
         super(schemaInfo);
         setReader(reader);
         setWriter(writer);
+        isCustomReaderAndWriter = true;
     }
 
     @Override
@@ -70,6 +71,9 @@ public class AvroSchema<T> extends AvroBaseStructSchema<T> {
 
     @Override
     public Schema<T> clone() {
+        if (isCustomReaderAndWriter) {
+            return new AvroSchema<>(reader, writer, schemaInfo);
+        }
         Schema<T> schema = new AvroSchema<>(schemaInfo, pojoClassLoader);
         if (schemaInfoProvider != null) {
             schema.setSchemaInfoProvider(schemaInfoProvider);

Reply via email to