merlimat closed pull request #2215: Add option to retrieve schema for a topic 
from pulsar client
URL: https://github.com/apache/incubator-pulsar/pull/2215
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 741afbfefb..4bbfc725af 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -290,7 +290,7 @@ public void start() throws Exception {
 
         ServiceConfiguration serviceConfig = pulsar.getConfiguration();
 
-        bootstrap.childHandler(new PulsarChannelInitializer(this, 
serviceConfig, false));
+        bootstrap.childHandler(new PulsarChannelInitializer(pulsar, false));
 
         // Bind and start to accept incoming connections.
         InetSocketAddress addr = new 
InetSocketAddress(pulsar.getBindAddress(), port);
@@ -303,7 +303,7 @@ public void start() throws Exception {
 
         if (serviceConfig.isTlsEnabled()) {
             ServerBootstrap tlsBootstrap = bootstrap.clone();
-            tlsBootstrap.childHandler(new PulsarChannelInitializer(this, 
serviceConfig, true));
+            tlsBootstrap.childHandler(new PulsarChannelInitializer(pulsar, 
true));
             tlsBootstrap.bind(new InetSocketAddress(pulsar.getBindAddress(), 
tlsPort)).sync();
             log.info("Started Pulsar Broker TLS service on port {} - TLS 
provider: {}", tlsPort,
                     SslContext.defaultServerProvider());
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
index 8c16a55465..7858a11048 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
@@ -18,37 +18,38 @@
  */
 package org.apache.pulsar.broker.service;
 
-import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.common.api.ByteBufPair;
-import org.apache.pulsar.common.api.PulsarDecoder;
-import org.apache.pulsar.common.util.SecurityUtility;
-
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 import io.netty.handler.ssl.SslContext;
 
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.common.api.ByteBufPair;
+import org.apache.pulsar.common.api.PulsarDecoder;
+import org.apache.pulsar.common.util.SecurityUtility;
+
 public class PulsarChannelInitializer extends 
ChannelInitializer<SocketChannel> {
 
     public static final String TLS_HANDLER = "tls";
-    BrokerService brokerService;
-    ServiceConfiguration serviceConfig;
-    boolean enableTLS;
+
+    private final PulsarService pulsar;
+    private final boolean enableTLS;
 
     /**
      *
      * @param brokerService
      */
-    public PulsarChannelInitializer(BrokerService brokerService, 
ServiceConfiguration serviceConfig,
+    public PulsarChannelInitializer(PulsarService pulsar,
             boolean enableTLS) {
         super();
-        this.brokerService = brokerService;
-        this.serviceConfig = serviceConfig;
+        this.pulsar = pulsar;
         this.enableTLS = enableTLS;
     }
 
     @Override
     protected void initChannel(SocketChannel ch) throws Exception {
+        ServiceConfiguration serviceConfig = pulsar.getConfiguration();
         if (enableTLS) {
             SslContext sslCtx = SecurityUtility.createNettySslContextForServer(
                     serviceConfig.isTlsAllowInsecureConnection(), 
serviceConfig.getTlsTrustCertsFilePath(),
@@ -60,6 +61,6 @@ protected void initChannel(SocketChannel ch) throws Exception 
{
 
         ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
         ch.pipeline().addLast("frameDecoder", new 
LengthFieldBasedFrameDecoder(PulsarDecoder.MaxFrameSize, 0, 4, 0, 4));
-        ch.pipeline().addLast("handler", new ServerCnx(brokerService));
+        ch.pipeline().addLast("handler", new ServerCnx(pulsar));
     }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 4b45f72c16..1d191bc2a0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -50,11 +50,13 @@
 import org.apache.bookkeeper.mledger.util.SafeRun;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
+import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
@@ -72,6 +74,7 @@
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandConsumerStatsResponse;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandFlow;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema;
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopic;
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadata;
@@ -93,6 +96,7 @@
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.ConsumerStats;
 import org.apache.pulsar.common.schema.SchemaData;
+import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.schema.SchemaVersion;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -103,6 +107,7 @@
 
 public class ServerCnx extends PulsarHandler {
     private final BrokerService service;
+    private final SchemaRegistryService schemaService;
     private final ConcurrentLongHashMap<CompletableFuture<Producer>> producers;
     private final ConcurrentLongHashMap<CompletableFuture<Consumer>> consumers;
     private State state;
@@ -127,9 +132,10 @@
         Start, Connected, Failed
     }
 
-    public ServerCnx(BrokerService service) {
-        super(service.getKeepAliveIntervalSeconds(), TimeUnit.SECONDS);
-        this.service = service;
+    public ServerCnx(PulsarService pulsar) {
+        super(pulsar.getBrokerService().getKeepAliveIntervalSeconds(), 
TimeUnit.SECONDS);
+        this.service = pulsar.getBrokerService();
+        this.schemaService = pulsar.getSchemaRegistryService();
         this.state = State.Start;
 
         // This maps are not heavily contended since most accesses are within 
the cnx thread
@@ -301,7 +307,7 @@ protected void 
handlePartitionMetadataRequest(CommandPartitionedTopicMetadata pa
         if (topicName == null) {
             return;
         }
-        
+
         final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
         if (lookupSemaphore.tryAcquire()) {
             if (invalidOriginalPrincipal(originalPrincipal)) {
@@ -1198,6 +1204,43 @@ protected void 
handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGet
         }
     }
 
+    @Override
+    protected void handleGetSchema(CommandGetSchema commandGetSchema) {
+        if (log.isDebugEnabled()) {
+            log.debug("Received CommandGetSchema call from {}", remoteAddress);
+        }
+
+        long requestId = commandGetSchema.getRequestId();
+        SchemaVersion schemaVersion = SchemaVersion.Latest;
+        if (commandGetSchema.hasSchemaVersion()) {
+            schemaVersion = 
schemaService.versionFromBytes(commandGetSchema.getSchemaVersion().toByteArray());
+        }
+
+        String schemaName;
+        try {
+            schemaName = 
TopicName.get(commandGetSchema.getTopic()).getSchemaName();
+        } catch (Throwable t) {
+            ctx.writeAndFlush(
+                    Commands.newGetSchemaResponseError(requestId, 
ServerError.InvalidTopicName, t.getMessage()));
+            return;
+        }
+
+        schemaService.getSchema(schemaName, 
schemaVersion).thenAccept(schemaAndMetadata -> {
+            if (schemaAndMetadata == null) {
+                
ctx.writeAndFlush(Commands.newGetSchemaResponseError(requestId, 
ServerError.TopicNotFound,
+                        "Topic not found or no-schema"));
+            } else {
+                ctx.writeAndFlush(Commands.newGetSchemaResponse(requestId,
+                        new SchemaInfo(schemaName, schemaAndMetadata.schema), 
schemaAndMetadata.version));
+            }
+        }).exceptionally(ex -> {
+            ctx.writeAndFlush(
+                    Commands.newGetSchemaResponseError(requestId, 
ServerError.UnknownError, ex.getMessage()));
+            return null;
+        });
+    }
+
+
     @Override
     protected boolean isHandshakeCompleted() {
         return state == State.Connected;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
index ccd32e1e52..bc8d4bfcdb 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
@@ -157,14 +157,14 @@ public void setup() throws Exception {
             return null;
         }).when(channelCtx).writeAndFlush(any(), any());
 
-        serverCnx = spy(new ServerCnx(brokerService));
+        serverCnx = spy(new ServerCnx(pulsar));
         doReturn(true).when(serverCnx).isActive();
         doReturn(true).when(serverCnx).isWritable();
         doReturn(new InetSocketAddress("localhost", 
1234)).when(serverCnx).clientAddress();
         
when(serverCnx.getRemoteEndpointProtocolVersion()).thenReturn(ProtocolVersion.v12.getNumber());
         when(serverCnx.ctx()).thenReturn(channelCtx);
 
-        serverCnxWithOldVersion = spy(new ServerCnx(brokerService));
+        serverCnxWithOldVersion = spy(new ServerCnx(pulsar));
         doReturn(true).when(serverCnxWithOldVersion).isActive();
         doReturn(true).when(serverCnxWithOldVersion).isWritable();
         doReturn(new InetSocketAddress("localhost", 1234))
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
index 8bfd6ef00c..7c5ca2077d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
@@ -98,7 +98,7 @@ public void setup(Method m) throws Exception {
         brokerService = spy(new BrokerService(pulsar));
         doReturn(brokerService).when(pulsar).getBrokerService();
 
-        serverCnx = spy(new ServerCnx(brokerService));
+        serverCnx = spy(new ServerCnx(pulsar));
         doReturn(true).when(serverCnx).isActive();
 
         NamespaceService nsSvc = mock(NamespaceService.class);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index db35e74845..f8ac40a945 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -171,7 +171,7 @@ public void setup() throws Exception {
         brokerService = spy(new BrokerService(pulsar));
         doReturn(brokerService).when(pulsar).getBrokerService();
 
-        serverCnx = spy(new ServerCnx(brokerService));
+        serverCnx = spy(new ServerCnx(pulsar));
         doReturn(true).when(serverCnx).isActive();
         doReturn(true).when(serverCnx).isWritable();
         doReturn(new InetSocketAddress("localhost", 
1234)).when(serverCnx).clientAddress();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 2b324ab3aa..f746571f8c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -1379,7 +1379,7 @@ protected void resetChannel() throws Exception {
             serverCnx.close();
             channel.close().get();
         }
-        serverCnx = new ServerCnx(brokerService);
+        serverCnx = new ServerCnx(pulsar);
         serverCnx.authRole = "";
         channel = new EmbeddedChannel(new 
LengthFieldBasedFrameDecoder(MaxMessageSize, 0, 4, 0, 4), serverCnx);
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ClientGetSchemaTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ClientGetSchemaTest.java
new file mode 100644
index 0000000000..70554926c9
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ClientGetSchemaTest.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.schema;
+
+import static org.testng.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+import lombok.Cleanup;
+
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+public class ClientGetSchemaTest extends ProducerConsumerBase {
+
+    private static final String topicBytes = "my-property/my-ns/topic-bytes";
+    private static final String topicString = "my-property/my-ns/topic-string";
+    private static final String topicJson = "my-property/my-ns/topic-json";
+    private static final String topicAvro = "my-property/my-ns/topic-avro";
+
+    List<Producer<?>> producers = new ArrayList<>();
+
+    private static class MyClass {
+        public String name;
+        public int age;
+    }
+
+    @BeforeClass
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+
+        // Create few topics with different types
+        
producers.add(pulsarClient.newProducer(Schema.BYTES).topic(topicBytes).create());
+        
producers.add(pulsarClient.newProducer(Schema.STRING).topic(topicString).create());
+        
producers.add(pulsarClient.newProducer(Schema.AVRO(MyClass.class)).topic(topicAvro).create());
+        
producers.add(pulsarClient.newProducer(Schema.JSON(MyClass.class)).topic(topicJson).create());
+    }
+
+    @AfterClass
+    @Override
+    protected void cleanup() throws Exception {
+        producers.forEach(t -> {
+            try {
+                t.close();
+            } catch (PulsarClientException e) {
+            }
+        });
+        super.internalCleanup();
+    }
+
+    @DataProvider(name = "serviceUrl")
+    public String[] serviceUrls() {
+        return new String[] { "pulsar://" + pulsar.getAdvertisedAddress() + 
":" + BROKER_PORT,
+                "http://"; + pulsar.getAdvertisedAddress() + ":" + 
BROKER_WEBSERVICE_PORT };
+    }
+
+    @Test(dataProvider = "serviceUrl")
+    public void testGetSchema(String serviceUrl) throws Exception {
+        @Cleanup
+        PulsarClientImpl client = (PulsarClientImpl) 
PulsarClient.builder().serviceUrl(serviceUrl).build();
+
+        assertEquals(client.getSchema("non-existing-topic").join(), 
Optional.empty());
+        assertEquals(client.getSchema(topicBytes).join(), Optional.empty());
+        assertEquals(client.getSchema(topicString).join(), 
Optional.of(Schema.STRING.getSchemaInfo()));
+        assertEquals(client.getSchema(topicJson).join(), 
Optional.of(Schema.JSON(MyClass.class).getSchemaInfo()));
+        assertEquals(client.getSchema(topicAvro).join(), 
Optional.of(Schema.AVRO(MyClass.class).getSchemaInfo()));
+    }
+
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java
index d7e7c2c3a0..341487f695 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java
@@ -22,9 +22,18 @@
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.module.jsonSchema.JsonSchema;
 import com.fasterxml.jackson.module.jsonSchema.JsonSchemaGenerator;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Date;
+import java.util.Map;
+
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
+
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SchemaSerializationException;
 import org.apache.pulsar.client.impl.schema.JSONSchema;
@@ -34,10 +43,6 @@
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Map;
-
 public class JsonSchemaCompatibilityCheckTest extends 
BaseAvroSchemaCompatibilityTest{
 
     @Override
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
index 2bcd1425f6..2618066c13 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
@@ -51,6 +51,16 @@ public InvalidConfigurationException(Throwable t) {
         }
     }
 
+    public static class NotFoundException extends PulsarClientException {
+        public NotFoundException(String msg) {
+            super(msg);
+        }
+
+        public NotFoundException(Throwable t) {
+            super(t);
+        }
+    }
+
     public static class TimeoutException extends PulsarClientException {
         public TimeoutException(String msg) {
             super(msg);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
index 6d8f67850c..29ae1f40f1 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
@@ -21,29 +21,32 @@
 import static java.lang.String.format;
 
 import com.google.common.collect.Lists;
+
+import io.netty.buffer.ByteBuf;
+
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.nio.channels.ClosedChannelException;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse;
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse.LookupType;
-import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.schema.SchemaInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import io.netty.buffer.ByteBuf;
-
 public class BinaryProtoLookupService implements LookupService {
 
     private final PulsarClientImpl client;
@@ -181,6 +184,16 @@ public BinaryProtoLookupService(PulsarClientImpl client, 
String serviceUrl, bool
         return partitionFuture;
     }
 
+    @Override
+    public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName 
topicName) {
+        return 
client.getCnxPool().getConnection(serviceAddress).thenCompose(clientCnx -> {
+            long requestId = client.newRequestId();
+            ByteBuf request = Commands.newGetSchema(requestId, 
topicName.toString(), Optional.empty());
+
+            return clientCnx.sendGetSchema(request, requestId);
+        });
+    }
+
     public String getServiceUrl() {
         return serviceAddress.toString();
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 6a8ab747ca..887a890984 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -23,6 +23,7 @@
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Queues;
+
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandler;
@@ -31,16 +32,20 @@
 import io.netty.channel.unix.Errors.NativeIoException;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.util.concurrent.Promise;
+
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.nio.channels.ClosedChannelException;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+
 import javax.net.ssl.SSLSession;
+
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.http.conn.ssl.DefaultHostnameVerifier;
 import org.apache.pulsar.client.api.Authentication;
@@ -56,6 +61,7 @@
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnected;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandError;
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse;
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandMessage;
@@ -67,6 +73,7 @@
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSuccess;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
 import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
+import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -87,6 +94,9 @@
     private final ConcurrentLongHashMap<CompletableFuture<List<String>>> 
pendingGetTopicsRequests =
         new ConcurrentLongHashMap<>(16, 1);
 
+    private final 
ConcurrentLongHashMap<CompletableFuture<Optional<SchemaInfo>>> 
pendingGetSchemaRequests = new ConcurrentLongHashMap<>(
+            16, 1);
+
     private final ConcurrentLongHashMap<ProducerImpl<?>> producers = new 
ConcurrentLongHashMap<>(16, 1);
     private final ConcurrentLongHashMap<ConsumerImpl<?>> consumers = new 
ConcurrentLongHashMap<>(16, 1);
 
@@ -151,7 +161,7 @@ public void channelActive(ChannelHandlerContext ctx) throws 
Exception {
                     }
                 });
     }
-    
+
     protected ByteBuf newConnectCommand() throws PulsarClientException {
         String authData = "";
         if (authentication.getAuthData().hasDataFromCommand()) {
@@ -178,6 +188,7 @@ public void channelInactive(ChannelHandlerContext ctx) 
throws Exception {
         waitingLookupRequests.forEach(pair -> 
pair.getRight().getRight().completeExceptionally(e));
         pendingGetLastMessageIdRequests.forEach((key, future) -> 
future.completeExceptionally(e));
         pendingGetTopicsRequests.forEach((key, future) -> 
future.completeExceptionally(e));
+        pendingGetSchemaRequests.forEach((key, future) -> 
future.completeExceptionally(e));
 
         // Notify all attached producers/consumers so they have a chance to 
reconnect
         producers.forEach((id, producer) -> producer.connectionClosed(this));
@@ -595,6 +606,31 @@ protected void 
handleGetTopicsOfNamespaceSuccess(CommandGetTopicsOfNamespaceResp
         }
     }
 
+    @Override
+    protected void handleGetSchemaResponse(CommandGetSchemaResponse 
commandGetSchemaResponse) {
+        checkArgument(state == State.Ready);
+
+        long requestId = commandGetSchemaResponse.getRequestId();
+
+        CompletableFuture<Optional<SchemaInfo>> future = 
pendingGetSchemaRequests.remove(requestId);
+        if (future == null) {
+            log.warn("{} Received unknown request id from server: {}", 
ctx.channel(), requestId);
+            return;
+        }
+
+        if (commandGetSchemaResponse.hasErrorCode()) {
+            // Request has failed
+            ServerError rc = commandGetSchemaResponse.getErrorCode();
+            if (rc == ServerError.TopicNotFound) {
+                future.complete(Optional.empty());
+            } else {
+                future.completeExceptionally(getPulsarClientException(rc, 
commandGetSchemaResponse.getErrorMessage()));
+            }
+        } else {
+            future.complete(Optional.of(new 
SchemaInfo(commandGetSchemaResponse.getSchema())));
+        }
+    }
+
     Promise<Void> newPromise() {
         return ctx.newPromise();
     }
@@ -644,6 +680,23 @@ SocketAddress serverAddrees() {
         return future;
     }
 
+    public CompletableFuture<Optional<SchemaInfo>> sendGetSchema(ByteBuf 
request, long requestId) {
+        CompletableFuture<Optional<SchemaInfo>> future = new 
CompletableFuture<>();
+
+        pendingGetSchemaRequests.put(requestId, future);
+
+        ctx.writeAndFlush(request).addListener(writeFuture -> {
+            if (!writeFuture.isSuccess()) {
+                log.warn("{} Failed to send GetSchema request to broker: {}", 
ctx.channel(),
+                        writeFuture.cause().getMessage());
+                pendingGetLastMessageIdRequests.remove(requestId);
+                future.completeExceptionally(writeFuture.cause());
+            }
+        });
+
+        return future;
+    }
+
     /**
      * check serverError and take appropriate action
      * <ul>
@@ -760,7 +813,7 @@ private PulsarClientException 
getPulsarClientException(ServerError error, String
             return new PulsarClientException(errorMsg);
         }
     }
-    
+
     @VisibleForTesting
     public void close() {
        if (ctx != null) {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
index eb672624dc..e5ef4c5abc 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
@@ -18,6 +18,13 @@
  */
 package org.apache.pulsar.client.impl;
 
+import com.google.common.util.concurrent.MoreExecutors;
+
+import io.netty.channel.EventLoopGroup;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.ssl.SslContext;
+
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.InputStream;
@@ -31,6 +38,7 @@
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.PulsarClientException.NotFoundException;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.common.util.SecurityUtility;
 import org.asynchttpclient.AsyncCompletionHandler;
@@ -46,13 +54,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.util.concurrent.MoreExecutors;
-
-import io.netty.channel.EventLoopGroup;
-import io.netty.handler.codec.http.HttpRequest;
-import io.netty.handler.codec.http.HttpResponse;
-import io.netty.handler.ssl.SslContext;
-
 public class HttpClient implements Closeable {
 
     protected final static int DEFAULT_CONNECT_TIMEOUT_IN_SECONDS = 10;
@@ -158,8 +159,13 @@ public void onThrowable(Throwable t) {
                     Response response = responseFuture.get();
                     if (response.getStatusCode() != HttpURLConnection.HTTP_OK) 
{
                         log.warn("[{}] HTTP get request failed: {}", 
requestUrl, response.getStatusText());
-                        future.completeExceptionally(
-                                new PulsarClientException("HTTP get request 
failed: " + response.getStatusText()));
+                        Exception e;
+                        if (response.getStatusCode() == 
HttpURLConnection.HTTP_NOT_FOUND) {
+                            e = new NotFoundException("Not found: " + 
response.getStatusText());
+                        } else {
+                            e = new PulsarClientException("HTTP get request 
failed: " + response.getStatusText());
+                        }
+                        future.completeExceptionally(e);
                         return;
                     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
index 185412cb73..a427fbdf38 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
@@ -19,25 +19,30 @@
 package org.apache.pulsar.client.impl;
 
 import com.google.common.collect.Lists;
+
+import io.netty.channel.EventLoopGroup;
+
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.PulsarClientException.NotFoundException;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.lookup.data.LookupData;
-import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.schema.GetSchemaResponse;
+import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import io.netty.channel.EventLoopGroup;
-
 class HttpLookupService implements LookupService {
 
     private final HttpClient httpClient;
@@ -121,6 +126,27 @@ public String getServiceUrl() {
         return future;
     }
 
+    @Override
+    public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName 
topicName) {
+        CompletableFuture<Optional<SchemaInfo>> future = new 
CompletableFuture<>();
+
+        String schemaName = topicName.getSchemaName();
+        String path = String.format("admin/v2/schemas/%s/schema", schemaName);
+
+        httpClient.get(path, GetSchemaResponse.class).thenAccept(response -> {
+            future.complete(Optional.of(new SchemaInfo(schemaName, response)));
+        }).exceptionally(ex -> {
+            if (ex.getCause() instanceof NotFoundException) {
+                future.complete(Optional.empty());
+            } else {
+                log.warn("Failed to get schema for topic {} : {}", topicName, 
ex.getCause().getClass());
+                future.completeExceptionally(ex);
+            }
+            return null;
+        });
+        return future;
+    }
+
     @Override
     public void close() throws Exception {
         httpClient.close();
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java
index aa00e54b3c..769422bc9f 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java
@@ -20,12 +20,14 @@
 
 import java.net.InetSocketAddress;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.schema.SchemaInfo;
 
 /**
  * Provides lookup service to find broker which serves given topic. It helps to
@@ -58,6 +60,8 @@
         */
        public CompletableFuture<PartitionedTopicMetadata> 
getPartitionedTopicMetadata(TopicName topicName);
 
+       public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName 
topicName);
+
        /**
         * Returns broker-service lookup api url.
         *
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 84dcfc1781..e156e10820 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -23,12 +23,15 @@
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+
 import io.netty.channel.EventLoopGroup;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timer;
 import io.netty.util.concurrent.DefaultThreadFactory;
+
 import java.util.IdentityHashMap;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -36,9 +39,9 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
-
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
+
 import org.apache.pulsar.client.api.ClientConfiguration;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
@@ -63,6 +66,7 @@
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
 import org.slf4j.Logger;
@@ -559,6 +563,23 @@ public ClientConfigurationData getConfiguration() {
         return readerFuture;
     }
 
+    /**
+     * Read the schema information for a given topic.
+     *
+     * If the topic does not exist or it has no schema associated, it will 
return an empty response
+     */
+    public CompletableFuture<Optional<SchemaInfo>> getSchema(String topic) {
+        TopicName topicName;
+        try {
+            topicName = TopicName.get(topic);
+        } catch (Throwable t) {
+            return FutureUtil
+                    .failedFuture(new 
PulsarClientException.InvalidTopicNameException("Invalid topic name: " + 
topic));
+        }
+
+        return lookup.getSchema(topicName);
+    }
+
     @Override
     public void close() throws PulsarClientException {
         try {
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
index 3c1ac991a9..b5e2406c9e 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
@@ -33,6 +33,7 @@
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
 import org.apache.commons.lang3.tuple.Pair;
@@ -462,6 +463,24 @@ public static ByteBuf newProducer(String topic, long 
producerId, long requestId,
         }
     }
 
+    public static SchemaType getSchemaType(PulsarApi.Schema.Type type) {
+        switch (type) {
+            case None:
+                return SchemaType.NONE;
+            case String:
+                return SchemaType.STRING;
+            case Json:
+                return SchemaType.JSON;
+            case Protobuf:
+                return SchemaType.PROTOBUF;
+            case Avro:
+                return SchemaType.AVRO;
+            default:
+                return SchemaType.NONE;
+        }
+    }
+
+
     private static PulsarApi.Schema getSchema(SchemaInfo schemaInfo) {
         PulsarApi.Schema.Builder builder = PulsarApi.Schema.newBuilder()
             .setName(schemaInfo.getName())
@@ -797,6 +816,47 @@ public static ByteBuf newGetLastMessageIdResponse(long 
requestId, MessageIdData
         return res;
     }
 
+    public static ByteBuf newGetSchema(long requestId, String topic, 
Optional<SchemaVersion> version) {
+        PulsarApi.CommandGetSchema.Builder schema = 
PulsarApi.CommandGetSchema.newBuilder()
+            .setRequestId(requestId);
+        schema.setTopic(topic);
+        if (version.isPresent()) {
+            
schema.setSchemaVersion(ByteString.copyFrom(version.get().bytes()));
+        }
+
+        ByteBuf res = serializeWithSize(BaseCommand.newBuilder()
+            .setType(Type.GET_SCHEMA)
+            .setGetSchema(schema.build()));
+        schema.recycle();
+        return res;
+    }
+
+    public static ByteBuf newGetSchemaResponse(long requestId, SchemaInfo 
schema, SchemaVersion version) {
+        PulsarApi.CommandGetSchemaResponse.Builder schemaResponse = 
PulsarApi.CommandGetSchemaResponse.newBuilder()
+            .setRequestId(requestId)
+            .setSchemaVersion(ByteString.copyFrom(version.bytes()))
+            .setSchema(getSchema(schema));
+
+        ByteBuf res = serializeWithSize(BaseCommand.newBuilder()
+            .setType(Type.GET_SCHEMA_RESPONSE)
+            .setGetSchemaResponse(schemaResponse.build()));
+        schemaResponse.recycle();
+        return res;
+    }
+
+    public static ByteBuf newGetSchemaResponseError(long requestId, 
ServerError error, String errorMessage) {
+        PulsarApi.CommandGetSchemaResponse.Builder schemaResponse = 
PulsarApi.CommandGetSchemaResponse.newBuilder()
+            .setRequestId(requestId)
+            .setErrorCode(error)
+            .setErrorMessage(errorMessage);
+
+        ByteBuf res = serializeWithSize(BaseCommand.newBuilder()
+            .setType(Type.GET_SCHEMA_RESPONSE)
+            .setGetSchemaResponse(schemaResponse.build()));
+        schemaResponse.recycle();
+        return res;
+    }
+
     @VisibleForTesting
     public static ByteBuf serializeWithSize(BaseCommand.Builder cmdBuilder) {
         // / Wire format
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarDecoder.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarDecoder.java
index 5d2f1b0b49..f8c2d61e3c 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarDecoder.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarDecoder.java
@@ -20,6 +20,10 @@
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+
 import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck;
@@ -32,6 +36,8 @@
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandConsumerStatsResponse;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandError;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandFlow;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse;
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace;
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopic;
@@ -55,9 +61,6 @@
 import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
 
 public abstract class PulsarDecoder extends ChannelInboundHandlerAdapter {
 
@@ -288,6 +291,18 @@ public void channelRead(ChannelHandlerContext ctx, Object 
msg) throws Exception
                 
handleGetTopicsOfNamespaceSuccess(cmd.getGetTopicsOfNamespaceResponse());
                 cmd.getGetTopicsOfNamespaceResponse().recycle();
                 break;
+
+            case GET_SCHEMA:
+                checkArgument(cmd.hasGetSchema());
+                handleGetSchema(cmd.getGetSchema());
+                cmd.getGetSchema().recycle();
+                break;
+
+            case GET_SCHEMA_RESPONSE:
+                checkArgument(cmd.hasGetSchemaResponse());
+                handleGetSchemaResponse(cmd.getGetSchemaResponse());
+                cmd.getGetSchemaResponse().recycle();
+                break;
             }
         } finally {
             if (cmdBuilder != null) {
@@ -431,5 +446,13 @@ protected void 
handleGetTopicsOfNamespaceSuccess(CommandGetTopicsOfNamespaceResp
         throw new UnsupportedOperationException();
     }
 
+    protected void handleGetSchema(CommandGetSchema commandGetSchema) {
+        throw new UnsupportedOperationException();
+    }
+
+    protected void handleGetSchemaResponse(CommandGetSchemaResponse 
commandGetSchemaResponse) {
+        throw new UnsupportedOperationException();
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(PulsarDecoder.class);
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index c161913c02..edc7e9b118 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -23482,6 +23482,1138 @@ void 
addTopics(org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString valu
     // 
@@protoc_insertion_point(class_scope:pulsar.proto.CommandGetTopicsOfNamespaceResponse)
   }
   
+  public interface CommandGetSchemaOrBuilder
+      extends 
org.apache.pulsar.shaded.com.google.protobuf.v241.MessageLiteOrBuilder {
+    
+    // required uint64 request_id = 1;
+    boolean hasRequestId();
+    long getRequestId();
+    
+    // required string topic = 2;
+    boolean hasTopic();
+    String getTopic();
+    
+    // optional bytes schema_version = 3;
+    boolean hasSchemaVersion();
+    org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString 
getSchemaVersion();
+  }
+  public static final class CommandGetSchema extends
+      org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite
+      implements CommandGetSchemaOrBuilder, 
org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage
  {
+    // Use CommandGetSchema.newBuilder() to construct.
+    private io.netty.util.Recycler.Handle handle;
+    private CommandGetSchema(io.netty.util.Recycler.Handle handle) {
+      this.handle = handle;
+    }
+    
+     private static final io.netty.util.Recycler<CommandGetSchema> RECYCLER = 
new io.netty.util.Recycler<CommandGetSchema>() {
+            protected CommandGetSchema newObject(Handle handle) {
+              return new CommandGetSchema(handle);
+            }
+          };
+        
+        public void recycle() {
+            this.initFields();
+            this.memoizedIsInitialized = -1;
+            this.bitField0_ = 0;
+            this.memoizedSerializedSize = -1;
+            if (handle != null) { RECYCLER.recycle(this, handle); }
+        }
+         
+    private CommandGetSchema(boolean noInit) {}
+    
+    private static final CommandGetSchema defaultInstance;
+    public static CommandGetSchema getDefaultInstance() {
+      return defaultInstance;
+    }
+    
+    public CommandGetSchema getDefaultInstanceForType() {
+      return defaultInstance;
+    }
+    
+    private int bitField0_;
+    // required uint64 request_id = 1;
+    public static final int REQUEST_ID_FIELD_NUMBER = 1;
+    private long requestId_;
+    public boolean hasRequestId() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    public long getRequestId() {
+      return requestId_;
+    }
+    
+    // required string topic = 2;
+    public static final int TOPIC_FIELD_NUMBER = 2;
+    private java.lang.Object topic_;
+    public boolean hasTopic() {
+      return ((bitField0_ & 0x00000002) == 0x00000002);
+    }
+    public String getTopic() {
+      java.lang.Object ref = topic_;
+      if (ref instanceof String) {
+        return (String) ref;
+      } else {
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString bs = 
+            (org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString) ref;
+        String s = bs.toStringUtf8();
+        if 
(org.apache.pulsar.shaded.com.google.protobuf.v241.Internal.isValidUtf8(bs)) {
+          topic_ = s;
+        }
+        return s;
+      }
+    }
+    private org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString 
getTopicBytes() {
+      java.lang.Object ref = topic_;
+      if (ref instanceof String) {
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString b = 
+            
org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.copyFromUtf8((String)
 ref);
+        topic_ = b;
+        return b;
+      } else {
+        return (org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString) 
ref;
+      }
+    }
+    
+    // optional bytes schema_version = 3;
+    public static final int SCHEMA_VERSION_FIELD_NUMBER = 3;
+    private org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString 
schemaVersion_;
+    public boolean hasSchemaVersion() {
+      return ((bitField0_ & 0x00000004) == 0x00000004);
+    }
+    public org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString 
getSchemaVersion() {
+      return schemaVersion_;
+    }
+    
+    private void initFields() {
+      requestId_ = 0L;
+      topic_ = "";
+      schemaVersion_ = 
org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.EMPTY;
+    }
+    private byte memoizedIsInitialized = -1;
+    public final boolean isInitialized() {
+      byte isInitialized = memoizedIsInitialized;
+      if (isInitialized != -1) return isInitialized == 1;
+      
+      if (!hasRequestId()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      if (!hasTopic()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      memoizedIsInitialized = 1;
+      return true;
+    }
+    
+    public void 
writeTo(org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream 
output)
+                        throws java.io.IOException {
+        throw new RuntimeException("Cannot use CodedOutputStream");
+    }
+    
+    public void 
writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream output)
+                        throws java.io.IOException {
+      getSerializedSize();
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeUInt64(1, requestId_);
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        output.writeBytes(2, getTopicBytes());
+      }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        output.writeBytes(3, schemaVersion_);
+      }
+    }
+    
+    private int memoizedSerializedSize = -1;
+    public int getSerializedSize() {
+      int size = memoizedSerializedSize;
+      if (size != -1) return size;
+    
+      size = 0;
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += 
org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeUInt64Size(1, requestId_);
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        size += 
org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeBytesSize(2, getTopicBytes());
+      }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        size += 
org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeBytesSize(3, schemaVersion_);
+      }
+      memoizedSerializedSize = size;
+      return size;
+    }
+    
+    private static final long serialVersionUID = 0L;
+    @java.lang.Override
+    protected java.lang.Object writeReplace()
+        throws java.io.ObjectStreamException {
+      return super.writeReplace();
+    }
+    
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema parseFrom(
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString data)
+        throws 
org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException
 {
+         throw new RuntimeException("Disabled");
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema parseFrom(
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString data,
+        
org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite 
extensionRegistry)
+        throws 
org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException
 {
+         throw new RuntimeException("Disabled");
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema parseFrom(byte[] 
data)
+        throws 
org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException
 {
+      return newBuilder().mergeFrom(data).buildParsed();
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema parseFrom(
+        byte[] data,
+        
org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite 
extensionRegistry)
+        throws 
org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException
 {
+      return newBuilder().mergeFrom(data, extensionRegistry)
+               .buildParsed();
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema 
parseFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema parseFrom(
+        java.io.InputStream input,
+        
org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite 
extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema 
parseDelimitedFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema 
parseDelimitedFrom(
+        java.io.InputStream input,
+        
org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite 
extensionRegistry)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema parseFrom(
+        org.apache.pulsar.shaded.com.google.protobuf.v241.CodedInputStream 
input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema parseFrom(
+        org.apache.pulsar.shaded.com.google.protobuf.v241.CodedInputStream 
input,
+        
org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite 
extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    
+    public static Builder newBuilder() { return Builder.create(); }
+    public Builder newBuilderForType() { return newBuilder(); }
+    public static Builder 
newBuilder(org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema 
prototype) {
+      return newBuilder().mergeFrom(prototype);
+    }
+    public Builder toBuilder() { return newBuilder(this); }
+    
+    public static final class Builder extends
+        
org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite.Builder<
+          org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema, 
Builder>
+        implements 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaOrBuilder, 
org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder
  {
+      // Construct using 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema.newBuilder()
+      private final io.netty.util.Recycler.Handle handle;
+      private Builder(io.netty.util.Recycler.Handle handle) {
+        this.handle = handle;
+        maybeForceBuilderInitialization();
+      }
+      private final static io.netty.util.Recycler<Builder> RECYCLER = new 
io.netty.util.Recycler<Builder>() {
+         protected Builder newObject(io.netty.util.Recycler.Handle handle) {
+               return new Builder(handle);
+             }
+            };
+      
+       public void recycle() {
+                clear();
+                if (handle != null) {RECYCLER.recycle(this, handle);}
+            }
+      
+      private void maybeForceBuilderInitialization() {
+      }
+      private static Builder create() {
+        return RECYCLER.get();
+      }
+      
+      public Builder clear() {
+        super.clear();
+        requestId_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000001);
+        topic_ = "";
+        bitField0_ = (bitField0_ & ~0x00000002);
+        schemaVersion_ = 
org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.EMPTY;
+        bitField0_ = (bitField0_ & ~0x00000004);
+        return this;
+      }
+      
+      public Builder clone() {
+        return create().mergeFrom(buildPartial());
+      }
+      
+      public org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema 
getDefaultInstanceForType() {
+        return 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema.getDefaultInstance();
+      }
+      
+      public org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema 
build() {
+        org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema result = 
buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(result);
+        }
+        return result;
+      }
+      
+      private org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema 
buildParsed()
+          throws 
org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException
 {
+        org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema result = 
buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(
+            result).asInvalidProtocolBufferException();
+        }
+        return result;
+      }
+      
+      public org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema 
buildPartial() {
+        org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema result = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema.RECYCLER.get();
+        int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
+        if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        result.requestId_ = requestId_;
+        if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+          to_bitField0_ |= 0x00000002;
+        }
+        result.topic_ = topic_;
+        if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+          to_bitField0_ |= 0x00000004;
+        }
+        result.schemaVersion_ = schemaVersion_;
+        result.bitField0_ = to_bitField0_;
+        return result;
+      }
+      
+      public Builder 
mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema other) {
+        if (other == 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema.getDefaultInstance())
 return this;
+        if (other.hasRequestId()) {
+          setRequestId(other.getRequestId());
+        }
+        if (other.hasTopic()) {
+          setTopic(other.getTopic());
+        }
+        if (other.hasSchemaVersion()) {
+          setSchemaVersion(other.getSchemaVersion());
+        }
+        return this;
+      }
+      
+      public final boolean isInitialized() {
+        if (!hasRequestId()) {
+          
+          return false;
+        }
+        if (!hasTopic()) {
+          
+          return false;
+        }
+        return true;
+      }
+      
+      public Builder 
mergeFrom(org.apache.pulsar.shaded.com.google.protobuf.v241.CodedInputStream 
input,
+                              
org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite 
extensionRegistry)
+                              throws java.io.IOException {
+         throw new java.io.IOException("Merge from CodedInputStream is 
disabled");
+                              }
+      public Builder mergeFrom(
+          org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream input,
+          
org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite 
extensionRegistry)
+          throws java.io.IOException {
+        while (true) {
+          int tag = input.readTag();
+          switch (tag) {
+            case 0:
+              
+              return this;
+            default: {
+              if (!input.skipField(tag)) {
+                
+                return this;
+              }
+              break;
+            }
+            case 8: {
+              bitField0_ |= 0x00000001;
+              requestId_ = input.readUInt64();
+              break;
+            }
+            case 18: {
+              bitField0_ |= 0x00000002;
+              topic_ = input.readBytes();
+              break;
+            }
+            case 26: {
+              bitField0_ |= 0x00000004;
+              schemaVersion_ = input.readBytes();
+              break;
+            }
+          }
+        }
+      }
+      
+      private int bitField0_;
+      
+      // required uint64 request_id = 1;
+      private long requestId_ ;
+      public boolean hasRequestId() {
+        return ((bitField0_ & 0x00000001) == 0x00000001);
+      }
+      public long getRequestId() {
+        return requestId_;
+      }
+      public Builder setRequestId(long value) {
+        bitField0_ |= 0x00000001;
+        requestId_ = value;
+        
+        return this;
+      }
+      public Builder clearRequestId() {
+        bitField0_ = (bitField0_ & ~0x00000001);
+        requestId_ = 0L;
+        
+        return this;
+      }
+      
+      // required string topic = 2;
+      private java.lang.Object topic_ = "";
+      public boolean hasTopic() {
+        return ((bitField0_ & 0x00000002) == 0x00000002);
+      }
+      public String getTopic() {
+        java.lang.Object ref = topic_;
+        if (!(ref instanceof String)) {
+          String s = 
((org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString) 
ref).toStringUtf8();
+          topic_ = s;
+          return s;
+        } else {
+          return (String) ref;
+        }
+      }
+      public Builder setTopic(String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000002;
+        topic_ = value;
+        
+        return this;
+      }
+      public Builder clearTopic() {
+        bitField0_ = (bitField0_ & ~0x00000002);
+        topic_ = getDefaultInstance().getTopic();
+        
+        return this;
+      }
+      void 
setTopic(org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString value) {
+        bitField0_ |= 0x00000002;
+        topic_ = value;
+        
+      }
+      
+      // optional bytes schema_version = 3;
+      private org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString 
schemaVersion_ = 
org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.EMPTY;
+      public boolean hasSchemaVersion() {
+        return ((bitField0_ & 0x00000004) == 0x00000004);
+      }
+      public org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString 
getSchemaVersion() {
+        return schemaVersion_;
+      }
+      public Builder 
setSchemaVersion(org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString 
value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000004;
+        schemaVersion_ = value;
+        
+        return this;
+      }
+      public Builder clearSchemaVersion() {
+        bitField0_ = (bitField0_ & ~0x00000004);
+        schemaVersion_ = getDefaultInstance().getSchemaVersion();
+        
+        return this;
+      }
+      
+      // @@protoc_insertion_point(builder_scope:pulsar.proto.CommandGetSchema)
+    }
+    
+    static {
+      defaultInstance = new CommandGetSchema(true);
+      defaultInstance.initFields();
+    }
+    
+    // @@protoc_insertion_point(class_scope:pulsar.proto.CommandGetSchema)
+  }
+  
+  public interface CommandGetSchemaResponseOrBuilder
+      extends 
org.apache.pulsar.shaded.com.google.protobuf.v241.MessageLiteOrBuilder {
+    
+    // required uint64 request_id = 1;
+    boolean hasRequestId();
+    long getRequestId();
+    
+    // optional .pulsar.proto.ServerError error_code = 2;
+    boolean hasErrorCode();
+    org.apache.pulsar.common.api.proto.PulsarApi.ServerError getErrorCode();
+    
+    // optional string error_message = 3;
+    boolean hasErrorMessage();
+    String getErrorMessage();
+    
+    // optional .pulsar.proto.Schema schema = 4;
+    boolean hasSchema();
+    org.apache.pulsar.common.api.proto.PulsarApi.Schema getSchema();
+    
+    // optional bytes schema_version = 5;
+    boolean hasSchemaVersion();
+    org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString 
getSchemaVersion();
+  }
+  public static final class CommandGetSchemaResponse extends
+      org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite
+      implements CommandGetSchemaResponseOrBuilder, 
org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage
  {
+    // Use CommandGetSchemaResponse.newBuilder() to construct.
+    private io.netty.util.Recycler.Handle handle;
+    private CommandGetSchemaResponse(io.netty.util.Recycler.Handle handle) {
+      this.handle = handle;
+    }
+    
+     private static final io.netty.util.Recycler<CommandGetSchemaResponse> 
RECYCLER = new io.netty.util.Recycler<CommandGetSchemaResponse>() {
+            protected CommandGetSchemaResponse newObject(Handle handle) {
+              return new CommandGetSchemaResponse(handle);
+            }
+          };
+        
+        public void recycle() {
+            this.initFields();
+            this.memoizedIsInitialized = -1;
+            this.bitField0_ = 0;
+            this.memoizedSerializedSize = -1;
+            if (handle != null) { RECYCLER.recycle(this, handle); }
+        }
+         
+    private CommandGetSchemaResponse(boolean noInit) {}
+    
+    private static final CommandGetSchemaResponse defaultInstance;
+    public static CommandGetSchemaResponse getDefaultInstance() {
+      return defaultInstance;
+    }
+    
+    public CommandGetSchemaResponse getDefaultInstanceForType() {
+      return defaultInstance;
+    }
+    
+    private int bitField0_;
+    // required uint64 request_id = 1;
+    public static final int REQUEST_ID_FIELD_NUMBER = 1;
+    private long requestId_;
+    public boolean hasRequestId() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    public long getRequestId() {
+      return requestId_;
+    }
+    
+    // optional .pulsar.proto.ServerError error_code = 2;
+    public static final int ERROR_CODE_FIELD_NUMBER = 2;
+    private org.apache.pulsar.common.api.proto.PulsarApi.ServerError 
errorCode_;
+    public boolean hasErrorCode() {
+      return ((bitField0_ & 0x00000002) == 0x00000002);
+    }
+    public org.apache.pulsar.common.api.proto.PulsarApi.ServerError 
getErrorCode() {
+      return errorCode_;
+    }
+    
+    // optional string error_message = 3;
+    public static final int ERROR_MESSAGE_FIELD_NUMBER = 3;
+    private java.lang.Object errorMessage_;
+    public boolean hasErrorMessage() {
+      return ((bitField0_ & 0x00000004) == 0x00000004);
+    }
+    public String getErrorMessage() {
+      java.lang.Object ref = errorMessage_;
+      if (ref instanceof String) {
+        return (String) ref;
+      } else {
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString bs = 
+            (org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString) ref;
+        String s = bs.toStringUtf8();
+        if 
(org.apache.pulsar.shaded.com.google.protobuf.v241.Internal.isValidUtf8(bs)) {
+          errorMessage_ = s;
+        }
+        return s;
+      }
+    }
+    private org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString 
getErrorMessageBytes() {
+      java.lang.Object ref = errorMessage_;
+      if (ref instanceof String) {
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString b = 
+            
org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.copyFromUtf8((String)
 ref);
+        errorMessage_ = b;
+        return b;
+      } else {
+        return (org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString) 
ref;
+      }
+    }
+    
+    // optional .pulsar.proto.Schema schema = 4;
+    public static final int SCHEMA_FIELD_NUMBER = 4;
+    private org.apache.pulsar.common.api.proto.PulsarApi.Schema schema_;
+    public boolean hasSchema() {
+      return ((bitField0_ & 0x00000008) == 0x00000008);
+    }
+    public org.apache.pulsar.common.api.proto.PulsarApi.Schema getSchema() {
+      return schema_;
+    }
+    
+    // optional bytes schema_version = 5;
+    public static final int SCHEMA_VERSION_FIELD_NUMBER = 5;
+    private org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString 
schemaVersion_;
+    public boolean hasSchemaVersion() {
+      return ((bitField0_ & 0x00000010) == 0x00000010);
+    }
+    public org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString 
getSchemaVersion() {
+      return schemaVersion_;
+    }
+    
+    private void initFields() {
+      requestId_ = 0L;
+      errorCode_ = 
org.apache.pulsar.common.api.proto.PulsarApi.ServerError.UnknownError;
+      errorMessage_ = "";
+      schema_ = 
org.apache.pulsar.common.api.proto.PulsarApi.Schema.getDefaultInstance();
+      schemaVersion_ = 
org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.EMPTY;
+    }
+    private byte memoizedIsInitialized = -1;
+    public final boolean isInitialized() {
+      byte isInitialized = memoizedIsInitialized;
+      if (isInitialized != -1) return isInitialized == 1;
+      
+      if (!hasRequestId()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      if (hasSchema()) {
+        if (!getSchema().isInitialized()) {
+          memoizedIsInitialized = 0;
+          return false;
+        }
+      }
+      memoizedIsInitialized = 1;
+      return true;
+    }
+    
+    public void 
writeTo(org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream 
output)
+                        throws java.io.IOException {
+        throw new RuntimeException("Cannot use CodedOutputStream");
+    }
+    
+    public void 
writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream output)
+                        throws java.io.IOException {
+      getSerializedSize();
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeUInt64(1, requestId_);
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        output.writeEnum(2, errorCode_.getNumber());
+      }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        output.writeBytes(3, getErrorMessageBytes());
+      }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        output.writeMessage(4, schema_);
+      }
+      if (((bitField0_ & 0x00000010) == 0x00000010)) {
+        output.writeBytes(5, schemaVersion_);
+      }
+    }
+    
+    private int memoizedSerializedSize = -1;
+    public int getSerializedSize() {
+      int size = memoizedSerializedSize;
+      if (size != -1) return size;
+    
+      size = 0;
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += 
org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeUInt64Size(1, requestId_);
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        size += 
org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeEnumSize(2, errorCode_.getNumber());
+      }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        size += 
org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeBytesSize(3, getErrorMessageBytes());
+      }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        size += 
org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeMessageSize(4, schema_);
+      }
+      if (((bitField0_ & 0x00000010) == 0x00000010)) {
+        size += 
org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeBytesSize(5, schemaVersion_);
+      }
+      memoizedSerializedSize = size;
+      return size;
+    }
+    
+    private static final long serialVersionUID = 0L;
+    @java.lang.Override
+    protected java.lang.Object writeReplace()
+        throws java.io.ObjectStreamException {
+      return super.writeReplace();
+    }
+    
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse parseFrom(
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString data)
+        throws 
org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException
 {
+         throw new RuntimeException("Disabled");
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse parseFrom(
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString data,
+        
org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite 
extensionRegistry)
+        throws 
org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException
 {
+         throw new RuntimeException("Disabled");
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse 
parseFrom(byte[] data)
+        throws 
org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException
 {
+      return newBuilder().mergeFrom(data).buildParsed();
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse parseFrom(
+        byte[] data,
+        
org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite 
extensionRegistry)
+        throws 
org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException
 {
+      return newBuilder().mergeFrom(data, extensionRegistry)
+               .buildParsed();
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse 
parseFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse parseFrom(
+        java.io.InputStream input,
+        
org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite 
extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse 
parseDelimitedFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse 
parseDelimitedFrom(
+        java.io.InputStream input,
+        
org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite 
extensionRegistry)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse parseFrom(
+        org.apache.pulsar.shaded.com.google.protobuf.v241.CodedInputStream 
input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse parseFrom(
+        org.apache.pulsar.shaded.com.google.protobuf.v241.CodedInputStream 
input,
+        
org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite 
extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    
+    public static Builder newBuilder() { return Builder.create(); }
+    public Builder newBuilderForType() { return newBuilder(); }
+    public static Builder 
newBuilder(org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse
 prototype) {
+      return newBuilder().mergeFrom(prototype);
+    }
+    public Builder toBuilder() { return newBuilder(this); }
+    
+    public static final class Builder extends
+        
org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite.Builder<
+          
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse, Builder>
+        implements 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponseOrBuilder, 
org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder
  {
+      // Construct using 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse.newBuilder()
+      private final io.netty.util.Recycler.Handle handle;
+      private Builder(io.netty.util.Recycler.Handle handle) {
+        this.handle = handle;
+        maybeForceBuilderInitialization();
+      }
+      private final static io.netty.util.Recycler<Builder> RECYCLER = new 
io.netty.util.Recycler<Builder>() {
+         protected Builder newObject(io.netty.util.Recycler.Handle handle) {
+               return new Builder(handle);
+             }
+            };
+      
+       public void recycle() {
+                clear();
+                if (handle != null) {RECYCLER.recycle(this, handle);}
+            }
+      
+      private void maybeForceBuilderInitialization() {
+      }
+      private static Builder create() {
+        return RECYCLER.get();
+      }
+      
+      public Builder clear() {
+        super.clear();
+        requestId_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000001);
+        errorCode_ = 
org.apache.pulsar.common.api.proto.PulsarApi.ServerError.UnknownError;
+        bitField0_ = (bitField0_ & ~0x00000002);
+        errorMessage_ = "";
+        bitField0_ = (bitField0_ & ~0x00000004);
+        schema_ = 
org.apache.pulsar.common.api.proto.PulsarApi.Schema.getDefaultInstance();
+        bitField0_ = (bitField0_ & ~0x00000008);
+        schemaVersion_ = 
org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.EMPTY;
+        bitField0_ = (bitField0_ & ~0x00000010);
+        return this;
+      }
+      
+      public Builder clone() {
+        return create().mergeFrom(buildPartial());
+      }
+      
+      public 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse 
getDefaultInstanceForType() {
+        return 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse.getDefaultInstance();
+      }
+      
+      public 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse build() {
+        org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse 
result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(result);
+        }
+        return result;
+      }
+      
+      private 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse 
buildParsed()
+          throws 
org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException
 {
+        org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse 
result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(
+            result).asInvalidProtocolBufferException();
+        }
+        return result;
+      }
+      
+      public 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse 
buildPartial() {
+        org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse 
result = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse.RECYCLER.get();
+        int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
+        if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        result.requestId_ = requestId_;
+        if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+          to_bitField0_ |= 0x00000002;
+        }
+        result.errorCode_ = errorCode_;
+        if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+          to_bitField0_ |= 0x00000004;
+        }
+        result.errorMessage_ = errorMessage_;
+        if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+          to_bitField0_ |= 0x00000008;
+        }
+        result.schema_ = schema_;
+        if (((from_bitField0_ & 0x00000010) == 0x00000010)) {
+          to_bitField0_ |= 0x00000010;
+        }
+        result.schemaVersion_ = schemaVersion_;
+        result.bitField0_ = to_bitField0_;
+        return result;
+      }
+      
+      public Builder 
mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse 
other) {
+        if (other == 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse.getDefaultInstance())
 return this;
+        if (other.hasRequestId()) {
+          setRequestId(other.getRequestId());
+        }
+        if (other.hasErrorCode()) {
+          setErrorCode(other.getErrorCode());
+        }
+        if (other.hasErrorMessage()) {
+          setErrorMessage(other.getErrorMessage());
+        }
+        if (other.hasSchema()) {
+          mergeSchema(other.getSchema());
+        }
+        if (other.hasSchemaVersion()) {
+          setSchemaVersion(other.getSchemaVersion());
+        }
+        return this;
+      }
+      
+      public final boolean isInitialized() {
+        if (!hasRequestId()) {
+          
+          return false;
+        }
+        if (hasSchema()) {
+          if (!getSchema().isInitialized()) {
+            
+            return false;
+          }
+        }
+        return true;
+      }
+      
+      public Builder 
mergeFrom(org.apache.pulsar.shaded.com.google.protobuf.v241.CodedInputStream 
input,
+                              
org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite 
extensionRegistry)
+                              throws java.io.IOException {
+         throw new java.io.IOException("Merge from CodedInputStream is 
disabled");
+                              }
+      public Builder mergeFrom(
+          org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream input,
+          
org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite 
extensionRegistry)
+          throws java.io.IOException {
+        while (true) {
+          int tag = input.readTag();
+          switch (tag) {
+            case 0:
+              
+              return this;
+            default: {
+              if (!input.skipField(tag)) {
+                
+                return this;
+              }
+              break;
+            }
+            case 8: {
+              bitField0_ |= 0x00000001;
+              requestId_ = input.readUInt64();
+              break;
+            }
+            case 16: {
+              int rawValue = input.readEnum();
+              org.apache.pulsar.common.api.proto.PulsarApi.ServerError value = 
org.apache.pulsar.common.api.proto.PulsarApi.ServerError.valueOf(rawValue);
+              if (value != null) {
+                bitField0_ |= 0x00000002;
+                errorCode_ = value;
+              }
+              break;
+            }
+            case 26: {
+              bitField0_ |= 0x00000004;
+              errorMessage_ = input.readBytes();
+              break;
+            }
+            case 34: {
+              org.apache.pulsar.common.api.proto.PulsarApi.Schema.Builder 
subBuilder = org.apache.pulsar.common.api.proto.PulsarApi.Schema.newBuilder();
+              if (hasSchema()) {
+                subBuilder.mergeFrom(getSchema());
+              }
+              input.readMessage(subBuilder, extensionRegistry);
+              setSchema(subBuilder.buildPartial());
+              subBuilder.recycle();
+              break;
+            }
+            case 42: {
+              bitField0_ |= 0x00000010;
+              schemaVersion_ = input.readBytes();
+              break;
+            }
+          }
+        }
+      }
+      
+      private int bitField0_;
+      
+      // required uint64 request_id = 1;
+      private long requestId_ ;
+      public boolean hasRequestId() {
+        return ((bitField0_ & 0x00000001) == 0x00000001);
+      }
+      public long getRequestId() {
+        return requestId_;
+      }
+      public Builder setRequestId(long value) {
+        bitField0_ |= 0x00000001;
+        requestId_ = value;
+        
+        return this;
+      }
+      public Builder clearRequestId() {
+        bitField0_ = (bitField0_ & ~0x00000001);
+        requestId_ = 0L;
+        
+        return this;
+      }
+      
+      // optional .pulsar.proto.ServerError error_code = 2;
+      private org.apache.pulsar.common.api.proto.PulsarApi.ServerError 
errorCode_ = 
org.apache.pulsar.common.api.proto.PulsarApi.ServerError.UnknownError;
+      public boolean hasErrorCode() {
+        return ((bitField0_ & 0x00000002) == 0x00000002);
+      }
+      public org.apache.pulsar.common.api.proto.PulsarApi.ServerError 
getErrorCode() {
+        return errorCode_;
+      }
+      public Builder 
setErrorCode(org.apache.pulsar.common.api.proto.PulsarApi.ServerError value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        bitField0_ |= 0x00000002;
+        errorCode_ = value;
+        
+        return this;
+      }
+      public Builder clearErrorCode() {
+        bitField0_ = (bitField0_ & ~0x00000002);
+        errorCode_ = 
org.apache.pulsar.common.api.proto.PulsarApi.ServerError.UnknownError;
+        
+        return this;
+      }
+      
+      // optional string error_message = 3;
+      private java.lang.Object errorMessage_ = "";
+      public boolean hasErrorMessage() {
+        return ((bitField0_ & 0x00000004) == 0x00000004);
+      }
+      public String getErrorMessage() {
+        java.lang.Object ref = errorMessage_;
+        if (!(ref instanceof String)) {
+          String s = 
((org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString) 
ref).toStringUtf8();
+          errorMessage_ = s;
+          return s;
+        } else {
+          return (String) ref;
+        }
+      }
+      public Builder setErrorMessage(String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000004;
+        errorMessage_ = value;
+        
+        return this;
+      }
+      public Builder clearErrorMessage() {
+        bitField0_ = (bitField0_ & ~0x00000004);
+        errorMessage_ = getDefaultInstance().getErrorMessage();
+        
+        return this;
+      }
+      void 
setErrorMessage(org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString 
value) {
+        bitField0_ |= 0x00000004;
+        errorMessage_ = value;
+        
+      }
+      
+      // optional .pulsar.proto.Schema schema = 4;
+      private org.apache.pulsar.common.api.proto.PulsarApi.Schema schema_ = 
org.apache.pulsar.common.api.proto.PulsarApi.Schema.getDefaultInstance();
+      public boolean hasSchema() {
+        return ((bitField0_ & 0x00000008) == 0x00000008);
+      }
+      public org.apache.pulsar.common.api.proto.PulsarApi.Schema getSchema() {
+        return schema_;
+      }
+      public Builder 
setSchema(org.apache.pulsar.common.api.proto.PulsarApi.Schema value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        schema_ = value;
+        
+        bitField0_ |= 0x00000008;
+        return this;
+      }
+      public Builder setSchema(
+          org.apache.pulsar.common.api.proto.PulsarApi.Schema.Builder 
builderForValue) {
+        schema_ = builderForValue.build();
+        
+        bitField0_ |= 0x00000008;
+        return this;
+      }
+      public Builder 
mergeSchema(org.apache.pulsar.common.api.proto.PulsarApi.Schema value) {
+        if (((bitField0_ & 0x00000008) == 0x00000008) &&
+            schema_ != 
org.apache.pulsar.common.api.proto.PulsarApi.Schema.getDefaultInstance()) {
+          schema_ =
+            
org.apache.pulsar.common.api.proto.PulsarApi.Schema.newBuilder(schema_).mergeFrom(value).buildPartial();
+        } else {
+          schema_ = value;
+        }
+        
+        bitField0_ |= 0x00000008;
+        return this;
+      }
+      public Builder clearSchema() {
+        schema_ = 
org.apache.pulsar.common.api.proto.PulsarApi.Schema.getDefaultInstance();
+        
+        bitField0_ = (bitField0_ & ~0x00000008);
+        return this;
+      }
+      
+      // optional bytes schema_version = 5;
+      private org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString 
schemaVersion_ = 
org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.EMPTY;
+      public boolean hasSchemaVersion() {
+        return ((bitField0_ & 0x00000010) == 0x00000010);
+      }
+      public org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString 
getSchemaVersion() {
+        return schemaVersion_;
+      }
+      public Builder 
setSchemaVersion(org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString 
value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000010;
+        schemaVersion_ = value;
+        
+        return this;
+      }
+      public Builder clearSchemaVersion() {
+        bitField0_ = (bitField0_ & ~0x00000010);
+        schemaVersion_ = getDefaultInstance().getSchemaVersion();
+        
+        return this;
+      }
+      
+      // 
@@protoc_insertion_point(builder_scope:pulsar.proto.CommandGetSchemaResponse)
+    }
+    
+    static {
+      defaultInstance = new CommandGetSchemaResponse(true);
+      defaultInstance.initFields();
+    }
+    
+    // 
@@protoc_insertion_point(class_scope:pulsar.proto.CommandGetSchemaResponse)
+  }
+  
   public interface BaseCommandOrBuilder
       extends 
org.apache.pulsar.shaded.com.google.protobuf.v241.MessageLiteOrBuilder {
     
@@ -23616,6 +24748,14 @@ void 
addTopics(org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString valu
     // optional .pulsar.proto.CommandGetTopicsOfNamespaceResponse 
getTopicsOfNamespaceResponse = 33;
     boolean hasGetTopicsOfNamespaceResponse();
     
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse
 getGetTopicsOfNamespaceResponse();
+    
+    // optional .pulsar.proto.CommandGetSchema getSchema = 34;
+    boolean hasGetSchema();
+    org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema 
getGetSchema();
+    
+    // optional .pulsar.proto.CommandGetSchemaResponse getSchemaResponse = 35;
+    boolean hasGetSchemaResponse();
+    org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse 
getGetSchemaResponse();
   }
   public static final class BaseCommand extends
       org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite
@@ -23686,6 +24826,8 @@ public BaseCommand getDefaultInstanceForType() {
       ACTIVE_CONSUMER_CHANGE(29, 31),
       GET_TOPICS_OF_NAMESPACE(30, 32),
       GET_TOPICS_OF_NAMESPACE_RESPONSE(31, 33),
+      GET_SCHEMA(32, 34),
+      GET_SCHEMA_RESPONSE(33, 35),
       ;
       
       public static final int CONNECT_VALUE = 2;
@@ -23720,6 +24862,8 @@ public BaseCommand getDefaultInstanceForType() {
       public static final int ACTIVE_CONSUMER_CHANGE_VALUE = 31;
       public static final int GET_TOPICS_OF_NAMESPACE_VALUE = 32;
       public static final int GET_TOPICS_OF_NAMESPACE_RESPONSE_VALUE = 33;
+      public static final int GET_SCHEMA_VALUE = 34;
+      public static final int GET_SCHEMA_RESPONSE_VALUE = 35;
       
       
       public final int getNumber() { return value; }
@@ -23758,6 +24902,8 @@ public static Type valueOf(int value) {
           case 31: return ACTIVE_CONSUMER_CHANGE;
           case 32: return GET_TOPICS_OF_NAMESPACE;
           case 33: return GET_TOPICS_OF_NAMESPACE_RESPONSE;
+          case 34: return GET_SCHEMA;
+          case 35: return GET_SCHEMA_RESPONSE;
           default: return null;
         }
       }
@@ -24115,6 +25261,26 @@ public boolean hasGetTopicsOfNamespaceResponse() {
       return getTopicsOfNamespaceResponse_;
     }
     
+    // optional .pulsar.proto.CommandGetSchema getSchema = 34;
+    public static final int GETSCHEMA_FIELD_NUMBER = 34;
+    private org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema 
getSchema_;
+    public boolean hasGetSchema() {
+      return ((bitField1_ & 0x00000002) == 0x00000002);
+    }
+    public org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema 
getGetSchema() {
+      return getSchema_;
+    }
+    
+    // optional .pulsar.proto.CommandGetSchemaResponse getSchemaResponse = 35;
+    public static final int GETSCHEMARESPONSE_FIELD_NUMBER = 35;
+    private 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse 
getSchemaResponse_;
+    public boolean hasGetSchemaResponse() {
+      return ((bitField1_ & 0x00000004) == 0x00000004);
+    }
+    public 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse 
getGetSchemaResponse() {
+      return getSchemaResponse_;
+    }
+    
     private void initFields() {
       type_ = 
org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand.Type.CONNECT;
       connect_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandConnect.getDefaultInstance();
@@ -24149,6 +25315,8 @@ private void initFields() {
       activeConsumerChange_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange.getDefaultInstance();
       getTopicsOfNamespace_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.getDefaultInstance();
       getTopicsOfNamespaceResponse_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse.getDefaultInstance();
+      getSchema_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema.getDefaultInstance();
+      getSchemaResponse_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse.getDefaultInstance();
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -24339,6 +25507,18 @@ public final boolean isInitialized() {
           return false;
         }
       }
+      if (hasGetSchema()) {
+        if (!getGetSchema().isInitialized()) {
+          memoizedIsInitialized = 0;
+          return false;
+        }
+      }
+      if (hasGetSchemaResponse()) {
+        if (!getGetSchemaResponse().isInitialized()) {
+          memoizedIsInitialized = 0;
+          return false;
+        }
+      }
       memoizedIsInitialized = 1;
       return true;
     }
@@ -24450,6 +25630,12 @@ public void 
writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStr
       if (((bitField1_ & 0x00000001) == 0x00000001)) {
         output.writeMessage(33, getTopicsOfNamespaceResponse_);
       }
+      if (((bitField1_ & 0x00000002) == 0x00000002)) {
+        output.writeMessage(34, getSchema_);
+      }
+      if (((bitField1_ & 0x00000004) == 0x00000004)) {
+        output.writeMessage(35, getSchemaResponse_);
+      }
     }
     
     private int memoizedSerializedSize = -1;
@@ -24590,6 +25776,14 @@ public int getSerializedSize() {
         size += 
org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
           .computeMessageSize(33, getTopicsOfNamespaceResponse_);
       }
+      if (((bitField1_ & 0x00000002) == 0x00000002)) {
+        size += 
org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeMessageSize(34, getSchema_);
+      }
+      if (((bitField1_ & 0x00000004) == 0x00000004)) {
+        size += 
org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeMessageSize(35, getSchemaResponse_);
+      }
       memoizedSerializedSize = size;
       return size;
     }
@@ -24769,6 +25963,10 @@ public Builder clear() {
         bitField0_ = (bitField0_ & ~0x80000000);
         getTopicsOfNamespaceResponse_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse.getDefaultInstance();
         bitField1_ = (bitField1_ & ~0x00000001);
+        getSchema_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema.getDefaultInstance();
+        bitField1_ = (bitField1_ & ~0x00000002);
+        getSchemaResponse_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse.getDefaultInstance();
+        bitField1_ = (bitField1_ & ~0x00000004);
         return this;
       }
       
@@ -24936,6 +26134,14 @@ public Builder clone() {
           to_bitField1_ |= 0x00000001;
         }
         result.getTopicsOfNamespaceResponse_ = getTopicsOfNamespaceResponse_;
+        if (((from_bitField1_ & 0x00000002) == 0x00000002)) {
+          to_bitField1_ |= 0x00000002;
+        }
+        result.getSchema_ = getSchema_;
+        if (((from_bitField1_ & 0x00000004) == 0x00000004)) {
+          to_bitField1_ |= 0x00000004;
+        }
+        result.getSchemaResponse_ = getSchemaResponse_;
         result.bitField0_ = to_bitField0_;
         result.bitField1_ = to_bitField1_;
         return result;
@@ -25042,6 +26248,12 @@ public Builder 
mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.BaseComman
         if (other.hasGetTopicsOfNamespaceResponse()) {
           
mergeGetTopicsOfNamespaceResponse(other.getGetTopicsOfNamespaceResponse());
         }
+        if (other.hasGetSchema()) {
+          mergeGetSchema(other.getGetSchema());
+        }
+        if (other.hasGetSchemaResponse()) {
+          mergeGetSchemaResponse(other.getGetSchemaResponse());
+        }
         return this;
       }
       
@@ -25230,6 +26442,18 @@ public final boolean isInitialized() {
             return false;
           }
         }
+        if (hasGetSchema()) {
+          if (!getGetSchema().isInitialized()) {
+            
+            return false;
+          }
+        }
+        if (hasGetSchemaResponse()) {
+          if (!getGetSchemaResponse().isInitialized()) {
+            
+            return false;
+          }
+        }
         return true;
       }
       
@@ -25584,6 +26808,26 @@ public Builder mergeFrom(
               subBuilder.recycle();
               break;
             }
+            case 274: {
+              
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema.Builder 
subBuilder = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema.newBuilder();
+              if (hasGetSchema()) {
+                subBuilder.mergeFrom(getGetSchema());
+              }
+              input.readMessage(subBuilder, extensionRegistry);
+              setGetSchema(subBuilder.buildPartial());
+              subBuilder.recycle();
+              break;
+            }
+            case 282: {
+              
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse.Builder 
subBuilder = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse.newBuilder();
+              if (hasGetSchemaResponse()) {
+                subBuilder.mergeFrom(getGetSchemaResponse());
+              }
+              input.readMessage(subBuilder, extensionRegistry);
+              setGetSchemaResponse(subBuilder.buildPartial());
+              subBuilder.recycle();
+              break;
+            }
           }
         }
       }
@@ -26991,6 +28235,92 @@ public Builder clearGetTopicsOfNamespaceResponse() {
         return this;
       }
       
+      // optional .pulsar.proto.CommandGetSchema getSchema = 34;
+      private org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema 
getSchema_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema.getDefaultInstance();
+      public boolean hasGetSchema() {
+        return ((bitField1_ & 0x00000002) == 0x00000002);
+      }
+      public org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema 
getGetSchema() {
+        return getSchema_;
+      }
+      public Builder 
setGetSchema(org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema 
value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        getSchema_ = value;
+        
+        bitField1_ |= 0x00000002;
+        return this;
+      }
+      public Builder setGetSchema(
+          
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema.Builder 
builderForValue) {
+        getSchema_ = builderForValue.build();
+        
+        bitField1_ |= 0x00000002;
+        return this;
+      }
+      public Builder 
mergeGetSchema(org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema 
value) {
+        if (((bitField1_ & 0x00000002) == 0x00000002) &&
+            getSchema_ != 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema.getDefaultInstance())
 {
+          getSchema_ =
+            
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema.newBuilder(getSchema_).mergeFrom(value).buildPartial();
+        } else {
+          getSchema_ = value;
+        }
+        
+        bitField1_ |= 0x00000002;
+        return this;
+      }
+      public Builder clearGetSchema() {
+        getSchema_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema.getDefaultInstance();
+        
+        bitField1_ = (bitField1_ & ~0x00000002);
+        return this;
+      }
+      
+      // optional .pulsar.proto.CommandGetSchemaResponse getSchemaResponse = 
35;
+      private 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse 
getSchemaResponse_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse.getDefaultInstance();
+      public boolean hasGetSchemaResponse() {
+        return ((bitField1_ & 0x00000004) == 0x00000004);
+      }
+      public 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse 
getGetSchemaResponse() {
+        return getSchemaResponse_;
+      }
+      public Builder 
setGetSchemaResponse(org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse
 value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        getSchemaResponse_ = value;
+        
+        bitField1_ |= 0x00000004;
+        return this;
+      }
+      public Builder setGetSchemaResponse(
+          
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse.Builder 
builderForValue) {
+        getSchemaResponse_ = builderForValue.build();
+        
+        bitField1_ |= 0x00000004;
+        return this;
+      }
+      public Builder 
mergeGetSchemaResponse(org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse
 value) {
+        if (((bitField1_ & 0x00000004) == 0x00000004) &&
+            getSchemaResponse_ != 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse.getDefaultInstance())
 {
+          getSchemaResponse_ =
+            
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse.newBuilder(getSchemaResponse_).mergeFrom(value).buildPartial();
+        } else {
+          getSchemaResponse_ = value;
+        }
+        
+        bitField1_ |= 0x00000004;
+        return this;
+      }
+      public Builder clearGetSchemaResponse() {
+        getSchemaResponse_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse.getDefaultInstance();
+        
+        bitField1_ = (bitField1_ & ~0x00000004);
+        return this;
+      }
+      
       // @@protoc_insertion_point(builder_scope:pulsar.proto.BaseCommand)
     }
     
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
index 117019ecd4..ca7b93f174 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
@@ -20,17 +20,55 @@
 
 import java.util.Collections;
 import java.util.Map;
+import java.util.TreeMap;
 
 import lombok.AllArgsConstructor;
 import lombok.Data;
+import lombok.EqualsAndHashCode;
 import lombok.NoArgsConstructor;
 
+import org.apache.pulsar.common.api.Commands;
+import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
+import org.apache.pulsar.common.api.proto.PulsarApi.Schema;
+
 @Data
 @AllArgsConstructor
 @NoArgsConstructor
 public class SchemaInfo {
+
+    @EqualsAndHashCode.Exclude
     private String name;
+
     private byte[] schema;
     private SchemaType type;
     private Map<String, String> properties = Collections.emptyMap();
+
+    public SchemaInfo(String name, SchemaData data) {
+        this.name = name;
+        this.schema = data.getData();
+        this.type = data.getType();
+        this.properties = data.getProps();
+    }
+
+    public SchemaInfo(Schema schema) {
+        this.name = schema.getName();
+        this.schema = schema.getSchemaData().toByteArray();
+        this.type = Commands.getSchemaType(schema.getType());
+        if (schema.getPropertiesCount() == 0) {
+            this.properties = Collections.emptyMap();
+        } else {
+            this.properties = new TreeMap<>();
+            for (int i =0 ; i < schema.getPropertiesCount();i ++ ) {
+                KeyValue kv = schema.getProperties(i);
+                properties.put(kv.getKey(), kv.getValue());
+            }
+        }
+    }
+
+    public SchemaInfo(String name, GetSchemaResponse schema) {
+        this.name = name;
+        this.schema = schema.getData().getBytes();
+        this.type = schema.getType();
+        this.properties = schema.getProperties();
+    }
 }
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto 
b/pulsar-common/src/main/proto/PulsarApi.proto
index 06eca29ba0..779a27bc1f 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -512,6 +512,22 @@ message CommandGetTopicsOfNamespaceResponse {
        repeated string topics          = 2;
 }
 
+message CommandGetSchema {
+       required uint64 request_id = 1;
+       required string topic      = 2;
+
+       optional bytes schema_version = 3;
+}
+
+message CommandGetSchemaResponse {
+       required uint64 request_id      = 1;
+       optional ServerError error_code = 2;
+    optional string error_message   = 3;
+
+       optional Schema schema          = 4;
+       optional bytes schema_version   = 5;
+}
+
 message BaseCommand {
        enum Type {
                CONNECT     = 2;
@@ -564,6 +580,9 @@ message BaseCommand {
 
                GET_TOPICS_OF_NAMESPACE                         = 32;
                GET_TOPICS_OF_NAMESPACE_RESPONSE        = 33;
+
+               GET_SCHEMA = 34;
+               GET_SCHEMA_RESPONSE = 35;
        }
 
 
@@ -614,4 +633,6 @@ message BaseCommand {
        optional CommandGetTopicsOfNamespace getTopicsOfNamespace = 32;
        optional CommandGetTopicsOfNamespaceResponse 
getTopicsOfNamespaceResponse = 33;
 
+       optional CommandGetSchema getSchema = 34;
+       optional CommandGetSchemaResponse getSchemaResponse = 35;
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to