This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 8247222c6ad083dc9bcfab74a6ed1660df99395a
Author: Enrico Olivelli <[email protected]>
AuthorDate: Mon Mar 22 10:47:22 2021 +0100

    Allow to use KeyValue<GenericRecord, GenericRecord> (#9981)
    
    Fix a problem in HttpLookupService#getSchema
    
    (cherry picked from commit 5a4b441b0f20ea0457b558178526f607c69a0574)
---
 .../apache/pulsar/broker/service/KeyValueTest.java | 107 +++++++++++++++++++++
 .../pulsar/client/impl/HttpLookupService.java      |  21 +++-
 2 files changed, 127 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/KeyValueTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/KeyValueTest.java
new file mode 100644
index 0000000..0faee61
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/KeyValueTest.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageRouter;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.TopicMetadata;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.GenericSchema;
+import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
+import org.apache.pulsar.client.api.schema.SchemaBuilder;
+import org.apache.pulsar.client.impl.schema.KeyValueSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.testng.Assert.assertEquals;
+
+/**
+ * Null value message produce and consume test.
+ */
+@Slf4j
+@Test(groups = "broker")
+public class KeyValueTest extends BrokerTestBase {
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        super.baseSetup();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void keyValueAutoConsumeTest()  throws Exception {
+        String topic = "persistent://prop/ns-abc/kv-record";
+        admin.topics().createNonPartitionedTopic(topic);
+
+        RecordSchemaBuilder builder = SchemaBuilder
+                .record("test");
+                builder.field("test").type(SchemaType.STRING);
+        GenericSchema<GenericRecord> schema = 
GenericAvroSchema.of(builder.build(SchemaType.AVRO));
+
+        GenericRecord key = schema.newRecordBuilder().set("test", 
"foo").build();
+        GenericRecord value = schema.newRecordBuilder().set("test", 
"bar").build();
+
+        @Cleanup
+        Producer<KeyValue<GenericRecord, GenericRecord>> producer = 
pulsarClient
+                .newProducer(KeyValueSchema.of(schema, schema))
+                .topic(topic)
+                .create();
+
+        producer.newMessage().value(new KeyValue<>(key, value)).send();
+
+        @Cleanup
+        Consumer<KeyValue<GenericRecord, GenericRecord>> consumer = 
pulsarClient
+                .newConsumer(KeyValueSchema.of(Schema.AUTO_CONSUME(), 
Schema.AUTO_CONSUME()))
+                .topic(topic)
+                .subscriptionName("test")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+
+        Message<KeyValue<GenericRecord, GenericRecord>> message = 
consumer.receive();
+        assertEquals(key.getField("test"), 
message.getValue().getKey().getField("test"));
+        assertEquals(value.getField("test"), 
message.getValue().getValue().getField("test"));
+
+    }
+
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
index da7f148..e822d0e 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
@@ -22,9 +22,11 @@ import com.google.common.collect.Lists;
 
 import io.netty.channel.EventLoopGroup;
 
+import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.Base64;
 import java.util.List;
@@ -33,6 +35,7 @@ import java.util.concurrent.CompletableFuture;
 
 import org.apache.commons.lang3.tuple.Pair;
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode;
+import org.apache.pulsar.client.impl.schema.SchemaUtils;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.PulsarClientException.NotFoundException;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
@@ -41,8 +44,10 @@ import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.protocol.schema.GetSchemaResponse;
+import org.apache.pulsar.common.protocol.schema.SchemaData;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.protocol.schema.SchemaInfoUtil;
+import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -153,7 +158,21 @@ public class HttpLookupService implements LookupService {
                     ByteBuffer.wrap(version).getLong());
         }
         httpClient.get(path, GetSchemaResponse.class).thenAccept(response -> {
-            
future.complete(Optional.of(SchemaInfoUtil.newSchemaInfo(schemaName, 
response)));
+            if (response.getType() == SchemaType.KEY_VALUE) {
+                try {
+                    SchemaData data = SchemaData
+                            .builder()
+                            
.data(SchemaUtils.convertKeyValueDataStringToSchemaInfoSchema(response.getData().getBytes(StandardCharsets.UTF_8)))
+                            .type(response.getType())
+                            .props(response.getProperties())
+                            .build();
+                    
future.complete(Optional.of(SchemaInfoUtil.newSchemaInfo(schemaName, data)));
+                } catch (IOException err) {
+                    future.completeExceptionally(err);
+                }
+            } else {
+                
future.complete(Optional.of(SchemaInfoUtil.newSchemaInfo(schemaName, 
response)));
+            }
         }).exceptionally(ex -> {
             if (ex.getCause() instanceof NotFoundException) {
                 future.complete(Optional.empty());

Reply via email to