This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e097cb  [pulsar-proxy]Add the LookupProxyHandler handle getSchema 
request and test (#4083)
1e097cb is described below

commit 1e097cbced91789d8515744cb20d1fecc17b3b41
Author: congbo <[email protected]>
AuthorDate: Sun Apr 21 22:06:34 2019 +0800

    [pulsar-proxy]Add the LookupProxyHandler handle getSchema request and test 
(#4083)
    
    ### Motivation
    In order to support #3742 #3876.
    Now, proxy handle ProxyLookupRequests don't support GetSchema.
    
    ### Modifications
    Add the getSchema method implementation in ProxyConnection
    
    ### Verifying this change
    Add new a test in ProxyTest testGetSchema()
    
    ### Dependencies (does it add or upgrade a dependency): (yes / no)
    The public API: (no)
    The schema: (no)
    The default values of configurations: (no)
    The wire protocol: (no)
    The rest endpoints: (yes)
    The admin cli options: (no)
    Anything that affects deployment: (no)
    ### Documentation
    Does this pull request introduce a new feature? (no)
    If yes, how is the feature documented? (no)
    If a feature is not documented yet in this PR, please create a followup 
issue for adding the documentation
---
 .../pulsar/proxy/server/LookupProxyHandler.java    | 125 ++++++++++++++++-----
 .../pulsar/proxy/server/ProxyConnection.java       |   7 ++
 .../org/apache/pulsar/proxy/server/ProxyTest.java  |  44 ++++++++
 3 files changed, 151 insertions(+), 25 deletions(-)

diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
index 0072dc5..548c522 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
@@ -24,14 +24,19 @@ import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.Optional;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.common.api.Commands;
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopic;
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse.LookupType;
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadata;
 import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.schema.BytesSchemaVersion;
+import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -60,6 +65,11 @@ public class LookupProxyHandler {
             .create()
             .register();
 
+    private static final Counter getSchemaRequests = Counter
+            .build("pulsar_proxy_get_schema_requests", "Counter of schema 
requests")
+            .create()
+            .register();
+
     static final Counter rejectedLookupRequests = 
Counter.build("pulsar_proxy_rejected_lookup_requests",
             "Counter of topic lookup requests rejected due to 
throttling").create().register();
 
@@ -280,26 +290,12 @@ public class LookupProxyHandler {
         }
     }
 
-
     private void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace 
commandGetTopicsOfNamespace,
                                             long clientRequestId) {
-        String serviceUrl;
-        if (isBlank(brokerServiceURL)) {
-            ServiceLookupData availableBroker;
-            try {
-                availableBroker = service.getDiscoveryProvider().nextBroker();
-            } catch (Exception e) {
-                log.warn("[{}] Failed to get next active broker {}", 
clientAddress, e.getMessage(), e);
-                proxyConnection.ctx().writeAndFlush(Commands.newError(
-                    clientRequestId, ServerError.ServiceNotReady, 
e.getMessage()
-                ));
-                return;
-            }
-            serviceUrl = this.connectWithTLS ?
-                availableBroker.getPulsarServiceUrlTls() : 
availableBroker.getPulsarServiceUrl();
-        } else {
-            serviceUrl = this.connectWithTLS ?
-                service.getConfiguration().getBrokerServiceURLTLS() : 
service.getConfiguration().getBrokerServiceURL();
+        String serviceUrl = getServiceUrl(clientRequestId);
+
+        if(!StringUtils.isNotBlank(serviceUrl)) {
+            return;
         }
         performGetTopicsOfNamespace(clientRequestId, 
commandGetTopicsOfNamespace.getNamespace(), serviceUrl, 10,
             commandGetTopicsOfNamespace.getMode());
@@ -316,16 +312,12 @@ public class LookupProxyHandler {
             return;
         }
 
-        URI brokerURI;
-        try {
-            brokerURI = new URI(brokerServiceUrl);
-        } catch (URISyntaxException e) {
-            proxyConnection.ctx().writeAndFlush(
-                    Commands.newError(clientRequestId, 
ServerError.MetadataError, e.getMessage()));
+        InetSocketAddress addr = getAddr(brokerServiceUrl, clientRequestId);
+
+        if(addr == null){
             return;
         }
 
-        InetSocketAddress addr = 
InetSocketAddress.createUnresolved(brokerURI.getHost(), brokerURI.getPort());
         if (log.isDebugEnabled()) {
             log.debug("Getting connections to '{}' for getting 
TopicsOfNamespace '{}' with clientReq Id '{}'",
                 addr, namespaceName, clientRequestId);
@@ -352,5 +344,88 @@ public class LookupProxyHandler {
         });
     }
 
+    public void handleGetSchema(CommandGetSchema commandGetSchema) {
+        getSchemaRequests.inc();
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] Received GetSchema", clientAddress);
+        }
+
+        final long clientRequestId = commandGetSchema.getRequestId();
+        String serviceUrl = getServiceUrl(clientRequestId);
+
+        if(!StringUtils.isNotBlank(serviceUrl)) {
+            return;
+        }
+        InetSocketAddress addr = getAddr(serviceUrl, clientRequestId);
+
+        if(addr == null){
+            return;
+        }
+        if (log.isDebugEnabled()) {
+            log.debug("Getting connections to '{}' for getting schema of topic 
'{}' with clientReq Id '{}'",
+                    addr, commandGetSchema.getTopic(), clientRequestId);
+        }
+
+        
proxyConnection.getConnectionPool().getConnection(addr).thenAccept(clientCnx -> 
{
+            // Connected to backend broker
+            long requestId = proxyConnection.newRequestId();
+            ByteBuf command;
+            byte[] schemaVersion = 
commandGetSchema.getSchemaVersion().toByteArray();
+            command = Commands.newGetSchema(requestId, 
commandGetSchema.getTopic(),
+                    Optional.ofNullable(BytesSchemaVersion.of(schemaVersion)));
+            clientCnx.sendGetSchema(command, 
requestId).thenAccept(optionalSchemaInfo -> {
+                        SchemaInfo schemaInfo = optionalSchemaInfo.get();
+                        proxyConnection.ctx().writeAndFlush(
+                                Commands.newGetSchemaResponse(clientRequestId,
+                                        schemaInfo,
+                                        BytesSchemaVersion.of(schemaVersion)));
+            }).exceptionally(ex -> {
+                log.warn("[{}] Failed to get schema {}: {}", clientAddress, 
commandGetSchema.getTopic(), ex.getMessage());
+                proxyConnection.ctx().writeAndFlush(
+                        Commands.newError(clientRequestId, 
ServerError.ServiceNotReady, ex.getMessage()));
+                return null;
+            });
+        }).exceptionally(ex -> {
+            // Failed to connect to backend broker
+            proxyConnection.ctx().writeAndFlush(
+                    Commands.newError(clientRequestId, 
ServerError.ServiceNotReady, ex.getMessage()));
+            return null;
+        });
+
+    }
+
+    private String getServiceUrl(long clientRequestId) {
+        if (isBlank(brokerServiceURL)) {
+            ServiceLookupData availableBroker;
+            try {
+                availableBroker = service.getDiscoveryProvider().nextBroker();
+            } catch (Exception e) {
+                log.warn("[{}] Failed to get next active broker {}", 
clientAddress, e.getMessage(), e);
+                proxyConnection.ctx().writeAndFlush(Commands.newError(
+                        clientRequestId, ServerError.ServiceNotReady, 
e.getMessage()
+                ));
+                return null;
+            }
+            return this.connectWithTLS ?
+                    availableBroker.getPulsarServiceUrlTls() : 
availableBroker.getPulsarServiceUrl();
+        } else {
+            return this.connectWithTLS ?
+                    service.getConfiguration().getBrokerServiceURLTLS() : 
service.getConfiguration().getBrokerServiceURL();
+        }
+
+    }
+
+    private InetSocketAddress getAddr(String brokerServiceUrl, long 
clientRequestId) {
+        URI brokerURI;
+        try {
+            brokerURI = new URI(brokerServiceUrl);
+        } catch (URISyntaxException e) {
+            proxyConnection.ctx().writeAndFlush(
+                    Commands.newError(clientRequestId, 
ServerError.MetadataError, e.getMessage()));
+            return null;
+        }
+        return InetSocketAddress.createUnresolved(brokerURI.getHost(), 
brokerURI.getPort());
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(LookupProxyHandler.class);
 }
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index a1147f6..18e4e5d 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -47,6 +47,7 @@ import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthResponse;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnect;
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopic;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema;
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadata;
 import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
 import org.slf4j.Logger;
@@ -371,6 +372,12 @@ public class ProxyConnection extends PulsarHandler 
implements FutureListener<Voi
 
         
lookupProxyHandler.handleGetTopicsOfNamespace(commandGetTopicsOfNamespace);
     }
+    @Override
+    protected void handleGetSchema(CommandGetSchema commandGetSchema) {
+        checkArgument(state == State.ProxyLookupRequests);
+
+        lookupProxyHandler.handleGetSchema(commandGetSchema);
+    }
 
     /**
      * handles discovery request from client ands sends next active broker 
address
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
index be47d9e..94cb83e 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
@@ -24,12 +24,17 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.mockito.Mockito.doReturn;
 import static org.testng.Assert.assertEquals;
 
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
 import io.netty.channel.EventLoopGroup;
 import io.netty.util.concurrent.DefaultThreadFactory;
 
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.avro.reflect.Nullable;
 import org.apache.bookkeeper.test.PortManager;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
@@ -47,11 +52,14 @@ import 
org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange;
 import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
 import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -65,6 +73,18 @@ public class ProxyTest extends MockedPulsarServiceBaseTest {
     private ProxyService proxyService;
     private ProxyConfiguration proxyConfig = new ProxyConfiguration();
 
+
+    @Data
+    @ToString
+    @EqualsAndHashCode
+    public static class Foo {
+        @Nullable
+        private String field1;
+        @Nullable
+        private String field2;
+        private int field3;
+    }
+
     @Override
     @BeforeClass
     protected void setup() throws Exception {
@@ -206,6 +226,29 @@ public class ProxyTest extends MockedPulsarServiceBaseTest 
{
     }
 
     @Test
+    private void testGetSchema() throws Exception {
+        PulsarClient client = 
PulsarClient.builder().serviceUrl("pulsar://localhost:" + 
proxyConfig.getServicePort().get())
+                .build();
+        Producer<Foo> producer;
+        Schema schema = Schema.AVRO(Foo.class);
+        try {
+            producer = 
client.newProducer(schema).topic("persistent://sample/test/local/get-schema")
+                    .create();
+        } catch (Exception ex) {
+            Assert.fail("Should not have failed since can acquire 
LookupRequestSemaphore");
+        }
+        byte[] schemaVersion = new byte[8];
+        byte b = new Long(0l).byteValue();
+        for (int i = 0; i<8; i++){
+            schemaVersion[i] = b;
+        }
+        SchemaInfo schemaInfo = ((PulsarClientImpl)client).getLookup()
+                
.getSchema(TopicName.get("persistent://sample/test/local/get-schema"), 
schemaVersion).get().orElse(null);
+        Assert.assertEquals(new String(schemaInfo.getSchema()), new 
String(schema.getSchemaInfo().getSchema()));
+        client.close();
+    }
+
+    @Test
     private void testProtocolVersionAdvertisement() throws Exception {
         final String url = "pulsar://localhost:" + 
proxyConfig.getServicePort().get();
         final String topic = 
"persistent://sample/test/local/protocol-version-advertisement";
@@ -234,6 +277,7 @@ public class ProxyTest extends MockedPulsarServiceBaseTest {
         client.close();
     }
 
+
     private static PulsarClient 
getClientActiveConsumerChangeNotSupported(ClientConfigurationData conf)
             throws Exception {
         ThreadFactory threadFactory = new 
DefaultThreadFactory("pulsar-client-io", Thread.currentThread().isDaemon());

Reply via email to