This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a5a94ed  GenericObject - support KeyValue in Message#getValue() 
(#10107)
a5a94ed is described below

commit a5a94edd023cadecaf07f7bfe3b055df39467963
Author: Enrico Olivelli <[email protected]>
AuthorDate: Fri Apr 9 19:29:00 2021 +0200

    GenericObject - support KeyValue in Message#getValue() (#10107)
---
 .../java/org/apache/pulsar/schema/SchemaTest.java  | 48 ++++++++++++++++++++++
 .../org/apache/pulsar/client/impl/MessageImpl.java | 13 +++++-
 .../client/impl/schema/AutoConsumeSchema.java      |  4 ++
 3 files changed, 63 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index b38327b..03b8116 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -46,6 +46,8 @@ import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -302,6 +304,52 @@ public class SchemaTest extends 
MockedPulsarServiceBaseTest {
     }
 
     @Test
+    public void testKeyValueSchema() throws Exception {
+        final String tenant = PUBLIC_TENANT;
+        final String namespace = "test-namespace-" + randomName(16);
+        final String topicName = "test-string-schema";
+
+        final String topic = TopicName.get(
+                TopicDomain.persistent.value(),
+                tenant,
+                namespace,
+                topicName).toString();
+
+        admin.namespaces().createNamespace(
+                tenant + "/" + namespace,
+                Sets.newHashSet(CLUSTER_NAME));
+
+        admin.topics().createPartitionedTopic(topic, 2);
+
+        Producer<KeyValue<String, Integer>> producer = pulsarClient
+                .newProducer(Schema.KeyValue(Schema.STRING, Schema.INT32, 
KeyValueEncodingType.INLINE))
+                .topic(topic)
+                .create();
+
+        producer.send(new KeyValue<>("foo", 123));
+
+        Consumer<KeyValue<String, Integer>> consumer = 
pulsarClient.newConsumer(Schema.KeyValue(Schema.STRING, Schema.INT32, 
KeyValueEncodingType.INLINE))
+                .subscriptionName("test-sub")
+                .topic(topic)
+                .subscribe();
+
+        Consumer<GenericRecord> consumer2 = 
pulsarClient.newConsumer(Schema.AUTO_CONSUME())
+                .subscriptionName("test-sub2")
+                .topic(topic)
+                .subscribe();
+
+        producer.send(new KeyValue<>("foo", 123));
+
+        Message<KeyValue<String, Integer>> message = consumer.receive();
+        Message<GenericRecord> message2 = consumer2.receive();
+        assertEquals(message.getValue(), 
message2.getValue().getNativeObject());
+
+        producer.close();
+        consumer.close();
+        consumer2.close();
+    }
+
+    @Test
     public void testIsUsingAvroSchemaParser() {
         for (SchemaType value : SchemaType.values()) {
             if (value == SchemaType.AVRO || value == SchemaType.JSON || value 
== SchemaType.PROTOBUF) {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index 1243c76..77f0ee5 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -41,6 +41,7 @@ import java.util.stream.Collectors;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
 import org.apache.pulsar.client.impl.schema.KeyValueSchema;
 import org.apache.pulsar.common.api.EncryptionContext;
 import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
@@ -359,8 +360,16 @@ public class MessageImpl<T> implements Message<T> {
         }
     }
 
+    private KeyValueSchema getKeyValueSchema() {
+        if (schema instanceof AutoConsumeSchema) {
+            return (KeyValueSchema) ((AutoConsumeSchema) 
schema).getInternalSchema();
+        } else {
+            return (KeyValueSchema) schema;
+        }
+    }
+
     private T getKeyValueBySchemaVersion() {
-        KeyValueSchema kvSchema = (KeyValueSchema) schema;
+        KeyValueSchema kvSchema = getKeyValueSchema();
         byte[] schemaVersion = getSchemaVersion();
         if (kvSchema.getKeyValueEncodingType() == 
KeyValueEncodingType.SEPARATED) {
             return (T) kvSchema.decode(getKeyBytes(), getData(), 
schemaVersion);
@@ -370,7 +379,7 @@ public class MessageImpl<T> implements Message<T> {
     }
 
     private T getKeyValue() {
-        KeyValueSchema kvSchema = (KeyValueSchema) schema;
+        KeyValueSchema kvSchema = getKeyValueSchema();
         if (kvSchema.getKeyValueEncodingType() == 
KeyValueEncodingType.SEPARATED) {
             return (T) kvSchema.decode(getKeyBytes(), getData(), null);
         } else {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
index d03f30c..75e7ed2 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
@@ -237,4 +237,8 @@ public class AutoConsumeSchema implements 
Schema<GenericRecord> {
         return GenericObjectWrapper.of(value,
                 this.schema.getSchemaInfo().getType(), schemaVersion);
     }
+
+    public Schema<?> getInternalSchema() {
+        return schema;
+    }
 }

Reply via email to