This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a5a94ed GenericObject - support KeyValue in Message#getValue()
(#10107)
a5a94ed is described below
commit a5a94edd023cadecaf07f7bfe3b055df39467963
Author: Enrico Olivelli <[email protected]>
AuthorDate: Fri Apr 9 19:29:00 2021 +0200
GenericObject - support KeyValue in Message#getValue() (#10107)
---
.../java/org/apache/pulsar/schema/SchemaTest.java | 48 ++++++++++++++++++++++
.../org/apache/pulsar/client/impl/MessageImpl.java | 13 +++++-
.../client/impl/schema/AutoConsumeSchema.java | 4 ++
3 files changed, 63 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index b38327b..03b8116 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -46,6 +46,8 @@ import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaType;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -302,6 +304,52 @@ public class SchemaTest extends
MockedPulsarServiceBaseTest {
}
@Test
+ public void testKeyValueSchema() throws Exception {
+ final String tenant = PUBLIC_TENANT;
+ final String namespace = "test-namespace-" + randomName(16);
+ final String topicName = "test-string-schema";
+
+ final String topic = TopicName.get(
+ TopicDomain.persistent.value(),
+ tenant,
+ namespace,
+ topicName).toString();
+
+ admin.namespaces().createNamespace(
+ tenant + "/" + namespace,
+ Sets.newHashSet(CLUSTER_NAME));
+
+ admin.topics().createPartitionedTopic(topic, 2);
+
+ Producer<KeyValue<String, Integer>> producer = pulsarClient
+ .newProducer(Schema.KeyValue(Schema.STRING, Schema.INT32,
KeyValueEncodingType.INLINE))
+ .topic(topic)
+ .create();
+
+ producer.send(new KeyValue<>("foo", 123));
+
+ Consumer<KeyValue<String, Integer>> consumer =
pulsarClient.newConsumer(Schema.KeyValue(Schema.STRING, Schema.INT32,
KeyValueEncodingType.INLINE))
+ .subscriptionName("test-sub")
+ .topic(topic)
+ .subscribe();
+
+ Consumer<GenericRecord> consumer2 =
pulsarClient.newConsumer(Schema.AUTO_CONSUME())
+ .subscriptionName("test-sub2")
+ .topic(topic)
+ .subscribe();
+
+ producer.send(new KeyValue<>("foo", 123));
+
+ Message<KeyValue<String, Integer>> message = consumer.receive();
+ Message<GenericRecord> message2 = consumer2.receive();
+ assertEquals(message.getValue(),
message2.getValue().getNativeObject());
+
+ producer.close();
+ consumer.close();
+ consumer2.close();
+ }
+
+ @Test
public void testIsUsingAvroSchemaParser() {
for (SchemaType value : SchemaType.values()) {
if (value == SchemaType.AVRO || value == SchemaType.JSON || value
== SchemaType.PROTOBUF) {
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index 1243c76..77f0ee5 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -41,6 +41,7 @@ import java.util.stream.Collectors;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.client.impl.schema.KeyValueSchema;
import org.apache.pulsar.common.api.EncryptionContext;
import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
@@ -359,8 +360,16 @@ public class MessageImpl<T> implements Message<T> {
}
}
+ private KeyValueSchema getKeyValueSchema() {
+ if (schema instanceof AutoConsumeSchema) {
+ return (KeyValueSchema) ((AutoConsumeSchema)
schema).getInternalSchema();
+ } else {
+ return (KeyValueSchema) schema;
+ }
+ }
+
private T getKeyValueBySchemaVersion() {
- KeyValueSchema kvSchema = (KeyValueSchema) schema;
+ KeyValueSchema kvSchema = getKeyValueSchema();
byte[] schemaVersion = getSchemaVersion();
if (kvSchema.getKeyValueEncodingType() ==
KeyValueEncodingType.SEPARATED) {
return (T) kvSchema.decode(getKeyBytes(), getData(),
schemaVersion);
@@ -370,7 +379,7 @@ public class MessageImpl<T> implements Message<T> {
}
private T getKeyValue() {
- KeyValueSchema kvSchema = (KeyValueSchema) schema;
+ KeyValueSchema kvSchema = getKeyValueSchema();
if (kvSchema.getKeyValueEncodingType() ==
KeyValueEncodingType.SEPARATED) {
return (T) kvSchema.decode(getKeyBytes(), getData(), null);
} else {
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
index d03f30c..75e7ed2 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
@@ -237,4 +237,8 @@ public class AutoConsumeSchema implements
Schema<GenericRecord> {
return GenericObjectWrapper.of(value,
this.schema.getSchemaInfo().getType(), schemaVersion);
}
+
+ public Schema<?> getInternalSchema() {
+ return schema;
+ }
}