This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 8247222c6ad083dc9bcfab74a6ed1660df99395a Author: Enrico Olivelli <[email protected]> AuthorDate: Mon Mar 22 10:47:22 2021 +0100 Allow to use KeyValue<GenericRecord, GenericRecord> (#9981) Fix a problem in HttpLookupService#getSchema (cherry picked from commit 5a4b441b0f20ea0457b558178526f607c69a0574) --- .../apache/pulsar/broker/service/KeyValueTest.java | 107 +++++++++++++++++++++ .../pulsar/client/impl/HttpLookupService.java | 21 +++- 2 files changed, 127 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/KeyValueTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/KeyValueTest.java new file mode 100644 index 0000000..0faee61 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/KeyValueTest.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageRouter; +import org.apache.pulsar.client.api.MessageRoutingMode; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.client.api.TopicMetadata; +import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.client.api.schema.GenericSchema; +import org.apache.pulsar.client.api.schema.RecordSchemaBuilder; +import org.apache.pulsar.client.api.schema.SchemaBuilder; +import org.apache.pulsar.client.impl.schema.KeyValueSchema; +import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema; +import org.apache.pulsar.common.schema.KeyValue; +import org.apache.pulsar.common.schema.KeyValueEncodingType; +import org.apache.pulsar.common.schema.SchemaType; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.util.concurrent.CompletableFuture; + +import static org.testng.Assert.assertEquals; + +/** + * Null value message produce and consume test. + */ +@Slf4j +@Test(groups = "broker") +public class KeyValueTest extends BrokerTestBase { + + @BeforeMethod + @Override + protected void setup() throws Exception { + super.baseSetup(); + } + + @AfterMethod(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test + public void keyValueAutoConsumeTest() throws Exception { + String topic = "persistent://prop/ns-abc/kv-record"; + admin.topics().createNonPartitionedTopic(topic); + + RecordSchemaBuilder builder = SchemaBuilder + .record("test"); + builder.field("test").type(SchemaType.STRING); + GenericSchema<GenericRecord> schema = GenericAvroSchema.of(builder.build(SchemaType.AVRO)); + + GenericRecord key = schema.newRecordBuilder().set("test", "foo").build(); + GenericRecord value = schema.newRecordBuilder().set("test", "bar").build(); + + @Cleanup + Producer<KeyValue<GenericRecord, GenericRecord>> producer = pulsarClient + .newProducer(KeyValueSchema.of(schema, schema)) + .topic(topic) + .create(); + + producer.newMessage().value(new KeyValue<>(key, value)).send(); + + @Cleanup + Consumer<KeyValue<GenericRecord, GenericRecord>> consumer = pulsarClient + .newConsumer(KeyValueSchema.of(Schema.AUTO_CONSUME(), Schema.AUTO_CONSUME())) + .topic(topic) + .subscriptionName("test") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + + Message<KeyValue<GenericRecord, GenericRecord>> message = consumer.receive(); + assertEquals(key.getField("test"), message.getValue().getKey().getField("test")); + assertEquals(value.getField("test"), message.getValue().getValue().getField("test")); + + } + +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java index da7f148..e822d0e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java @@ -22,9 +22,11 @@ import com.google.common.collect.Lists; import io.netty.channel.EventLoopGroup; +import java.io.IOException; import java.net.InetSocketAddress; import java.net.URI; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Base64; import java.util.List; @@ -33,6 +35,7 @@ import java.util.concurrent.CompletableFuture; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode; +import org.apache.pulsar.client.impl.schema.SchemaUtils; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.PulsarClientException.NotFoundException; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; @@ -41,8 +44,10 @@ import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.protocol.schema.GetSchemaResponse; +import org.apache.pulsar.common.protocol.schema.SchemaData; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.protocol.schema.SchemaInfoUtil; +import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.util.FutureUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -153,7 +158,21 @@ public class HttpLookupService implements LookupService { ByteBuffer.wrap(version).getLong()); } httpClient.get(path, GetSchemaResponse.class).thenAccept(response -> { - future.complete(Optional.of(SchemaInfoUtil.newSchemaInfo(schemaName, response))); + if (response.getType() == SchemaType.KEY_VALUE) { + try { + SchemaData data = SchemaData + .builder() + .data(SchemaUtils.convertKeyValueDataStringToSchemaInfoSchema(response.getData().getBytes(StandardCharsets.UTF_8))) + .type(response.getType()) + .props(response.getProperties()) + .build(); + future.complete(Optional.of(SchemaInfoUtil.newSchemaInfo(schemaName, data))); + } catch (IOException err) { + future.completeExceptionally(err); + } + } else { + future.complete(Optional.of(SchemaInfoUtil.newSchemaInfo(schemaName, response))); + } }).exceptionally(ex -> { if (ex.getCause() instanceof NotFoundException) { future.complete(Optional.empty());
