This is an automated email from the ASF dual-hosted git repository.

guangning pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b9d0ac71f78bfc2c845eefeda7b850f91b246ae7
Author: lipenghui <[email protected]>
AuthorDate: Fri Feb 7 08:43:01 2020 +0800

    Fix get schema version in HttpLookupService. (#6193)
    
    ### Motivation
    
    Fix get schema version in HttpLookupService.  The 
com.yahoo.sketches.Util.bytesToLong method need to flip the byte[]. Otherwise, 
will get a wrong long value. So use ByteBuffer to convert byte[] version to 
long.
    
    This issue will happens when users use http protocol client and multiple 
version schemas.
    
    ### Verifying this change
    
    New tests added for HttpLookupService and BinaryLookupService.
---
 .../apache/pulsar/client/api/SimpleSchemaTest.java | 33 ++++++++++++++++++++++
 .../pulsar/client/impl/HttpLookupService.java      |  7 ++---
 2 files changed, 36 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
index aa0f0e0..5548089 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
@@ -31,10 +31,16 @@ import static org.testng.Assert.assertTrue;
 import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.Schema.Parser;
 import org.apache.pulsar.broker.service.schema.LongSchemaVersion;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import 
org.apache.pulsar.client.api.PulsarClientException.IncompatibleSchemaException;
 import 
org.apache.pulsar.client.api.PulsarClientException.InvalidMessageException;
 import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.impl.BinaryProtoLookupService;
+import org.apache.pulsar.client.impl.HttpLookupService;
+import org.apache.pulsar.client.impl.LookupService;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.schema.writer.AvroWriter;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.protocol.schema.SchemaVersion;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.KeyValueEncodingType;
@@ -47,8 +53,10 @@ import org.testng.annotations.Factory;
 import org.testng.annotations.Test;
 
 import java.io.ByteArrayInputStream;
+import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 public class SimpleSchemaTest extends ProducerConsumerBase {
@@ -603,4 +611,29 @@ public class SimpleSchemaTest extends ProducerConsumerBase 
{
         }
     }
 
+    @Test
+    public void testGetSchemaByVersion() throws PulsarClientException, 
PulsarAdminException, ExecutionException, InterruptedException {
+        final String topic = 
"persistent://my-property/my-ns/testGetSchemaByVersion";
+
+        PulsarClientImpl httpProtocolClient = (PulsarClientImpl) 
PulsarClient.builder().serviceUrl(brokerUrl.toString()).build();
+        PulsarClientImpl binaryProtocolClient = (PulsarClientImpl) 
pulsarClient;
+
+        pulsarClient.newProducer(Schema.AVRO(V1Data.class))
+            .topic(topic)
+            .create();
+
+        pulsarClient.newProducer(Schema.AVRO(V2Data.class))
+            .topic(topic)
+            .create();
+
+        LookupService httpLookupService = httpProtocolClient.getLookup();
+        LookupService binaryLookupService = binaryProtocolClient.getLookup();
+        Assert.assertTrue(httpLookupService instanceof HttpLookupService);
+        Assert.assertTrue(binaryLookupService instanceof 
BinaryProtoLookupService);
+        Assert.assertEquals(admin.schemas().getAllSchemas(topic).size(), 2);
+        Assert.assertTrue(httpLookupService.getSchema(TopicName.get(topic), 
ByteBuffer.allocate(8).putLong(0).array()).get().isPresent());
+        Assert.assertTrue(httpLookupService.getSchema(TopicName.get(topic), 
ByteBuffer.allocate(8).putLong(1).array()).get().isPresent());
+        Assert.assertTrue(binaryLookupService.getSchema(TopicName.get(topic), 
ByteBuffer.allocate(8).putLong(0).array()).get().isPresent());
+        Assert.assertTrue(binaryLookupService.getSchema(TopicName.get(topic), 
ByteBuffer.allocate(8).putLong(1).array()).get().isPresent());
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
index 6a0746c..5bc3bcd 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
@@ -24,6 +24,7 @@ import io.netty.channel.EventLoopGroup;
 
 import java.net.InetSocketAddress;
 import java.net.URI;
+import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Base64;
 import java.util.List;
@@ -46,9 +47,7 @@ import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static com.yahoo.sketches.Util.bytesToLong;
-
-class HttpLookupService implements LookupService {
+public class HttpLookupService implements LookupService {
 
     private final HttpClient httpClient;
     private final boolean useTls;
@@ -152,7 +151,7 @@ class HttpLookupService implements LookupService {
         if (version != null) {
             path = String.format("admin/v2/schemas/%s/schema/%s",
                     schemaName,
-                    bytesToLong(version));
+                    ByteBuffer.wrap(version).getLong());
         }
         httpClient.get(path, GetSchemaResponse.class).thenAccept(response -> {
             
future.complete(Optional.of(SchemaInfoUtil.newSchemaInfo(schemaName, 
response)));

Reply via email to