This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 80dc0de  Schema registry 3/N (#1363)
80dc0de is described below

commit 80dc0deb86f2d8a58563094c24913968e24099d3
Author: Dave Rusek <dave.ru...@gmail.com>
AuthorDate: Wed Mar 14 00:16:57 2018 -0600

    Schema registry 3/N (#1363)
    
    * Schema Registry proto changes
    
    * Infrastructure to store schemas
    
    * A default schema registry implementation
    
    * Renumber schema fields
    
    * Update Pulsar API with schema changes
    
    * Revert field number change
    
    * Fix merge conflict
    
    * Fix broken merge
    
    * Address issues in review
    
    * Add schema type back to proto definition
    
    * Address comments regarding lombok usage
    
    * Remove reserved future enum fields
    
    * regenerate code from protobuf
    
    * Remove unused code
    
    * Add schema version to producer success message
    
    * plumb schema through to producer
    
    * Revert "Add schema version to producer success message"
    
    This reverts commit e7e72f468cf46f1605524a7399520c22763583c9.
    
    * Revert "Revert "Add schema version to producer success message""
    
    This reverts commit 7b902f6bdb1cb054e26577747ff4dd8c326a6248.
    
    * Persist schema on producer connect
    
    * Add principal to schema on publish
    
    * Reformat function for readability
    
    * Remove unused protoc profile
    
    * Rename put on schema registry to putIfAbsent
    
    * Reformat function for readability
    
    * Remove unused protoc profile
    
    * Rename put on schema registry to putIfAbsent
    
    * fix compile errors from parent branch changes
    
    * fix lombok tomfoolery on builder
    
    * plumb hash through and allow lookup by data
    
    * wip
    
    * run tests
    
    * wip: address review comments
    
    * switch underscore to slash in schema name
    
    * blah
    
    * Get duplicate schema detection to work
    
    * Fix protobuf version incompatibility
    
    * fix merge issues
    
    * Fix license headers
    
    * Address review
---
 .../service/schema/BookkeeperSchemaStorage.java    |  479 ++++++
 ...ge.java => BookkeeperSchemaStorageFactory.java} |   27 +-
 .../broker/service/schema/LongSchemaVersion.java   |   69 +
 .../service/schema/SchemaRegistryServiceImpl.java  |   12 +-
 .../broker/service/schema/SchemaStorage.java       |    2 +-
 .../broker/service/schema/SchemaStorageFormat.java | 1785 ++++++++++++++++++++
 .../SchemaStorageFormat.proto}                     |   34 +-
 .../service/BrokerServiceThrottlingTest.java       |    2 +
 .../broker/service/schema/SchemaServiceTest.java   |  255 +++
 .../apache/pulsar/common/schema/SchemaData.java    |    4 +-
 .../apache/pulsar/zookeeper/ZooKeeperCache.java    |   21 +
 11 files changed, 2658 insertions(+), 32 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
