This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 3da2365 [ISSUE 8832] Avro custom schema not working in consumer
(#8939)
3da2365 is described below
commit 3da236503b71131023f5de9928ed499c7d1201ea
Author: Weijie Guo <[email protected]>
AuthorDate: Mon Dec 14 12:22:47 2020 +0800
[ISSUE 8832] Avro custom schema not working in consumer (#8939)
Fix #8832
### Modifications
Make AvroSchema's clone method consider custom reader and writer.
---
.../client/impl/BrokerClientIntegrationTest.java | 39 +++++++++++++++++++++-
.../pulsar/client/impl/schema/AvroSchema.java | 6 +++-
2 files changed, 43 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
index 9041134..2838293 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
@@ -36,6 +36,7 @@ import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.lang.reflect.Field;
+import java.nio.charset.StandardCharsets;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.List;
@@ -858,8 +859,42 @@ public class BrokerClientIntegrationTest extends
ProducerConsumerBase {
consumer.close();
}
+
+ @Test
+ public void testAvroSchemaProducerConsumerWithSpecifiedReaderAndWriter()
throws PulsarClientException {
+ final String topicName = "persistent://my-property/my-ns/my-topic1";
+ TestMessageObject object = new TestMessageObject();
+ SchemaReader<TestMessageObject> reader =
Mockito.mock(SchemaReader.class);
+ SchemaWriter<TestMessageObject> writer =
Mockito.mock(SchemaWriter.class);
+ Mockito.when(reader.read(Mockito.any(byte[].class),
Mockito.any(byte[].class))).thenReturn(object);
+
Mockito.when(writer.write(Mockito.any(TestMessageObject.class))).thenReturn("fake
data".getBytes(StandardCharsets.UTF_8));
+ SchemaDefinition<TestMessageObject> schemaDefinition = new
SchemaDefinitionBuilderImpl<TestMessageObject>()
+ .withPojo(TestMessageObject.class)
+ .withSchemaReader(reader)
+ .withSchemaWriter(writer)
+ .build();
+ Schema<TestMessageObject> schema = Schema.AVRO(schemaDefinition);
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl(lookupUrl.toString())
+ .build();
+
+ try(Producer<TestMessageObject> producer =
client.newProducer(schema).topic(topicName).create();
+ Consumer<TestMessageObject> consumer =
+
client.newConsumer(schema).topic(topicName).subscriptionName("my-subscriber-name").subscribe())
{
+ assertNotNull(producer);
+ assertNotNull(consumer);
+ producer.newMessage().value(object).send();
+ TestMessageObject testObject = consumer.receive().getValue();
+ Assert.assertEquals(object.getValue(), testObject.getValue());
+ Mockito.verify(writer, Mockito.times(1)).write(Mockito.any());
+ Mockito.verify(reader,
Mockito.times(1)).read(Mockito.any(byte[].class), Mockito.any(byte[].class));
+ } finally {
+ client.close();
+ }
+ }
+
@Test
- public void testProducerConsumerWithSpecifiedReaderAndWriter() throws
PulsarClientException {
+ public void testJsonSchemaProducerConsumerWithSpecifiedReaderAndWriter()
throws PulsarClientException {
final String topicName = "persistent://my-property/my-ns/my-topic1";
ObjectMapper mapper = new ObjectMapper();
SchemaReader<TestMessageObject> reader = Mockito.spy(new
JacksonJsonReader<>(mapper, TestMessageObject.class));
@@ -889,6 +924,8 @@ public class BrokerClientIntegrationTest extends
ProducerConsumerBase {
Mockito.verify(writer, Mockito.times(1)).write(Mockito.any());
Mockito.verify(reader,
Mockito.times(1)).read(Mockito.any(byte[].class));
+ } finally {
+ client.close();
}
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
index 46227b8..d5a99f8 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
@@ -46,7 +46,7 @@ import static
org.apache.pulsar.client.impl.schema.util.SchemaUtil.parseSchemaIn
@Slf4j
public class AvroSchema<T> extends AvroBaseStructSchema<T> {
private static final Logger LOG =
LoggerFactory.getLogger(AvroSchema.class);
-
+ private boolean isCustomReaderAndWriter;
private ClassLoader pojoClassLoader;
private AvroSchema(SchemaInfo schemaInfo, ClassLoader pojoClassLoader) {
@@ -61,6 +61,7 @@ public class AvroSchema<T> extends AvroBaseStructSchema<T> {
super(schemaInfo);
setReader(reader);
setWriter(writer);
+ isCustomReaderAndWriter = true;
}
@Override
@@ -70,6 +71,9 @@ public class AvroSchema<T> extends AvroBaseStructSchema<T> {
@Override
public Schema<T> clone() {
+ if (isCustomReaderAndWriter) {
+ return new AvroSchema<>(reader, writer, schemaInfo);
+ }
Schema<T> schema = new AvroSchema<>(schemaInfo, pojoClassLoader);
if (schemaInfoProvider != null) {
schema.setSchemaInfoProvider(schemaInfoProvider);