This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 90908e9  Add option to retrieve schema for a topic from pulsar client 
(#2215)
90908e9 is described below

commit 90908e9d950a747820c9ba8b4160372937062f90
Author: Matteo Merli <[email protected]>
AuthorDate: Mon Jul 23 00:08:15 2018 -0700

    Add option to retrieve schema for a topic from pulsar client (#2215)
---
 .../pulsar/broker/service/BrokerService.java       |    4 +-
 .../broker/service/PulsarChannelInitializer.java   |   25 +-
 .../apache/pulsar/broker/service/ServerCnx.java    |   51 +-
 .../PersistentDispatcherFailoverConsumerTest.java  |    4 +-
 .../service/PersistentTopicConcurrentTest.java     |    2 +-
 .../pulsar/broker/service/PersistentTopicTest.java |    2 +-
 .../pulsar/broker/service/ServerCnxTest.java       |    2 +-
 .../broker/service/schema/ClientGetSchemaTest.java |   97 ++
 .../schema/JsonSchemaCompatibilityCheckTest.java   |   13 +-
 .../pulsar/client/api/PulsarClientException.java   |   10 +
 .../client/impl/BinaryProtoLookupService.java      |   21 +-
 .../org/apache/pulsar/client/impl/ClientCnx.java   |   57 +-
 .../org/apache/pulsar/client/impl/HttpClient.java  |   24 +-
 .../pulsar/client/impl/HttpLookupService.java      |   32 +-
 .../apache/pulsar/client/impl/LookupService.java   |    4 +
 .../pulsar/client/impl/PulsarClientImpl.java       |   23 +-
 .../org/apache/pulsar/common/api/Commands.java     |   60 +
 .../apache/pulsar/common/api/PulsarDecoder.java    |   29 +-
 .../apache/pulsar/common/api/proto/PulsarApi.java  | 1330 ++++++++++++++++++++
 .../apache/pulsar/common/schema/SchemaInfo.java    |   38 +
 pulsar-common/src/main/proto/PulsarApi.proto       |   21 +
 21 files changed, 1800 insertions(+), 49 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 741afbf..4bbfc72 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -290,7 +290,7 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
 
         ServiceConfiguration serviceConfig = pulsar.getConfiguration();
 
-        bootstrap.childHandler(new PulsarChannelInitializer(this, 
serviceConfig, false));
+        bootstrap.childHandler(new PulsarChannelInitializer(pulsar, false));
 
         // Bind and start to accept incoming connections.
         InetSocketAddress addr = new 
InetSocketAddress(pulsar.getBindAddress(), port);
@@ -303,7 +303,7 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
 
         if (serviceConfig.isTlsEnabled()) {
             ServerBootstrap tlsBootstrap = bootstrap.clone();
-            tlsBootstrap.childHandler(new PulsarChannelInitializer(this, 
serviceConfig, true));
+            tlsBootstrap.childHandler(new PulsarChannelInitializer(pulsar, 
true));
             tlsBootstrap.bind(new InetSocketAddress(pulsar.getBindAddress(), 
tlsPort)).sync();
             log.info("Started Pulsar Broker TLS service on port {} - TLS 
provider: {}", tlsPort,
                     SslContext.defaultServerProvider());
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
index 8c16a55..7858a11 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
@@ -18,37 +18,38 @@
  */
 package org.apache.pulsar.broker.service;
 
-import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.common.api.ByteBufPair;
-import org.apache.pulsar.common.api.PulsarDecoder;
-import org.apache.pulsar.common.util.SecurityUtility;
-
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 import io.netty.handler.ssl.SslContext;
 
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.common.api.ByteBufPair;
+import org.apache.pulsar.common.api.PulsarDecoder;
+import org.apache.pulsar.common.util.SecurityUtility;
+
 public class PulsarChannelInitializer extends 
ChannelInitializer<SocketChannel> {
 
     public static final String TLS_HANDLER = "tls";
-    BrokerService brokerService;
-    ServiceConfiguration serviceConfig;
-    boolean enableTLS;
+
+    private final PulsarService pulsar;
+    private final boolean enableTLS;
 
     /**
      *
      * @param brokerService
      */
-    public PulsarChannelInitializer(BrokerService brokerService, 
ServiceConfiguration serviceConfig,
+    public PulsarChannelInitializer(PulsarService pulsar,
             boolean enableTLS) {
         super();
-        this.brokerService = brokerService;
-        this.serviceConfig = serviceConfig;
+        this.pulsar = pulsar;
         this.enableTLS = enableTLS;
     }
 
     @Override
     protected void initChannel(SocketChannel ch) throws Exception {
+        ServiceConfiguration serviceConfig = pulsar.getConfiguration();
         if (enableTLS) {
             SslContext sslCtx = SecurityUtility.createNettySslContextForServer(
                     serviceConfig.isTlsAllowInsecureConnection(), 
serviceConfig.getTlsTrustCertsFilePath(),
@@ -60,6 +61,6 @@ public class PulsarChannelInitializer extends 
ChannelInitializer<SocketChannel>
 
         ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
         ch.pipeline().addLast("frameDecoder", new 
LengthFieldBasedFrameDecoder(PulsarDecoder.MaxFrameSize, 0, 4, 0, 4));
-        ch.pipeline().addLast("handler", new ServerCnx(brokerService));
+        ch.pipeline().addLast("handler", new ServerCnx(pulsar));
     }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 4b45f72..1d191bc 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -50,11 +50,13 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.bookkeeper.mledger.util.SafeRun;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
+import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
@@ -72,6 +74,7 @@ import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandConsumerStats;
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandConsumerStatsResponse;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandFlow;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema;
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopic;
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadata;
@@ -93,6 +96,7 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.ConsumerStats;
 import org.apache.pulsar.common.schema.SchemaData;
+import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.schema.SchemaVersion;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -103,6 +107,7 @@ import org.slf4j.LoggerFactory;
 
 public class ServerCnx extends PulsarHandler {
     private final BrokerService service;
+    private final SchemaRegistryService schemaService;
     private final ConcurrentLongHashMap<CompletableFuture<Producer>> producers;
     private final ConcurrentLongHashMap<CompletableFuture<Consumer>> consumers;
     private State state;
@@ -127,9 +132,10 @@ public class ServerCnx extends PulsarHandler {
         Start, Connected, Failed
     }
 
-    public ServerCnx(BrokerService service) {
-        super(service.getKeepAliveIntervalSeconds(), TimeUnit.SECONDS);
-        this.service = service;
+    public ServerCnx(PulsarService pulsar) {
+        super(pulsar.getBrokerService().getKeepAliveIntervalSeconds(), 
TimeUnit.SECONDS);
+        this.service = pulsar.getBrokerService();
+        this.schemaService = pulsar.getSchemaRegistryService();
         this.state = State.Start;
 
         // This maps are not heavily contended since most accesses are within 
the cnx thread
@@ -301,7 +307,7 @@ public class ServerCnx extends PulsarHandler {
         if (topicName == null) {
             return;
         }
-        
+
         final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
         if (lookupSemaphore.tryAcquire()) {
             if (invalidOriginalPrincipal(originalPrincipal)) {
@@ -1199,6 +1205,43 @@ public class ServerCnx extends PulsarHandler {
     }
 
     @Override
+    protected void handleGetSchema(CommandGetSchema commandGetSchema) {
+        if (log.isDebugEnabled()) {
+            log.debug("Received CommandGetSchema call from {}", remoteAddress);
+        }
+
+        long requestId = commandGetSchema.getRequestId();
+        SchemaVersion schemaVersion = SchemaVersion.Latest;
+        if (commandGetSchema.hasSchemaVersion()) {
+            schemaVersion = 
schemaService.versionFromBytes(commandGetSchema.getSchemaVersion().toByteArray());
+        }
+
+        String schemaName;
+        try {
+            schemaName = 
TopicName.get(commandGetSchema.getTopic()).getSchemaName();
+        } catch (Throwable t) {
+            ctx.writeAndFlush(
+                    Commands.newGetSchemaResponseError(requestId, 
ServerError.InvalidTopicName, t.getMessage()));
+            return;
+        }
+
+        schemaService.getSchema(schemaName, 
schemaVersion).thenAccept(schemaAndMetadata -> {
+            if (schemaAndMetadata == null) {
+                
ctx.writeAndFlush(Commands.newGetSchemaResponseError(requestId, 
ServerError.TopicNotFound,
+                        "Topic not found or no-schema"));
+            } else {
+                ctx.writeAndFlush(Commands.newGetSchemaResponse(requestId,
+                        new SchemaInfo(schemaName, schemaAndMetadata.schema), 
schemaAndMetadata.version));
+            }
+        }).exceptionally(ex -> {
+            ctx.writeAndFlush(
+                    Commands.newGetSchemaResponseError(requestId, 
ServerError.UnknownError, ex.getMessage()));
+            return null;
+        });
+    }
+
+
+    @Override
     protected boolean isHandshakeCompleted() {
         return state == State.Connected;
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
index ccd32e1..bc8d4bf 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
@@ -157,14 +157,14 @@ public class PersistentDispatcherFailoverConsumerTest {
             return null;
         }).when(channelCtx).writeAndFlush(any(), any());
 
-        serverCnx = spy(new ServerCnx(brokerService));
+        serverCnx = spy(new ServerCnx(pulsar));
         doReturn(true).when(serverCnx).isActive();
         doReturn(true).when(serverCnx).isWritable();
         doReturn(new InetSocketAddress("localhost", 
1234)).when(serverCnx).clientAddress();
         
when(serverCnx.getRemoteEndpointProtocolVersion()).thenReturn(ProtocolVersion.v12.getNumber());
         when(serverCnx.ctx()).thenReturn(channelCtx);
 
-        serverCnxWithOldVersion = spy(new ServerCnx(brokerService));
+        serverCnxWithOldVersion = spy(new ServerCnx(pulsar));
         doReturn(true).when(serverCnxWithOldVersion).isActive();
         doReturn(true).when(serverCnxWithOldVersion).isWritable();
         doReturn(new InetSocketAddress("localhost", 1234))
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
index 8bfd6ef..7c5ca20 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
@@ -98,7 +98,7 @@ public class PersistentTopicConcurrentTest extends 
MockedBookKeeperTestCase {
         brokerService = spy(new BrokerService(pulsar));
         doReturn(brokerService).when(pulsar).getBrokerService();
 
-        serverCnx = spy(new ServerCnx(brokerService));
+        serverCnx = spy(new ServerCnx(pulsar));
         doReturn(true).when(serverCnx).isActive();
 
         NamespaceService nsSvc = mock(NamespaceService.class);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index db35e74..f8ac40a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -171,7 +171,7 @@ public class PersistentTopicTest {
         brokerService = spy(new BrokerService(pulsar));
         doReturn(brokerService).when(pulsar).getBrokerService();
 
-        serverCnx = spy(new ServerCnx(brokerService));
+        serverCnx = spy(new ServerCnx(pulsar));
         doReturn(true).when(serverCnx).isActive();
         doReturn(true).when(serverCnx).isWritable();
         doReturn(new InetSocketAddress("localhost", 
1234)).when(serverCnx).clientAddress();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 2b324ab..f746571 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -1379,7 +1379,7 @@ public class ServerCnxTest {
             serverCnx.close();
             channel.close().get();
         }
-        serverCnx = new ServerCnx(brokerService);
+        serverCnx = new ServerCnx(pulsar);
         serverCnx.authRole = "";
         channel = new EmbeddedChannel(new 
LengthFieldBasedFrameDecoder(MaxMessageSize, 0, 4, 0, 4), serverCnx);
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ClientGetSchemaTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ClientGetSchemaTest.java
new file mode 100644
index 0000000..7055492
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ClientGetSchemaTest.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.schema;
+
+import static org.testng.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+import lombok.Cleanup;
+
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+public class ClientGetSchemaTest extends ProducerConsumerBase {
+
+    private static final String topicBytes = "my-property/my-ns/topic-bytes";
+    private static final String topicString = "my-property/my-ns/topic-string";
+    private static final String topicJson = "my-property/my-ns/topic-json";
+    private static final String topicAvro = "my-property/my-ns/topic-avro";
+
+    List<Producer<?>> producers = new ArrayList<>();
+
+    private static class MyClass {
+        public String name;
+        public int age;
+    }
+
+    @BeforeClass
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+
+        // Create few topics with different types
+        
producers.add(pulsarClient.newProducer(Schema.BYTES).topic(topicBytes).create());
+        
producers.add(pulsarClient.newProducer(Schema.STRING).topic(topicString).create());
+        
producers.add(pulsarClient.newProducer(Schema.AVRO(MyClass.class)).topic(topicAvro).create());
+        
producers.add(pulsarClient.newProducer(Schema.JSON(MyClass.class)).topic(topicJson).create());
+    }
+
+    @AfterClass
+    @Override
+    protected void cleanup() throws Exception {
+        producers.forEach(t -> {
+            try {
+                t.close();
+            } catch (PulsarClientException e) {
+            }
+        });
+        super.internalCleanup();
+    }
+
+    @DataProvider(name = "serviceUrl")
+    public String[] serviceUrls() {
+        return new String[] { "pulsar://" + pulsar.getAdvertisedAddress() + 
":" + BROKER_PORT,
+                "http://"; + pulsar.getAdvertisedAddress() + ":" + 
BROKER_WEBSERVICE_PORT };
+    }
+
+    @Test(dataProvider = "serviceUrl")
+    public void testGetSchema(String serviceUrl) throws Exception {
+        @Cleanup
+        PulsarClientImpl client = (PulsarClientImpl) 
PulsarClient.builder().serviceUrl(serviceUrl).build();
+
+        assertEquals(client.getSchema("non-existing-topic").join(), 
Optional.empty());
+        assertEquals(client.getSchema(topicBytes).join(), Optional.empty());
+        assertEquals(client.getSchema(topicString).join(), 
Optional.of(Schema.STRING.getSchemaInfo()));
+        assertEquals(client.getSchema(topicJson).join(), 
Optional.of(Schema.JSON(MyClass.class).getSchemaInfo()));
+        assertEquals(client.getSchema(topicAvro).join(), 
Optional.of(Schema.AVRO(MyClass.class).getSchemaInfo()));
+    }
+
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java
index d7e7c2c..341487f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java
@@ -22,9 +22,18 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.module.jsonSchema.JsonSchema;
 import com.fasterxml.jackson.module.jsonSchema.JsonSchemaGenerator;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Date;
+import java.util.Map;
+
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
+
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SchemaSerializationException;
 import org.apache.pulsar.client.impl.schema.JSONSchema;
@@ -34,10 +43,6 @@ import org.apache.pulsar.common.schema.SchemaType;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Map;
-
 public class JsonSchemaCompatibilityCheckTest extends 
BaseAvroSchemaCompatibilityTest{
 
     @Override
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
index 2bcd142..2618066 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
@@ -51,6 +51,16 @@ public class PulsarClientException extends IOException {
         }
     }
 
+    public static class NotFoundException extends PulsarClientException {
+        public NotFoundException(String msg) {
+            super(msg);
+        }
+
+        public NotFoundException(Throwable t) {
+            super(t);
+        }
+    }
+
     public static class TimeoutException extends PulsarClientException {
         public TimeoutException(String msg) {
             super(msg);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
index 6d8f678..29ae1f4 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
@@ -21,29 +21,32 @@ package org.apache.pulsar.client.impl;
 import static java.lang.String.format;
 
 import com.google.common.collect.Lists;
+
+import io.netty.buffer.ByteBuf;
+
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.nio.channels.ClosedChannelException;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse;
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse.LookupType;
-import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.schema.SchemaInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import io.netty.buffer.ByteBuf;
-
 public class BinaryProtoLookupService implements LookupService {
 
     private final PulsarClientImpl client;
@@ -181,6 +184,16 @@ public class BinaryProtoLookupService implements 
LookupService {
         return partitionFuture;
     }
 
+    @Override
+    public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName 
topicName) {
+        return 
client.getCnxPool().getConnection(serviceAddress).thenCompose(clientCnx -> {
+            long requestId = client.newRequestId();
+            ByteBuf request = Commands.newGetSchema(requestId, 
topicName.toString(), Optional.empty());
+
+            return clientCnx.sendGetSchema(request, requestId);
+        });
+    }
+
     public String getServiceUrl() {
         return serviceAddress.toString();
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 6a8ab74..887a890 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -23,6 +23,7 @@ import static 
org.apache.pulsar.client.impl.HttpClient.getPulsarClientVersion;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Queues;
+
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandler;
@@ -31,16 +32,20 @@ import io.netty.channel.EventLoopGroup;
 import io.netty.channel.unix.Errors.NativeIoException;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.util.concurrent.Promise;
+
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.nio.channels.ClosedChannelException;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+
 import javax.net.ssl.SSLSession;
+
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.http.conn.ssl.DefaultHostnameVerifier;
 import org.apache.pulsar.client.api.Authentication;
@@ -56,6 +61,7 @@ import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseProducer;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnected;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandError;
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse;
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandMessage;
@@ -67,6 +73,7 @@ import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandSendReceipt;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSuccess;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
 import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
+import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -87,6 +94,9 @@ public class ClientCnx extends PulsarHandler {
     private final ConcurrentLongHashMap<CompletableFuture<List<String>>> 
pendingGetTopicsRequests =
         new ConcurrentLongHashMap<>(16, 1);
 
+    private final 
ConcurrentLongHashMap<CompletableFuture<Optional<SchemaInfo>>> 
pendingGetSchemaRequests = new ConcurrentLongHashMap<>(
+            16, 1);
+
     private final ConcurrentLongHashMap<ProducerImpl<?>> producers = new 
ConcurrentLongHashMap<>(16, 1);
     private final ConcurrentLongHashMap<ConsumerImpl<?>> consumers = new 
ConcurrentLongHashMap<>(16, 1);
 
@@ -151,7 +161,7 @@ public class ClientCnx extends PulsarHandler {
                     }
                 });
     }
-    
+
     protected ByteBuf newConnectCommand() throws PulsarClientException {
         String authData = "";
         if (authentication.getAuthData().hasDataFromCommand()) {
@@ -178,6 +188,7 @@ public class ClientCnx extends PulsarHandler {
         waitingLookupRequests.forEach(pair -> 
pair.getRight().getRight().completeExceptionally(e));
         pendingGetLastMessageIdRequests.forEach((key, future) -> 
future.completeExceptionally(e));
         pendingGetTopicsRequests.forEach((key, future) -> 
future.completeExceptionally(e));
+        pendingGetSchemaRequests.forEach((key, future) -> 
future.completeExceptionally(e));
 
         // Notify all attached producers/consumers so they have a chance to 
reconnect
         producers.forEach((id, producer) -> producer.connectionClosed(this));
@@ -595,6 +606,31 @@ public class ClientCnx extends PulsarHandler {
         }
     }
 
+    @Override
+    protected void handleGetSchemaResponse(CommandGetSchemaResponse 
commandGetSchemaResponse) {
+        checkArgument(state == State.Ready);
+
+        long requestId = commandGetSchemaResponse.getRequestId();
+
+        CompletableFuture<Optional<SchemaInfo>> future = 
pendingGetSchemaRequests.remove(requestId);
+        if (future == null) {
+            log.warn("{} Received unknown request id from server: {}", 
ctx.channel(), requestId);
+            return;
+        }
+
+        if (commandGetSchemaResponse.hasErrorCode()) {
+            // Request has failed
+            ServerError rc = commandGetSchemaResponse.getErrorCode();
+            if (rc == ServerError.TopicNotFound) {
+                future.complete(Optional.empty());
+            } else {
+                future.completeExceptionally(getPulsarClientException(rc, 
commandGetSchemaResponse.getErrorMessage()));
+            }
+        } else {
+            future.complete(Optional.of(new 
SchemaInfo(commandGetSchemaResponse.getSchema())));
+        }
+    }
+
     Promise<Void> newPromise() {
         return ctx.newPromise();
     }
@@ -644,6 +680,23 @@ public class ClientCnx extends PulsarHandler {
         return future;
     }
 
+    public CompletableFuture<Optional<SchemaInfo>> sendGetSchema(ByteBuf 
request, long requestId) {
+        CompletableFuture<Optional<SchemaInfo>> future = new 
CompletableFuture<>();
+
+        pendingGetSchemaRequests.put(requestId, future);
+
+        ctx.writeAndFlush(request).addListener(writeFuture -> {
+            if (!writeFuture.isSuccess()) {
+                log.warn("{} Failed to send GetSchema request to broker: {}", 
ctx.channel(),
+                        writeFuture.cause().getMessage());
+                pendingGetLastMessageIdRequests.remove(requestId);
+                future.completeExceptionally(writeFuture.cause());
+            }
+        });
+
+        return future;
+    }
+
     /**
      * check serverError and take appropriate action
      * <ul>
@@ -760,7 +813,7 @@ public class ClientCnx extends PulsarHandler {
             return new PulsarClientException(errorMsg);
         }
     }
-    
+
     @VisibleForTesting
     public void close() {
        if (ctx != null) {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
index eb67262..e5ef4c5 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
@@ -18,6 +18,13 @@
  */
 package org.apache.pulsar.client.impl;
 
+import com.google.common.util.concurrent.MoreExecutors;
+
+import io.netty.channel.EventLoopGroup;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.ssl.SslContext;
+
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.InputStream;
@@ -31,6 +38,7 @@ import java.util.concurrent.CompletableFuture;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.PulsarClientException.NotFoundException;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.common.util.SecurityUtility;
 import org.asynchttpclient.AsyncCompletionHandler;
@@ -46,13 +54,6 @@ import org.asynchttpclient.channel.DefaultKeepAliveStrategy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.util.concurrent.MoreExecutors;
-
-import io.netty.channel.EventLoopGroup;
-import io.netty.handler.codec.http.HttpRequest;
-import io.netty.handler.codec.http.HttpResponse;
-import io.netty.handler.ssl.SslContext;
-
 public class HttpClient implements Closeable {
 
     protected final static int DEFAULT_CONNECT_TIMEOUT_IN_SECONDS = 10;
@@ -158,8 +159,13 @@ public class HttpClient implements Closeable {
                     Response response = responseFuture.get();
                     if (response.getStatusCode() != HttpURLConnection.HTTP_OK) 
{
                         log.warn("[{}] HTTP get request failed: {}", 
requestUrl, response.getStatusText());
-                        future.completeExceptionally(
-                                new PulsarClientException("HTTP get request 
failed: " + response.getStatusText()));
+                        Exception e;
+                        if (response.getStatusCode() == 
HttpURLConnection.HTTP_NOT_FOUND) {
+                            e = new NotFoundException("Not found: " + 
response.getStatusText());
+                        } else {
+                            e = new PulsarClientException("HTTP get request 
failed: " + response.getStatusText());
+                        }
+                        future.completeExceptionally(e);
                         return;
                     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
index 185412c..a427fbd 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
@@ -19,25 +19,30 @@
 package org.apache.pulsar.client.impl;
 
 import com.google.common.collect.Lists;
+
+import io.netty.channel.EventLoopGroup;
+
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.PulsarClientException.NotFoundException;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.lookup.data.LookupData;
-import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.schema.GetSchemaResponse;
+import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import io.netty.channel.EventLoopGroup;
-
 class HttpLookupService implements LookupService {
 
     private final HttpClient httpClient;
@@ -122,6 +127,27 @@ class HttpLookupService implements LookupService {
     }
 
     @Override
+    public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName 
topicName) {
+        CompletableFuture<Optional<SchemaInfo>> future = new 
CompletableFuture<>();
+
+        String schemaName = topicName.getSchemaName();
+        String path = String.format("admin/v2/schemas/%s/schema", schemaName);
+
+        httpClient.get(path, GetSchemaResponse.class).thenAccept(response -> {
+            future.complete(Optional.of(new SchemaInfo(schemaName, response)));
+        }).exceptionally(ex -> {
+            if (ex.getCause() instanceof NotFoundException) {
+                future.complete(Optional.empty());
+            } else {
+                log.warn("Failed to get schema for topic {} : {}", topicName, 
ex.getCause().getClass());
+                future.completeExceptionally(ex);
+            }
+            return null;
+        });
+        return future;
+    }
+
+    @Override
     public void close() throws Exception {
         httpClient.close();
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java
index aa00e54..769422b 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java
@@ -20,12 +20,14 @@ package org.apache.pulsar.client.impl;
 
 import java.net.InetSocketAddress;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.schema.SchemaInfo;
 
 /**
  * Provides lookup service to find broker which serves given topic. It helps to
@@ -58,6 +60,8 @@ interface LookupService extends AutoCloseable {
         */
        public CompletableFuture<PartitionedTopicMetadata> 
getPartitionedTopicMetadata(TopicName topicName);
 
+       public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName 
topicName);
+
        /**
         * Returns broker-service lookup api url.
         *
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 84dcfc1..e156e10 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -23,12 +23,15 @@ import static org.apache.commons.lang3.StringUtils.isBlank;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+
 import io.netty.channel.EventLoopGroup;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timer;
 import io.netty.util.concurrent.DefaultThreadFactory;
+
 import java.util.IdentityHashMap;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -36,9 +39,9 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
-
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
+
 import org.apache.pulsar.client.api.ClientConfiguration;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
@@ -63,6 +66,7 @@ import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
 import org.slf4j.Logger;
@@ -559,6 +563,23 @@ public class PulsarClientImpl implements PulsarClient {
         return readerFuture;
     }
 
+    /**
+     * Read the schema information for a given topic.
+     *
+     * If the topic does not exist or it has no schema associated, it will 
return an empty response
+     */
+    public CompletableFuture<Optional<SchemaInfo>> getSchema(String topic) {
+        TopicName topicName;
+        try {
+            topicName = TopicName.get(topic);
+        } catch (Throwable t) {
+            return FutureUtil
+                    .failedFuture(new 
PulsarClientException.InvalidTopicNameException("Invalid topic name: " + 
topic));
+        }
+
+        return lookup.getSchema(topicName);
+    }
+
     @Override
     public void close() throws PulsarClientException {
         try {
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
index 3c1ac99..b5e2406 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
@@ -33,6 +33,7 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
 import org.apache.commons.lang3.tuple.Pair;
@@ -462,6 +463,24 @@ public class Commands {
         }
     }
 
+    public static SchemaType getSchemaType(PulsarApi.Schema.Type type) {
+        switch (type) {
+            case None:
+                return SchemaType.NONE;
+            case String:
+                return SchemaType.STRING;
+            case Json:
+                return SchemaType.JSON;
+            case Protobuf:
+                return SchemaType.PROTOBUF;
+            case Avro:
+                return SchemaType.AVRO;
+            default:
+                return SchemaType.NONE;
+        }
+    }
+
+
     private static PulsarApi.Schema getSchema(SchemaInfo schemaInfo) {
         PulsarApi.Schema.Builder builder = PulsarApi.Schema.newBuilder()
             .setName(schemaInfo.getName())
@@ -797,6 +816,47 @@ public class Commands {
         return res;
     }
 
+    public static ByteBuf newGetSchema(long requestId, String topic, 
Optional<SchemaVersion> version) {
+        PulsarApi.CommandGetSchema.Builder schema = 
PulsarApi.CommandGetSchema.newBuilder()
+            .setRequestId(requestId);
+        schema.setTopic(topic);
+        if (version.isPresent()) {
+            
schema.setSchemaVersion(ByteString.copyFrom(version.get().bytes()));
+        }
+
+        ByteBuf res = serializeWithSize(BaseCommand.newBuilder()
+            .setType(Type.GET_SCHEMA)
+            .setGetSchema(schema.build()));
+        schema.recycle();
+        return res;
+    }
+
+    public static ByteBuf newGetSchemaResponse(long requestId, SchemaInfo 
schema, SchemaVersion version) {
+        PulsarApi.CommandGetSchemaResponse.Builder schemaResponse = 
PulsarApi.CommandGetSchemaResponse.newBuilder()
+            .setRequestId(requestId)
+            .setSchemaVersion(ByteString.copyFrom(version.bytes()))
+            .setSchema(getSchema(schema));
+
+        ByteBuf res = serializeWithSize(BaseCommand.newBuilder()
+            .setType(Type.GET_SCHEMA_RESPONSE)
+            .setGetSchemaResponse(schemaResponse.build()));
+        schemaResponse.recycle();
+        return res;
+    }
+
+    public static ByteBuf newGetSchemaResponseError(long requestId, 
ServerError error, String errorMessage) {
+        PulsarApi.CommandGetSchemaResponse.Builder schemaResponse = 
PulsarApi.CommandGetSchemaResponse.newBuilder()
+            .setRequestId(requestId)
+            .setErrorCode(error)
+            .setErrorMessage(errorMessage);
+
+        ByteBuf res = serializeWithSize(BaseCommand.newBuilder()
+            .setType(Type.GET_SCHEMA_RESPONSE)
+            .setGetSchemaResponse(schemaResponse.build()));
+        schemaResponse.recycle();
+        return res;
+    }
+
     @VisibleForTesting
     public static ByteBuf serializeWithSize(BaseCommand.Builder cmdBuilder) {
         // / Wire format
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarDecoder.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarDecoder.java
index 5d2f1b0..f8c2d61 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarDecoder.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarDecoder.java
@@ -20,6 +20,10 @@ package org.apache.pulsar.common.api;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+
 import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck;
@@ -32,6 +36,8 @@ import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandConsumerStats;
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandConsumerStatsResponse;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandError;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandFlow;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse;
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace;
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopic;
@@ -55,9 +61,6 @@ import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandUnsubscribe;
 import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
 
 public abstract class PulsarDecoder extends ChannelInboundHandlerAdapter {
 
@@ -288,6 +291,18 @@ public abstract class PulsarDecoder extends 
ChannelInboundHandlerAdapter {
                 
handleGetTopicsOfNamespaceSuccess(cmd.getGetTopicsOfNamespaceResponse());
                 cmd.getGetTopicsOfNamespaceResponse().recycle();
                 break;
+
+            case GET_SCHEMA:
+                checkArgument(cmd.hasGetSchema());
+                handleGetSchema(cmd.getGetSchema());
+                cmd.getGetSchema().recycle();
+                break;
+
+            case GET_SCHEMA_RESPONSE:
+                checkArgument(cmd.hasGetSchemaResponse());
+                handleGetSchemaResponse(cmd.getGetSchemaResponse());
+                cmd.getGetSchemaResponse().recycle();
+                break;
             }
         } finally {
             if (cmdBuilder != null) {
@@ -431,5 +446,13 @@ public abstract class PulsarDecoder extends 
ChannelInboundHandlerAdapter {
         throw new UnsupportedOperationException();
     }
 
+    protected void handleGetSchema(CommandGetSchema commandGetSchema) {
+        throw new UnsupportedOperationException();
+    }
+
+    protected void handleGetSchemaResponse(CommandGetSchemaResponse 
commandGetSchemaResponse) {
+        throw new UnsupportedOperationException();
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(PulsarDecoder.class);
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index c161913..edc7e9b 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -23482,6 +23482,1138 @@ public final class PulsarApi {
     // 
@@protoc_insertion_point(class_scope:pulsar.proto.CommandGetTopicsOfNamespaceResponse)
   }
   
+  public interface CommandGetSchemaOrBuilder
+      extends 
org.apache.pulsar.shaded.com.google.protobuf.v241.MessageLiteOrBuilder {
+    
+    // required uint64 request_id = 1;
+    boolean hasRequestId();
+    long getRequestId();
+    
+    // required string topic = 2;
+    boolean hasTopic();
+    String getTopic();
+    
+    // optional bytes schema_version = 3;
+    boolean hasSchemaVersion();
+    org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString 
getSchemaVersion();
+  }
+  public static final class CommandGetSchema extends
+      org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite
+      implements CommandGetSchemaOrBuilder, 
org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage
  {
+    // Use CommandGetSchema.newBuilder() to construct.
+    private io.netty.util.Recycler.Handle handle;
+    private CommandGetSchema(io.netty.util.Recycler.Handle handle) {
+      this.handle = handle;
+    }
+    
+     private static final io.netty.util.Recycler<CommandGetSchema> RECYCLER = 
new io.netty.util.Recycler<CommandGetSchema>() {
+            protected CommandGetSchema newObject(Handle handle) {
+              return new CommandGetSchema(handle);
+            }
+          };
+        
+        public void recycle() {
+            this.initFields();
+            this.memoizedIsInitialized = -1;
+            this.bitField0_ = 0;
+            this.memoizedSerializedSize = -1;
+            if (handle != null) { RECYCLER.recycle(this, handle); }
+        }
+         
+    private CommandGetSchema(boolean noInit) {}
+    
+    private static final CommandGetSchema defaultInstance;
+    public static CommandGetSchema getDefaultInstance() {
+      return defaultInstance;
+    }
+    
+    public CommandGetSchema getDefaultInstanceForType() {
+      return defaultInstance;
+    }
+    
+    private int bitField0_;
+    // required uint64 request_id = 1;
+    public static final int REQUEST_ID_FIELD_NUMBER = 1;
+    private long requestId_;
+    public boolean hasRequestId() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    public long getRequestId() {
+      return requestId_;
+    }
+    
+    // required string topic = 2;
+    public static final int TOPIC_FIELD_NUMBER = 2;
+    private java.lang.Object topic_;
+    public boolean hasTopic() {
+      return ((bitField0_ & 0x00000002) == 0x00000002);
+    }
+    public String getTopic() {
+      java.lang.Object ref = topic_;
+      if (ref instanceof String) {
+        return (String) ref;
+      } else {
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString bs = 
+            (org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString) ref;
+        String s = bs.toStringUtf8();
+        if 
(org.apache.pulsar.shaded.com.google.protobuf.v241.Internal.isValidUtf8(bs)) {
+          topic_ = s;
+        }
+        return s;
+      }
+    }
+    private org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString 
getTopicBytes() {
+      java.lang.Object ref = topic_;
+      if (ref instanceof String) {
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString b = 
+            
org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.copyFromUtf8((String)
 ref);
+        topic_ = b;
+        return b;
+      } else {
+        return (org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString) 
ref;
+      }
+    }
+    
+    // optional bytes schema_version = 3;
+    public static final int SCHEMA_VERSION_FIELD_NUMBER = 3;
+    private org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString 
schemaVersion_;
+    public boolean hasSchemaVersion() {
+      return ((bitField0_ & 0x00000004) == 0x00000004);
+    }
+    public org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString 
getSchemaVersion() {
+      return schemaVersion_;
+    }
+    
+    private void initFields() {
+      requestId_ = 0L;
+      topic_ = "";
+      schemaVersion_ = 
org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.EMPTY;
+    }
+    private byte memoizedIsInitialized = -1;
+    public final boolean isInitialized() {
+      byte isInitialized = memoizedIsInitialized;
+      if (isInitialized != -1) return isInitialized == 1;
+      
+      if (!hasRequestId()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      if (!hasTopic()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      memoizedIsInitialized = 1;
+      return true;
+    }
+    
+    public void 
writeTo(org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream 
output)
+                        throws java.io.IOException {
+        throw new RuntimeException("Cannot use CodedOutputStream");
+    }
+    
+    public void 
writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream output)
+                        throws java.io.IOException {
+      getSerializedSize();
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeUInt64(1, requestId_);
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        output.writeBytes(2, getTopicBytes());
+      }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        output.writeBytes(3, schemaVersion_);
+      }
+    }
+    
+    private int memoizedSerializedSize = -1;
+    public int getSerializedSize() {
+      int size = memoizedSerializedSize;
+      if (size != -1) return size;
+    
+      size = 0;
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += 
org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeUInt64Size(1, requestId_);
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        size += 
org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeBytesSize(2, getTopicBytes());
+      }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        size += 
org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeBytesSize(3, schemaVersion_);
+      }
+      memoizedSerializedSize = size;
+      return size;
+    }
+    
+    private static final long serialVersionUID = 0L;
+    @java.lang.Override
+    protected java.lang.Object writeReplace()
+        throws java.io.ObjectStreamException {
+      return super.writeReplace();
+    }
+    
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema parseFrom(
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString data)
+        throws 
org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException
 {
+         throw new RuntimeException("Disabled");
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema parseFrom(
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString data,
+        
org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite 
extensionRegistry)
+        throws 
org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException
 {
+         throw new RuntimeException("Disabled");
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema parseFrom(byte[] 
data)
+        throws 
org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException
 {
+      return newBuilder().mergeFrom(data).buildParsed();
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema parseFrom(
+        byte[] data,
+        
org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite 
extensionRegistry)
+        throws 
org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException
 {
+      return newBuilder().mergeFrom(data, extensionRegistry)
+               .buildParsed();
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema 
parseFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema parseFrom(
+        java.io.InputStream input,
+        
org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite 
extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema 
parseDelimitedFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema 
parseDelimitedFrom(
+        java.io.InputStream input,
+        
org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite 
extensionRegistry)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema parseFrom(
+        org.apache.pulsar.shaded.com.google.protobuf.v241.CodedInputStream 
input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema parseFrom(
+        org.apache.pulsar.shaded.com.google.protobuf.v241.CodedInputStream 
input,
+        
org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite 
extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    
+    public static Builder newBuilder() { return Builder.create(); }
+    public Builder newBuilderForType() { return newBuilder(); }
+    public static Builder 
newBuilder(org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema 
prototype) {
+      return newBuilder().mergeFrom(prototype);
+    }
+    public Builder toBuilder() { return newBuilder(this); }
+    
+    public static final class Builder extends
+        
org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite.Builder<
+          org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema, 
Builder>
+        implements 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaOrBuilder, 
org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder
  {
+      // Construct using 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema.newBuilder()
+      private final io.netty.util.Recycler.Handle handle;
+      private Builder(io.netty.util.Recycler.Handle handle) {
+        this.handle = handle;
+        maybeForceBuilderInitialization();
+      }
+      private final static io.netty.util.Recycler<Builder> RECYCLER = new 
io.netty.util.Recycler<Builder>() {
+         protected Builder newObject(io.netty.util.Recycler.Handle handle) {
+               return new Builder(handle);
+             }
+            };
+      
+       public void recycle() {
+                clear();
+                if (handle != null) {RECYCLER.recycle(this, handle);}
+            }
+      
+      private void maybeForceBuilderInitialization() {
+      }
+      private static Builder create() {
+        return RECYCLER.get();
+      }
+      
+      public Builder clear() {
+        super.clear();
+        requestId_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000001);
+        topic_ = "";
+        bitField0_ = (bitField0_ & ~0x00000002);
+        schemaVersion_ = 
org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.EMPTY;
+        bitField0_ = (bitField0_ & ~0x00000004);
+        return this;
+      }
+      
+      public Builder clone() {
+        return create().mergeFrom(buildPartial());
+      }
+      
+      public org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema 
getDefaultInstanceForType() {
+        return 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema.getDefaultInstance();
+      }
+      
+      public org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema 
build() {
+        org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema result = 
buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(result);
+        }
+        return result;
+      }
+      
+      private org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema 
buildParsed()
+          throws 
org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException
 {
+        org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema result = 
buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(
+            result).asInvalidProtocolBufferException();
+        }
+        return result;
+      }
+      
+      public org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema 
buildPartial() {
+        org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema result = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema.RECYCLER.get();
+        int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
+        if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        result.requestId_ = requestId_;
+        if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+          to_bitField0_ |= 0x00000002;
+        }
+        result.topic_ = topic_;
+        if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+          to_bitField0_ |= 0x00000004;
+        }
+        result.schemaVersion_ = schemaVersion_;
+        result.bitField0_ = to_bitField0_;
+        return result;
+      }
+      
+      public Builder 
mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema other) {
+        if (other == 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema.getDefaultInstance())
 return this;
+        if (other.hasRequestId()) {
+          setRequestId(other.getRequestId());
+        }
+        if (other.hasTopic()) {
+          setTopic(other.getTopic());
+        }
+        if (other.hasSchemaVersion()) {
+          setSchemaVersion(other.getSchemaVersion());
+        }
+        return this;
+      }
+      
+      public final boolean isInitialized() {
+        if (!hasRequestId()) {
+          
+          return false;
+        }
+        if (!hasTopic()) {
+          
+          return false;
+        }
+        return true;
+      }
+      
+      public Builder 
mergeFrom(org.apache.pulsar.shaded.com.google.protobuf.v241.CodedInputStream 
input,
+                              
org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite 
extensionRegistry)
+                              throws java.io.IOException {
+         throw new java.io.IOException("Merge from CodedInputStream is 
disabled");
+                              }
+      public Builder mergeFrom(
+          org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream input,
+          
org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite 
extensionRegistry)
+          throws java.io.IOException {
+        while (true) {
+          int tag = input.readTag();
+          switch (tag) {
+            case 0:
+              
+              return this;
+            default: {
+              if (!input.skipField(tag)) {
+                
+                return this;
+              }
+              break;
+            }
+            case 8: {
+              bitField0_ |= 0x00000001;
+              requestId_ = input.readUInt64();
+              break;
+            }
+            case 18: {
+              bitField0_ |= 0x00000002;
+              topic_ = input.readBytes();
+              break;
+            }
+            case 26: {
+              bitField0_ |= 0x00000004;
+              schemaVersion_ = input.readBytes();
+              break;
+            }
+          }
+        }
+      }
+      
+      private int bitField0_;
+      
+      // required uint64 request_id = 1;
+      private long requestId_ ;
+      public boolean hasRequestId() {
+        return ((bitField0_ & 0x00000001) == 0x00000001);
+      }
+      public long getRequestId() {
+        return requestId_;
+      }
+      public Builder setRequestId(long value) {
+        bitField0_ |= 0x00000001;
+        requestId_ = value;
+        
+        return this;
+      }
+      public Builder clearRequestId() {
+        bitField0_ = (bitField0_ & ~0x00000001);
+        requestId_ = 0L;
+        
+        return this;
+      }
+      
+      // required string topic = 2;
+      private java.lang.Object topic_ = "";
+      public boolean hasTopic() {
+        return ((bitField0_ & 0x00000002) == 0x00000002);
+      }
+      public String getTopic() {
+        java.lang.Object ref = topic_;
+        if (!(ref instanceof String)) {
+          String s = 
((org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString) 
ref).toStringUtf8();
+          topic_ = s;
+          return s;
+        } else {
+          return (String) ref;
+        }
+      }
+      public Builder setTopic(String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000002;
+        topic_ = value;
+        
+        return this;
+      }
+      public Builder clearTopic() {
+        bitField0_ = (bitField0_ & ~0x00000002);
+        topic_ = getDefaultInstance().getTopic();
+        
+        return this;
+      }
+      void 
setTopic(org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString value) {
+        bitField0_ |= 0x00000002;
+        topic_ = value;
+        
+      }
+      
+      // optional bytes schema_version = 3;
+      private org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString 
schemaVersion_ = 
org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.EMPTY;
+      public boolean hasSchemaVersion() {
+        return ((bitField0_ & 0x00000004) == 0x00000004);
+      }
+      public org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString 
getSchemaVersion() {
+        return schemaVersion_;
+      }
+      public Builder 
setSchemaVersion(org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString 
value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000004;
+        schemaVersion_ = value;
+        
+        return this;
+      }
+      public Builder clearSchemaVersion() {
+        bitField0_ = (bitField0_ & ~0x00000004);
+        schemaVersion_ = getDefaultInstance().getSchemaVersion();
+        
+        return this;
+      }
+      
+      // @@protoc_insertion_point(builder_scope:pulsar.proto.CommandGetSchema)
+    }
+    
+    static {
+      defaultInstance = new CommandGetSchema(true);
+      defaultInstance.initFields();
+    }
+    
+    // @@protoc_insertion_point(class_scope:pulsar.proto.CommandGetSchema)
+  }
+  
+  public interface CommandGetSchemaResponseOrBuilder
+      extends 
org.apache.pulsar.shaded.com.google.protobuf.v241.MessageLiteOrBuilder {
+    
+    // required uint64 request_id = 1;
+    boolean hasRequestId();
+    long getRequestId();
+    
+    // optional .pulsar.proto.ServerError error_code = 2;
+    boolean hasErrorCode();
+    org.apache.pulsar.common.api.proto.PulsarApi.ServerError getErrorCode();
+    
+    // optional string error_message = 3;
+    boolean hasErrorMessage();
+    String getErrorMessage();
+    
+    // optional .pulsar.proto.Schema schema = 4;
+    boolean hasSchema();
+    org.apache.pulsar.common.api.proto.PulsarApi.Schema getSchema();
+    
+    // optional bytes schema_version = 5;
+    boolean hasSchemaVersion();
+    org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString 
getSchemaVersion();
+  }
+  public static final class CommandGetSchemaResponse extends
+      org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite
+      implements CommandGetSchemaResponseOrBuilder, 
org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage
  {
+    // Use CommandGetSchemaResponse.newBuilder() to construct.
+    private io.netty.util.Recycler.Handle handle;
+    private CommandGetSchemaResponse(io.netty.util.Recycler.Handle handle) {
+      this.handle = handle;
+    }
+    
+     private static final io.netty.util.Recycler<CommandGetSchemaResponse> 
RECYCLER = new io.netty.util.Recycler<CommandGetSchemaResponse>() {
+            protected CommandGetSchemaResponse newObject(Handle handle) {
+              return new CommandGetSchemaResponse(handle);
+            }
+          };
+        
+        public void recycle() {
+            this.initFields();
+            this.memoizedIsInitialized = -1;
+            this.bitField0_ = 0;
+            this.memoizedSerializedSize = -1;
+            if (handle != null) { RECYCLER.recycle(this, handle); }
+        }
+         
+    private CommandGetSchemaResponse(boolean noInit) {}
+    
+    private static final CommandGetSchemaResponse defaultInstance;
+    public static CommandGetSchemaResponse getDefaultInstance() {
+      return defaultInstance;
+    }
+    
+    public CommandGetSchemaResponse getDefaultInstanceForType() {
+      return defaultInstance;
+    }
+    
+    private int bitField0_;
+    // required uint64 request_id = 1;
+    public static final int REQUEST_ID_FIELD_NUMBER = 1;
+    private long requestId_;
+    public boolean hasRequestId() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    public long getRequestId() {
+      return requestId_;
+    }
+    
+    // optional .pulsar.proto.ServerError error_code = 2;
+    public static final int ERROR_CODE_FIELD_NUMBER = 2;
+    private org.apache.pulsar.common.api.proto.PulsarApi.ServerError 
errorCode_;
+    public boolean hasErrorCode() {
+      return ((bitField0_ & 0x00000002) == 0x00000002);
+    }
+    public org.apache.pulsar.common.api.proto.PulsarApi.ServerError 
getErrorCode() {
+      return errorCode_;
+    }
+    
+    // optional string error_message = 3;
+    public static final int ERROR_MESSAGE_FIELD_NUMBER = 3;
+    private java.lang.Object errorMessage_;
+    public boolean hasErrorMessage() {
+      return ((bitField0_ & 0x00000004) == 0x00000004);
+    }
+    public String getErrorMessage() {
+      java.lang.Object ref = errorMessage_;
+      if (ref instanceof String) {
+        return (String) ref;
+      } else {
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString bs = 
+            (org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString) ref;
+        String s = bs.toStringUtf8();
+        if 
(org.apache.pulsar.shaded.com.google.protobuf.v241.Internal.isValidUtf8(bs)) {
+          errorMessage_ = s;
+        }
+        return s;
+      }
+    }
+    private org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString 
getErrorMessageBytes() {
+      java.lang.Object ref = errorMessage_;
+      if (ref instanceof String) {
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString b = 
+            
org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.copyFromUtf8((String)
 ref);
+        errorMessage_ = b;
+        return b;
+      } else {
+        return (org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString) 
ref;
+      }
+    }
+    
+    // optional .pulsar.proto.Schema schema = 4;
+    public static final int SCHEMA_FIELD_NUMBER = 4;
+    private org.apache.pulsar.common.api.proto.PulsarApi.Schema schema_;
+    public boolean hasSchema() {
+      return ((bitField0_ & 0x00000008) == 0x00000008);
+    }
+    public org.apache.pulsar.common.api.proto.PulsarApi.Schema getSchema() {
+      return schema_;
+    }
+    
+    // optional bytes schema_version = 5;
+    public static final int SCHEMA_VERSION_FIELD_NUMBER = 5;
+    private org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString 
schemaVersion_;
+    public boolean hasSchemaVersion() {
+      return ((bitField0_ & 0x00000010) == 0x00000010);
+    }
+    public org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString 
getSchemaVersion() {
+      return schemaVersion_;
+    }
+    
+    private void initFields() {
+      requestId_ = 0L;
+      errorCode_ = 
org.apache.pulsar.common.api.proto.PulsarApi.ServerError.UnknownError;
+      errorMessage_ = "";
+      schema_ = 
org.apache.pulsar.common.api.proto.PulsarApi.Schema.getDefaultInstance();
+      schemaVersion_ = 
org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.EMPTY;
+    }
+    private byte memoizedIsInitialized = -1;
+    public final boolean isInitialized() {
+      byte isInitialized = memoizedIsInitialized;
+      if (isInitialized != -1) return isInitialized == 1;
+      
+      if (!hasRequestId()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      if (hasSchema()) {
+        if (!getSchema().isInitialized()) {
+          memoizedIsInitialized = 0;
+          return false;
+        }
+      }
+      memoizedIsInitialized = 1;
+      return true;
+    }
+    
+    public void 
writeTo(org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream 
output)
+                        throws java.io.IOException {
+        throw new RuntimeException("Cannot use CodedOutputStream");
+    }
+    
+    public void 
writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream output)
+                        throws java.io.IOException {
+      getSerializedSize();
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeUInt64(1, requestId_);
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        output.writeEnum(2, errorCode_.getNumber());
+      }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        output.writeBytes(3, getErrorMessageBytes());
+      }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        output.writeMessage(4, schema_);
+      }
+      if (((bitField0_ & 0x00000010) == 0x00000010)) {
+        output.writeBytes(5, schemaVersion_);
+      }
+    }
+    
+    private int memoizedSerializedSize = -1;
+    public int getSerializedSize() {
+      int size = memoizedSerializedSize;
+      if (size != -1) return size;
+    
+      size = 0;
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += 
org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeUInt64Size(1, requestId_);
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        size += 
org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeEnumSize(2, errorCode_.getNumber());
+      }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        size += 
org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeBytesSize(3, getErrorMessageBytes());
+      }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        size += 
org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeMessageSize(4, schema_);
+      }
+      if (((bitField0_ & 0x00000010) == 0x00000010)) {
+        size += 
org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeBytesSize(5, schemaVersion_);
+      }
+      memoizedSerializedSize = size;
+      return size;
+    }
+    
+    private static final long serialVersionUID = 0L;
+    @java.lang.Override
+    protected java.lang.Object writeReplace()
+        throws java.io.ObjectStreamException {
+      return super.writeReplace();
+    }
+    
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse parseFrom(
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString data)
+        throws 
org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException
 {
+         throw new RuntimeException("Disabled");
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse parseFrom(
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString data,
+        
org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite 
extensionRegistry)
+        throws 
org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException
 {
+         throw new RuntimeException("Disabled");
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse 
parseFrom(byte[] data)
+        throws 
org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException
 {
+      return newBuilder().mergeFrom(data).buildParsed();
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse parseFrom(
+        byte[] data,
+        
org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite 
extensionRegistry)
+        throws 
org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException
 {
+      return newBuilder().mergeFrom(data, extensionRegistry)
+               .buildParsed();
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse 
parseFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse parseFrom(
+        java.io.InputStream input,
+        
org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite 
extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse 
parseDelimitedFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse 
parseDelimitedFrom(
+        java.io.InputStream input,
+        
org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite 
extensionRegistry)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse parseFrom(
+        org.apache.pulsar.shaded.com.google.protobuf.v241.CodedInputStream 
input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse parseFrom(
+        org.apache.pulsar.shaded.com.google.protobuf.v241.CodedInputStream 
input,
+        
org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite 
extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    
+    public static Builder newBuilder() { return Builder.create(); }
+    public Builder newBuilderForType() { return newBuilder(); }
+    public static Builder 
newBuilder(org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse
 prototype) {
+      return newBuilder().mergeFrom(prototype);
+    }
+    public Builder toBuilder() { return newBuilder(this); }
+    
+    public static final class Builder extends
+        
org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite.Builder<
+          
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse, Builder>
+        implements 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponseOrBuilder, 
org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder
  {
+      // Construct using 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse.newBuilder()
+      private final io.netty.util.Recycler.Handle handle;
+      private Builder(io.netty.util.Recycler.Handle handle) {
+        this.handle = handle;
+        maybeForceBuilderInitialization();
+      }
+      private final static io.netty.util.Recycler<Builder> RECYCLER = new 
io.netty.util.Recycler<Builder>() {
+         protected Builder newObject(io.netty.util.Recycler.Handle handle) {
+               return new Builder(handle);
+             }
+            };
+      
+       public void recycle() {
+                clear();
+                if (handle != null) {RECYCLER.recycle(this, handle);}
+            }
+      
+      private void maybeForceBuilderInitialization() {
+      }
+      private static Builder create() {
+        return RECYCLER.get();
+      }
+      
+      public Builder clear() {
+        super.clear();
+        requestId_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000001);
+        errorCode_ = 
org.apache.pulsar.common.api.proto.PulsarApi.ServerError.UnknownError;
+        bitField0_ = (bitField0_ & ~0x00000002);
+        errorMessage_ = "";
+        bitField0_ = (bitField0_ & ~0x00000004);
+        schema_ = 
org.apache.pulsar.common.api.proto.PulsarApi.Schema.getDefaultInstance();
+        bitField0_ = (bitField0_ & ~0x00000008);
+        schemaVersion_ = 
org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.EMPTY;
+        bitField0_ = (bitField0_ & ~0x00000010);
+        return this;
+      }
+      
+      public Builder clone() {
+        return create().mergeFrom(buildPartial());
+      }
+      
+      public 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse 
getDefaultInstanceForType() {
+        return 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse.getDefaultInstance();
+      }
+      
+      public 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse build() {
+        org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse 
result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(result);
+        }
+        return result;
+      }
+      
+      private 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse 
buildParsed()
+          throws 
org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException
 {
+        org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse 
result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(
+            result).asInvalidProtocolBufferException();
+        }
+        return result;
+      }
+      
+      public 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse 
buildPartial() {
+        org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse 
result = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse.RECYCLER.get();
+        int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
+        if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        result.requestId_ = requestId_;
+        if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+          to_bitField0_ |= 0x00000002;
+        }
+        result.errorCode_ = errorCode_;
+        if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+          to_bitField0_ |= 0x00000004;
+        }
+        result.errorMessage_ = errorMessage_;
+        if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+          to_bitField0_ |= 0x00000008;
+        }
+        result.schema_ = schema_;
+        if (((from_bitField0_ & 0x00000010) == 0x00000010)) {
+          to_bitField0_ |= 0x00000010;
+        }
+        result.schemaVersion_ = schemaVersion_;
+        result.bitField0_ = to_bitField0_;
+        return result;
+      }
+      
+      public Builder 
mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse 
other) {
+        if (other == 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse.getDefaultInstance())
 return this;
+        if (other.hasRequestId()) {
+          setRequestId(other.getRequestId());
+        }
+        if (other.hasErrorCode()) {
+          setErrorCode(other.getErrorCode());
+        }
+        if (other.hasErrorMessage()) {
+          setErrorMessage(other.getErrorMessage());
+        }
+        if (other.hasSchema()) {
+          mergeSchema(other.getSchema());
+        }
+        if (other.hasSchemaVersion()) {
+          setSchemaVersion(other.getSchemaVersion());
+        }
+        return this;
+      }
+      
+      public final boolean isInitialized() {
+        if (!hasRequestId()) {
+          
+          return false;
+        }
+        if (hasSchema()) {
+          if (!getSchema().isInitialized()) {
+            
+            return false;
+          }
+        }
+        return true;
+      }
+      
+      public Builder 
mergeFrom(org.apache.pulsar.shaded.com.google.protobuf.v241.CodedInputStream 
input,
+                              
org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite 
extensionRegistry)
+                              throws java.io.IOException {
+         throw new java.io.IOException("Merge from CodedInputStream is 
disabled");
+                              }
+      public Builder mergeFrom(
+          org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream input,
+          
org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite 
extensionRegistry)
+          throws java.io.IOException {
+        while (true) {
+          int tag = input.readTag();
+          switch (tag) {
+            case 0:
+              
+              return this;
+            default: {
+              if (!input.skipField(tag)) {
+                
+                return this;
+              }
+              break;
+            }
+            case 8: {
+              bitField0_ |= 0x00000001;
+              requestId_ = input.readUInt64();
+              break;
+            }
+            case 16: {
+              int rawValue = input.readEnum();
+              org.apache.pulsar.common.api.proto.PulsarApi.ServerError value = 
org.apache.pulsar.common.api.proto.PulsarApi.ServerError.valueOf(rawValue);
+              if (value != null) {
+                bitField0_ |= 0x00000002;
+                errorCode_ = value;
+              }
+              break;
+            }
+            case 26: {
+              bitField0_ |= 0x00000004;
+              errorMessage_ = input.readBytes();
+              break;
+            }
+            case 34: {
+              org.apache.pulsar.common.api.proto.PulsarApi.Schema.Builder 
subBuilder = org.apache.pulsar.common.api.proto.PulsarApi.Schema.newBuilder();
+              if (hasSchema()) {
+                subBuilder.mergeFrom(getSchema());
+              }
+              input.readMessage(subBuilder, extensionRegistry);
+              setSchema(subBuilder.buildPartial());
+              subBuilder.recycle();
+              break;
+            }
+            case 42: {
+              bitField0_ |= 0x00000010;
+              schemaVersion_ = input.readBytes();
+              break;
+            }
+          }
+        }
+      }
+      
+      private int bitField0_;
+      
+      // required uint64 request_id = 1;
+      private long requestId_ ;
+      public boolean hasRequestId() {
+        return ((bitField0_ & 0x00000001) == 0x00000001);
+      }
+      public long getRequestId() {
+        return requestId_;
+      }
+      public Builder setRequestId(long value) {
+        bitField0_ |= 0x00000001;
+        requestId_ = value;
+        
+        return this;
+      }
+      public Builder clearRequestId() {
+        bitField0_ = (bitField0_ & ~0x00000001);
+        requestId_ = 0L;
+        
+        return this;
+      }
+      
+      // optional .pulsar.proto.ServerError error_code = 2;
+      private org.apache.pulsar.common.api.proto.PulsarApi.ServerError 
errorCode_ = 
org.apache.pulsar.common.api.proto.PulsarApi.ServerError.UnknownError;
+      public boolean hasErrorCode() {
+        return ((bitField0_ & 0x00000002) == 0x00000002);
+      }
+      public org.apache.pulsar.common.api.proto.PulsarApi.ServerError 
getErrorCode() {
+        return errorCode_;
+      }
+      public Builder 
setErrorCode(org.apache.pulsar.common.api.proto.PulsarApi.ServerError value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        bitField0_ |= 0x00000002;
+        errorCode_ = value;
+        
+        return this;
+      }
+      public Builder clearErrorCode() {
+        bitField0_ = (bitField0_ & ~0x00000002);
+        errorCode_ = 
org.apache.pulsar.common.api.proto.PulsarApi.ServerError.UnknownError;
+        
+        return this;
+      }
+      
+      // optional string error_message = 3;
+      private java.lang.Object errorMessage_ = "";
+      public boolean hasErrorMessage() {
+        return ((bitField0_ & 0x00000004) == 0x00000004);
+      }
+      public String getErrorMessage() {
+        java.lang.Object ref = errorMessage_;
+        if (!(ref instanceof String)) {
+          String s = 
((org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString) 
ref).toStringUtf8();
+          errorMessage_ = s;
+          return s;
+        } else {
+          return (String) ref;
+        }
+      }
+      public Builder setErrorMessage(String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000004;
+        errorMessage_ = value;
+        
+        return this;
+      }
+      public Builder clearErrorMessage() {
+        bitField0_ = (bitField0_ & ~0x00000004);
+        errorMessage_ = getDefaultInstance().getErrorMessage();
+        
+        return this;
+      }
+      void 
setErrorMessage(org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString 
value) {
+        bitField0_ |= 0x00000004;
+        errorMessage_ = value;
+        
+      }
+      
+      // optional .pulsar.proto.Schema schema = 4;
+      private org.apache.pulsar.common.api.proto.PulsarApi.Schema schema_ = 
org.apache.pulsar.common.api.proto.PulsarApi.Schema.getDefaultInstance();
+      public boolean hasSchema() {
+        return ((bitField0_ & 0x00000008) == 0x00000008);
+      }
+      public org.apache.pulsar.common.api.proto.PulsarApi.Schema getSchema() {
+        return schema_;
+      }
+      public Builder 
setSchema(org.apache.pulsar.common.api.proto.PulsarApi.Schema value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        schema_ = value;
+        
+        bitField0_ |= 0x00000008;
+        return this;
+      }
+      public Builder setSchema(
+          org.apache.pulsar.common.api.proto.PulsarApi.Schema.Builder 
builderForValue) {
+        schema_ = builderForValue.build();
+        
+        bitField0_ |= 0x00000008;
+        return this;
+      }
+      public Builder 
mergeSchema(org.apache.pulsar.common.api.proto.PulsarApi.Schema value) {
+        if (((bitField0_ & 0x00000008) == 0x00000008) &&
+            schema_ != 
org.apache.pulsar.common.api.proto.PulsarApi.Schema.getDefaultInstance()) {
+          schema_ =
+            
org.apache.pulsar.common.api.proto.PulsarApi.Schema.newBuilder(schema_).mergeFrom(value).buildPartial();
+        } else {
+          schema_ = value;
+        }
+        
+        bitField0_ |= 0x00000008;
+        return this;
+      }
+      public Builder clearSchema() {
+        schema_ = 
org.apache.pulsar.common.api.proto.PulsarApi.Schema.getDefaultInstance();
+        
+        bitField0_ = (bitField0_ & ~0x00000008);
+        return this;
+      }
+      
+      // optional bytes schema_version = 5;
+      private org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString 
schemaVersion_ = 
org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.EMPTY;
+      public boolean hasSchemaVersion() {
+        return ((bitField0_ & 0x00000010) == 0x00000010);
+      }
+      public org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString 
getSchemaVersion() {
+        return schemaVersion_;
+      }
+      public Builder 
setSchemaVersion(org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString 
value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000010;
+        schemaVersion_ = value;
+        
+        return this;
+      }
+      public Builder clearSchemaVersion() {
+        bitField0_ = (bitField0_ & ~0x00000010);
+        schemaVersion_ = getDefaultInstance().getSchemaVersion();
+        
+        return this;
+      }
+      
+      // 
@@protoc_insertion_point(builder_scope:pulsar.proto.CommandGetSchemaResponse)
+    }
+    
+    static {
+      defaultInstance = new CommandGetSchemaResponse(true);
+      defaultInstance.initFields();
+    }
+    
+    // 
@@protoc_insertion_point(class_scope:pulsar.proto.CommandGetSchemaResponse)
+  }
+  
   public interface BaseCommandOrBuilder
       extends 
org.apache.pulsar.shaded.com.google.protobuf.v241.MessageLiteOrBuilder {
     
@@ -23616,6 +24748,14 @@ public final class PulsarApi {
     // optional .pulsar.proto.CommandGetTopicsOfNamespaceResponse 
getTopicsOfNamespaceResponse = 33;
     boolean hasGetTopicsOfNamespaceResponse();
     
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse
 getGetTopicsOfNamespaceResponse();
+    
+    // optional .pulsar.proto.CommandGetSchema getSchema = 34;
+    boolean hasGetSchema();
+    org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema 
getGetSchema();
+    
+    // optional .pulsar.proto.CommandGetSchemaResponse getSchemaResponse = 35;
+    boolean hasGetSchemaResponse();
+    org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse 
getGetSchemaResponse();
   }
   public static final class BaseCommand extends
       org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite
@@ -23686,6 +24826,8 @@ public final class PulsarApi {
       ACTIVE_CONSUMER_CHANGE(29, 31),
       GET_TOPICS_OF_NAMESPACE(30, 32),
       GET_TOPICS_OF_NAMESPACE_RESPONSE(31, 33),
+      GET_SCHEMA(32, 34),
+      GET_SCHEMA_RESPONSE(33, 35),
       ;
       
       public static final int CONNECT_VALUE = 2;
@@ -23720,6 +24862,8 @@ public final class PulsarApi {
       public static final int ACTIVE_CONSUMER_CHANGE_VALUE = 31;
       public static final int GET_TOPICS_OF_NAMESPACE_VALUE = 32;
       public static final int GET_TOPICS_OF_NAMESPACE_RESPONSE_VALUE = 33;
+      public static final int GET_SCHEMA_VALUE = 34;
+      public static final int GET_SCHEMA_RESPONSE_VALUE = 35;
       
       
       public final int getNumber() { return value; }
@@ -23758,6 +24902,8 @@ public final class PulsarApi {
           case 31: return ACTIVE_CONSUMER_CHANGE;
           case 32: return GET_TOPICS_OF_NAMESPACE;
           case 33: return GET_TOPICS_OF_NAMESPACE_RESPONSE;
+          case 34: return GET_SCHEMA;
+          case 35: return GET_SCHEMA_RESPONSE;
           default: return null;
         }
       }
@@ -24115,6 +25261,26 @@ public final class PulsarApi {
       return getTopicsOfNamespaceResponse_;
     }
     
+    // optional .pulsar.proto.CommandGetSchema getSchema = 34;
+    public static final int GETSCHEMA_FIELD_NUMBER = 34;
+    private org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema 
getSchema_;
+    public boolean hasGetSchema() {
+      return ((bitField1_ & 0x00000002) == 0x00000002);
+    }
+    public org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema 
getGetSchema() {
+      return getSchema_;
+    }
+    
+    // optional .pulsar.proto.CommandGetSchemaResponse getSchemaResponse = 35;
+    public static final int GETSCHEMARESPONSE_FIELD_NUMBER = 35;
+    private 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse 
getSchemaResponse_;
+    public boolean hasGetSchemaResponse() {
+      return ((bitField1_ & 0x00000004) == 0x00000004);
+    }
+    public 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse 
getGetSchemaResponse() {
+      return getSchemaResponse_;
+    }
+    
     private void initFields() {
       type_ = 
org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand.Type.CONNECT;
       connect_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandConnect.getDefaultInstance();
@@ -24149,6 +25315,8 @@ public final class PulsarApi {
       activeConsumerChange_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange.getDefaultInstance();
       getTopicsOfNamespace_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.getDefaultInstance();
       getTopicsOfNamespaceResponse_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse.getDefaultInstance();
+      getSchema_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema.getDefaultInstance();
+      getSchemaResponse_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse.getDefaultInstance();
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -24339,6 +25507,18 @@ public final class PulsarApi {
           return false;
         }
       }
+      if (hasGetSchema()) {
+        if (!getGetSchema().isInitialized()) {
+          memoizedIsInitialized = 0;
+          return false;
+        }
+      }
+      if (hasGetSchemaResponse()) {
+        if (!getGetSchemaResponse().isInitialized()) {
+          memoizedIsInitialized = 0;
+          return false;
+        }
+      }
       memoizedIsInitialized = 1;
       return true;
     }
@@ -24450,6 +25630,12 @@ public final class PulsarApi {
       if (((bitField1_ & 0x00000001) == 0x00000001)) {
         output.writeMessage(33, getTopicsOfNamespaceResponse_);
       }
+      if (((bitField1_ & 0x00000002) == 0x00000002)) {
+        output.writeMessage(34, getSchema_);
+      }
+      if (((bitField1_ & 0x00000004) == 0x00000004)) {
+        output.writeMessage(35, getSchemaResponse_);
+      }
     }
     
     private int memoizedSerializedSize = -1;
@@ -24590,6 +25776,14 @@ public final class PulsarApi {
         size += 
org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
           .computeMessageSize(33, getTopicsOfNamespaceResponse_);
       }
+      if (((bitField1_ & 0x00000002) == 0x00000002)) {
+        size += 
org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeMessageSize(34, getSchema_);
+      }
+      if (((bitField1_ & 0x00000004) == 0x00000004)) {
+        size += 
org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeMessageSize(35, getSchemaResponse_);
+      }
       memoizedSerializedSize = size;
       return size;
     }
@@ -24769,6 +25963,10 @@ public final class PulsarApi {
         bitField0_ = (bitField0_ & ~0x80000000);
         getTopicsOfNamespaceResponse_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse.getDefaultInstance();
         bitField1_ = (bitField1_ & ~0x00000001);
+        getSchema_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema.getDefaultInstance();
+        bitField1_ = (bitField1_ & ~0x00000002);
+        getSchemaResponse_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse.getDefaultInstance();
+        bitField1_ = (bitField1_ & ~0x00000004);
         return this;
       }
       
@@ -24936,6 +26134,14 @@ public final class PulsarApi {
           to_bitField1_ |= 0x00000001;
         }
         result.getTopicsOfNamespaceResponse_ = getTopicsOfNamespaceResponse_;
+        if (((from_bitField1_ & 0x00000002) == 0x00000002)) {
+          to_bitField1_ |= 0x00000002;
+        }
+        result.getSchema_ = getSchema_;
+        if (((from_bitField1_ & 0x00000004) == 0x00000004)) {
+          to_bitField1_ |= 0x00000004;
+        }
+        result.getSchemaResponse_ = getSchemaResponse_;
         result.bitField0_ = to_bitField0_;
         result.bitField1_ = to_bitField1_;
         return result;
@@ -25042,6 +26248,12 @@ public final class PulsarApi {
         if (other.hasGetTopicsOfNamespaceResponse()) {
           
mergeGetTopicsOfNamespaceResponse(other.getGetTopicsOfNamespaceResponse());
         }
+        if (other.hasGetSchema()) {
+          mergeGetSchema(other.getGetSchema());
+        }
+        if (other.hasGetSchemaResponse()) {
+          mergeGetSchemaResponse(other.getGetSchemaResponse());
+        }
         return this;
       }
       
@@ -25230,6 +26442,18 @@ public final class PulsarApi {
             return false;
           }
         }
+        if (hasGetSchema()) {
+          if (!getGetSchema().isInitialized()) {
+            
+            return false;
+          }
+        }
+        if (hasGetSchemaResponse()) {
+          if (!getGetSchemaResponse().isInitialized()) {
+            
+            return false;
+          }
+        }
         return true;
       }
       
@@ -25584,6 +26808,26 @@ public final class PulsarApi {
               subBuilder.recycle();
               break;
             }
+            case 274: {
+              
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema.Builder 
subBuilder = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema.newBuilder();
+              if (hasGetSchema()) {
+                subBuilder.mergeFrom(getGetSchema());
+              }
+              input.readMessage(subBuilder, extensionRegistry);
+              setGetSchema(subBuilder.buildPartial());
+              subBuilder.recycle();
+              break;
+            }
+            case 282: {
+              
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse.Builder 
subBuilder = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse.newBuilder();
+              if (hasGetSchemaResponse()) {
+                subBuilder.mergeFrom(getGetSchemaResponse());
+              }
+              input.readMessage(subBuilder, extensionRegistry);
+              setGetSchemaResponse(subBuilder.buildPartial());
+              subBuilder.recycle();
+              break;
+            }
           }
         }
       }
@@ -26991,6 +28235,92 @@ public final class PulsarApi {
         return this;
       }
       
+      // optional .pulsar.proto.CommandGetSchema getSchema = 34;
+      private org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema 
getSchema_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema.getDefaultInstance();
+      public boolean hasGetSchema() {
+        return ((bitField1_ & 0x00000002) == 0x00000002);
+      }
+      public org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema 
getGetSchema() {
+        return getSchema_;
+      }
+      public Builder 
setGetSchema(org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema 
value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        getSchema_ = value;
+        
+        bitField1_ |= 0x00000002;
+        return this;
+      }
+      public Builder setGetSchema(
+          
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema.Builder 
builderForValue) {
+        getSchema_ = builderForValue.build();
+        
+        bitField1_ |= 0x00000002;
+        return this;
+      }
+      public Builder 
mergeGetSchema(org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema 
value) {
+        if (((bitField1_ & 0x00000002) == 0x00000002) &&
+            getSchema_ != 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema.getDefaultInstance())
 {
+          getSchema_ =
+            
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema.newBuilder(getSchema_).mergeFrom(value).buildPartial();
+        } else {
+          getSchema_ = value;
+        }
+        
+        bitField1_ |= 0x00000002;
+        return this;
+      }
+      public Builder clearGetSchema() {
+        getSchema_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema.getDefaultInstance();
+        
+        bitField1_ = (bitField1_ & ~0x00000002);
+        return this;
+      }
+      
+      // optional .pulsar.proto.CommandGetSchemaResponse getSchemaResponse = 
35;
+      private 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse 
getSchemaResponse_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse.getDefaultInstance();
+      public boolean hasGetSchemaResponse() {
+        return ((bitField1_ & 0x00000004) == 0x00000004);
+      }
+      public 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse 
getGetSchemaResponse() {
+        return getSchemaResponse_;
+      }
+      public Builder 
setGetSchemaResponse(org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse
 value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        getSchemaResponse_ = value;
+        
+        bitField1_ |= 0x00000004;
+        return this;
+      }
+      public Builder setGetSchemaResponse(
+          
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse.Builder 
builderForValue) {
+        getSchemaResponse_ = builderForValue.build();
+        
+        bitField1_ |= 0x00000004;
+        return this;
+      }
+      public Builder 
mergeGetSchemaResponse(org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse
 value) {
+        if (((bitField1_ & 0x00000004) == 0x00000004) &&
+            getSchemaResponse_ != 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse.getDefaultInstance())
 {
+          getSchemaResponse_ =
+            
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse.newBuilder(getSchemaResponse_).mergeFrom(value).buildPartial();
+        } else {
+          getSchemaResponse_ = value;
+        }
+        
+        bitField1_ |= 0x00000004;
+        return this;
+      }
+      public Builder clearGetSchemaResponse() {
+        getSchemaResponse_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse.getDefaultInstance();
+        
+        bitField1_ = (bitField1_ & ~0x00000004);
+        return this;
+      }
+      
       // @@protoc_insertion_point(builder_scope:pulsar.proto.BaseCommand)
     }
     
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
index 117019e..ca7b93f 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
@@ -20,17 +20,55 @@ package org.apache.pulsar.common.schema;
 
 import java.util.Collections;
 import java.util.Map;
+import java.util.TreeMap;
 
 import lombok.AllArgsConstructor;
 import lombok.Data;
+import lombok.EqualsAndHashCode;
 import lombok.NoArgsConstructor;
 
+import org.apache.pulsar.common.api.Commands;
+import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
+import org.apache.pulsar.common.api.proto.PulsarApi.Schema;
+
 @Data
 @AllArgsConstructor
 @NoArgsConstructor
 public class SchemaInfo {
+
+    @EqualsAndHashCode.Exclude
     private String name;
+
     private byte[] schema;
     private SchemaType type;
     private Map<String, String> properties = Collections.emptyMap();
+
+    public SchemaInfo(String name, SchemaData data) {
+        this.name = name;
+        this.schema = data.getData();
+        this.type = data.getType();
+        this.properties = data.getProps();
+    }
+
+    public SchemaInfo(Schema schema) {
+        this.name = schema.getName();
+        this.schema = schema.getSchemaData().toByteArray();
+        this.type = Commands.getSchemaType(schema.getType());
+        if (schema.getPropertiesCount() == 0) {
+            this.properties = Collections.emptyMap();
+        } else {
+            this.properties = new TreeMap<>();
+            for (int i =0 ; i < schema.getPropertiesCount();i ++ ) {
+                KeyValue kv = schema.getProperties(i);
+                properties.put(kv.getKey(), kv.getValue());
+            }
+        }
+    }
+
+    public SchemaInfo(String name, GetSchemaResponse schema) {
+        this.name = name;
+        this.schema = schema.getData().getBytes();
+        this.type = schema.getType();
+        this.properties = schema.getProperties();
+    }
 }
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto 
b/pulsar-common/src/main/proto/PulsarApi.proto
index 06eca29..779a27b 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -512,6 +512,22 @@ message CommandGetTopicsOfNamespaceResponse {
        repeated string topics          = 2;
 }
 
+message CommandGetSchema {
+       required uint64 request_id = 1;
+       required string topic      = 2;
+
+       optional bytes schema_version = 3;
+}
+
+message CommandGetSchemaResponse {
+       required uint64 request_id      = 1;
+       optional ServerError error_code = 2;
+    optional string error_message   = 3;
+
+       optional Schema schema          = 4;
+       optional bytes schema_version   = 5;
+}
+
 message BaseCommand {
        enum Type {
                CONNECT     = 2;
@@ -564,6 +580,9 @@ message BaseCommand {
 
                GET_TOPICS_OF_NAMESPACE                         = 32;
                GET_TOPICS_OF_NAMESPACE_RESPONSE        = 33;
+
+               GET_SCHEMA = 34;
+               GET_SCHEMA_RESPONSE = 35;
        }
 
 
@@ -614,4 +633,6 @@ message BaseCommand {
        optional CommandGetTopicsOfNamespace getTopicsOfNamespace = 32;
        optional CommandGetTopicsOfNamespaceResponse 
getTopicsOfNamespaceResponse = 33;
 
+       optional CommandGetSchema getSchema = 34;
+       optional CommandGetSchemaResponse getSchemaResponse = 35;
 }

Reply via email to