new file mode 100644
index 0000000..8ba8750
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
@@ -0,0 +1,479 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.schema;
+
+import static com.google.common.collect.Iterables.concat;
+import static com.google.common.collect.Lists.newArrayList;
+import static com.google.protobuf.ByteString.copyFrom;
+import static java.util.Collections.emptyMap;
+import static java.util.Objects.isNull;
+import static java.util.Objects.nonNull;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static 
org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.Functions.newSchemaEntry;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ByteString;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import javax.validation.constraints.NotNull;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.common.schema.SchemaVersion;
+import org.apache.pulsar.zookeeper.ZooKeeperCache;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+
+public class BookkeeperSchemaStorage implements SchemaStorage {
+    private static final String SchemaPath = "/schemas";
+    private static final List<ACL> Acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
+
+    private final PulsarService pulsar;
+    private final ZooKeeper zooKeeper;
+    private final ZooKeeperCache localZkCache;
+    private BookKeeper bookKeeper;
+
+    @VisibleForTesting
+    BookkeeperSchemaStorage(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.localZkCache = pulsar.getLocalZkCache();
+        this.zooKeeper = localZkCache.getZooKeeper();
+    }
+
+    @VisibleForTesting
+    public void init() throws KeeperException, InterruptedException {
+        try {
+            if (zooKeeper.exists(SchemaPath, false) == null) {
+                zooKeeper.create(SchemaPath, new byte[]{}, Acl, 
CreateMode.PERSISTENT);
+            }
+        } catch (KeeperException.NodeExistsException error) {
+            // race on startup, ignore.
+        }
+    }
+
+    public void start() throws IOException {
+        this.bookKeeper = pulsar.getBookKeeperClientFactory().create(
+            pulsar.getConfiguration(),
+            pulsar.getZkClient()
+        );
+    }
+
+    @Override
+    public CompletableFuture<SchemaVersion> put(String key, byte[] value, 
byte[] hash) {
+        return putSchemaIfAbsent(key, value, 
hash).thenApply(LongSchemaVersion::new);
+    }
+
+    @Override
+    public CompletableFuture<StoredSchema> get(String key, SchemaVersion 
version) {
+        if (version == SchemaVersion.Latest) {
+            return getSchema(key);
+        } else {
+            LongSchemaVersion longVersion = (LongSchemaVersion) version;
+            return getSchema(key, longVersion.getVersion());
+        }
+    }
+
+    @Override
+    public CompletableFuture<SchemaVersion> delete(String key) {
+        return deleteSchema(key).thenApply(LongSchemaVersion::new);
+    }
+
+    @NotNull
+    private CompletableFuture<StoredSchema> getSchema(String schemaId) {
+        return getSchemaLocator(getSchemaPath(schemaId)).thenCompose(locator 
-> {
+
+            if (!locator.isPresent()) {
+                return completedFuture(null);
+            }
+
+            SchemaStorageFormat.SchemaLocator schemaLocator = 
locator.get().locator;
+            return readSchemaEntry(schemaLocator.getInfo().getPosition())
+                .thenApply(entry ->
+                    new StoredSchema(
+                        entry.getSchemaData().toByteArray(),
+                        new 
LongSchemaVersion(schemaLocator.getInfo().getVersion()),
+                        emptyMap()
+                    )
+                );
+        });
+    }
+
+    @Override
+    public SchemaVersion versionFromBytes(byte[] version) {
+        ByteBuffer bb = ByteBuffer.wrap(version);
+        return new LongSchemaVersion(bb.getLong());
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (nonNull(bookKeeper)) {
+            bookKeeper.close();
+        }
+    }
+
+    @NotNull
+    private CompletableFuture<StoredSchema> getSchema(String schemaId, long 
version) {
+        return getSchemaLocator(getSchemaPath(schemaId)).thenCompose(locator 
-> {
+
+            if (!locator.isPresent()) {
+                return completedFuture(null);
+            }
+
+            SchemaStorageFormat.SchemaLocator schemaLocator = 
locator.get().locator;
+            if (version > schemaLocator.getInfo().getVersion()) {
+                return completedFuture(null);
+            }
+
+            return findSchemaEntryByVersion(schemaLocator.getIndexList(), 
version)
+                .thenApply(entry ->
+                    new StoredSchema(
+                        entry.getSchemaData().toByteArray(),
+                        new LongSchemaVersion(version),
+                        emptyMap()
+                    )
+                );
+        });
+    }
+
+    @NotNull
+    private CompletableFuture<Long> putSchema(String schemaId, byte[] data, 
byte[] hash) {
+        return 
getOrCreateSchemaLocator(getSchemaPath(schemaId)).thenCompose(locatorEntry ->
+            addNewSchemaEntryToStore(locatorEntry.locator.getIndexList(), 
data).thenCompose(position ->
+                updateSchemaLocator(schemaId, locatorEntry, position, hash)
+            )
+        );
+    }
+
+    @NotNull
+    private CompletableFuture<Long> putSchemaIfAbsent(String schemaId, byte[] 
data, byte[] hash) {
+        return 
getOrCreateSchemaLocator(getSchemaPath(schemaId)).thenCompose(locatorEntry -> {
+            byte[] storedHash = 
locatorEntry.locator.getInfo().getHash().toByteArray();
+            if (storedHash.length > 0 && Arrays.equals(storedHash, hash)) {
+                return 
completedFuture(locatorEntry.locator.getInfo().getVersion());
+            }
+            return findSchemaEntryByHash(locatorEntry.locator.getIndexList(), 
hash).thenCompose(version -> {
+                if (isNull(version)) {
+                    return 
addNewSchemaEntryToStore(locatorEntry.locator.getIndexList(), 
data).thenCompose(position ->
+                        updateSchemaLocator(schemaId, locatorEntry, position, 
hash)
+                    );
+                } else {
+                    return completedFuture(version);
+                }
+            });
+        });
+    }
+
+    @NotNull
+    private CompletableFuture<Long> deleteSchema(String schemaId) {
+        return getSchema(schemaId).thenCompose(schemaAndVersion -> {
+            if (isNull(schemaAndVersion)) {
+                return completedFuture(null);
+            } else {
+                return putSchema(schemaId, new byte[]{}, new byte[]{});
+            }
+        });
+    }
+
+    @NotNull
+    private String getSchemaPath(String schemaId) {
+        return SchemaPath + "/" + schemaId;
+    }
+
+    @NotNull
+    private CompletableFuture<SchemaStorageFormat.PositionInfo> 
addNewSchemaEntryToStore(
+        List<SchemaStorageFormat.IndexEntry> index,
+        byte[] data
+    ) {
+        SchemaStorageFormat.SchemaEntry schemaEntry = newSchemaEntry(index, 
data);
+        return createLedger().thenCompose(ledgerHandle ->
+            addEntry(ledgerHandle, schemaEntry).thenApply(entryId ->
+                Functions.newPositionInfo(ledgerHandle.getId(), entryId)
+            )
+        );
+    }
+
+    @NotNull
+    private CompletableFuture<Long> updateSchemaLocator(
+        String schemaId,
+        LocatorEntry locatorEntry,
+        SchemaStorageFormat.PositionInfo position,
+        byte[] hash
+    ) {
+        long nextVersion = locatorEntry.locator.getInfo().getVersion() + 1;
+        SchemaStorageFormat.SchemaLocator locator = locatorEntry.locator;
+        SchemaStorageFormat.IndexEntry info =
+            SchemaStorageFormat.IndexEntry.newBuilder()
+                .setVersion(nextVersion)
+                .setPosition(position)
+                .setHash(copyFrom(hash))
+                .build();
+        return updateSchemaLocator(getSchemaPath(schemaId),
+            SchemaStorageFormat.SchemaLocator.newBuilder()
+                .setInfo(info)
+                .addAllIndex(
+                    concat(locator.getIndexList(), newArrayList(info))
+                ).build(), locatorEntry.zkZnodeVersion
+        ).thenApply(ignore -> nextVersion);
+    }
+
+    @NotNull
+    private CompletableFuture<SchemaStorageFormat.SchemaEntry> 
findSchemaEntryByVersion(
+        List<SchemaStorageFormat.IndexEntry> index,
+        long version
+    ) {
+
+        if (index.isEmpty()) {
+            return completedFuture(null);
+        }
+
+        SchemaStorageFormat.IndexEntry lowest = index.get(0);
+        if (version < lowest.getVersion()) {
+            return readSchemaEntry(lowest.getPosition())
+                .thenCompose(entry -> 
findSchemaEntryByVersion(entry.getIndexList(), version));
+        }
+
+        for (SchemaStorageFormat.IndexEntry entry : index) {
+            if (entry.getVersion() == version) {
+                return readSchemaEntry(entry.getPosition());
+            } else if (entry.getVersion() > version) {
+                break;
+            }
+        }
+
+        return completedFuture(null);
+    }
+
+    @NotNull
+    private CompletableFuture<Long> findSchemaEntryByHash(
+        List<SchemaStorageFormat.IndexEntry> index,
+        byte[] hash
+    ) {
+
+        if (index.isEmpty()) {
+            return completedFuture(null);
+        }
+
+        for (SchemaStorageFormat.IndexEntry entry : index) {
+            if (Arrays.equals(entry.getHash().toByteArray(), hash)) {
+                return completedFuture(entry.getVersion());
+            }
+        }
+
+        return readSchemaEntry(index.get(0).getPosition())
+            .thenCompose(entry -> findSchemaEntryByHash(entry.getIndexList(), 
hash));
+
+    }
+
+    @NotNull
+    private CompletableFuture<SchemaStorageFormat.SchemaEntry> readSchemaEntry(
+        SchemaStorageFormat.PositionInfo position
+    ) {
+        return openLedger(position.getLedgerId())
+            .thenCompose((ledger) ->
+                Functions.getLedgerEntry(ledger, position.getEntryId())
+                    .thenCompose(entry -> closeLedger(ledger)
+                        .thenApply(ignore -> entry)
+                    )
+            ).thenCompose(Functions::parseSchemaEntry);
+    }
+
+    @NotNull
+    private CompletableFuture<Void> updateSchemaLocator(String id, 
SchemaStorageFormat.SchemaLocator schema, int version) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        zooKeeper.setData(id, schema.toByteArray(), version, (rc, path, ctx, 
stat) -> {
+            Code code = Code.get(rc);
+            if (code != Code.OK) {
+                future.completeExceptionally(KeeperException.create(code));
+            } else {
+                future.complete(null);
+            }
+        }, null);
+        return future;
+    }
+
+    @NotNull
+    private CompletableFuture<Optional<LocatorEntry>> getSchemaLocator(String 
schema) {
+        return localZkCache.getEntryAsync(schema, new 
SchemaLocatorDeserializer()).thenApply(optional ->
+            optional.map(entry -> new LocatorEntry(entry.getKey(), 
entry.getValue().getVersion()))
+        );
+    }
+
+    @NotNull
+    private CompletableFuture<LocatorEntry> getOrCreateSchemaLocator(String 
schema) {
+        return getSchemaLocator(schema).thenCompose(schemaLocatorStatEntry -> {
+            if (schemaLocatorStatEntry.isPresent()) {
+                return completedFuture(schemaLocatorStatEntry.get());
+            } else {
+                SchemaStorageFormat.SchemaLocator locator = 
SchemaStorageFormat.SchemaLocator.newBuilder()
+                    .setInfo(SchemaStorageFormat.IndexEntry.newBuilder()
+                        .setVersion(-1L)
+                        .setHash(ByteString.EMPTY)
+                        
.setPosition(SchemaStorageFormat.PositionInfo.newBuilder()
+                            .setEntryId(-1L)
+                            .setLedgerId(-1L)
+                        )
+                    ).build();
+
+                CompletableFuture<LocatorEntry> future = new 
CompletableFuture<>();
+
+                ZkUtils.asyncCreateFullPathOptimistic(zooKeeper, schema, 
locator.toByteArray(), Acl, CreateMode.PERSISTENT,
+                    (rc, path, ctx, name) -> {
+                        Code code = Code.get(rc);
+                        if (code != Code.OK) {
+                            
future.completeExceptionally(KeeperException.create(code));
+                        } else {
+                            future.complete(new LocatorEntry(locator, -1));
+                        }
+                    }, null);
+
+                return future;
+            }
+        });
+    }
+
+    @NotNull
+    private CompletableFuture<Long> addEntry(LedgerHandle ledgerHandle, 
SchemaStorageFormat.SchemaEntry entry) {
+        final CompletableFuture<Long> future = new CompletableFuture<>();
+        ledgerHandle.asyncAddEntry(entry.toByteArray(),
+            (rc, handle, entryId, ctx) -> {
+                if (rc != BKException.Code.OK) {
+                    future.completeExceptionally(BKException.create(rc));
+                } else {
+                    future.complete(entryId);
+                }
+            }, null
+        );
+        return future;
+    }
+
+    @NotNull
+    private CompletableFuture<LedgerHandle> createLedger() {
+        final CompletableFuture<LedgerHandle> future = new 
CompletableFuture<>();
+        bookKeeper.asyncCreateLedger(0, 0, DigestType.MAC, new byte[]{},
+            (rc, handle, ctx) -> {
+                if (rc != BKException.Code.OK) {
+                    future.completeExceptionally(BKException.create(rc));
+                } else {
+                    future.complete(handle);
+                }
+            }, null
+        );
+        return future;
+    }
+
+    @NotNull
+    private CompletableFuture<LedgerHandle> openLedger(Long ledgerId) {
+        final CompletableFuture<LedgerHandle> future = new 
CompletableFuture<>();
+        bookKeeper.asyncOpenLedger(ledgerId, DigestType.MAC, new byte[]{},
+            (rc, handle, ctx) -> {
+                if (rc != BKException.Code.OK) {
+                    future.completeExceptionally(BKException.create(rc));
+                } else {
+                    future.complete(handle);
+                }
+            }, null
+        );
+        return future;
+    }
+
+    @NotNull
+    private CompletableFuture<Void> closeLedger(LedgerHandle ledgerHandle) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        ledgerHandle.asyncClose((rc, handle, ctx) -> {
+            if (rc != BKException.Code.OK) {
+                future.completeExceptionally(BKException.create(rc));
+            } else {
+                future.complete(null);
+            }
+        }, null);
+        return future;
+    }
+
+    interface Functions {
+        static CompletableFuture<LedgerEntry> getLedgerEntry(LedgerHandle 
ledger, long entry) {
+            final CompletableFuture<LedgerEntry> future = new 
CompletableFuture<>();
+            ledger.asyncReadEntries(entry, entry,
+                (rc, handle, entries, ctx) -> {
+                    if (rc != BKException.Code.OK) {
+                        future.completeExceptionally(BKException.create(rc));
+                    } else {
+                        future.complete(entries.nextElement());
+                    }
+                }, null
+            );
+            return future;
+        }
+
+        static CompletableFuture<SchemaStorageFormat.SchemaEntry> 
parseSchemaEntry(LedgerEntry ledgerEntry) {
+            CompletableFuture<SchemaStorageFormat.SchemaEntry> result = new 
CompletableFuture<>();
+            try {
+                
result.complete(SchemaStorageFormat.SchemaEntry.parseFrom(ledgerEntry.getEntry()));
+            } catch (IOException e) {
+                result.completeExceptionally(e);
+            }
+            return result;
+        }
+
+        static SchemaStorageFormat.SchemaEntry newSchemaEntry(
+            List<SchemaStorageFormat.IndexEntry> index,
+            byte[] data
+        ) {
+            return SchemaStorageFormat.SchemaEntry.newBuilder()
+                .setSchemaData(copyFrom(data))
+                .addAllIndex(index)
+                .build();
+        }
+
+        static SchemaStorageFormat.PositionInfo newPositionInfo(long ledgerId, 
long entryId) {
+            return SchemaStorageFormat.PositionInfo.newBuilder()
+                .setLedgerId(ledgerId)
+                .setEntryId(entryId)
+                .build();
+        }
+    }
+
+    static class SchemaLocatorDeserializer implements 
ZooKeeperCache.Deserializer<SchemaStorageFormat.SchemaLocator> {
+        @Override
+        public SchemaStorageFormat.SchemaLocator deserialize(String key, 
byte[] content) throws Exception {
+            return SchemaStorageFormat.SchemaLocator.parseFrom(content);
+        }
+    }
+
+    static class LocatorEntry {
+        final SchemaStorageFormat.SchemaLocator locator;
+        final Integer zkZnodeVersion;
+
+        LocatorEntry(SchemaStorageFormat.SchemaLocator locator, Integer 
zkZnodeVersion) {
+            this.locator = locator;
+            this.zkZnodeVersion = zkZnodeVersion;
+        }
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorage.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorageFactory.java
similarity index 66%
copy from 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorage.java
copy to 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorageFactory.java
index 4d0d8af..4b25374 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorage.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorageFactory.java
@@ -18,19 +18,16 @@
  */
 package org.apache.pulsar.broker.service.schema;
 
-import java.util.concurrent.CompletableFuture;
-import org.apache.pulsar.common.schema.SchemaVersion;
-
-public interface SchemaStorage {
-
-    CompletableFuture<SchemaVersion> put(String key, byte[] value);
-
-    CompletableFuture<StoredSchema> get(String key, SchemaVersion version);
-
-    CompletableFuture<SchemaVersion> delete(String key);
-
-    SchemaVersion versionFromBytes(byte[] version);
-
-    void close() throws Exception;
-
+import javax.validation.constraints.NotNull;
+import org.apache.pulsar.broker.PulsarService;
+
+@SuppressWarnings("unused")
+public class BookkeeperSchemaStorageFactory implements SchemaStorageFactory {
+    @Override
+    @NotNull
+    public SchemaStorage create(PulsarService pulsar) throws Exception {
+        BookkeeperSchemaStorage service = new BookkeeperSchemaStorage(pulsar);
+        service.init();
+        return service;
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/LongSchemaVersion.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/LongSchemaVersion.java
new file mode 100644
index 0000000..a837cd4
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/LongSchemaVersion.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.schema;
+
+import com.google.common.base.MoreObjects;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+import org.apache.pulsar.common.schema.SchemaVersion;
+
+class LongSchemaVersion implements SchemaVersion {
+    private final long version;
+
+    LongSchemaVersion(long version) {
+        this.version = version;
+    }
+
+    public long getVersion() {
+        return version;
+    }
+
+    @Override
+    public byte[] bytes() {
+        ByteBuffer buffer = ByteBuffer.allocate(Long.SIZE);
+        buffer.putLong(version);
+        buffer.rewind();
+        return buffer.array();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        LongSchemaVersion that = (LongSchemaVersion) o;
+        return version == that.version;
+    }
+
+    @Override
+    public int hashCode() {
+
+        return Objects.hash(version);
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this)
+            .add("version", version)
+            .toString();
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
index 1aec039..d014f92 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
@@ -23,10 +23,13 @@ import static 
java.util.concurrent.CompletableFuture.completedFuture;
 import static 
org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl.Functions.toPairs;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.InvalidProtocolBufferException;
 import java.time.Clock;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -38,6 +41,7 @@ import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.schema.SchemaVersion;
 
 public class SchemaRegistryServiceImpl implements SchemaRegistryService {
+    private static HashFunction hashFunction = Hashing.sha256();
     private final SchemaStorage schemaStorage;
     private final Clock clock;
 
@@ -76,6 +80,7 @@ public class SchemaRegistryServiceImpl implements 
SchemaRegistryService {
     @Override
     @NotNull
     public CompletableFuture<SchemaVersion> putSchemaIfAbsent(String schemaId, 
SchemaData schema) {
+        byte[] context = hashFunction.hashBytes(schema.getData()).asBytes();
         SchemaRegistryFormat.SchemaInfo info = 
SchemaRegistryFormat.SchemaInfo.newBuilder()
             .setType(Functions.convertFromDomainType(schema.getType()))
             .setSchema(ByteString.copyFrom(schema.getData()))
@@ -85,14 +90,14 @@ public class SchemaRegistryServiceImpl implements 
SchemaRegistryService {
             .setTimestamp(clock.millis())
             .addAllProps(toPairs(schema.getProps()))
             .build();
-        return schemaStorage.put(schemaId, info.toByteArray());
+        return schemaStorage.put(schemaId, info.toByteArray(), context);
     }
 
     @Override
     @NotNull
     public CompletableFuture<SchemaVersion> deleteSchema(String schemaId, 
String user) {
         byte[] deletedEntry = deleted(schemaId, user).toByteArray();
-        return schemaStorage.put(schemaId, deletedEntry);
+        return schemaStorage.put(schemaId, deletedEntry, new byte[]{});
     }
 
     @Override
@@ -156,6 +161,9 @@ public class SchemaRegistryServiceImpl implements 
SchemaRegistryService {
         }
 
         static List<SchemaRegistryFormat.SchemaInfo.KeyValuePair> 
toPairs(Map<String, String> map) {
+            if (isNull(map)) {
+                return Collections.emptyList();
+            }
             List<SchemaRegistryFormat.SchemaInfo.KeyValuePair> pairs = new 
ArrayList<>(map.size());
             for (Map.Entry<String, String> entry : map.entrySet()) {
                 SchemaRegistryFormat.SchemaInfo.KeyValuePair.Builder builder =
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorage.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorage.java
index 4d0d8af..b0c8075 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorage.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorage.java
@@ -23,7 +23,7 @@ import org.apache.pulsar.common.schema.SchemaVersion;
 
 public interface SchemaStorage {
 
-    CompletableFuture<SchemaVersion> put(String key, byte[] value);
+    CompletableFuture<SchemaVersion> put(String key, byte[] value, byte[] 
hash);
 
     CompletableFuture<StoredSchema> get(String key, SchemaVersion version);
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorageFormat.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorageFormat.java
new file mode 100644
index 0000000..df95e69
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorageFormat.java
@@ -0,0 +1,1785 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+// Generated by the protocol buffer compiler.  DO NOT EDIT!
+// source: src/main/proto/SchemaStorageFormat.proto
+
+package org.apache.pulsar.broker.service.schema;
+
+public final class SchemaStorageFormat {
+  private SchemaStorageFormat() {}
+  public static void registerAllExtensions(
+      com.google.protobuf.ExtensionRegistryLite registry) {
+  }
+  public interface PositionInfoOrBuilder
+      extends com.google.protobuf.MessageLiteOrBuilder {
+    
+    // required int64 ledgerId = 1;
+    boolean hasLedgerId();
+    long getLedgerId();
+    
+    // required int64 entryId = 2;
+    boolean hasEntryId();
+    long getEntryId();
+  }
+  public static final class PositionInfo extends
+      com.google.protobuf.GeneratedMessageLite
+      implements PositionInfoOrBuilder {
+    // Use PositionInfo.newBuilder() to construct.
+    private PositionInfo(Builder builder) {
+      super(builder);
+    }
+    private PositionInfo(boolean noInit) {}
+    
+    private static final PositionInfo defaultInstance;
+    public static PositionInfo getDefaultInstance() {
+      return defaultInstance;
+    }
+    
+    public PositionInfo getDefaultInstanceForType() {
+      return defaultInstance;
+    }
+    
+    private int bitField0_;
+    // required int64 ledgerId = 1;
+    public static final int LEDGERID_FIELD_NUMBER = 1;
+    private long ledgerId_;
+    public boolean hasLedgerId() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    public long getLedgerId() {
+      return ledgerId_;
+    }
+    
+    // required int64 entryId = 2;
+    public static final int ENTRYID_FIELD_NUMBER = 2;
+    private long entryId_;
+    public boolean hasEntryId() {
+      return ((bitField0_ & 0x00000002) == 0x00000002);
+    }
+    public long getEntryId() {
+      return entryId_;
+    }
+    
+    private void initFields() {
+      ledgerId_ = 0L;
+      entryId_ = 0L;
+    }
+    private byte memoizedIsInitialized = -1;
+    public final boolean isInitialized() {
+      byte isInitialized = memoizedIsInitialized;
+      if (isInitialized != -1) return isInitialized == 1;
+      
+      if (!hasLedgerId()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      if (!hasEntryId()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      memoizedIsInitialized = 1;
+      return true;
+    }
+    
+    public void writeTo(com.google.protobuf.CodedOutputStream output)
+                        throws java.io.IOException {
+      getSerializedSize();
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeInt64(1, ledgerId_);
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        output.writeInt64(2, entryId_);
+      }
+    }
+    
+    private int memoizedSerializedSize = -1;
+    public int getSerializedSize() {
+      int size = memoizedSerializedSize;
+      if (size != -1) return size;
+    
+      size = 0;
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt64Size(1, ledgerId_);
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt64Size(2, entryId_);
+      }
+      memoizedSerializedSize = size;
+      return size;
+    }
+    
+    private static final long serialVersionUID = 0L;
+    @java.lang.Override
+    protected java.lang.Object writeReplace()
+        throws java.io.ObjectStreamException {
+      return super.writeReplace();
+    }
+    
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo 
parseFrom(
+        com.google.protobuf.ByteString data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data).buildParsed();
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo 
parseFrom(
+        com.google.protobuf.ByteString data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data, extensionRegistry)
+               .buildParsed();
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo 
parseFrom(byte[] data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data).buildParsed();
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo 
parseFrom(
+        byte[] data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data, extensionRegistry)
+               .buildParsed();
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo 
parseFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo 
parseFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo 
parseDelimitedFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo 
parseDelimitedFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo 
parseFrom(
+        com.google.protobuf.CodedInputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo 
parseFrom(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    
+    public static Builder newBuilder() { return Builder.create(); }
+    public Builder newBuilderForType() { return newBuilder(); }
+    public static Builder 
newBuilder(org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo
 prototype) {
+      return newBuilder().mergeFrom(prototype);
+    }
+    public Builder toBuilder() { return newBuilder(this); }
+    
+    public static final class Builder extends
+        com.google.protobuf.GeneratedMessageLite.Builder<
+          
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo, 
Builder>
+        implements 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfoOrBuilder
 {
+      // Construct using 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo.newBuilder()
+      private Builder() {
+        maybeForceBuilderInitialization();
+      }
+      
+      private void maybeForceBuilderInitialization() {
+      }
+      private static Builder create() {
+        return new Builder();
+      }
+      
+      public Builder clear() {
+        super.clear();
+        ledgerId_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000001);
+        entryId_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000002);
+        return this;
+      }
+      
+      public Builder clone() {
+        return create().mergeFrom(buildPartial());
+      }
+      
+      public 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo 
getDefaultInstanceForType() {
+        return 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo.getDefaultInstance();
+      }
+      
+      public 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo 
build() {
+        
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo result 
= buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(result);
+        }
+        return result;
+      }
+      
+      private 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo 
buildParsed()
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo result 
= buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(
+            result).asInvalidProtocolBufferException();
+        }
+        return result;
+      }
+      
+      public 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo 
buildPartial() {
+        
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo result 
= new 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo(this);
+        int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
+        if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        result.ledgerId_ = ledgerId_;
+        if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+          to_bitField0_ |= 0x00000002;
+        }
+        result.entryId_ = entryId_;
+        result.bitField0_ = to_bitField0_;
+        return result;
+      }
+      
+      public Builder 
mergeFrom(org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo
 other) {
+        if (other == 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo.getDefaultInstance())
 return this;
+        if (other.hasLedgerId()) {
+          setLedgerId(other.getLedgerId());
+        }
+        if (other.hasEntryId()) {
+          setEntryId(other.getEntryId());
+        }
+        return this;
+      }
+      
+      public final boolean isInitialized() {
+        if (!hasLedgerId()) {
+          
+          return false;
+        }
+        if (!hasEntryId()) {
+          
+          return false;
+        }
+        return true;
+      }
+      
+      public Builder mergeFrom(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        while (true) {
+          int tag = input.readTag();
+          switch (tag) {
+            case 0:
+              
+              return this;
+            default: {
+              if (!parseUnknownField(input, extensionRegistry, tag)) {
+                
+                return this;
+              }
+              break;
+            }
+            case 8: {
+              bitField0_ |= 0x00000001;
+              ledgerId_ = input.readInt64();
+              break;
+            }
+            case 16: {
+              bitField0_ |= 0x00000002;
+              entryId_ = input.readInt64();
+              break;
+            }
+          }
+        }
+      }
+      
+      private int bitField0_;
+      
+      // required int64 ledgerId = 1;
+      private long ledgerId_ ;
+      public boolean hasLedgerId() {
+        return ((bitField0_ & 0x00000001) == 0x00000001);
+      }
+      public long getLedgerId() {
+        return ledgerId_;
+      }
+      public Builder setLedgerId(long value) {
+        bitField0_ |= 0x00000001;
+        ledgerId_ = value;
+        
+        return this;
+      }
+      public Builder clearLedgerId() {
+        bitField0_ = (bitField0_ & ~0x00000001);
+        ledgerId_ = 0L;
+        
+        return this;
+      }
+      
+      // required int64 entryId = 2;
+      private long entryId_ ;
+      public boolean hasEntryId() {
+        return ((bitField0_ & 0x00000002) == 0x00000002);
+      }
+      public long getEntryId() {
+        return entryId_;
+      }
+      public Builder setEntryId(long value) {
+        bitField0_ |= 0x00000002;
+        entryId_ = value;
+        
+        return this;
+      }
+      public Builder clearEntryId() {
+        bitField0_ = (bitField0_ & ~0x00000002);
+        entryId_ = 0L;
+        
+        return this;
+      }
+      
+      // @@protoc_insertion_point(builder_scope:pulsar.schema.PositionInfo)
+    }
+    
+    static {
+      defaultInstance = new PositionInfo(true);
+      defaultInstance.initFields();
+    }
+    
+    // @@protoc_insertion_point(class_scope:pulsar.schema.PositionInfo)
+  }
+  
+  public interface IndexEntryOrBuilder
+      extends com.google.protobuf.MessageLiteOrBuilder {
+    
+    // required int64 version = 1;
+    boolean hasVersion();
+    long getVersion();
+    
+    // required .pulsar.schema.PositionInfo position = 2;
+    boolean hasPosition();
+    org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo 
getPosition();
+    
+    // required bytes hash = 3;
+    boolean hasHash();
+    com.google.protobuf.ByteString getHash();
+  }
+  public static final class IndexEntry extends
+      com.google.protobuf.GeneratedMessageLite
+      implements IndexEntryOrBuilder {
+    // Use IndexEntry.newBuilder() to construct.
+    private IndexEntry(Builder builder) {
+      super(builder);
+    }
+    private IndexEntry(boolean noInit) {}
+    
+    private static final IndexEntry defaultInstance;
+    public static IndexEntry getDefaultInstance() {
+      return defaultInstance;
+    }
+    
+    public IndexEntry getDefaultInstanceForType() {
+      return defaultInstance;
+    }
+    
+    private int bitField0_;
+    // required int64 version = 1;
+    public static final int VERSION_FIELD_NUMBER = 1;
+    private long version_;
+    public boolean hasVersion() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    public long getVersion() {
+      return version_;
+    }
+    
+    // required .pulsar.schema.PositionInfo position = 2;
+    public static final int POSITION_FIELD_NUMBER = 2;
+    private 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo 
position_;
+    public boolean hasPosition() {
+      return ((bitField0_ & 0x00000002) == 0x00000002);
+    }
+    public 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo 
getPosition() {
+      return position_;
+    }
+    
+    // required bytes hash = 3;
+    public static final int HASH_FIELD_NUMBER = 3;
+    private com.google.protobuf.ByteString hash_;
+    public boolean hasHash() {
+      return ((bitField0_ & 0x00000004) == 0x00000004);
+    }
+    public com.google.protobuf.ByteString getHash() {
+      return hash_;
+    }
+    
+    private void initFields() {
+      version_ = 0L;
+      position_ = 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo.getDefaultInstance();
+      hash_ = com.google.protobuf.ByteString.EMPTY;
+    }
+    private byte memoizedIsInitialized = -1;
+    public final boolean isInitialized() {
+      byte isInitialized = memoizedIsInitialized;
+      if (isInitialized != -1) return isInitialized == 1;
+      
+      if (!hasVersion()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      if (!hasPosition()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      if (!hasHash()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      if (!getPosition().isInitialized()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      memoizedIsInitialized = 1;
+      return true;
+    }
+    
+    public void writeTo(com.google.protobuf.CodedOutputStream output)
+                        throws java.io.IOException {
+      getSerializedSize();
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeInt64(1, version_);
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        output.writeMessage(2, position_);
+      }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        output.writeBytes(3, hash_);
+      }
+    }
+    
+    private int memoizedSerializedSize = -1;
+    public int getSerializedSize() {
+      int size = memoizedSerializedSize;
+      if (size != -1) return size;
+    
+      size = 0;
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt64Size(1, version_);
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeMessageSize(2, position_);
+      }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(3, hash_);
+      }
+      memoizedSerializedSize = size;
+      return size;
+    }
+    
+    private static final long serialVersionUID = 0L;
+    @java.lang.Override
+    protected java.lang.Object writeReplace()
+        throws java.io.ObjectStreamException {
+      return super.writeReplace();
+    }
+    
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry 
parseFrom(
+        com.google.protobuf.ByteString data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data).buildParsed();
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry 
parseFrom(
+        com.google.protobuf.ByteString data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data, extensionRegistry)
+               .buildParsed();
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry 
parseFrom(byte[] data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data).buildParsed();
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry 
parseFrom(
+        byte[] data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data, extensionRegistry)
+               .buildParsed();
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry 
parseFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry 
parseFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry 
parseDelimitedFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry 
parseDelimitedFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry 
parseFrom(
+        com.google.protobuf.CodedInputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry 
parseFrom(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    
+    public static Builder newBuilder() { return Builder.create(); }
+    public Builder newBuilderForType() { return newBuilder(); }
+    public static Builder 
newBuilder(org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry
 prototype) {
+      return newBuilder().mergeFrom(prototype);
+    }
+    public Builder toBuilder() { return newBuilder(this); }
+    
+    public static final class Builder extends
+        com.google.protobuf.GeneratedMessageLite.Builder<
+          
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry, Builder>
+        implements 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntryOrBuilder 
{
+      // Construct using 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry.newBuilder()
+      private Builder() {
+        maybeForceBuilderInitialization();
+      }
+      
+      private void maybeForceBuilderInitialization() {
+      }
+      private static Builder create() {
+        return new Builder();
+      }
+      
+      public Builder clear() {
+        super.clear();
+        version_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000001);
+        position_ = 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo.getDefaultInstance();
+        bitField0_ = (bitField0_ & ~0x00000002);
+        hash_ = com.google.protobuf.ByteString.EMPTY;
+        bitField0_ = (bitField0_ & ~0x00000004);
+        return this;
+      }
+      
+      public Builder clone() {
+        return create().mergeFrom(buildPartial());
+      }
+      
+      public 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry 
getDefaultInstanceForType() {
+        return 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry.getDefaultInstance();
+      }
+      
+      public 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry build() {
+        org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry 
result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(result);
+        }
+        return result;
+      }
+      
+      private 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry 
buildParsed()
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry 
result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(
+            result).asInvalidProtocolBufferException();
+        }
+        return result;
+      }
+      
+      public 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry 
buildPartial() {
+        org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry 
result = new 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry(this);
+        int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
+        if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        result.version_ = version_;
+        if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+          to_bitField0_ |= 0x00000002;
+        }
+        result.position_ = position_;
+        if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+          to_bitField0_ |= 0x00000004;
+        }
+        result.hash_ = hash_;
+        result.bitField0_ = to_bitField0_;
+        return result;
+      }
+      
+      public Builder 
mergeFrom(org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry
 other) {
+        if (other == 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry.getDefaultInstance())
 return this;
+        if (other.hasVersion()) {
+          setVersion(other.getVersion());
+        }
+        if (other.hasPosition()) {
+          mergePosition(other.getPosition());
+        }
+        if (other.hasHash()) {
+          setHash(other.getHash());
+        }
+        return this;
+      }
+      
+      public final boolean isInitialized() {
+        if (!hasVersion()) {
+          
+          return false;
+        }
+        if (!hasPosition()) {
+          
+          return false;
+        }
+        if (!hasHash()) {
+          
+          return false;
+        }
+        if (!getPosition().isInitialized()) {
+          
+          return false;
+        }
+        return true;
+      }
+      
+      public Builder mergeFrom(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        while (true) {
+          int tag = input.readTag();
+          switch (tag) {
+            case 0:
+              
+              return this;
+            default: {
+              if (!parseUnknownField(input, extensionRegistry, tag)) {
+                
+                return this;
+              }
+              break;
+            }
+            case 8: {
+              bitField0_ |= 0x00000001;
+              version_ = input.readInt64();
+              break;
+            }
+            case 18: {
+              
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo.Builder
 subBuilder = 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo.newBuilder();
+              if (hasPosition()) {
+                subBuilder.mergeFrom(getPosition());
+              }
+              input.readMessage(subBuilder, extensionRegistry);
+              setPosition(subBuilder.buildPartial());
+              break;
+            }
+            case 26: {
+              bitField0_ |= 0x00000004;
+              hash_ = input.readBytes();
+              break;
+            }
+          }
+        }
+      }
+      
+      private int bitField0_;
+      
+      // required int64 version = 1;
+      private long version_ ;
+      public boolean hasVersion() {
+        return ((bitField0_ & 0x00000001) == 0x00000001);
+      }
+      public long getVersion() {
+        return version_;
+      }
+      public Builder setVersion(long value) {
+        bitField0_ |= 0x00000001;
+        version_ = value;
+        
+        return this;
+      }
+      public Builder clearVersion() {
+        bitField0_ = (bitField0_ & ~0x00000001);
+        version_ = 0L;
+        
+        return this;
+      }
+      
+      // required .pulsar.schema.PositionInfo position = 2;
+      private 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo 
position_ = 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo.getDefaultInstance();
+      public boolean hasPosition() {
+        return ((bitField0_ & 0x00000002) == 0x00000002);
+      }
+      public 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo 
getPosition() {
+        return position_;
+      }
+      public Builder 
setPosition(org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo
 value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        position_ = value;
+        
+        bitField0_ |= 0x00000002;
+        return this;
+      }
+      public Builder setPosition(
+          
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo.Builder
 builderForValue) {
+        position_ = builderForValue.build();
+        
+        bitField0_ |= 0x00000002;
+        return this;
+      }
+      public Builder 
mergePosition(org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo
 value) {
+        if (((bitField0_ & 0x00000002) == 0x00000002) &&
+            position_ != 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo.getDefaultInstance())
 {
+          position_ =
+            
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo.newBuilder(position_).mergeFrom(value).buildPartial();
+        } else {
+          position_ = value;
+        }
+        
+        bitField0_ |= 0x00000002;
+        return this;
+      }
+      public Builder clearPosition() {
+        position_ = 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo.getDefaultInstance();
+        
+        bitField0_ = (bitField0_ & ~0x00000002);
+        return this;
+      }
+      
+      // required bytes hash = 3;
+      private com.google.protobuf.ByteString hash_ = 
com.google.protobuf.ByteString.EMPTY;
+      public boolean hasHash() {
+        return ((bitField0_ & 0x00000004) == 0x00000004);
+      }
+      public com.google.protobuf.ByteString getHash() {
+        return hash_;
+      }
+      public Builder setHash(com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000004;
+        hash_ = value;
+        
+        return this;
+      }
+      public Builder clearHash() {
+        bitField0_ = (bitField0_ & ~0x00000004);
+        hash_ = getDefaultInstance().getHash();
+        
+        return this;
+      }
+      
+      // @@protoc_insertion_point(builder_scope:pulsar.schema.IndexEntry)
+    }
+    
+    static {
+      defaultInstance = new IndexEntry(true);
+      defaultInstance.initFields();
+    }
+    
+    // @@protoc_insertion_point(class_scope:pulsar.schema.IndexEntry)
+  }
+  
+  public interface SchemaEntryOrBuilder
+      extends com.google.protobuf.MessageLiteOrBuilder {
+    
+    // required bytes schema_data = 2;
+    boolean hasSchemaData();
+    com.google.protobuf.ByteString getSchemaData();
+    
+    // repeated .pulsar.schema.IndexEntry index = 5;
+    
java.util.List<org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry>
 
+        getIndexList();
+    org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry 
getIndex(int index);
+    int getIndexCount();
+  }
+  public static final class SchemaEntry extends
+      com.google.protobuf.GeneratedMessageLite
+      implements SchemaEntryOrBuilder {
+    // Use SchemaEntry.newBuilder() to construct.
+    private SchemaEntry(Builder builder) {
+      super(builder);
+    }
+    private SchemaEntry(boolean noInit) {}
+    
+    private static final SchemaEntry defaultInstance;
+    public static SchemaEntry getDefaultInstance() {
+      return defaultInstance;
+    }
+    
+    public SchemaEntry getDefaultInstanceForType() {
+      return defaultInstance;
+    }
+    
+    private int bitField0_;
+    // required bytes schema_data = 2;
+    public static final int SCHEMA_DATA_FIELD_NUMBER = 2;
+    private com.google.protobuf.ByteString schemaData_;
+    public boolean hasSchemaData() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    public com.google.protobuf.ByteString getSchemaData() {
+      return schemaData_;
+    }
+    
+    // repeated .pulsar.schema.IndexEntry index = 5;
+    public static final int INDEX_FIELD_NUMBER = 5;
+    private 
java.util.List<org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry>
 index_;
+    public 
java.util.List<org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry>
 getIndexList() {
+      return index_;
+    }
+    public java.util.List<? extends 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntryOrBuilder>
 
+        getIndexOrBuilderList() {
+      return index_;
+    }
+    public int getIndexCount() {
+      return index_.size();
+    }
+    public 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry 
getIndex(int index) {
+      return index_.get(index);
+    }
+    public 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntryOrBuilder 
getIndexOrBuilder(
+        int index) {
+      return index_.get(index);
+    }
+    
+    private void initFields() {
+      schemaData_ = com.google.protobuf.ByteString.EMPTY;
+      index_ = java.util.Collections.emptyList();
+    }
+    private byte memoizedIsInitialized = -1;
+    public final boolean isInitialized() {
+      byte isInitialized = memoizedIsInitialized;
+      if (isInitialized != -1) return isInitialized == 1;
+      
+      if (!hasSchemaData()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      for (int i = 0; i < getIndexCount(); i++) {
+        if (!getIndex(i).isInitialized()) {
+          memoizedIsInitialized = 0;
+          return false;
+        }
+      }
+      memoizedIsInitialized = 1;
+      return true;
+    }
+    
+    public void writeTo(com.google.protobuf.CodedOutputStream output)
+                        throws java.io.IOException {
+      getSerializedSize();
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeBytes(2, schemaData_);
+      }
+      for (int i = 0; i < index_.size(); i++) {
+        output.writeMessage(5, index_.get(i));
+      }
+    }
+    
+    private int memoizedSerializedSize = -1;
+    public int getSerializedSize() {
+      int size = memoizedSerializedSize;
+      if (size != -1) return size;
+    
+      size = 0;
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(2, schemaData_);
+      }
+      for (int i = 0; i < index_.size(); i++) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeMessageSize(5, index_.get(i));
+      }
+      memoizedSerializedSize = size;
+      return size;
+    }
+    
+    private static final long serialVersionUID = 0L;
+    @java.lang.Override
+    protected java.lang.Object writeReplace()
+        throws java.io.ObjectStreamException {
+      return super.writeReplace();
+    }
+    
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaEntry 
parseFrom(
+        com.google.protobuf.ByteString data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data).buildParsed();
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaEntry 
parseFrom(
+        com.google.protobuf.ByteString data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data, extensionRegistry)
+               .buildParsed();
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaEntry 
parseFrom(byte[] data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data).buildParsed();
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaEntry 
parseFrom(
+        byte[] data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data, extensionRegistry)
+               .buildParsed();
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaEntry 
parseFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaEntry 
parseFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaEntry 
parseDelimitedFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaEntry 
parseDelimitedFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaEntry 
parseFrom(
+        com.google.protobuf.CodedInputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaEntry 
parseFrom(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    
+    public static Builder newBuilder() { return Builder.create(); }
+    public Builder newBuilderForType() { return newBuilder(); }
+    public static Builder 
newBuilder(org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaEntry
 prototype) {
+      return newBuilder().mergeFrom(prototype);
+    }
+    public Builder toBuilder() { return newBuilder(this); }
+    
+    public static final class Builder extends
+        com.google.protobuf.GeneratedMessageLite.Builder<
+          
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaEntry, 
Builder>
+        implements 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaEntryOrBuilder
 {
+      // Construct using 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaEntry.newBuilder()
+      private Builder() {
+        maybeForceBuilderInitialization();
+      }
+      
+      private void maybeForceBuilderInitialization() {
+      }
+      private static Builder create() {
+        return new Builder();
+      }
+      
+      public Builder clear() {
+        super.clear();
+        schemaData_ = com.google.protobuf.ByteString.EMPTY;
+        bitField0_ = (bitField0_ & ~0x00000001);
+        index_ = java.util.Collections.emptyList();
+        bitField0_ = (bitField0_ & ~0x00000002);
+        return this;
+      }
+      
+      public Builder clone() {
+        return create().mergeFrom(buildPartial());
+      }
+      
+      public 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaEntry 
getDefaultInstanceForType() {
+        return 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaEntry.getDefaultInstance();
+      }
+      
+      public 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaEntry build() 
{
+        
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaEntry result 
= buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(result);
+        }
+        return result;
+      }
+      
+      private 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaEntry 
buildParsed()
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaEntry result 
= buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(
+            result).asInvalidProtocolBufferException();
+        }
+        return result;
+      }
+      
+      public 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaEntry 
buildPartial() {
+        
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaEntry result 
= new 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaEntry(this);
+        int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
+        if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        result.schemaData_ = schemaData_;
+        if (((bitField0_ & 0x00000002) == 0x00000002)) {
+          index_ = java.util.Collections.unmodifiableList(index_);
+          bitField0_ = (bitField0_ & ~0x00000002);
+        }
+        result.index_ = index_;
+        result.bitField0_ = to_bitField0_;
+        return result;
+      }
+      
+      public Builder 
mergeFrom(org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaEntry
 other) {
+        if (other == 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaEntry.getDefaultInstance())
 return this;
+        if (other.hasSchemaData()) {
+          setSchemaData(other.getSchemaData());
+        }
+        if (!other.index_.isEmpty()) {
+          if (index_.isEmpty()) {
+            index_ = other.index_;
+            bitField0_ = (bitField0_ & ~0x00000002);
+          } else {
+            ensureIndexIsMutable();
+            index_.addAll(other.index_);
+          }
+          
+        }
+        return this;
+      }
+      
+      public final boolean isInitialized() {
+        if (!hasSchemaData()) {
+          
+          return false;
+        }
+        for (int i = 0; i < getIndexCount(); i++) {
+          if (!getIndex(i).isInitialized()) {
+            
+            return false;
+          }
+        }
+        return true;
+      }
+      
+      public Builder mergeFrom(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        while (true) {
+          int tag = input.readTag();
+          switch (tag) {
+            case 0:
+              
+              return this;
+            default: {
+              if (!parseUnknownField(input, extensionRegistry, tag)) {
+                
+                return this;
+              }
+              break;
+            }
+            case 18: {
+              bitField0_ |= 0x00000001;
+              schemaData_ = input.readBytes();
+              break;
+            }
+            case 42: {
+              
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry.Builder 
subBuilder = 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry.newBuilder();
+              input.readMessage(subBuilder, extensionRegistry);
+              addIndex(subBuilder.buildPartial());
+              break;
+            }
+          }
+        }
+      }
+      
+      private int bitField0_;
+      
+      // required bytes schema_data = 2;
+      private com.google.protobuf.ByteString schemaData_ = 
com.google.protobuf.ByteString.EMPTY;
+      public boolean hasSchemaData() {
+        return ((bitField0_ & 0x00000001) == 0x00000001);
+      }
+      public com.google.protobuf.ByteString getSchemaData() {
+        return schemaData_;
+      }
+      public Builder setSchemaData(com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000001;
+        schemaData_ = value;
+        
+        return this;
+      }
+      public Builder clearSchemaData() {
+        bitField0_ = (bitField0_ & ~0x00000001);
+        schemaData_ = getDefaultInstance().getSchemaData();
+        
+        return this;
+      }
+      
+      // repeated .pulsar.schema.IndexEntry index = 5;
+      private 
java.util.List<org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry>
 index_ =
+        java.util.Collections.emptyList();
+      private void ensureIndexIsMutable() {
+        if (!((bitField0_ & 0x00000002) == 0x00000002)) {
+          index_ = new 
java.util.ArrayList<org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry>(index_);
+          bitField0_ |= 0x00000002;
+         }
+      }
+      
+      public 
java.util.List<org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry>
 getIndexList() {
+        return java.util.Collections.unmodifiableList(index_);
+      }
+      public int getIndexCount() {
+        return index_.size();
+      }
+      public 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry 
getIndex(int index) {
+        return index_.get(index);
+      }
+      public Builder setIndex(
+          int index, 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        ensureIndexIsMutable();
+        index_.set(index, value);
+        
+        return this;
+      }
+      public Builder setIndex(
+          int index, 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry.Builder 
builderForValue) {
+        ensureIndexIsMutable();
+        index_.set(index, builderForValue.build());
+        
+        return this;
+      }
+      public Builder 
addIndex(org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry 
value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        ensureIndexIsMutable();
+        index_.add(value);
+        
+        return this;
+      }
+      public Builder addIndex(
+          int index, 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        ensureIndexIsMutable();
+        index_.add(index, value);
+        
+        return this;
+      }
+      public Builder addIndex(
+          
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry.Builder 
builderForValue) {
+        ensureIndexIsMutable();
+        index_.add(builderForValue.build());
+        
+        return this;
+      }
+      public Builder addIndex(
+          int index, 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry.Builder 
builderForValue) {
+        ensureIndexIsMutable();
+        index_.add(index, builderForValue.build());
+        
+        return this;
+      }
+      public Builder addAllIndex(
+          java.lang.Iterable<? extends 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry> values) 
{
+        ensureIndexIsMutable();
+        super.addAll(values, index_);
+        
+        return this;
+      }
+      public Builder clearIndex() {
+        index_ = java.util.Collections.emptyList();
+        bitField0_ = (bitField0_ & ~0x00000002);
+        
+        return this;
+      }
+      public Builder removeIndex(int index) {
+        ensureIndexIsMutable();
+        index_.remove(index);
+        
+        return this;
+      }
+      
+      // @@protoc_insertion_point(builder_scope:pulsar.schema.SchemaEntry)
+    }
+    
+    static {
+      defaultInstance = new SchemaEntry(true);
+      defaultInstance.initFields();
+    }
+    
+    // @@protoc_insertion_point(class_scope:pulsar.schema.SchemaEntry)
+  }
+  
+  public interface SchemaLocatorOrBuilder
+      extends com.google.protobuf.MessageLiteOrBuilder {
+    
+    // required .pulsar.schema.IndexEntry info = 1;
+    boolean hasInfo();
+    org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry 
getInfo();
+    
+    // repeated .pulsar.schema.IndexEntry index = 2;
+    
java.util.List<org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry>
 
+        getIndexList();
+    org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry 
getIndex(int index);
+    int getIndexCount();
+  }
+  public static final class SchemaLocator extends
+      com.google.protobuf.GeneratedMessageLite
+      implements SchemaLocatorOrBuilder {
+    // Use SchemaLocator.newBuilder() to construct.
+    private SchemaLocator(Builder builder) {
+      super(builder);
+    }
+    private SchemaLocator(boolean noInit) {}
+    
+    private static final SchemaLocator defaultInstance;
+    public static SchemaLocator getDefaultInstance() {
+      return defaultInstance;
+    }
+    
+    public SchemaLocator getDefaultInstanceForType() {
+      return defaultInstance;
+    }
+    
+    private int bitField0_;
+    // required .pulsar.schema.IndexEntry info = 1;
+    public static final int INFO_FIELD_NUMBER = 1;
+    private 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry info_;
+    public boolean hasInfo() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    public 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry 
getInfo() {
+      return info_;
+    }
+    
+    // repeated .pulsar.schema.IndexEntry index = 2;
+    public static final int INDEX_FIELD_NUMBER = 2;
+    private 
java.util.List<org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry>
 index_;
+    public 
java.util.List<org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry>
 getIndexList() {
+      return index_;
+    }
+    public java.util.List<? extends 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntryOrBuilder>
 
+        getIndexOrBuilderList() {
+      return index_;
+    }
+    public int getIndexCount() {
+      return index_.size();
+    }
+    public 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry 
getIndex(int index) {
+      return index_.get(index);
+    }
+    public 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntryOrBuilder 
getIndexOrBuilder(
+        int index) {
+      return index_.get(index);
+    }
+    
+    private void initFields() {
+      info_ = 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry.getDefaultInstance();
+      index_ = java.util.Collections.emptyList();
+    }
+    private byte memoizedIsInitialized = -1;
+    public final boolean isInitialized() {
+      byte isInitialized = memoizedIsInitialized;
+      if (isInitialized != -1) return isInitialized == 1;
+      
+      if (!hasInfo()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      if (!getInfo().isInitialized()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      for (int i = 0; i < getIndexCount(); i++) {
+        if (!getIndex(i).isInitialized()) {
+          memoizedIsInitialized = 0;
+          return false;
+        }
+      }
+      memoizedIsInitialized = 1;
+      return true;
+    }
+    
+    public void writeTo(com.google.protobuf.CodedOutputStream output)
+                        throws java.io.IOException {
+      getSerializedSize();
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeMessage(1, info_);
+      }
+      for (int i = 0; i < index_.size(); i++) {
+        output.writeMessage(2, index_.get(i));
+      }
+    }
+    
+    private int memoizedSerializedSize = -1;
+    public int getSerializedSize() {
+      int size = memoizedSerializedSize;
+      if (size != -1) return size;
+    
+      size = 0;
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeMessageSize(1, info_);
+      }
+      for (int i = 0; i < index_.size(); i++) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeMessageSize(2, index_.get(i));
+      }
+      memoizedSerializedSize = size;
+      return size;
+    }
+    
+    private static final long serialVersionUID = 0L;
+    @java.lang.Override
+    protected java.lang.Object writeReplace()
+        throws java.io.ObjectStreamException {
+      return super.writeReplace();
+    }
+    
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator 
parseFrom(
+        com.google.protobuf.ByteString data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data).buildParsed();
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator 
parseFrom(
+        com.google.protobuf.ByteString data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data, extensionRegistry)
+               .buildParsed();
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator 
parseFrom(byte[] data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data).buildParsed();
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator 
parseFrom(
+        byte[] data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data, extensionRegistry)
+               .buildParsed();
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator 
parseFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator 
parseFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator 
parseDelimitedFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator 
parseDelimitedFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator 
parseFrom(
+        com.google.protobuf.CodedInputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator 
parseFrom(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    
+    public static Builder newBuilder() { return Builder.create(); }
+    public Builder newBuilderForType() { return newBuilder(); }
+    public static Builder 
newBuilder(org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator
 prototype) {
+      return newBuilder().mergeFrom(prototype);
+    }
+    public Builder toBuilder() { return newBuilder(this); }
+    
+    public static final class Builder extends
+        com.google.protobuf.GeneratedMessageLite.Builder<
+          
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator, 
Builder>
+        implements 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocatorOrBuilder
 {
+      // Construct using 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator.newBuilder()
+      private Builder() {
+        maybeForceBuilderInitialization();
+      }
+      
+      private void maybeForceBuilderInitialization() {
+      }
+      private static Builder create() {
+        return new Builder();
+      }
+      
+      public Builder clear() {
+        super.clear();
+        info_ = 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry.getDefaultInstance();
+        bitField0_ = (bitField0_ & ~0x00000001);
+        index_ = java.util.Collections.emptyList();
+        bitField0_ = (bitField0_ & ~0x00000002);
+        return this;
+      }
+      
+      public Builder clone() {
+        return create().mergeFrom(buildPartial());
+      }
+      
+      public 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator 
getDefaultInstanceForType() {
+        return 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator.getDefaultInstance();
+      }
+      
+      public 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator 
build() {
+        
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator 
result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(result);
+        }
+        return result;
+      }
+      
+      private 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator 
buildParsed()
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator 
result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(
+            result).asInvalidProtocolBufferException();
+        }
+        return result;
+      }
+      
+      public 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator 
buildPartial() {
+        
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator 
result = new 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator(this);
+        int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
+        if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        result.info_ = info_;
+        if (((bitField0_ & 0x00000002) == 0x00000002)) {
+          index_ = java.util.Collections.unmodifiableList(index_);
+          bitField0_ = (bitField0_ & ~0x00000002);
+        }
+        result.index_ = index_;
+        result.bitField0_ = to_bitField0_;
+        return result;
+      }
+      
+      public Builder 
mergeFrom(org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator
 other) {
+        if (other == 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator.getDefaultInstance())
 return this;
+        if (other.hasInfo()) {
+          mergeInfo(other.getInfo());
+        }
+        if (!other.index_.isEmpty()) {
+          if (index_.isEmpty()) {
+            index_ = other.index_;
+            bitField0_ = (bitField0_ & ~0x00000002);
+          } else {
+            ensureIndexIsMutable();
+            index_.addAll(other.index_);
+          }
+          
+        }
+        return this;
+      }
+      
+      public final boolean isInitialized() {
+        if (!hasInfo()) {
+          
+          return false;
+        }
+        if (!getInfo().isInitialized()) {
+          
+          return false;
+        }
+        for (int i = 0; i < getIndexCount(); i++) {
+          if (!getIndex(i).isInitialized()) {
+            
+            return false;
+          }
+        }
+        return true;
+      }
+      
+      public Builder mergeFrom(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        while (true) {
+          int tag = input.readTag();
+          switch (tag) {
+            case 0:
+              
+              return this;
+            default: {
+              if (!parseUnknownField(input, extensionRegistry, tag)) {
+                
+                return this;
+              }
+              break;
+            }
+            case 10: {
+              
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry.Builder 
subBuilder = 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry.newBuilder();
+              if (hasInfo()) {
+                subBuilder.mergeFrom(getInfo());
+              }
+              input.readMessage(subBuilder, extensionRegistry);
+              setInfo(subBuilder.buildPartial());
+              break;
+            }
+            case 18: {
+              
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry.Builder 
subBuilder = 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry.newBuilder();
+              input.readMessage(subBuilder, extensionRegistry);
+              addIndex(subBuilder.buildPartial());
+              break;
+            }
+          }
+        }
+      }
+      
+      private int bitField0_;
+      
+      // required .pulsar.schema.IndexEntry info = 1;
+      private 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry info_ = 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry.getDefaultInstance();
+      public boolean hasInfo() {
+        return ((bitField0_ & 0x00000001) == 0x00000001);
+      }
+      public 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry 
getInfo() {
+        return info_;
+      }
+      public Builder 
setInfo(org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry 
value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        info_ = value;
+        
+        bitField0_ |= 0x00000001;
+        return this;
+      }
+      public Builder setInfo(
+          
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry.Builder 
builderForValue) {
+        info_ = builderForValue.build();
+        
+        bitField0_ |= 0x00000001;
+        return this;
+      }
+      public Builder 
mergeInfo(org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry
 value) {
+        if (((bitField0_ & 0x00000001) == 0x00000001) &&
+            info_ != 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry.getDefaultInstance())
 {
+          info_ =
+            
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry.newBuilder(info_).mergeFrom(value).buildPartial();
+        } else {
+          info_ = value;
+        }
+        
+        bitField0_ |= 0x00000001;
+        return this;
+      }
+      public Builder clearInfo() {
+        info_ = 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry.getDefaultInstance();
+        
+        bitField0_ = (bitField0_ & ~0x00000001);
+        return this;
+      }
+      
+      // repeated .pulsar.schema.IndexEntry index = 2;
+      private 
java.util.List<org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry>
 index_ =
+        java.util.Collections.emptyList();
+      private void ensureIndexIsMutable() {
+        if (!((bitField0_ & 0x00000002) == 0x00000002)) {
+          index_ = new 
java.util.ArrayList<org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry>(index_);
+          bitField0_ |= 0x00000002;
+         }
+      }
+      
+      public 
java.util.List<org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry>
 getIndexList() {
+        return java.util.Collections.unmodifiableList(index_);
+      }
+      public int getIndexCount() {
+        return index_.size();
+      }
+      public 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry 
getIndex(int index) {
+        return index_.get(index);
+      }
+      public Builder setIndex(
+          int index, 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        ensureIndexIsMutable();
+        index_.set(index, value);
+        
+        return this;
+      }
+      public Builder setIndex(
+          int index, 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry.Builder 
builderForValue) {
+        ensureIndexIsMutable();
+        index_.set(index, builderForValue.build());
+        
+        return this;
+      }
+      public Builder 
addIndex(org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry 
value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        ensureIndexIsMutable();
+        index_.add(value);
+        
+        return this;
+      }
+      public Builder addIndex(
+          int index, 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        ensureIndexIsMutable();
+        index_.add(index, value);
+        
+        return this;
+      }
+      public Builder addIndex(
+          
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry.Builder 
builderForValue) {
+        ensureIndexIsMutable();
+        index_.add(builderForValue.build());
+        
+        return this;
+      }
+      public Builder addIndex(
+          int index, 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry.Builder 
builderForValue) {
+        ensureIndexIsMutable();
+        index_.add(index, builderForValue.build());
+        
+        return this;
+      }
+      public Builder addAllIndex(
+          java.lang.Iterable<? extends 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry> values) 
{
+        ensureIndexIsMutable();
+        super.addAll(values, index_);
+        
+        return this;
+      }
+      public Builder clearIndex() {
+        index_ = java.util.Collections.emptyList();
+        bitField0_ = (bitField0_ & ~0x00000002);
+        
+        return this;
+      }
+      public Builder removeIndex(int index) {
+        ensureIndexIsMutable();
+        index_.remove(index);
+        
+        return this;
+      }
+      
+      // @@protoc_insertion_point(builder_scope:pulsar.schema.SchemaLocator)
+    }
+    
+    static {
+      defaultInstance = new SchemaLocator(true);
+      defaultInstance.initFields();
+    }
+    
+    // @@protoc_insertion_point(class_scope:pulsar.schema.SchemaLocator)
+  }
+  
+  
+  static {
+  }
+  
+  // @@protoc_insertion_point(outer_class_scope)
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorage.java
 b/pulsar-broker/src/main/proto/SchemaStorageFormat.proto
