This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f04eb51  Schema registry (2/N) (#1319)
f04eb51 is described below

commit f04eb517e40ea23779fc7c0264c3537dd7d6671f
Author: Dave Rusek <[email protected]>
AuthorDate: Thu Mar 8 15:35:09 2018 -0700

    Schema registry (2/N) (#1319)
    
    * Schema Registry proto changes
    
    * Infrastructure to store schemas
    
    * Renumber schema fields
    
    * Update Pulsar API with schema changes
    
    * Revert field number change
    
    * Fix merge conflict
    
    * Fix broken merge
    
    * Address issues in review
    
    * Add schema type back to proto definition
    
    * Address comments regarding lombok usage
    
    * Remove reserved future enum fields
    
    * regenerate code from protobuf
    
    * Remove unused code
    
    * Add schema version to producer success message
    
    * plumb schema through to producer
    
    * Revert "Add schema version to producer success message"
    
    This reverts commit e7e72f468cf46f1605524a7399520c22763583c9.
    
    * Revert "Revert "Add schema version to producer success message""
    
    This reverts commit 7b902f6bdb1cb054e26577747ff4dd8c326a6248.
    
    * Persist schema on producer connect
    
    * Add principal to schema on publish
    
    * Reformat function for readability
    
    * Remove unused protoc profile
    
    * Rename put on schema registry to putIfAbsent
    
    * wip: address review comments
    
    * switch underscore to slash in schema name
    
    * blah
    
    * Fix protobuf version incompatibility
    
    * Add appropriate license headers
---
 pom.xml                                            |   14 +
 .../apache/pulsar/broker/ServiceConfiguration.java |   12 +-
 pulsar-broker/pom.xml                              |    6 +
 .../org/apache/pulsar/broker/PulsarService.java    |   21 +-
 .../org/apache/pulsar/broker/service/Producer.java |   23 +-
 .../apache/pulsar/broker/service/ServerCnx.java    |  122 +-
 .../org/apache/pulsar/broker/service/Topic.java    |    8 +-
 .../service/nonpersistent/NonPersistentTopic.java  |   30 +-
 .../broker/service/persistent/PersistentTopic.java |   11 +
 .../schema/DefaultSchemaRegistryService.java       |   57 +
 .../broker/service/schema/SchemaRegistry.java      |   79 ++
 .../service/schema/SchemaRegistryService.java      |   46 +
 .../service/schema/SchemaRegistryServiceImpl.java  |  190 +++
 .../broker/service/schema/SchemaStorage.java       |   36 +
 .../service/schema/SchemaStorageFactory.java       |   27 +
 .../pulsar/broker/service/schema/StoredSchema.java |   68 +
 .../service/schema/proto/SchemaRegistryFormat.java | 1373 ++++++++++++++++++++
 .../src/main/proto/SchemaRegistryFormat.proto      |   45 +
 .../pulsar/broker/service/PersistentTopicTest.java |   15 +-
 .../pulsar/broker/service/ServerCnxTest.java       |   19 +-
 .../apache/pulsar/client/api/ClientErrorsTest.java |   16 +-
 .../pulsar/client/api/MockBrokerService.java       |    3 +-
 .../org/apache/pulsar/common/api/Commands.java     |   14 +-
 .../org/apache/pulsar/common/naming/TopicName.java |    6 +
 .../apache/pulsar/common/schema/EmptyVersion.java  |   28 +
 .../apache/pulsar/common/schema/LatestVersion.java |   28 +
 .../apache/pulsar/common/schema/SchemaData.java    |   34 +
 .../apache/pulsar/common/schema/SchemaType.java    |   23 +
 .../apache/pulsar/common/schema/SchemaVersion.java |   26 +
 29 files changed, 2281 insertions(+), 99 deletions(-)

diff --git a/pom.xml b/pom.xml
index 081fcdb..7137195 100644
--- a/pom.xml
+++ b/pom.xml
@@ -408,6 +408,20 @@ flexible messaging model and an intuitive client 
API.</description>
         <type>test-jar</type>
         <version>${log4j2.version}</version>
       </dependency>
+
+      <dependency>
+        <groupId>org.apache.logging.log4j</groupId>
+        <artifactId>log4j-core</artifactId>
+        <type>test-jar</type>
+        <version>${log4j2.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.logging.log4j</groupId>
+        <artifactId>log4j-api</artifactId>
+        <type>test-jar</type>
+        <version>${log4j2.version}</version>
+      </dependency>
       <dependency>
         <groupId>org.apache.logging.log4j</groupId>
         <artifactId>log4j-core</artifactId>
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index bd47970..cc2a409 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -415,6 +415,8 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     @FieldContext(dynamic = true)
     private boolean preferLaterVersions = false;
 
+    private String schemaRegistryStorageClassName = 
"org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorageFactory";
+
     /**** --- WebSocket --- ****/
     // Number of IO threads in Pulsar Client used in WebSocket proxy
     private int webSocketNumIoThreads = 
Runtime.getRuntime().availableProcessors();
@@ -1449,7 +1451,15 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     public void setExposeTopicLevelMetricsInPrometheus(boolean 
exposeTopicLevelMetricsInPrometheus) {
         this.exposeTopicLevelMetricsInPrometheus = 
exposeTopicLevelMetricsInPrometheus;
     }
-    
+
+    public String getSchemaRegistryStorageClassName() {
+       return schemaRegistryStorageClassName;
+    }
+
+    public void setSchemaRegistryStorageClassName(String className) {
+        schemaRegistryStorageClassName = className;
+    }
+
     public boolean authenticateOriginalAuthData() {
         return authenticateOriginalAuthData;
     }
diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index abead55..8681cad 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -55,6 +55,12 @@
     </dependency>
 
     <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
+      <version>${protobuf2.version}</version>
+    </dependency>
+
+    <dependency>
       <groupId>${project.groupId}</groupId>
       <artifactId>pulsar-common</artifactId>
       <version>${project.version}</version>
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 6bbc1e6..51014a7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -20,6 +20,9 @@ package org.apache.pulsar.broker;
 
 import static 
org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import io.netty.util.concurrent.DefaultThreadFactory;
 import java.io.IOException;
 import java.net.URL;
 import java.util.List;
@@ -34,7 +37,6 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Supplier;
-
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
@@ -52,6 +54,7 @@ import 
org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
 import org.apache.pulsar.broker.stats.MetricsGenerator;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
 import org.apache.pulsar.broker.web.WebService;
@@ -80,11 +83,6 @@ import org.eclipse.jetty.servlet.ServletHolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-import io.netty.util.concurrent.DefaultThreadFactory;
-
 /**
  * Main class for Pulsar broker service
  */
@@ -123,6 +121,7 @@ public class PulsarService implements AutoCloseable {
     private final String brokerServiceUrl;
     private final String brokerServiceUrlTls;
     private final String brokerVersion;
+    private SchemaRegistryService schemaRegistryService = null;
     private final Optional<WorkerService> functionWorkerService;
 
     private final MessagingServiceShutdownHook shutdownService;
@@ -233,6 +232,10 @@ public class PulsarService implements AutoCloseable {
                 loadManager.stop();
             }
 
+            if (schemaRegistryService != null) {
+                schemaRegistryService.close();
+            }
+
             state = State.Closed;
 
         } catch (Exception e) {
@@ -359,6 +362,8 @@ public class PulsarService implements AutoCloseable {
 
             this.metricsGenerator = new MetricsGenerator(this);
 
+            schemaRegistryService = SchemaRegistryService.create(this);
+
             state = State.Started;
 
             acquireSLANamespace();
@@ -701,4 +706,8 @@ public class PulsarService implements AutoCloseable {
     public String getBrokerVersion() {
         return brokerVersion;
     }
+
+    public SchemaRegistryService getSchemaRegistryService() {
+        return schemaRegistryService;
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index 9268ac7..35bc315 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -23,12 +23,16 @@ import static 
com.scurrilous.circe.checksum.Crc32cIntChecksum.computeChecksum;
 import static org.apache.pulsar.common.api.Commands.hasChecksum;
 import static org.apache.pulsar.common.api.Commands.readChecksum;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.MoreObjects;
+import io.netty.buffer.ByteBuf;
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
-
 import org.apache.bookkeeper.mledger.util.Rate;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.TopicClosedException;
@@ -42,17 +46,11 @@ import 
org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.NonPersistentPublisherStats;
 import org.apache.pulsar.common.policies.data.PublisherStats;
+import org.apache.pulsar.common.schema.SchemaVersion;
 import org.apache.pulsar.common.util.DateFormatter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.MoreObjects;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.util.Recycler;
-import io.netty.util.Recycler.Handle;
-
 /**
  * Represents a currently connected producer
  */
@@ -82,8 +80,10 @@ public class Producer {
 
     private final Map<String, String> metadata;
 
+    private final SchemaVersion schemaVersion;
+
     public Producer(Topic topic, ServerCnx cnx, long producerId, String 
producerName, String appId,
-        boolean isEncrypted, Map<String, String> metadata) {
+        boolean isEncrypted, Map<String, String> metadata, SchemaVersion 
schemaVersion) {
         this.topic = topic;
         this.cnx = cnx;
         this.producerId = producerId;
@@ -110,6 +110,7 @@ public class Producer {
         this.remoteCluster = isRemote ? producerName.split("\\.")[2] : null;
 
         this.isEncrypted = isEncrypted;
+        this.schemaVersion = schemaVersion;
     }
 
     @Override
@@ -492,6 +493,10 @@ public class Producer {
         }
     }
 
+    public SchemaVersion getSchemaVersion() {
+        return schemaVersion;
+    }
+
     private static final Logger log = LoggerFactory.getLogger(Producer.class);
 
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index a386ece..e0ff275 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -25,6 +25,12 @@ import static 
org.apache.pulsar.broker.lookup.TopicLookup.lookupTopicAsync;
 import static org.apache.pulsar.common.api.Commands.newLookupErrorResponse;
 import static org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion.v5;
 
+import com.google.protobuf.GeneratedMessageLite;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOption;
+import io.netty.handler.ssl.SslHandler;
 import java.net.SocketAddress;
 import java.util.List;
 import java.util.Map;
@@ -32,6 +38,7 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 import javax.naming.AuthenticationException;
 import javax.net.ssl.SSLSession;
 import org.apache.bookkeeper.mledger.Position;
@@ -52,6 +59,7 @@ import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.api.CommandUtils;
 import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.PulsarHandler;
+import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseConsumer;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseProducer;
@@ -74,24 +82,18 @@ import 
org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
 import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
-import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.naming.Metadata;
 import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.ConsumerStats;
+import org.apache.pulsar.common.schema.SchemaData;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.common.schema.SchemaVersion;
 import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Sets;
-import com.google.protobuf.GeneratedMessageLite;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelOption;
-import io.netty.handler.ssl.SslHandler;
-
 public class ServerCnx extends PulsarHandler {
     private final BrokerService service;
     private final ConcurrentLongHashMap<CompletableFuture<Producer>> producers;
@@ -697,6 +699,36 @@ public class ServerCnx extends PulsarHandler {
         });
     }
 
+    private static SchemaType getType(PulsarApi.Schema.Type protocolType) {
+        switch (protocolType) {
+            case Json:
+                return SchemaType.JSON;
+            case Avro:
+                return SchemaType.AVRO;
+            case Thrift:
+                return SchemaType.THRIFT;
+            case Protobuf:
+                return SchemaType.PROTOBUF;
+            default:
+                return SchemaType.NONE;
+        }
+    }
+
+    private SchemaData getSchema(PulsarApi.Schema protocolSchema) {
+        return SchemaData.builder()
+            .data(protocolSchema.getSchemaData().toByteArray())
+            .isDeleted(false)
+            .timestamp(System.currentTimeMillis())
+            .user(originalPrincipal)
+            .type(getType(protocolSchema.getType()))
+            .props(protocolSchema.getPropertiesList().stream().collect(
+                Collectors.toMap(
+                    PulsarApi.KeyValue::getKey,
+                    PulsarApi.KeyValue::getValue
+                )
+            )).build();
+    }
+
     @Override
     protected void handleProducer(final CommandProducer cmdProducer) {
         checkArgument(state == State.Connected);
@@ -752,7 +784,8 @@ public class ServerCnx extends PulsarHandler {
                                 Producer producer = 
existingProducerFuture.getNow(null);
                                 log.info("[{}] Producer with the same id is 
already created: {}", remoteAddress,
                                         producer);
-                                
ctx.writeAndFlush(Commands.newProducerSuccess(requestId, 
producer.getProducerName()));
+                                
ctx.writeAndFlush(Commands.newProducerSuccess(requestId, 
producer.getProducerName(),
+                                    producer.getSchemaVersion()));
                                 return null;
                             } else {
                                 // There was an early request to create a 
producer with
@@ -805,41 +838,56 @@ public class ServerCnx extends PulsarHandler {
 
                             disableTcpNoDelayIfNeeded(topicName.toString(), 
producerName);
 
-                            Producer producer = new Producer(topic, 
ServerCnx.this, producerId, producerName, authRole,
-                                    isEncrypted, metadata);
+                            CompletableFuture<SchemaVersion> 
schemaVersionFuture;
+                            if (cmdProducer.hasSchema()) {
+                                schemaVersionFuture = 
topic.addSchema(getSchema(cmdProducer.getSchema()));
+                            } else {
+                                schemaVersionFuture = 
CompletableFuture.completedFuture(SchemaVersion.Empty);
+                            }
+
+                            schemaVersionFuture.exceptionally(exception -> {
+                                ctx.writeAndFlush(Commands.newError(requestId, 
ServerError.UnknownError, exception.getMessage()));
+                                producers.remove(producerId, producerFuture);
+                                return null;
+                            });
+
+                            schemaVersionFuture.thenAccept(schemaVersion -> {
+                                Producer producer = new Producer(topic, 
ServerCnx.this, producerId, producerName, authRole,
+                                    isEncrypted, metadata, schemaVersion);
 
-                            try {
-                                topic.addProducer(producer);
+                                try {
+                                    topic.addProducer(producer);
 
-                                if (isActive()) {
-                                    if (producerFuture.complete(producer)) {
-                                        log.info("[{}] Created new producer: 
{}", remoteAddress, producer);
-                                        
ctx.writeAndFlush(Commands.newProducerSuccess(requestId, producerName,
-                                                producer.getLastSequenceId()));
-                                        return;
+                                    if (isActive()) {
+                                        if (producerFuture.complete(producer)) 
{
+                                            log.info("[{}] Created new 
producer: {}", remoteAddress, producer);
+                                            
ctx.writeAndFlush(Commands.newProducerSuccess(requestId, producerName,
+                                                producer.getLastSequenceId(), 
producer.getSchemaVersion()));
+                                            return;
+                                        } else {
+                                            // The producer's future was 
completed before by
+                                            // a close command
+                                            producer.closeNow();
+                                            log.info("[{}] Cleared producer 
created after timeout on client side {}",
+                                                remoteAddress, producer);
+                                        }
                                     } else {
-                                        // The producer's future was completed 
before by
-                                        // a close command
                                         producer.closeNow();
-                                        log.info("[{}] Cleared producer 
created after timeout on client side {}",
-                                                remoteAddress, producer);
-                                    }
-                                } else {
-                                    producer.closeNow();
-                                    log.info("[{}] Cleared producer created 
after connection was closed: {}",
+                                        log.info("[{}] Cleared producer 
created after connection was closed: {}",
                                             remoteAddress, producer);
-                                    producerFuture.completeExceptionally(
+                                        producerFuture.completeExceptionally(
                                             new 
IllegalStateException("Producer created after connection was closed"));
-                                }
-                            } catch (BrokerServiceException ise) {
-                                log.error("[{}] Failed to add producer to 
topic {}: {}", remoteAddress, topicName,
+                                    }
+                                } catch (BrokerServiceException ise) {
+                                    log.error("[{}] Failed to add producer to 
topic {}: {}", remoteAddress, topicName,
                                         ise.getMessage());
-                                ctx.writeAndFlush(Commands.newError(requestId,
+                                    
ctx.writeAndFlush(Commands.newError(requestId,
                                         
BrokerServiceException.getClientErrorCode(ise), ise.getMessage()));
-                                producerFuture.completeExceptionally(ise);
-                            }
+                                    producerFuture.completeExceptionally(ise);
+                                }
 
-                            producers.remove(producerId, producerFuture);
+                                producers.remove(producerId, producerFuture);
+                            });
                         }).exceptionally(exception -> {
                             Throwable cause = exception.getCause();
                             if (!(cause instanceof 
ServiceUnitNotReadyException)) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index 80aed77..be60116 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -18,9 +18,9 @@
  */
 package org.apache.pulsar.broker.service;
 
+import io.netty.buffer.ByteBuf;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
 import org.apache.pulsar.broker.stats.NamespaceStats;
@@ -30,13 +30,13 @@ import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.PersistentTopicStats;
 import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.schema.SchemaData;
+import org.apache.pulsar.common.schema.SchemaVersion;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
 import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
 import org.apache.pulsar.utils.StatsOutputStream;
 
-import io.netty.buffer.ByteBuf;
-
 public interface Topic {
 
     public interface PublishContext {
@@ -125,4 +125,6 @@ public interface Topic {
     PersistentTopicInternalStats getInternalStats();
 
     Position getLastMessageId();
+
+    CompletableFuture<SchemaVersion> addSchema(SchemaData schema);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index ef355f0..4a6de05 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -22,6 +22,13 @@ import static 
com.google.common.base.Preconditions.checkArgument;
 import static org.apache.bookkeeper.mledger.impl.EntryCacheManager.create;
 import static 
org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 
+import com.carrotsearch.hppc.ObjectObjectHashMap;
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import io.netty.buffer.ByteBuf;
+import io.netty.util.concurrent.FastThreadLocal;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -32,7 +39,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.Position;
@@ -70,6 +76,8 @@ import 
org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import 
org.apache.pulsar.common.policies.data.PersistentTopicInternalStats.CursorStats;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.PublisherStats;
+import org.apache.pulsar.common.schema.SchemaData;
+import org.apache.pulsar.common.schema.SchemaVersion;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
@@ -79,15 +87,6 @@ import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.carrotsearch.hppc.ObjectObjectHashMap;
-import com.google.common.base.MoreObjects;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.util.concurrent.FastThreadLocal;
-
 public class NonPersistentTopic implements Topic {
     private final String topic;
 
@@ -973,7 +972,14 @@ public class NonPersistentTopic implements Topic {
         this.hasBatchMessagePublished = true;
     }
 
-
-
     private static final Logger log = 
LoggerFactory.getLogger(NonPersistentTopic.class);
+
+    @Override
+    public CompletableFuture<SchemaVersion> addSchema(SchemaData schema) {
+        String base = TopicName.get(getName()).getPartitionedTopicName();
+        String id = TopicName.get(base).getSchemaName();
+        return brokerService.pulsar()
+            .getSchemaRegistryService()
+            .putSchemaIfAbsent(id, schema);
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 62017f0..cdca1cd 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -92,6 +92,8 @@ import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.PublisherStats;
 import org.apache.pulsar.common.policies.data.ReplicatorStats;
 import org.apache.pulsar.common.policies.data.SubscriptionStats;
+import org.apache.pulsar.common.schema.SchemaData;
+import org.apache.pulsar.common.schema.SchemaVersion;
 import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.DateFormatter;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -1601,4 +1603,13 @@ public class PersistentTopic implements Topic, 
AddEntryCallback {
     }
 
     private static final Logger log = 
LoggerFactory.getLogger(PersistentTopic.class);
+
+    @Override
+    public CompletableFuture<SchemaVersion> addSchema(SchemaData schema) {
+        String base = TopicName.get(getName()).getPartitionedTopicName();
+        String id = TopicName.get(base).getSchemaName();
+        return brokerService.pulsar()
+            .getSchemaRegistryService()
+            .putSchemaIfAbsent(id, schema);
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java
new file mode 100644
index 0000000..db3b9f7
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.schema;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.common.schema.SchemaData;
+import org.apache.pulsar.common.schema.SchemaVersion;
+
+public class DefaultSchemaRegistryService implements SchemaRegistryService {
+    @Override
+    public CompletableFuture<SchemaAndMetadata> getSchema(String schemaId) {
+        return completedFuture(null);
+    }
+
+    @Override
+    public CompletableFuture<SchemaAndMetadata> getSchema(String schemaId, 
SchemaVersion version) {
+        return completedFuture(null);
+    }
+
+    @Override
+    public CompletableFuture<SchemaVersion> putSchemaIfAbsent(String schemaId, 
SchemaData schema) {
+        return completedFuture(null);
+    }
+
+    @Override
+    public CompletableFuture<SchemaVersion> deleteSchema(String schemaId, 
String user) {
+        return completedFuture(null);
+    }
+
+    @Override
+    public SchemaVersion versionFromBytes(byte[] version) {
+        return null;
+    }
+
+    @Override
+    public void close() throws Exception {
+
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java
new file mode 100644
index 0000000..4dfbd6d
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.schema;
+
+import com.google.common.base.MoreObjects;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.common.schema.SchemaData;
+import org.apache.pulsar.common.schema.SchemaVersion;
+
+public interface SchemaRegistry extends AutoCloseable {
+
+    CompletableFuture<SchemaAndMetadata> getSchema(String schemaId);
+
+    CompletableFuture<SchemaAndMetadata> getSchema(String schemaId, 
SchemaVersion version);
+
+    CompletableFuture<SchemaVersion> putSchemaIfAbsent(String schemaId, 
SchemaData schema);
+
+    CompletableFuture<SchemaVersion> deleteSchema(String schemaId, String 
user);
+
+    SchemaVersion versionFromBytes(byte[] version);
+
+    class SchemaAndMetadata {
+        public final String id;
+        public final SchemaData schema;
+        public final SchemaVersion version;
+
+        SchemaAndMetadata(String id, SchemaData schema, SchemaVersion version) 
{
+            this.id = id;
+            this.schema = schema;
+            this.version = version;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            SchemaAndMetadata that = (SchemaAndMetadata) o;
+            return version == that.version &&
+                Objects.equals(id, that.id) &&
+                Objects.equals(schema, that.schema);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(id, schema, version);
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(this)
+                .add("id", id)
+                .add("schema", schema)
+                .add("version", version)
+                .toString();
+        }
+    }
+
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java
new file mode 100644
index 0000000..69e7364
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.schema;
+
+import java.lang.reflect.Method;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public interface SchemaRegistryService extends SchemaRegistry {
+    String CreateMethodName = "create";
+    Logger log = LoggerFactory.getLogger(SchemaRegistryService.class);
+
+    static SchemaRegistryService create(PulsarService pulsar) {
+        try {
+            ServiceConfiguration config = pulsar.getConfiguration();
+            final Class<?> storageClass = 
Class.forName(config.getSchemaRegistryStorageClassName());
+            Object factoryInstance = storageClass.newInstance();
+            Method createMethod = storageClass.getMethod(CreateMethodName, 
PulsarService.class);
+            SchemaStorage schemaStorage = (SchemaStorage) 
createMethod.invoke(factoryInstance, pulsar);
+            return new SchemaRegistryServiceImpl(schemaStorage);
+        } catch (Exception e) {
+            log.warn("Error when trying to create scehema registry storage: 
{}", e);
+        }
+        return new DefaultSchemaRegistryService();
+    }
+
+    void close() throws Exception;
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
new file mode 100644
index 0000000..1aec039
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.schema;
+
+import static java.util.Objects.isNull;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static 
org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl.Functions.toPairs;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.time.Clock;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import javax.validation.constraints.NotNull;
+import org.apache.pulsar.broker.service.schema.proto.SchemaRegistryFormat;
+import org.apache.pulsar.common.schema.SchemaData;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.common.schema.SchemaVersion;
+
+public class SchemaRegistryServiceImpl implements SchemaRegistryService {
+    private final SchemaStorage schemaStorage;
+    private final Clock clock;
+
+    @VisibleForTesting
+    SchemaRegistryServiceImpl(SchemaStorage schemaStorage, Clock clock) {
+        this.schemaStorage = schemaStorage;
+        this.clock = clock;
+    }
+
+    @VisibleForTesting
+    SchemaRegistryServiceImpl(SchemaStorage schemaStorage) {
+        this(schemaStorage, Clock.systemUTC());
+    }
+
+    @Override
+    @NotNull
+    public CompletableFuture<SchemaAndMetadata> getSchema(String schemaId) {
+        return getSchema(schemaId, SchemaVersion.Latest);
+    }
+
+    @Override
+    @NotNull
+    public CompletableFuture<SchemaAndMetadata> getSchema(String schemaId, 
SchemaVersion version) {
+        return schemaStorage.get(schemaId, version).thenCompose(stored -> {
+                if (isNull(stored)) {
+                    return completedFuture(null);
+                } else {
+                    return Functions.bytesToSchemaInfo(stored.data)
+                        .thenApply(Functions::schemaInfoToSchema)
+                        .thenApply(schema -> new SchemaAndMetadata(schemaId, 
schema, stored.version));
+                }
+            }
+        );
+    }
+
+    @Override
+    @NotNull
+    public CompletableFuture<SchemaVersion> putSchemaIfAbsent(String schemaId, 
SchemaData schema) {
+        SchemaRegistryFormat.SchemaInfo info = 
SchemaRegistryFormat.SchemaInfo.newBuilder()
+            .setType(Functions.convertFromDomainType(schema.getType()))
+            .setSchema(ByteString.copyFrom(schema.getData()))
+            .setSchemaId(schemaId)
+            .setUser(schema.getUser())
+            .setDeleted(false)
+            .setTimestamp(clock.millis())
+            .addAllProps(toPairs(schema.getProps()))
+            .build();
+        return schemaStorage.put(schemaId, info.toByteArray());
+    }
+
+    @Override
+    @NotNull
+    public CompletableFuture<SchemaVersion> deleteSchema(String schemaId, 
String user) {
+        byte[] deletedEntry = deleted(schemaId, user).toByteArray();
+        return schemaStorage.put(schemaId, deletedEntry);
+    }
+
+    @Override
+    public SchemaVersion versionFromBytes(byte[] version) {
+        return schemaStorage.versionFromBytes(version);
+    }
+
+    @Override
+    public void close() throws Exception {
+        schemaStorage.close();
+    }
+
+    private SchemaRegistryFormat.SchemaInfo deleted(String schemaId, String 
user) {
+        return SchemaRegistryFormat.SchemaInfo.newBuilder()
+            .setSchemaId(schemaId)
+            .setType(SchemaRegistryFormat.SchemaInfo.SchemaType.NONE)
+            .setSchema(ByteString.EMPTY)
+            .setUser(user)
+            .setDeleted(true)
+            .setTimestamp(clock.millis())
+            .build();
+    }
+
+    interface Functions {
+        static SchemaType 
convertToDomainType(SchemaRegistryFormat.SchemaInfo.SchemaType type) {
+            switch (type) {
+                case AVRO:
+                    return SchemaType.AVRO;
+                case JSON:
+                    return SchemaType.JSON;
+                case PROTO:
+                    return SchemaType.PROTOBUF;
+                case THRIFT:
+                    return SchemaType.THRIFT;
+                default:
+                    return SchemaType.NONE;
+            }
+        }
+
+        static SchemaRegistryFormat.SchemaInfo.SchemaType 
convertFromDomainType(SchemaType type) {
+            switch (type) {
+                case AVRO:
+                    return SchemaRegistryFormat.SchemaInfo.SchemaType.AVRO;
+                case JSON:
+                    return SchemaRegistryFormat.SchemaInfo.SchemaType.JSON;
+                case THRIFT:
+                    return SchemaRegistryFormat.SchemaInfo.SchemaType.THRIFT;
+                case PROTOBUF:
+                    return SchemaRegistryFormat.SchemaInfo.SchemaType.PROTO;
+                default:
+                    return SchemaRegistryFormat.SchemaInfo.SchemaType.NONE;
+            }
+        }
+
+        static Map<String, String> 
toMap(List<SchemaRegistryFormat.SchemaInfo.KeyValuePair> pairs) {
+            Map<String, String> map = new HashMap<>();
+            for (SchemaRegistryFormat.SchemaInfo.KeyValuePair pair : pairs) {
+                map.put(pair.getKey(), pair.getValue());
+            }
+            return map;
+        }
+
+        static List<SchemaRegistryFormat.SchemaInfo.KeyValuePair> 
toPairs(Map<String, String> map) {
+            List<SchemaRegistryFormat.SchemaInfo.KeyValuePair> pairs = new 
ArrayList<>(map.size());
+            for (Map.Entry<String, String> entry : map.entrySet()) {
+                SchemaRegistryFormat.SchemaInfo.KeyValuePair.Builder builder =
+                    SchemaRegistryFormat.SchemaInfo.KeyValuePair.newBuilder();
+                
pairs.add(builder.setKey(entry.getKey()).setValue(entry.getValue()).build());
+            }
+            return pairs;
+        }
+
+        static SchemaData schemaInfoToSchema(SchemaRegistryFormat.SchemaInfo 
info) {
+            return SchemaData.builder()
+                .user(info.getUser())
+                .type(convertToDomainType(info.getType()))
+                .data(info.getSchema().toByteArray())
+                .isDeleted(info.getDeleted())
+                .props(toMap(info.getPropsList()))
+                .build();
+        }
+
+        static CompletableFuture<SchemaRegistryFormat.SchemaInfo> 
bytesToSchemaInfo(byte[] bytes) {
+            CompletableFuture<SchemaRegistryFormat.SchemaInfo> future;
+            try {
+                future = 
completedFuture(SchemaRegistryFormat.SchemaInfo.parseFrom(bytes));
+            } catch (InvalidProtocolBufferException e) {
+                future = new CompletableFuture<>();
+                future.completeExceptionally(e);
+            }
+            return future;
+        }
+    }
+
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorage.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorage.java
new file mode 100644
index 0000000..4d0d8af
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorage.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.schema;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.common.schema.SchemaVersion;
+
+public interface SchemaStorage {
+
+    CompletableFuture<SchemaVersion> put(String key, byte[] value);
+
+    CompletableFuture<StoredSchema> get(String key, SchemaVersion version);
+
+    CompletableFuture<SchemaVersion> delete(String key);
+
+    SchemaVersion versionFromBytes(byte[] version);
+
+    void close() throws Exception;
+
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorageFactory.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorageFactory.java
new file mode 100644
index 0000000..c4cff34
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorageFactory.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.schema;
+
+import javax.validation.constraints.NotNull;
+import org.apache.pulsar.broker.PulsarService;
+
+public interface SchemaStorageFactory {
+    @NotNull
+    SchemaStorage create(PulsarService pulsar) throws Exception;
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/StoredSchema.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/StoredSchema.java
new file mode 100644
index 0000000..f28a707
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/StoredSchema.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.schema;
+
+import com.google.common.base.MoreObjects;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.pulsar.common.schema.SchemaVersion;
+
+public class StoredSchema {
+    public final byte[] data;
+    public final SchemaVersion version;
+    public final Map<String, String> metadata;
+
+    public StoredSchema(byte[] data, SchemaVersion version, Map<String, 
String> metadata) {
+        this.data = data;
+        this.version = version;
+        this.metadata = metadata;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        StoredSchema that = (StoredSchema) o;
+        return Arrays.equals(data, that.data) &&
+            Objects.equals(version, that.version) &&
+            Objects.equals(metadata, that.metadata);
+    }
+
+    @Override
+    public int hashCode() {
+
+        int result = Objects.hash(version, metadata);
+        result = 31 * result + Arrays.hashCode(data);
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this)
+            .add("data", data)
+            .add("version", version)
+            .add("metadata", metadata)
+            .toString();
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/proto/SchemaRegistryFormat.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/proto/SchemaRegistryFormat.java
new file mode 100644
index 0000000..3922731
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/proto/SchemaRegistryFormat.java
@@ -0,0 +1,1373 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+// Generated by the protocol buffer compiler.  DO NOT EDIT!
+// source: src/main/proto/SchemaRegistryFormat.proto
+
+package org.apache.pulsar.broker.service.schema.proto;
+
+public final class SchemaRegistryFormat {
+  private SchemaRegistryFormat() {}
+  public static void registerAllExtensions(
+      com.google.protobuf.ExtensionRegistryLite registry) {
+  }
+  public interface SchemaInfoOrBuilder
+      extends com.google.protobuf.MessageLiteOrBuilder {
+    
+    // required string schema_id = 1;
+    boolean hasSchemaId();
+    String getSchemaId();
+    
+    // required string user = 2;
+    boolean hasUser();
+    String getUser();
+    
+    // required .pulsar.schema.SchemaInfo.SchemaType type = 3;
+    boolean hasType();
+    SchemaRegistryFormat.SchemaInfo.SchemaType getType();
+    
+    // required bytes schema = 4;
+    boolean hasSchema();
+    com.google.protobuf.ByteString getSchema();
+    
+    // required int64 timestamp = 5;
+    boolean hasTimestamp();
+    long getTimestamp();
+    
+    // required bool deleted = 6;
+    boolean hasDeleted();
+    boolean getDeleted();
+    
+    // repeated .pulsar.schema.SchemaInfo.KeyValuePair props = 7;
+    java.util.List<SchemaRegistryFormat.SchemaInfo.KeyValuePair>
+        getPropsList();
+    SchemaRegistryFormat.SchemaInfo.KeyValuePair getProps(int index);
+    int getPropsCount();
+  }
+  public static final class SchemaInfo extends
+      com.google.protobuf.GeneratedMessageLite
+      implements SchemaInfoOrBuilder {
+    // Use SchemaInfo.newBuilder() to construct.
+    private SchemaInfo(Builder builder) {
+      super(builder);
+    }
+    private SchemaInfo(boolean noInit) {}
+    
+    private static final SchemaInfo defaultInstance;
+    public static SchemaInfo getDefaultInstance() {
+      return defaultInstance;
+    }
+    
+    public SchemaInfo getDefaultInstanceForType() {
+      return defaultInstance;
+    }
+    
+    public enum SchemaType
+        implements com.google.protobuf.Internal.EnumLite {
+      NONE(0, 1),
+      THRIFT(1, 2),
+      AVRO(2, 3),
+      JSON(3, 4),
+      PROTO(4, 5),
+      ;
+      
+      public static final int NONE_VALUE = 1;
+      public static final int THRIFT_VALUE = 2;
+      public static final int AVRO_VALUE = 3;
+      public static final int JSON_VALUE = 4;
+      public static final int PROTO_VALUE = 5;
+      
+      
+      public final int getNumber() { return value; }
+      
+      public static SchemaType valueOf(int value) {
+        switch (value) {
+          case 1: return NONE;
+          case 2: return THRIFT;
+          case 3: return AVRO;
+          case 4: return JSON;
+          case 5: return PROTO;
+          default: return null;
+        }
+      }
+      
+      public static com.google.protobuf.Internal.EnumLiteMap<SchemaType>
+          internalGetValueMap() {
+        return internalValueMap;
+      }
+      private static com.google.protobuf.Internal.EnumLiteMap<SchemaType>
+          internalValueMap =
+            new com.google.protobuf.Internal.EnumLiteMap<SchemaType>() {
+              public SchemaType findValueByNumber(int number) {
+                return SchemaType.valueOf(number);
+              }
+            };
+      
+      private final int value;
+      
+      private SchemaType(int index, int value) {
+        this.value = value;
+      }
+      
+      // 
@@protoc_insertion_point(enum_scope:pulsar.schema.SchemaInfo.SchemaType)
+    }
+    
+    public interface KeyValuePairOrBuilder
+        extends com.google.protobuf.MessageLiteOrBuilder {
+      
+      // required string key = 1;
+      boolean hasKey();
+      String getKey();
+      
+      // required string value = 2;
+      boolean hasValue();
+      String getValue();
+    }
+    public static final class KeyValuePair extends
+        com.google.protobuf.GeneratedMessageLite
+        implements KeyValuePairOrBuilder {
+      // Use KeyValuePair.newBuilder() to construct.
+      private KeyValuePair(Builder builder) {
+        super(builder);
+      }
+      private KeyValuePair(boolean noInit) {}
+      
+      private static final KeyValuePair defaultInstance;
+      public static KeyValuePair getDefaultInstance() {
+        return defaultInstance;
+      }
+      
+      public KeyValuePair getDefaultInstanceForType() {
+        return defaultInstance;
+      }
+      
+      private int bitField0_;
+      // required string key = 1;
+      public static final int KEY_FIELD_NUMBER = 1;
+      private java.lang.Object key_;
+      public boolean hasKey() {
+        return ((bitField0_ & 0x00000001) == 0x00000001);
+      }
+      public String getKey() {
+        java.lang.Object ref = key_;
+        if (ref instanceof String) {
+          return (String) ref;
+        } else {
+          com.google.protobuf.ByteString bs = 
+              (com.google.protobuf.ByteString) ref;
+          String s = bs.toStringUtf8();
+          if (com.google.protobuf.Internal.isValidUtf8(bs)) {
+            key_ = s;
+          }
+          return s;
+        }
+      }
+      private com.google.protobuf.ByteString getKeyBytes() {
+        java.lang.Object ref = key_;
+        if (ref instanceof String) {
+          com.google.protobuf.ByteString b = 
+              com.google.protobuf.ByteString.copyFromUtf8((String) ref);
+          key_ = b;
+          return b;
+        } else {
+          return (com.google.protobuf.ByteString) ref;
+        }
+      }
+      
+      // required string value = 2;
+      public static final int VALUE_FIELD_NUMBER = 2;
+      private java.lang.Object value_;
+      public boolean hasValue() {
+        return ((bitField0_ & 0x00000002) == 0x00000002);
+      }
+      public String getValue() {
+        java.lang.Object ref = value_;
+        if (ref instanceof String) {
+          return (String) ref;
+        } else {
+          com.google.protobuf.ByteString bs = 
+              (com.google.protobuf.ByteString) ref;
+          String s = bs.toStringUtf8();
+          if (com.google.protobuf.Internal.isValidUtf8(bs)) {
+            value_ = s;
+          }
+          return s;
+        }
+      }
+      private com.google.protobuf.ByteString getValueBytes() {
+        java.lang.Object ref = value_;
+        if (ref instanceof String) {
+          com.google.protobuf.ByteString b = 
+              com.google.protobuf.ByteString.copyFromUtf8((String) ref);
+          value_ = b;
+          return b;
+        } else {
+          return (com.google.protobuf.ByteString) ref;
+        }
+      }
+      
+      private void initFields() {
+        key_ = "";
+        value_ = "";
+      }
+      private byte memoizedIsInitialized = -1;
+      public final boolean isInitialized() {
+        byte isInitialized = memoizedIsInitialized;
+        if (isInitialized != -1) return isInitialized == 1;
+        
+        if (!hasKey()) {
+          memoizedIsInitialized = 0;
+          return false;
+        }
+        if (!hasValue()) {
+          memoizedIsInitialized = 0;
+          return false;
+        }
+        memoizedIsInitialized = 1;
+        return true;
+      }
+      
+      public void writeTo(com.google.protobuf.CodedOutputStream output)
+                          throws java.io.IOException {
+        getSerializedSize();
+        if (((bitField0_ & 0x00000001) == 0x00000001)) {
+          output.writeBytes(1, getKeyBytes());
+        }
+        if (((bitField0_ & 0x00000002) == 0x00000002)) {
+          output.writeBytes(2, getValueBytes());
+        }
+      }
+      
+      private int memoizedSerializedSize = -1;
+      public int getSerializedSize() {
+        int size = memoizedSerializedSize;
+        if (size != -1) return size;
+      
+        size = 0;
+        if (((bitField0_ & 0x00000001) == 0x00000001)) {
+          size += com.google.protobuf.CodedOutputStream
+            .computeBytesSize(1, getKeyBytes());
+        }
+        if (((bitField0_ & 0x00000002) == 0x00000002)) {
+          size += com.google.protobuf.CodedOutputStream
+            .computeBytesSize(2, getValueBytes());
+        }
+        memoizedSerializedSize = size;
+        return size;
+      }
+      
+      private static final long serialVersionUID = 0L;
+      @java.lang.Override
+      protected java.lang.Object writeReplace()
+          throws java.io.ObjectStreamException {
+        return super.writeReplace();
+      }
+      
+      public static SchemaRegistryFormat.SchemaInfo.KeyValuePair parseFrom(
+          com.google.protobuf.ByteString data)
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        return newBuilder().mergeFrom(data).buildParsed();
+      }
+      public static SchemaRegistryFormat.SchemaInfo.KeyValuePair parseFrom(
+          com.google.protobuf.ByteString data,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        return newBuilder().mergeFrom(data, extensionRegistry)
+                 .buildParsed();
+      }
+      public static SchemaRegistryFormat.SchemaInfo.KeyValuePair 
parseFrom(byte[] data)
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        return newBuilder().mergeFrom(data).buildParsed();
+      }
+      public static SchemaRegistryFormat.SchemaInfo.KeyValuePair parseFrom(
+          byte[] data,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        return newBuilder().mergeFrom(data, extensionRegistry)
+                 .buildParsed();
+      }
+      public static SchemaRegistryFormat.SchemaInfo.KeyValuePair 
parseFrom(java.io.InputStream input)
+          throws java.io.IOException {
+        return newBuilder().mergeFrom(input).buildParsed();
+      }
+      public static SchemaRegistryFormat.SchemaInfo.KeyValuePair parseFrom(
+          java.io.InputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        return newBuilder().mergeFrom(input, extensionRegistry)
+                 .buildParsed();
+      }
+      public static SchemaRegistryFormat.SchemaInfo.KeyValuePair 
parseDelimitedFrom(java.io.InputStream input)
+          throws java.io.IOException {
+        Builder builder = newBuilder();
+        if (builder.mergeDelimitedFrom(input)) {
+          return builder.buildParsed();
+        } else {
+          return null;
+        }
+      }
+      public static SchemaRegistryFormat.SchemaInfo.KeyValuePair 
parseDelimitedFrom(
+          java.io.InputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        Builder builder = newBuilder();
+        if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
+          return builder.buildParsed();
+        } else {
+          return null;
+        }
+      }
+      public static SchemaRegistryFormat.SchemaInfo.KeyValuePair parseFrom(
+          com.google.protobuf.CodedInputStream input)
+          throws java.io.IOException {
+        return newBuilder().mergeFrom(input).buildParsed();
+      }
+      public static SchemaRegistryFormat.SchemaInfo.KeyValuePair parseFrom(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        return newBuilder().mergeFrom(input, extensionRegistry)
+                 .buildParsed();
+      }
+      
+      public static Builder newBuilder() { return Builder.create(); }
+      public Builder newBuilderForType() { return newBuilder(); }
+      public static Builder 
newBuilder(SchemaRegistryFormat.SchemaInfo.KeyValuePair prototype) {
+        return newBuilder().mergeFrom(prototype);
+      }
+      public Builder toBuilder() { return newBuilder(this); }
+      
+      public static final class Builder extends
+          com.google.protobuf.GeneratedMessageLite.Builder<
+            SchemaRegistryFormat.SchemaInfo.KeyValuePair, Builder>
+          implements SchemaRegistryFormat.SchemaInfo.KeyValuePairOrBuilder {
+        // Construct using 
org.apache.pulsar.broker.service.schema.proto.SchemaRegistryFormat.SchemaInfo.KeyValuePair.newBuilder()
+        private Builder() {
+          maybeForceBuilderInitialization();
+        }
+        
+        private void maybeForceBuilderInitialization() {
+        }
+        private static Builder create() {
+          return new Builder();
+        }
+        
+        public Builder clear() {
+          super.clear();
+          key_ = "";
+          bitField0_ = (bitField0_ & ~0x00000001);
+          value_ = "";
+          bitField0_ = (bitField0_ & ~0x00000002);
+          return this;
+        }
+        
+        public Builder clone() {
+          return create().mergeFrom(buildPartial());
+        }
+        
+        public SchemaRegistryFormat.SchemaInfo.KeyValuePair 
getDefaultInstanceForType() {
+          return 
SchemaRegistryFormat.SchemaInfo.KeyValuePair.getDefaultInstance();
+        }
+        
+        public SchemaRegistryFormat.SchemaInfo.KeyValuePair build() {
+          SchemaRegistryFormat.SchemaInfo.KeyValuePair result = buildPartial();
+          if (!result.isInitialized()) {
+            throw newUninitializedMessageException(result);
+          }
+          return result;
+        }
+        
+        private SchemaRegistryFormat.SchemaInfo.KeyValuePair buildParsed()
+            throws com.google.protobuf.InvalidProtocolBufferException {
+          SchemaRegistryFormat.SchemaInfo.KeyValuePair result = buildPartial();
+          if (!result.isInitialized()) {
+            throw newUninitializedMessageException(
+              result).asInvalidProtocolBufferException();
+          }
+          return result;
+        }
+        
+        public SchemaRegistryFormat.SchemaInfo.KeyValuePair buildPartial() {
+          SchemaRegistryFormat.SchemaInfo.KeyValuePair result = new 
SchemaRegistryFormat.SchemaInfo.KeyValuePair(this);
+          int from_bitField0_ = bitField0_;
+          int to_bitField0_ = 0;
+          if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+            to_bitField0_ |= 0x00000001;
+          }
+          result.key_ = key_;
+          if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+            to_bitField0_ |= 0x00000002;
+          }
+          result.value_ = value_;
+          result.bitField0_ = to_bitField0_;
+          return result;
+        }
+        
+        public Builder mergeFrom(SchemaRegistryFormat.SchemaInfo.KeyValuePair 
other) {
+          if (other == 
SchemaRegistryFormat.SchemaInfo.KeyValuePair.getDefaultInstance()) return this;
+          if (other.hasKey()) {
+            setKey(other.getKey());
+          }
+          if (other.hasValue()) {
+            setValue(other.getValue());
+          }
+          return this;
+        }
+        
+        public final boolean isInitialized() {
+          if (!hasKey()) {
+            
+            return false;
+          }
+          if (!hasValue()) {
+            
+            return false;
+          }
+          return true;
+        }
+        
+        public Builder mergeFrom(
+            com.google.protobuf.CodedInputStream input,
+            com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+            throws java.io.IOException {
+          while (true) {
+            int tag = input.readTag();
+            switch (tag) {
+              case 0:
+                
+                return this;
+              default: {
+                if (!parseUnknownField(input, extensionRegistry, tag)) {
+                  
+                  return this;
+                }
+                break;
+              }
+              case 10: {
+                bitField0_ |= 0x00000001;
+                key_ = input.readBytes();
+                break;
+              }
+              case 18: {
+                bitField0_ |= 0x00000002;
+                value_ = input.readBytes();
+                break;
+              }
+            }
+          }
+        }
+        
+        private int bitField0_;
+        
+        // required string key = 1;
+        private java.lang.Object key_ = "";
+        public boolean hasKey() {
+          return ((bitField0_ & 0x00000001) == 0x00000001);
+        }
+        public String getKey() {
+          java.lang.Object ref = key_;
+          if (!(ref instanceof String)) {
+            String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();
+            key_ = s;
+            return s;
+          } else {
+            return (String) ref;
+          }
+        }
+        public Builder setKey(String value) {
+          if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000001;
+          key_ = value;
+          
+          return this;
+        }
+        public Builder clearKey() {
+          bitField0_ = (bitField0_ & ~0x00000001);
+          key_ = getDefaultInstance().getKey();
+          
+          return this;
+        }
+        void setKey(com.google.protobuf.ByteString value) {
+          bitField0_ |= 0x00000001;
+          key_ = value;
+          
+        }
+        
+        // required string value = 2;
+        private java.lang.Object value_ = "";
+        public boolean hasValue() {
+          return ((bitField0_ & 0x00000002) == 0x00000002);
+        }
+        public String getValue() {
+          java.lang.Object ref = value_;
+          if (!(ref instanceof String)) {
+            String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();
+            value_ = s;
+            return s;
+          } else {
+            return (String) ref;
+          }
+        }
+        public Builder setValue(String value) {
+          if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000002;
+          value_ = value;
+          
+          return this;
+        }
+        public Builder clearValue() {
+          bitField0_ = (bitField0_ & ~0x00000002);
+          value_ = getDefaultInstance().getValue();
+          
+          return this;
+        }
+        void setValue(com.google.protobuf.ByteString value) {
+          bitField0_ |= 0x00000002;
+          value_ = value;
+          
+        }
+        
+        // 
@@protoc_insertion_point(builder_scope:pulsar.schema.SchemaInfo.KeyValuePair)
+      }
+      
+      static {
+        defaultInstance = new KeyValuePair(true);
+        defaultInstance.initFields();
+      }
+      
+      // 
@@protoc_insertion_point(class_scope:pulsar.schema.SchemaInfo.KeyValuePair)
+    }
+    
+    private int bitField0_;
+    // required string schema_id = 1;
+    public static final int SCHEMA_ID_FIELD_NUMBER = 1;
+    private java.lang.Object schemaId_;
+    public boolean hasSchemaId() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    public String getSchemaId() {
+      java.lang.Object ref = schemaId_;
+      if (ref instanceof String) {
+        return (String) ref;
+      } else {
+        com.google.protobuf.ByteString bs = 
+            (com.google.protobuf.ByteString) ref;
+        String s = bs.toStringUtf8();
+        if (com.google.protobuf.Internal.isValidUtf8(bs)) {
+          schemaId_ = s;
+        }
+        return s;
+      }
+    }
+    private com.google.protobuf.ByteString getSchemaIdBytes() {
+      java.lang.Object ref = schemaId_;
+      if (ref instanceof String) {
+        com.google.protobuf.ByteString b = 
+            com.google.protobuf.ByteString.copyFromUtf8((String) ref);
+        schemaId_ = b;
+        return b;
+      } else {
+        return (com.google.protobuf.ByteString) ref;
+      }
+    }
+    
+    // required string user = 2;
+    public static final int USER_FIELD_NUMBER = 2;
+    private java.lang.Object user_;
+    public boolean hasUser() {
+      return ((bitField0_ & 0x00000002) == 0x00000002);
+    }
+    public String getUser() {
+      java.lang.Object ref = user_;
+      if (ref instanceof String) {
+        return (String) ref;
+      } else {
+        com.google.protobuf.ByteString bs = 
+            (com.google.protobuf.ByteString) ref;
+        String s = bs.toStringUtf8();
+        if (com.google.protobuf.Internal.isValidUtf8(bs)) {
+          user_ = s;
+        }
+        return s;
+      }
+    }
+    private com.google.protobuf.ByteString getUserBytes() {
+      java.lang.Object ref = user_;
+      if (ref instanceof String) {
+        com.google.protobuf.ByteString b = 
+            com.google.protobuf.ByteString.copyFromUtf8((String) ref);
+        user_ = b;
+        return b;
+      } else {
+        return (com.google.protobuf.ByteString) ref;
+      }
+    }
+    
+    // required .pulsar.schema.SchemaInfo.SchemaType type = 3;
+    public static final int TYPE_FIELD_NUMBER = 3;
+    private SchemaRegistryFormat.SchemaInfo.SchemaType type_;
+    public boolean hasType() {
+      return ((bitField0_ & 0x00000004) == 0x00000004);
+    }
+    public SchemaRegistryFormat.SchemaInfo.SchemaType getType() {
+      return type_;
+    }
+    
+    // required bytes schema = 4;
+    public static final int SCHEMA_FIELD_NUMBER = 4;
+    private com.google.protobuf.ByteString schema_;
+    public boolean hasSchema() {
+      return ((bitField0_ & 0x00000008) == 0x00000008);
+    }
+    public com.google.protobuf.ByteString getSchema() {
+      return schema_;
+    }
+    
+    // required int64 timestamp = 5;
+    public static final int TIMESTAMP_FIELD_NUMBER = 5;
+    private long timestamp_;
+    public boolean hasTimestamp() {
+      return ((bitField0_ & 0x00000010) == 0x00000010);
+    }
+    public long getTimestamp() {
+      return timestamp_;
+    }
+    
+    // required bool deleted = 6;
+    public static final int DELETED_FIELD_NUMBER = 6;
+    private boolean deleted_;
+    public boolean hasDeleted() {
+      return ((bitField0_ & 0x00000020) == 0x00000020);
+    }
+    public boolean getDeleted() {
+      return deleted_;
+    }
+    
+    // repeated .pulsar.schema.SchemaInfo.KeyValuePair props = 7;
+    public static final int PROPS_FIELD_NUMBER = 7;
+    private java.util.List<SchemaRegistryFormat.SchemaInfo.KeyValuePair> 
props_;
+    public java.util.List<SchemaRegistryFormat.SchemaInfo.KeyValuePair> 
getPropsList() {
+      return props_;
+    }
+    public java.util.List<? extends 
SchemaRegistryFormat.SchemaInfo.KeyValuePairOrBuilder>
+        getPropsOrBuilderList() {
+      return props_;
+    }
+    public int getPropsCount() {
+      return props_.size();
+    }
+    public SchemaRegistryFormat.SchemaInfo.KeyValuePair getProps(int index) {
+      return props_.get(index);
+    }
+    public SchemaRegistryFormat.SchemaInfo.KeyValuePairOrBuilder 
getPropsOrBuilder(
+        int index) {
+      return props_.get(index);
+    }
+    
+    private void initFields() {
+      schemaId_ = "";
+      user_ = "";
+      type_ = SchemaRegistryFormat.SchemaInfo.SchemaType.NONE;
+      schema_ = com.google.protobuf.ByteString.EMPTY;
+      timestamp_ = 0L;
+      deleted_ = false;
+      props_ = java.util.Collections.emptyList();
+    }
+    private byte memoizedIsInitialized = -1;
+    public final boolean isInitialized() {
+      byte isInitialized = memoizedIsInitialized;
+      if (isInitialized != -1) return isInitialized == 1;
+      
+      if (!hasSchemaId()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      if (!hasUser()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      if (!hasType()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      if (!hasSchema()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      if (!hasTimestamp()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      if (!hasDeleted()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      for (int i = 0; i < getPropsCount(); i++) {
+        if (!getProps(i).isInitialized()) {
+          memoizedIsInitialized = 0;
+          return false;
+        }
+      }
+      memoizedIsInitialized = 1;
+      return true;
+    }
+    
+    public void writeTo(com.google.protobuf.CodedOutputStream output)
+                        throws java.io.IOException {
+      getSerializedSize();
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeBytes(1, getSchemaIdBytes());
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        output.writeBytes(2, getUserBytes());
+      }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        output.writeEnum(3, type_.getNumber());
+      }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        output.writeBytes(4, schema_);
+      }
+      if (((bitField0_ & 0x00000010) == 0x00000010)) {
+        output.writeInt64(5, timestamp_);
+      }
+      if (((bitField0_ & 0x00000020) == 0x00000020)) {
+        output.writeBool(6, deleted_);
+      }
+      for (int i = 0; i < props_.size(); i++) {
+        output.writeMessage(7, props_.get(i));
+      }
+    }
+    
+    private int memoizedSerializedSize = -1;
+    public int getSerializedSize() {
+      int size = memoizedSerializedSize;
+      if (size != -1) return size;
+    
+      size = 0;
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(1, getSchemaIdBytes());
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(2, getUserBytes());
+      }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeEnumSize(3, type_.getNumber());
+      }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(4, schema_);
+      }
+      if (((bitField0_ & 0x00000010) == 0x00000010)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt64Size(5, timestamp_);
+      }
+      if (((bitField0_ & 0x00000020) == 0x00000020)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBoolSize(6, deleted_);
+      }
+      for (int i = 0; i < props_.size(); i++) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeMessageSize(7, props_.get(i));
+      }
+      memoizedSerializedSize = size;
+      return size;
+    }
+    
+    private static final long serialVersionUID = 0L;
+    @java.lang.Override
+    protected java.lang.Object writeReplace()
+        throws java.io.ObjectStreamException {
+      return super.writeReplace();
+    }
+    
+    public static SchemaRegistryFormat.SchemaInfo parseFrom(
+        com.google.protobuf.ByteString data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data).buildParsed();
+    }
+    public static SchemaRegistryFormat.SchemaInfo parseFrom(
+        com.google.protobuf.ByteString data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data, extensionRegistry)
+               .buildParsed();
+    }
+    public static SchemaRegistryFormat.SchemaInfo parseFrom(byte[] data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data).buildParsed();
+    }
+    public static SchemaRegistryFormat.SchemaInfo parseFrom(
+        byte[] data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data, extensionRegistry)
+               .buildParsed();
+    }
+    public static SchemaRegistryFormat.SchemaInfo 
parseFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static SchemaRegistryFormat.SchemaInfo parseFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    public static SchemaRegistryFormat.SchemaInfo 
parseDelimitedFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static SchemaRegistryFormat.SchemaInfo parseDelimitedFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static SchemaRegistryFormat.SchemaInfo parseFrom(
+        com.google.protobuf.CodedInputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static SchemaRegistryFormat.SchemaInfo parseFrom(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    
+    public static Builder newBuilder() { return Builder.create(); }
+    public Builder newBuilderForType() { return newBuilder(); }
+    public static Builder newBuilder(SchemaRegistryFormat.SchemaInfo 
prototype) {
+      return newBuilder().mergeFrom(prototype);
+    }
+    public Builder toBuilder() { return newBuilder(this); }
+    
+    public static final class Builder extends
+        com.google.protobuf.GeneratedMessageLite.Builder<
+          SchemaRegistryFormat.SchemaInfo, Builder>
+        implements SchemaRegistryFormat.SchemaInfoOrBuilder {
+      // Construct using 
org.apache.pulsar.broker.service.schema.proto.SchemaRegistryFormat.SchemaInfo.newBuilder()
+      private Builder() {
+        maybeForceBuilderInitialization();
+      }
+      
+      private void maybeForceBuilderInitialization() {
+      }
+      private static Builder create() {
+        return new Builder();
+      }
+      
+      public Builder clear() {
+        super.clear();
+        schemaId_ = "";
+        bitField0_ = (bitField0_ & ~0x00000001);
+        user_ = "";
+        bitField0_ = (bitField0_ & ~0x00000002);
+        type_ = SchemaRegistryFormat.SchemaInfo.SchemaType.NONE;
+        bitField0_ = (bitField0_ & ~0x00000004);
+        schema_ = com.google.protobuf.ByteString.EMPTY;
+        bitField0_ = (bitField0_ & ~0x00000008);
+        timestamp_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000010);
+        deleted_ = false;
+        bitField0_ = (bitField0_ & ~0x00000020);
+        props_ = java.util.Collections.emptyList();
+        bitField0_ = (bitField0_ & ~0x00000040);
+        return this;
+      }
+      
+      public Builder clone() {
+        return create().mergeFrom(buildPartial());
+      }
+      
+      public SchemaRegistryFormat.SchemaInfo getDefaultInstanceForType() {
+        return SchemaRegistryFormat.SchemaInfo.getDefaultInstance();
+      }
+      
+      public SchemaRegistryFormat.SchemaInfo build() {
+        SchemaRegistryFormat.SchemaInfo result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(result);
+        }
+        return result;
+      }
+      
+      private SchemaRegistryFormat.SchemaInfo buildParsed()
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        SchemaRegistryFormat.SchemaInfo result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(
+            result).asInvalidProtocolBufferException();
+        }
+        return result;
+      }
+      
+      public SchemaRegistryFormat.SchemaInfo buildPartial() {
+        SchemaRegistryFormat.SchemaInfo result = new 
SchemaRegistryFormat.SchemaInfo(this);
+        int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
+        if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        result.schemaId_ = schemaId_;
+        if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+          to_bitField0_ |= 0x00000002;
+        }
+        result.user_ = user_;
+        if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+          to_bitField0_ |= 0x00000004;
+        }
+        result.type_ = type_;
+        if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+          to_bitField0_ |= 0x00000008;
+        }
+        result.schema_ = schema_;
+        if (((from_bitField0_ & 0x00000010) == 0x00000010)) {
+          to_bitField0_ |= 0x00000010;
+        }
+        result.timestamp_ = timestamp_;
+        if (((from_bitField0_ & 0x00000020) == 0x00000020)) {
+          to_bitField0_ |= 0x00000020;
+        }
+        result.deleted_ = deleted_;
+        if (((bitField0_ & 0x00000040) == 0x00000040)) {
+          props_ = java.util.Collections.unmodifiableList(props_);
+          bitField0_ = (bitField0_ & ~0x00000040);
+        }
+        result.props_ = props_;
+        result.bitField0_ = to_bitField0_;
+        return result;
+      }
+      
+      public Builder mergeFrom(SchemaRegistryFormat.SchemaInfo other) {
+        if (other == SchemaRegistryFormat.SchemaInfo.getDefaultInstance()) 
return this;
+        if (other.hasSchemaId()) {
+          setSchemaId(other.getSchemaId());
+        }
+        if (other.hasUser()) {
+          setUser(other.getUser());
+        }
+        if (other.hasType()) {
+          setType(other.getType());
+        }
+        if (other.hasSchema()) {
+          setSchema(other.getSchema());
+        }
+        if (other.hasTimestamp()) {
+          setTimestamp(other.getTimestamp());
+        }
+        if (other.hasDeleted()) {
+          setDeleted(other.getDeleted());
+        }
+        if (!other.props_.isEmpty()) {
+          if (props_.isEmpty()) {
+            props_ = other.props_;
+            bitField0_ = (bitField0_ & ~0x00000040);
+          } else {
+            ensurePropsIsMutable();
+            props_.addAll(other.props_);
+          }
+          
+        }
+        return this;
+      }
+      
+      public final boolean isInitialized() {
+        if (!hasSchemaId()) {
+          
+          return false;
+        }
+        if (!hasUser()) {
+          
+          return false;
+        }
+        if (!hasType()) {
+          
+          return false;
+        }
+        if (!hasSchema()) {
+          
+          return false;
+        }
+        if (!hasTimestamp()) {
+          
+          return false;
+        }
+        if (!hasDeleted()) {
+          
+          return false;
+        }
+        for (int i = 0; i < getPropsCount(); i++) {
+          if (!getProps(i).isInitialized()) {
+            
+            return false;
+          }
+        }
+        return true;
+      }
+      
+      public Builder mergeFrom(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        while (true) {
+          int tag = input.readTag();
+          switch (tag) {
+            case 0:
+              
+              return this;
+            default: {
+              if (!parseUnknownField(input, extensionRegistry, tag)) {
+                
+                return this;
+              }
+              break;
+            }
+            case 10: {
+              bitField0_ |= 0x00000001;
+              schemaId_ = input.readBytes();
+              break;
+            }
+            case 18: {
+              bitField0_ |= 0x00000002;
+              user_ = input.readBytes();
+              break;
+            }
+            case 24: {
+              int rawValue = input.readEnum();
+              SchemaRegistryFormat.SchemaInfo.SchemaType value = 
SchemaRegistryFormat.SchemaInfo.SchemaType.valueOf(rawValue);
+              if (value != null) {
+                bitField0_ |= 0x00000004;
+                type_ = value;
+              }
+              break;
+            }
+            case 34: {
+              bitField0_ |= 0x00000008;
+              schema_ = input.readBytes();
+              break;
+            }
+            case 40: {
+              bitField0_ |= 0x00000010;
+              timestamp_ = input.readInt64();
+              break;
+            }
+            case 48: {
+              bitField0_ |= 0x00000020;
+              deleted_ = input.readBool();
+              break;
+            }
+            case 58: {
+              SchemaRegistryFormat.SchemaInfo.KeyValuePair.Builder subBuilder 
= SchemaRegistryFormat.SchemaInfo.KeyValuePair.newBuilder();
+              input.readMessage(subBuilder, extensionRegistry);
+              addProps(subBuilder.buildPartial());
+              break;
+            }
+          }
+        }
+      }
+      
+      private int bitField0_;
+      
+      // required string schema_id = 1;
+      private java.lang.Object schemaId_ = "";
+      public boolean hasSchemaId() {
+        return ((bitField0_ & 0x00000001) == 0x00000001);
+      }
+      public String getSchemaId() {
+        java.lang.Object ref = schemaId_;
+        if (!(ref instanceof String)) {
+          String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();
+          schemaId_ = s;
+          return s;
+        } else {
+          return (String) ref;
+        }
+      }
+      public Builder setSchemaId(String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000001;
+        schemaId_ = value;
+        
+        return this;
+      }
+      public Builder clearSchemaId() {
+        bitField0_ = (bitField0_ & ~0x00000001);
+        schemaId_ = getDefaultInstance().getSchemaId();
+        
+        return this;
+      }
+      void setSchemaId(com.google.protobuf.ByteString value) {
+        bitField0_ |= 0x00000001;
+        schemaId_ = value;
+        
+      }
+      
+      // required string user = 2;
+      private java.lang.Object user_ = "";
+      public boolean hasUser() {
+        return ((bitField0_ & 0x00000002) == 0x00000002);
+      }
+      public String getUser() {
+        java.lang.Object ref = user_;
+        if (!(ref instanceof String)) {
+          String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();
+          user_ = s;
+          return s;
+        } else {
+          return (String) ref;
+        }
+      }
+      public Builder setUser(String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000002;
+        user_ = value;
+        
+        return this;
+      }
+      public Builder clearUser() {
+        bitField0_ = (bitField0_ & ~0x00000002);
+        user_ = getDefaultInstance().getUser();
+        
+        return this;
+      }
+      void setUser(com.google.protobuf.ByteString value) {
+        bitField0_ |= 0x00000002;
+        user_ = value;
+        
+      }
+      
+      // required .pulsar.schema.SchemaInfo.SchemaType type = 3;
+      private SchemaRegistryFormat.SchemaInfo.SchemaType type_ = 
SchemaRegistryFormat.SchemaInfo.SchemaType.NONE;
+      public boolean hasType() {
+        return ((bitField0_ & 0x00000004) == 0x00000004);
+      }
+      public SchemaRegistryFormat.SchemaInfo.SchemaType getType() {
+        return type_;
+      }
+      public Builder setType(SchemaRegistryFormat.SchemaInfo.SchemaType value) 
{
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        bitField0_ |= 0x00000004;
+        type_ = value;
+        
+        return this;
+      }
+      public Builder clearType() {
+        bitField0_ = (bitField0_ & ~0x00000004);
+        type_ = SchemaRegistryFormat.SchemaInfo.SchemaType.NONE;
+        
+        return this;
+      }
+      
+      // required bytes schema = 4;
+      private com.google.protobuf.ByteString schema_ = 
com.google.protobuf.ByteString.EMPTY;
+      public boolean hasSchema() {
+        return ((bitField0_ & 0x00000008) == 0x00000008);
+      }
+      public com.google.protobuf.ByteString getSchema() {
+        return schema_;
+      }
+      public Builder setSchema(com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000008;
+        schema_ = value;
+        
+        return this;
+      }
+      public Builder clearSchema() {
+        bitField0_ = (bitField0_ & ~0x00000008);
+        schema_ = getDefaultInstance().getSchema();
+        
+        return this;
+      }
+      
+      // required int64 timestamp = 5;
+      private long timestamp_ ;
+      public boolean hasTimestamp() {
+        return ((bitField0_ & 0x00000010) == 0x00000010);
+      }
+      public long getTimestamp() {
+        return timestamp_;
+      }
+      public Builder setTimestamp(long value) {
+        bitField0_ |= 0x00000010;
+        timestamp_ = value;
+        
+        return this;
+      }
+      public Builder clearTimestamp() {
+        bitField0_ = (bitField0_ & ~0x00000010);
+        timestamp_ = 0L;
+        
+        return this;
+      }
+      
+      // required bool deleted = 6;
+      private boolean deleted_ ;
+      public boolean hasDeleted() {
+        return ((bitField0_ & 0x00000020) == 0x00000020);
+      }
+      public boolean getDeleted() {
+        return deleted_;
+      }
+      public Builder setDeleted(boolean value) {
+        bitField0_ |= 0x00000020;
+        deleted_ = value;
+        
+        return this;
+      }
+      public Builder clearDeleted() {
+        bitField0_ = (bitField0_ & ~0x00000020);
+        deleted_ = false;
+        
+        return this;
+      }
+      
+      // repeated .pulsar.schema.SchemaInfo.KeyValuePair props = 7;
+      private java.util.List<SchemaRegistryFormat.SchemaInfo.KeyValuePair> 
props_ =
+        java.util.Collections.emptyList();
+      private void ensurePropsIsMutable() {
+        if (!((bitField0_ & 0x00000040) == 0x00000040)) {
+          props_ = new 
java.util.ArrayList<SchemaRegistryFormat.SchemaInfo.KeyValuePair>(props_);
+          bitField0_ |= 0x00000040;
+         }
+      }
+      
+      public java.util.List<SchemaRegistryFormat.SchemaInfo.KeyValuePair> 
getPropsList() {
+        return java.util.Collections.unmodifiableList(props_);
+      }
+      public int getPropsCount() {
+        return props_.size();
+      }
+      public SchemaRegistryFormat.SchemaInfo.KeyValuePair getProps(int index) {
+        return props_.get(index);
+      }
+      public Builder setProps(
+          int index, SchemaRegistryFormat.SchemaInfo.KeyValuePair value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        ensurePropsIsMutable();
+        props_.set(index, value);
+        
+        return this;
+      }
+      public Builder setProps(
+          int index, SchemaRegistryFormat.SchemaInfo.KeyValuePair.Builder 
builderForValue) {
+        ensurePropsIsMutable();
+        props_.set(index, builderForValue.build());
+        
+        return this;
+      }
+      public Builder addProps(SchemaRegistryFormat.SchemaInfo.KeyValuePair 
value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        ensurePropsIsMutable();
+        props_.add(value);
+        
+        return this;
+      }
+      public Builder addProps(
+          int index, SchemaRegistryFormat.SchemaInfo.KeyValuePair value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        ensurePropsIsMutable();
+        props_.add(index, value);
+        
+        return this;
+      }
+      public Builder addProps(
+          SchemaRegistryFormat.SchemaInfo.KeyValuePair.Builder 
builderForValue) {
+        ensurePropsIsMutable();
+        props_.add(builderForValue.build());
+        
+        return this;
+      }
+      public Builder addProps(
+          int index, SchemaRegistryFormat.SchemaInfo.KeyValuePair.Builder 
builderForValue) {
+        ensurePropsIsMutable();
+        props_.add(index, builderForValue.build());
+        
+        return this;
+      }
+      public Builder addAllProps(
+          java.lang.Iterable<? extends 
SchemaRegistryFormat.SchemaInfo.KeyValuePair> values) {
+        ensurePropsIsMutable();
+        super.addAll(values, props_);
+        
+        return this;
+      }
+      public Builder clearProps() {
+        props_ = java.util.Collections.emptyList();
+        bitField0_ = (bitField0_ & ~0x00000040);
+        
+        return this;
+      }
+      public Builder removeProps(int index) {
+        ensurePropsIsMutable();
+        props_.remove(index);
+        
+        return this;
+      }
+      
+      // @@protoc_insertion_point(builder_scope:pulsar.schema.SchemaInfo)
+    }
+    
+    static {
+      defaultInstance = new SchemaInfo(true);
+      defaultInstance.initFields();
+    }
+    
+    // @@protoc_insertion_point(class_scope:pulsar.schema.SchemaInfo)
+  }
+  
+  
+  static {
+  }
+  
+  // @@protoc_insertion_point(outer_class_scope)
+}
diff --git a/pulsar-broker/src/main/proto/SchemaRegistryFormat.proto 
b/pulsar-broker/src/main/proto/SchemaRegistryFormat.proto
new file mode 100644
index 0000000..e497eaf
--- /dev/null
+++ b/pulsar-broker/src/main/proto/SchemaRegistryFormat.proto
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+syntax = "proto2";
+
+package pulsar.schema;
+option java_package = "org.apache.pulsar.broker.service.schema.proto";
+option optimize_for = LITE_RUNTIME;
+
+message SchemaInfo {
+    enum SchemaType {
+        NONE = 1;
+        THRIFT = 2;
+        AVRO = 3;
+        JSON = 4;
+        PROTO = 5;
+    }
+    message KeyValuePair {
+        required string key = 1;
+        required string value = 2;
+    }
+    required string schema_id = 1;
+    required string user = 2;
+    required SchemaType type = 3;
+    required bytes schema = 4;
+    required int64 timestamp = 5;
+    required bool deleted = 6;
+
+    repeated KeyValuePair props = 7;
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index d6756b4..ce053fe 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -96,6 +96,7 @@ import 
org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.schema.SchemaVersion;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.compaction.CompactedTopic;
 import org.apache.pulsar.compaction.Compactor;
@@ -335,7 +336,7 @@ public class PersistentTopicTest {
         String role = "appid1";
         // 1. simple add producer
         Producer producer = new Producer(topic, serverCnx, 1 /* producer id 
*/, "prod-name",
-                role, false, null);
+                role, false, null, SchemaVersion.Latest);
         topic.addProducer(producer);
         assertEquals(topic.getProducers().size(), 1);
 
@@ -351,7 +352,7 @@ public class PersistentTopicTest {
         // 3. add producer for a different topic
         PersistentTopic failTopic = new PersistentTopic(failTopicName, 
ledgerMock, brokerService);
         Producer failProducer = new Producer(failTopic, serverCnx, 2 /* 
producer id */, "prod-name",
-                role, false, null);
+                role, false, null, SchemaVersion.Latest);
         try {
             topic.addProducer(failProducer);
             fail("should have failed");
@@ -371,18 +372,18 @@ public class PersistentTopicTest {
         PersistentTopic topic = new PersistentTopic(successTopicName, 
ledgerMock, brokerService);
         String role = "appid1";
         // 1. add producer1
-        Producer producer = new Producer(topic, serverCnx, 1 /* producer id 
*/, "prod-name1", role, false, null);
+        Producer producer = new Producer(topic, serverCnx, 1 /* producer id 
*/, "prod-name1", role, false, null, SchemaVersion.Latest);
         topic.addProducer(producer);
         assertEquals(topic.getProducers().size(), 1);
 
         // 2. add producer2
-        Producer producer2 = new Producer(topic, serverCnx, 2 /* producer id 
*/, "prod-name2", role, false, null);
+        Producer producer2 = new Producer(topic, serverCnx, 2 /* producer id 
*/, "prod-name2", role, false, null, SchemaVersion.Latest);
         topic.addProducer(producer2);
         assertEquals(topic.getProducers().size(), 2);
 
         // 3. add producer3 but reached maxProducersPerTopic
         try {
-            Producer producer3 = new Producer(topic, serverCnx, 3 /* producer 
id */, "prod-name3", role, false, null);
+            Producer producer3 = new Producer(topic, serverCnx, 3 /* producer 
id */, "prod-name3", role, false, null, SchemaVersion.Latest);
             topic.addProducer(producer3);
             fail("should have failed");
         } catch (BrokerServiceException e) {
@@ -721,7 +722,7 @@ public class PersistentTopicTest {
         // 2. delete topic with producer
         topic = (PersistentTopic) 
brokerService.getTopic(successTopicName).get();
         Producer producer = new Producer(topic, serverCnx, 1 /* producer id 
*/, "prod-name",
-                role, false, null);
+                role, false, null, SchemaVersion.Latest);
         topic.addProducer(producer);
 
         assertTrue(topic.delete().isCompletedExceptionally());
@@ -877,7 +878,7 @@ public class PersistentTopicTest {
             String role = "appid1";
             Thread.sleep(10); /* delay to ensure that the delete gets executed 
first */
             Producer producer = new Producer(topic, serverCnx, 1 /* producer 
id */, "prod-name",
-                    role, false, null);
+                    role, false, null, SchemaVersion.Latest);
             topic.addProducer(producer);
             fail("Should have failed");
         } catch (BrokerServiceException e) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index ded2f00..9fb02ea 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -35,6 +35,12 @@ import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 
+import com.google.common.collect.Maps;
+import com.google.protobuf.ByteString;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.embedded.EmbeddedChannel;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
@@ -43,9 +49,7 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-
 import javax.naming.AuthenticationException;
-
 import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
@@ -70,6 +74,7 @@ import 
org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.ServerCnx.State;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.service.schema.DefaultSchemaRegistryService;
 import org.apache.pulsar.broker.service.utils.ClientChannelHelper;
 import org.apache.pulsar.common.api.ByteBufPair;
 import org.apache.pulsar.common.api.Commands;
@@ -103,14 +108,6 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import com.google.common.collect.Maps;
-import com.google.protobuf.ByteString;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-import io.netty.channel.embedded.EmbeddedChannel;
-import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
-
 /**
  */
 @Test
@@ -143,6 +140,8 @@ public class ServerCnxTest {
     public void setup() throws Exception {
         svcConfig = spy(new ServiceConfiguration());
         pulsar = spy(new PulsarService(svcConfig));
+        doReturn(new 
DefaultSchemaRegistryService()).when(pulsar).getSchemaRegistryService();
+
         svcConfig.setKeepAliveIntervalSeconds(inSec(1, TimeUnit.SECONDS));
         svcConfig.setBacklogQuotaCheckEnabled(false);
         doReturn(svcConfig).when(pulsar).getConfiguration();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java
index 5d2b922..a0f007a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java
@@ -23,12 +23,12 @@ import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
+import io.netty.channel.ChannelHandlerContext;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
-
 import org.apache.bookkeeper.test.PortManager;
 import org.apache.pulsar.client.api.PulsarClientException.LookupException;
 import org.apache.pulsar.client.impl.ConsumerBase;
@@ -36,12 +36,11 @@ import org.apache.pulsar.client.impl.ProducerBase;
 import org.apache.pulsar.common.api.Commands;
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse.LookupType;
 import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
+import org.apache.pulsar.common.schema.SchemaVersion;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import io.netty.channel.ChannelHandlerContext;
-
 /**
  */
 public class ClientErrorsTest {
@@ -143,7 +142,7 @@ public class ClientErrorsTest {
 
         mockBrokerService.setHandleProducer((ctx, producer) -> {
             if (counter.incrementAndGet() == 2) {
-                
ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(), 
"default-producer"));
+                
ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(), 
"default-producer", SchemaVersion.Empty));
                 return;
             }
             ctx.writeAndFlush(Commands.newError(producer.getRequestId(), 
ServerError.ServiceNotReady, "msg"));
@@ -217,7 +216,8 @@ public class ClientErrorsTest {
                 ctx.writeAndFlush(Commands.newError(producer.getRequestId(), 
ServerError.AuthenticationError, "msg"));
                 return;
             }
-            
ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(), 
"default-producer"));
+            
ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(), 
"default-producer", SchemaVersion.Empty));
+
         });
 
         ProducerBase<byte[]> producer1 = (ProducerBase<byte[]>) 
client.newProducer().topic(topic1).create();
@@ -255,7 +255,7 @@ public class ClientErrorsTest {
             int i = counter.incrementAndGet();
             if (i == 1 || i == 5) {
                 // succeed on 1st and 5th attempts
-                
ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(), 
"default-producer"));
+                
ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(), 
"default-producer", SchemaVersion.Empty));
                 return;
             }
             ctx.writeAndFlush(Commands.newError(producer.getRequestId(), 
ServerError.PersistenceError, "msg"));
@@ -479,7 +479,7 @@ public class ClientErrorsTest {
                 ctx.writeAndFlush(Commands.newError(producer.getRequestId(), 
ServerError.AuthorizationError, "msg"));
                 return;
             }
-            
ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(), 
"default-producer"));
+            
ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(), 
"default-producer", SchemaVersion.Empty));
         });
 
         mockBrokerService.setHandleCloseProducer((ctx, closeProducer) -> {
@@ -583,7 +583,7 @@ public class ClientErrorsTest {
         });
 
         mockBrokerService.setHandleProducer((ctx, produce) -> {
-            
ctx.writeAndFlush(Commands.newProducerSuccess(produce.getRequestId(), 
"default-producer"));
+            
ctx.writeAndFlush(Commands.newProducerSuccess(produce.getRequestId(), 
"default-producer", SchemaVersion.Empty));
         });
 
         mockBrokerService.setHandleSend((ctx, sendCmd, headersAndPayload) -> {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java
index bdc8e32..91e5bbe 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java
@@ -47,6 +47,7 @@ import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetad
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSend;
 import org.apache.pulsar.common.lookup.data.LookupData;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.schema.SchemaVersion;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
 import org.eclipse.jetty.server.Request;
 import org.eclipse.jetty.server.Server;
@@ -174,7 +175,7 @@ public class MockBrokerService {
                 return;
             }
             // default
-            
ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(), 
"default-producer"));
+            
ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(), 
"default-producer", SchemaVersion.Empty));
         }
 
         @Override
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
index b51d29e..093f944 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.common.api;
 
+import static com.google.protobuf.ByteString.copyFromUtf8;
 import static com.scurrilous.circe.checksum.Crc32cIntChecksum.computeChecksum;
 import static com.scurrilous.circe.checksum.Crc32cIntChecksum.resumeChecksum;
 
@@ -37,11 +38,11 @@ import 
org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand.Type;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.ValidationError;
+import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseConsumer;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseProducer;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnect;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnected;
-import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange;
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandConsumerStatsResponse;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandError;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandFlow;
@@ -72,6 +73,7 @@ import 
org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
 import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
+import org.apache.pulsar.common.schema.SchemaVersion;
 import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
 import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream;
 
@@ -115,7 +117,7 @@ public class Commands {
         }
 
         if (authData != null) {
-            connectBuilder.setAuthData(ByteString.copyFromUtf8(authData));
+            connectBuilder.setAuthData(copyFromUtf8(authData));
         }
 
         if (originalPrincipal != null) {
@@ -165,15 +167,16 @@ public class Commands {
         return res;
     }
 
-    public static ByteBuf newProducerSuccess(long requestId, String 
producerName) {
-        return newProducerSuccess(requestId, producerName, -1);
+    public static ByteBuf newProducerSuccess(long requestId, String 
producerName, SchemaVersion schemaVersion) {
+        return newProducerSuccess(requestId, producerName, -1, schemaVersion);
     }
 
-    public static ByteBuf newProducerSuccess(long requestId, String 
producerName, long lastSequenceId) {
+    public static ByteBuf newProducerSuccess(long requestId, String 
producerName, long lastSequenceId, SchemaVersion schemaVersion) {
         CommandProducerSuccess.Builder producerSuccessBuilder = 
CommandProducerSuccess.newBuilder();
         producerSuccessBuilder.setRequestId(requestId);
         producerSuccessBuilder.setProducerName(producerName);
         producerSuccessBuilder.setLastSequenceId(lastSequenceId);
+        
producerSuccessBuilder.setSchemaVersion(ByteString.copyFrom(schemaVersion.bytes()));
         CommandProducerSuccess producerSuccess = 
producerSuccessBuilder.build();
         ByteBuf res = serializeWithSize(
                 
BaseCommand.newBuilder().setType(Type.PRODUCER_SUCCESS).setProducerSuccess(producerSuccess));
@@ -980,4 +983,5 @@ public class Commands {
     public static boolean peerSupportsActiveConsumerListener(int peerVersion) {
         return peerVersion >= ProtocolVersion.v12.getNumber();
     }
+
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
index 61339e1..c34f46c 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
@@ -288,6 +288,12 @@ public class TopicName implements ServiceUnitId {
         return cluster == null || 
Constants.GLOBAL_CLUSTER.equalsIgnoreCase(cluster);
     }
 
+    public String getSchemaName() {
+        return getProperty()
+            + "/" + getNamespacePortion()
+            + "/" + getLocalName();
+    }
+
     @Override
     public String toString() {
         return completeTopicName;
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/EmptyVersion.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/EmptyVersion.java
new file mode 100644
index 0000000..0aaefb3
--- /dev/null
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/EmptyVersion.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.schema;
+
+final public class EmptyVersion implements SchemaVersion {
+    private static final byte[] EMPTY = new byte[]{};
+
+    @Override
+    public byte[] bytes() {
+        return EMPTY;
+    }
+}
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/LatestVersion.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/LatestVersion.java
new file mode 100644
index 0000000..b26231c
--- /dev/null
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/LatestVersion.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.schema;
+
+final class LatestVersion implements SchemaVersion {
+    private static final byte[] EMPTY = new byte[]{};
+
+    @Override
+    public byte[] bytes() {
+        return EMPTY;
+    }
+}
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaData.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaData.java
new file mode 100644
index 0000000..5a5012c
--- /dev/null
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaData.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.schema;
+
+import java.util.Map;
+import lombok.Builder;
+import lombok.Data;
+
+@Builder
+@Data
+public class SchemaData {
+    private final SchemaType type;
+    private final boolean isDeleted;
+    private final long timestamp;
+    private final String user;
+    private final byte[] data;
+    private final Map<String, String> props;
+}
\ No newline at end of file
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
new file mode 100644
index 0000000..e9a01f0
--- /dev/null
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.schema;
+
+public enum SchemaType {
+    AVRO, PROTOBUF, THRIFT, JSON, NONE
+}
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaVersion.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaVersion.java
new file mode 100644
index 0000000..e31e45d
--- /dev/null
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaVersion.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.schema;
+
+public interface SchemaVersion {
+    SchemaVersion Latest = new LatestVersion();
+    SchemaVersion Empty = new EmptyVersion();
+
+    byte[] bytes();
+}

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to