similarity index 60%
copy from 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorage.java
copy to pulsar-broker/src/main/proto/SchemaStorageFormat.proto
index 4d0d8af..42cfc9f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorage.java
+++ b/pulsar-broker/src/main/proto/SchemaStorageFormat.proto
@@ -16,21 +16,29 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.broker.service.schema;
+syntax = "proto2";
 
-import java.util.concurrent.CompletableFuture;
-import org.apache.pulsar.common.schema.SchemaVersion;
+package pulsar.schema;
+option java_package = "org.apache.pulsar.broker.service.schema";
+option optimize_for = LITE_RUNTIME;
 
-public interface SchemaStorage {
-
-    CompletableFuture<SchemaVersion> put(String key, byte[] value);
-
-    CompletableFuture<StoredSchema> get(String key, SchemaVersion version);
-
-    CompletableFuture<SchemaVersion> delete(String key);
-
-    SchemaVersion versionFromBytes(byte[] version);
+message PositionInfo {
+    required int64 ledgerId = 1;
+    required int64 entryId = 2;
+}
 
-    void close() throws Exception;
+message IndexEntry {
+    required int64 version = 1;
+    required PositionInfo position = 2;
+    required bytes hash = 3;
+}
 
+message SchemaEntry {
+    required bytes schema_data = 2;
+    repeated IndexEntry index = 5;
 }
+
+message SchemaLocator {
+    required IndexEntry info = 1;
+    repeated IndexEntry index = 2;
+}
\ No newline at end of file
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceThrottlingTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceThrottlingTest.java
index a3171c7..d476abe 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceThrottlingTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceThrottlingTest.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.service;
 
 import static 
org.apache.pulsar.broker.service.BrokerService.BROKER_SERVICE_CONFIGURATION_PATH;
+import static org.mockito.Mockito.doReturn;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.fail;
@@ -41,6 +42,7 @@ import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.ZooDefs;
+import org.mockito.Mockito;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
new file mode 100644
index 0000000..9ae34fe
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.schema;
+
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertTrue;
+
+import java.time.Clock;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.common.schema.SchemaData;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.common.schema.SchemaVersion;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class SchemaServiceTest extends MockedPulsarServiceBaseTest {
+
+    private static Clock MockClock = Clock.fixed(Instant.EPOCH, 
ZoneId.systemDefault());
+
+    private String schemaId1 = "1/2/3/4";
+    private String userId = "user";
+
+    private SchemaData schema1 = SchemaData.builder()
+        .user(userId)
+        .type(SchemaType.PROTOBUF)
+        .timestamp(MockClock.millis())
+        .isDeleted(false)
+        .data("message { required int64 a = 1};".getBytes())
+        .build();
+
+    private SchemaData schema2 = SchemaData.builder()
+        .user(userId)
+        .type(SchemaType.PROTOBUF)
+        .timestamp(MockClock.millis())
+        .isDeleted(false)
+        .data("message { required int64 b = 1};".getBytes())
+        .build();
+
+    private SchemaData schema3 = SchemaData.builder()
+        .user(userId)
+        .type(SchemaType.PROTOBUF)
+        .timestamp(MockClock.millis())
+        .isDeleted(false)
+        .data("message { required int64 c = 1};".getBytes())
+        .build();
+
+    private SchemaRegistryServiceImpl schemaRegistryService;
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        BookkeeperSchemaStorage storage = new BookkeeperSchemaStorage(pulsar);
+        storage.init();
+        storage.start();
+        schemaRegistryService = new SchemaRegistryServiceImpl(storage, 
MockClock);
+    }
+
+    @AfterMethod
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+        schemaRegistryService.close();
+    }
+
+    @Test
+    public void writeReadBackDeleteSchemaEntry() throws Exception {
+        putSchema(schemaId1, schema1, version(0));
+
+        SchemaData latest = getLatestSchema(schemaId1, version(0));
+        assertEquals(schema1, latest);
+
+        deleteSchema(schemaId1, version(1));
+
+        SchemaData latest2 = getLatestSchema(schemaId1, version(1));
+
+        assertTrue(latest2.isDeleted());
+    }
+
+    @Test
+    public void getReturnsTheLastWrittenEntry() throws Exception {
+        putSchema(schemaId1, schema1, version(0));
+        putSchema(schemaId1, schema2, version(1));
+
+        SchemaData latest = getLatestSchema(schemaId1, version(1));
+        assertEquals(schema2, latest);
+
+    }
+
+    @Test
+    public void getByVersionReturnsTheCorrectEntry() throws Exception {
+        putSchema(schemaId1, schema1, version(0));
+        putSchema(schemaId1, schema2, version(1));
+
+        SchemaData version0 = getSchema(schemaId1, version(0));
+        assertEquals(schema1, version0);
+    }
+
+    @Test
+    public void getByVersionReturnsTheCorrectEntry2() throws Exception {
+        putSchema(schemaId1, schema1, version(0));
+        putSchema(schemaId1, schema2, version(1));
+
+        SchemaData version1 = getSchema(schemaId1, version(1));
+        assertEquals(schema2, version1);
+    }
+
+    @Test
+    public void getByVersionReturnsTheCorrectEntry3() throws Exception {
+        putSchema(schemaId1, schema1, version(0));
+
+        SchemaData version1 = getSchema(schemaId1, version(0));
+        assertEquals(schema1, version1);
+    }
+
+    @Test
+    public void addLotsOfEntriesThenDelete() throws Exception {
+        SchemaData randomSchema1 = randomSchema();
+        SchemaData randomSchema2 = randomSchema();
+        SchemaData randomSchema3 = randomSchema();
+        SchemaData randomSchema4 = randomSchema();
+        SchemaData randomSchema5 = randomSchema();
+        SchemaData randomSchema6 = randomSchema();
+        SchemaData randomSchema7 = randomSchema();
+
+        putSchema(schemaId1, randomSchema1, version(0));
+        putSchema(schemaId1, randomSchema2, version(1));
+        putSchema(schemaId1, randomSchema3, version(2));
+        putSchema(schemaId1, randomSchema4, version(3));
+        putSchema(schemaId1, randomSchema5, version(4));
+        putSchema(schemaId1, randomSchema6, version(5));
+        putSchema(schemaId1, randomSchema7, version(6));
+
+        SchemaData version0 = getSchema(schemaId1, version(0));
+        assertEquals(randomSchema1, version0);
+
+        SchemaData version1 = getSchema(schemaId1, version(1));
+        assertEquals(randomSchema2, version1);
+
+        SchemaData version2 = getSchema(schemaId1, version(2));
+        assertEquals(randomSchema3, version2);
+
+        SchemaData version3 = getSchema(schemaId1, version(3));
+        assertEquals(randomSchema4, version3);
+
+        SchemaData version4 = getSchema(schemaId1, version(4));
+        assertEquals(randomSchema5, version4);
+
+        SchemaData version5 = getSchema(schemaId1, version(5));
+        assertEquals(randomSchema6, version5);
+
+        SchemaData version6 = getSchema(schemaId1, version(6));
+        assertEquals(randomSchema7, version6);
+
+        deleteSchema(schemaId1, version(7));
+
+        SchemaData version7 = getSchema(schemaId1, version(7));
+        assertTrue(version7.isDeleted());
+
+    }
+
+    @Test
+    public void writeSchemasToDifferentIds() throws Exception {
+        SchemaData schemaWithDifferentId = schema3;
+
+        putSchema(schemaId1, schema1, version(0));
+        String schemaId2 = "id2";
+        putSchema(schemaId2, schemaWithDifferentId, version(0));
+
+        SchemaData withFirstId = getLatestSchema(schemaId1, version(0));
+        SchemaData withDifferentId = getLatestSchema(schemaId2, version(0));
+
+        assertEquals(schema1, withFirstId);
+        assertEquals(schema3, withDifferentId);
+    }
+
+    @Test
+    public void dontReAddExistingSchemaAtRoot() throws Exception {
+        putSchema(schemaId1, schema1, version(0));
+        putSchema(schemaId1, schema1, version(0));
+        putSchema(schemaId1, schema1, version(0));
+    }
+
+    @Test
+    public void dontReAddExistingSchemaInMiddle() throws Exception {
+        putSchema(schemaId1, randomSchema(), version(0));
+        putSchema(schemaId1, schema2, version(1));
+        putSchema(schemaId1, randomSchema(), version(2));
+        putSchema(schemaId1, randomSchema(), version(3));
+        putSchema(schemaId1, randomSchema(), version(4));
+        putSchema(schemaId1, randomSchema(), version(5));
+        putSchema(schemaId1, schema2, version(1));
+    }
+
+    private void putSchema(String schemaId, SchemaData schema, SchemaVersion 
expectedVersion) throws Exception {
+        CompletableFuture<SchemaVersion> put = 
schemaRegistryService.putSchemaIfAbsent(schemaId, schema);
+        SchemaVersion newVersion = put.get();
+        assertEquals(expectedVersion, newVersion);
+    }
+
+    private SchemaData getLatestSchema(String schemaId, SchemaVersion 
expectedVersion) throws Exception {
+        SchemaRegistry.SchemaAndMetadata schemaAndVersion = 
schemaRegistryService.getSchema(schemaId).get();
+        assertEquals(expectedVersion, schemaAndVersion.version);
+        assertEquals(schemaId, schemaAndVersion.id);
+        return schemaAndVersion.schema;
+    }
+
+    private SchemaData getSchema(String schemaId, SchemaVersion version) 
throws Exception {
+        SchemaRegistry.SchemaAndMetadata schemaAndVersion = 
schemaRegistryService.getSchema(schemaId, version).get();
+        assertEquals(version, schemaAndVersion.version);
+        assertEquals(schemaId, schemaAndVersion.id);
+        return schemaAndVersion.schema;
+    }
+
+    private void deleteSchema(String schemaId, SchemaVersion expectedVersion) 
throws Exception {
+        SchemaVersion version = schemaRegistryService.deleteSchema(schemaId, 
userId).get();
+        assertEquals(expectedVersion, version);
+    }
+
+    private SchemaData randomSchema() {
+        UUID randomString = UUID.randomUUID();
+        return SchemaData.builder()
+            .user(userId)
+            .type(SchemaType.PROTOBUF)
+            .timestamp(MockClock.millis())
+            .isDeleted(false)
+            .data(randomString.toString().getBytes())
+            .build();
+    }
+
+    private SchemaVersion version(long version) {
+        return new LongSchemaVersion(version);
+    }
+}
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaData.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaData.java
index 5a5012c..3eb9e6e 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaData.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaData.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.common.schema;
 
+import java.util.HashMap;
 import java.util.Map;
 import lombok.Builder;
 import lombok.Data;
@@ -30,5 +31,6 @@ public class SchemaData {
     private final long timestamp;
     private final String user;
     private final byte[] data;
-    private final Map<String, String> props;
+    @Builder.Default
+    public final Map<String, String> props = new HashMap<>();
 }
\ No newline at end of file
diff --git 
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
 
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
index d992c08..b2715b3 100644
--- 
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
+++ 
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
@@ -235,6 +235,27 @@ public abstract class ZooKeeperCache implements Watcher {
         return getData(path, this, deserializer).map(e -> e.getKey());
     }
 
+    public <T> Optional<Entry<T, Stat>> getEntry(final String path, final 
Deserializer<T> deserializer) throws Exception {
+        return getData(path, this, deserializer);
+    }
+
+    public <T> CompletableFuture<Optional<Entry<T, Stat>>> getEntryAsync(final 
String path, final Deserializer<T> deserializer) {
+        CompletableFuture<Optional<Entry<T, Stat>>> future = new 
CompletableFuture<>();
+        getDataAsync(path, this, deserializer)
+            .thenAccept(future::complete)
+            .exceptionally(ex -> {
+                asyncInvalidate(path);
+                if (ex.getCause() instanceof NoNodeException) {
+                    future.complete(Optional.empty());
+                } else {
+                    future.completeExceptionally(ex.getCause());
+                }
+
+                return null;
+            });
+        return future;
+    }
+
     public <T> CompletableFuture<Optional<T>> getDataAsync(final String path, 
final Deserializer<T> deserializer) {
         CompletableFuture<Optional<T>> future = new CompletableFuture<>();
         getDataAsync(path, this, deserializer).thenAccept(data -> {

-- 
To stop receiving notification emails like this one, please contact
mme...@apache.org.

Reply via email to