merlimat closed pull request #1363: Schema registry 3/N
URL: https://github.com/apache/incubator-pulsar/pull/1363
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
new file mode 100644
index 000000000..8ba8750e9
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
@@ -0,0 +1,479 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.schema;
+
+import static com.google.common.collect.Iterables.concat;
+import static com.google.common.collect.Lists.newArrayList;
+import static com.google.protobuf.ByteString.copyFrom;
+import static java.util.Collections.emptyMap;
+import static java.util.Objects.isNull;
+import static java.util.Objects.nonNull;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static 
org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.Functions.newSchemaEntry;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ByteString;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import javax.validation.constraints.NotNull;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.common.schema.SchemaVersion;
+import org.apache.pulsar.zookeeper.ZooKeeperCache;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+
+public class BookkeeperSchemaStorage implements SchemaStorage {
+    private static final String SchemaPath = "/schemas";
+    private static final List<ACL> Acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
+
+    private final PulsarService pulsar;
+    private final ZooKeeper zooKeeper;
+    private final ZooKeeperCache localZkCache;
+    private BookKeeper bookKeeper;
+
+    @VisibleForTesting
+    BookkeeperSchemaStorage(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.localZkCache = pulsar.getLocalZkCache();
+        this.zooKeeper = localZkCache.getZooKeeper();
+    }
+
+    @VisibleForTesting
+    public void init() throws KeeperException, InterruptedException {
+        try {
+            if (zooKeeper.exists(SchemaPath, false) == null) {
+                zooKeeper.create(SchemaPath, new byte[]{}, Acl, 
CreateMode.PERSISTENT);
+            }
+        } catch (KeeperException.NodeExistsException error) {
+            // race on startup, ignore.
+        }
+    }
+
+    public void start() throws IOException {
+        this.bookKeeper = pulsar.getBookKeeperClientFactory().create(
+            pulsar.getConfiguration(),
+            pulsar.getZkClient()
+        );
+    }
+
+    @Override
+    public CompletableFuture<SchemaVersion> put(String key, byte[] value, 
byte[] hash) {
+        return putSchemaIfAbsent(key, value, 
hash).thenApply(LongSchemaVersion::new);
+    }
+
+    @Override
+    public CompletableFuture<StoredSchema> get(String key, SchemaVersion 
version) {
+        if (version == SchemaVersion.Latest) {
+            return getSchema(key);
+        } else {
+            LongSchemaVersion longVersion = (LongSchemaVersion) version;
+            return getSchema(key, longVersion.getVersion());
+        }
+    }
+
+    @Override
+    public CompletableFuture<SchemaVersion> delete(String key) {
+        return deleteSchema(key).thenApply(LongSchemaVersion::new);
+    }
+
+    @NotNull
+    private CompletableFuture<StoredSchema> getSchema(String schemaId) {
+        return getSchemaLocator(getSchemaPath(schemaId)).thenCompose(locator 
-> {
+
+            if (!locator.isPresent()) {
+                return completedFuture(null);
+            }
+
+            SchemaStorageFormat.SchemaLocator schemaLocator = 
locator.get().locator;
+            return readSchemaEntry(schemaLocator.getInfo().getPosition())
+                .thenApply(entry ->
+                    new StoredSchema(
+                        entry.getSchemaData().toByteArray(),
+                        new 
LongSchemaVersion(schemaLocator.getInfo().getVersion()),
+                        emptyMap()
+                    )
+                );
+        });
+    }
+
+    @Override
+    public SchemaVersion versionFromBytes(byte[] version) {
+        ByteBuffer bb = ByteBuffer.wrap(version);
+        return new LongSchemaVersion(bb.getLong());
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (nonNull(bookKeeper)) {
+            bookKeeper.close();
+        }
+    }
+
+    @NotNull
+    private CompletableFuture<StoredSchema> getSchema(String schemaId, long 
version) {
+        return getSchemaLocator(getSchemaPath(schemaId)).thenCompose(locator 
-> {
+
+            if (!locator.isPresent()) {
+                return completedFuture(null);
+            }
+
+            SchemaStorageFormat.SchemaLocator schemaLocator = 
locator.get().locator;
+            if (version > schemaLocator.getInfo().getVersion()) {
+                return completedFuture(null);
+            }
+
+            return findSchemaEntryByVersion(schemaLocator.getIndexList(), 
version)
+                .thenApply(entry ->
+                    new StoredSchema(
+                        entry.getSchemaData().toByteArray(),
+                        new LongSchemaVersion(version),
+                        emptyMap()
+                    )
+                );
+        });
+    }
+
+    @NotNull
+    private CompletableFuture<Long> putSchema(String schemaId, byte[] data, 
byte[] hash) {
+        return 
getOrCreateSchemaLocator(getSchemaPath(schemaId)).thenCompose(locatorEntry ->
+            addNewSchemaEntryToStore(locatorEntry.locator.getIndexList(), 
data).thenCompose(position ->
+                updateSchemaLocator(schemaId, locatorEntry, position, hash)
+            )
+        );
+    }
+
+    @NotNull
+    private CompletableFuture<Long> putSchemaIfAbsent(String schemaId, byte[] 
data, byte[] hash) {
+        return 
getOrCreateSchemaLocator(getSchemaPath(schemaId)).thenCompose(locatorEntry -> {
+            byte[] storedHash = 
locatorEntry.locator.getInfo().getHash().toByteArray();
+            if (storedHash.length > 0 && Arrays.equals(storedHash, hash)) {
+                return 
completedFuture(locatorEntry.locator.getInfo().getVersion());
+            }
+            return findSchemaEntryByHash(locatorEntry.locator.getIndexList(), 
hash).thenCompose(version -> {
+                if (isNull(version)) {
+                    return 
addNewSchemaEntryToStore(locatorEntry.locator.getIndexList(), 
data).thenCompose(position ->
+                        updateSchemaLocator(schemaId, locatorEntry, position, 
hash)
+                    );
+                } else {
+                    return completedFuture(version);
+                }
+            });
+        });
+    }
+
+    @NotNull
+    private CompletableFuture<Long> deleteSchema(String schemaId) {
+        return getSchema(schemaId).thenCompose(schemaAndVersion -> {
+            if (isNull(schemaAndVersion)) {
+                return completedFuture(null);
+            } else {
+                return putSchema(schemaId, new byte[]{}, new byte[]{});
+            }
+        });
+    }
+
+    @NotNull
+    private String getSchemaPath(String schemaId) {
+        return SchemaPath + "/" + schemaId;
+    }
+
+    @NotNull
+    private CompletableFuture<SchemaStorageFormat.PositionInfo> 
addNewSchemaEntryToStore(
+        List<SchemaStorageFormat.IndexEntry> index,
+        byte[] data
+    ) {
+        SchemaStorageFormat.SchemaEntry schemaEntry = newSchemaEntry(index, 
data);
+        return createLedger().thenCompose(ledgerHandle ->
+            addEntry(ledgerHandle, schemaEntry).thenApply(entryId ->
+                Functions.newPositionInfo(ledgerHandle.getId(), entryId)
+            )
+        );
+    }
+
+    @NotNull
+    private CompletableFuture<Long> updateSchemaLocator(
+        String schemaId,
+        LocatorEntry locatorEntry,
+        SchemaStorageFormat.PositionInfo position,
+        byte[] hash
+    ) {
+        long nextVersion = locatorEntry.locator.getInfo().getVersion() + 1;
+        SchemaStorageFormat.SchemaLocator locator = locatorEntry.locator;
+        SchemaStorageFormat.IndexEntry info =
+            SchemaStorageFormat.IndexEntry.newBuilder()
+                .setVersion(nextVersion)
+                .setPosition(position)
+                .setHash(copyFrom(hash))
+                .build();
+        return updateSchemaLocator(getSchemaPath(schemaId),
+            SchemaStorageFormat.SchemaLocator.newBuilder()
+                .setInfo(info)
+                .addAllIndex(
+                    concat(locator.getIndexList(), newArrayList(info))
+                ).build(), locatorEntry.zkZnodeVersion
+        ).thenApply(ignore -> nextVersion);
+    }
+
+    @NotNull
+    private CompletableFuture<SchemaStorageFormat.SchemaEntry> 
findSchemaEntryByVersion(
+        List<SchemaStorageFormat.IndexEntry> index,
+        long version
+    ) {
+
+        if (index.isEmpty()) {
+            return completedFuture(null);
+        }
+
+        SchemaStorageFormat.IndexEntry lowest = index.get(0);
+        if (version < lowest.getVersion()) {
+            return readSchemaEntry(lowest.getPosition())
+                .thenCompose(entry -> 
findSchemaEntryByVersion(entry.getIndexList(), version));
+        }
+
+        for (SchemaStorageFormat.IndexEntry entry : index) {
+            if (entry.getVersion() == version) {
+                return readSchemaEntry(entry.getPosition());
+            } else if (entry.getVersion() > version) {
+                break;
+            }
+        }
+
+        return completedFuture(null);
+    }
+
+    @NotNull
+    private CompletableFuture<Long> findSchemaEntryByHash(
+        List<SchemaStorageFormat.IndexEntry> index,
+        byte[] hash
+    ) {
+
+        if (index.isEmpty()) {
+            return completedFuture(null);
+        }
+
+        for (SchemaStorageFormat.IndexEntry entry : index) {
+            if (Arrays.equals(entry.getHash().toByteArray(), hash)) {
+                return completedFuture(entry.getVersion());
+            }
+        }
+
+        return readSchemaEntry(index.get(0).getPosition())
+            .thenCompose(entry -> findSchemaEntryByHash(entry.getIndexList(), 
hash));
+
+    }
+
+    @NotNull
+    private CompletableFuture<SchemaStorageFormat.SchemaEntry> readSchemaEntry(
+        SchemaStorageFormat.PositionInfo position
+    ) {
+        return openLedger(position.getLedgerId())
+            .thenCompose((ledger) ->
+                Functions.getLedgerEntry(ledger, position.getEntryId())
+                    .thenCompose(entry -> closeLedger(ledger)
+                        .thenApply(ignore -> entry)
+                    )
+            ).thenCompose(Functions::parseSchemaEntry);
+    }
+
+    @NotNull
+    private CompletableFuture<Void> updateSchemaLocator(String id, 
SchemaStorageFormat.SchemaLocator schema, int version) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        zooKeeper.setData(id, schema.toByteArray(), version, (rc, path, ctx, 
stat) -> {
+            Code code = Code.get(rc);
+            if (code != Code.OK) {
+                future.completeExceptionally(KeeperException.create(code));
+            } else {
+                future.complete(null);
+            }
+        }, null);
+        return future;
+    }
+
+    @NotNull
+    private CompletableFuture<Optional<LocatorEntry>> getSchemaLocator(String 
schema) {
+        return localZkCache.getEntryAsync(schema, new 
SchemaLocatorDeserializer()).thenApply(optional ->
+            optional.map(entry -> new LocatorEntry(entry.getKey(), 
entry.getValue().getVersion()))
+        );
+    }
+
+    @NotNull
+    private CompletableFuture<LocatorEntry> getOrCreateSchemaLocator(String 
schema) {
+        return getSchemaLocator(schema).thenCompose(schemaLocatorStatEntry -> {
+            if (schemaLocatorStatEntry.isPresent()) {
+                return completedFuture(schemaLocatorStatEntry.get());
+            } else {
+                SchemaStorageFormat.SchemaLocator locator = 
SchemaStorageFormat.SchemaLocator.newBuilder()
+                    .setInfo(SchemaStorageFormat.IndexEntry.newBuilder()
+                        .setVersion(-1L)
+                        .setHash(ByteString.EMPTY)
+                        
.setPosition(SchemaStorageFormat.PositionInfo.newBuilder()
+                            .setEntryId(-1L)
+                            .setLedgerId(-1L)
+                        )
+                    ).build();
+
+                CompletableFuture<LocatorEntry> future = new 
CompletableFuture<>();
+
+                ZkUtils.asyncCreateFullPathOptimistic(zooKeeper, schema, 
locator.toByteArray(), Acl, CreateMode.PERSISTENT,
+                    (rc, path, ctx, name) -> {
+                        Code code = Code.get(rc);
+                        if (code != Code.OK) {
+                            
future.completeExceptionally(KeeperException.create(code));
+                        } else {
+                            future.complete(new LocatorEntry(locator, -1));
+                        }
+                    }, null);
+
+                return future;
+            }
+        });
+    }
+
+    @NotNull
+    private CompletableFuture<Long> addEntry(LedgerHandle ledgerHandle, 
SchemaStorageFormat.SchemaEntry entry) {
+        final CompletableFuture<Long> future = new CompletableFuture<>();
+        ledgerHandle.asyncAddEntry(entry.toByteArray(),
+            (rc, handle, entryId, ctx) -> {
+                if (rc != BKException.Code.OK) {
+                    future.completeExceptionally(BKException.create(rc));
+                } else {
+                    future.complete(entryId);
+                }
+            }, null
+        );
+        return future;
+    }
+
+    @NotNull
+    private CompletableFuture<LedgerHandle> createLedger() {
+        final CompletableFuture<LedgerHandle> future = new 
CompletableFuture<>();
+        bookKeeper.asyncCreateLedger(0, 0, DigestType.MAC, new byte[]{},
+            (rc, handle, ctx) -> {
+                if (rc != BKException.Code.OK) {
+                    future.completeExceptionally(BKException.create(rc));
+                } else {
+                    future.complete(handle);
+                }
+            }, null
+        );
+        return future;
+    }
+
+    @NotNull
+    private CompletableFuture<LedgerHandle> openLedger(Long ledgerId) {
+        final CompletableFuture<LedgerHandle> future = new 
CompletableFuture<>();
+        bookKeeper.asyncOpenLedger(ledgerId, DigestType.MAC, new byte[]{},
+            (rc, handle, ctx) -> {
+                if (rc != BKException.Code.OK) {
+                    future.completeExceptionally(BKException.create(rc));
+                } else {
+                    future.complete(handle);
+                }
+            }, null
+        );
+        return future;
+    }
+
+    @NotNull
+    private CompletableFuture<Void> closeLedger(LedgerHandle ledgerHandle) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        ledgerHandle.asyncClose((rc, handle, ctx) -> {
+            if (rc != BKException.Code.OK) {
+                future.completeExceptionally(BKException.create(rc));
+            } else {
+                future.complete(null);
+            }
+        }, null);
+        return future;
+    }
+
+    interface Functions {
+        static CompletableFuture<LedgerEntry> getLedgerEntry(LedgerHandle 
ledger, long entry) {
+            final CompletableFuture<LedgerEntry> future = new 
CompletableFuture<>();
+            ledger.asyncReadEntries(entry, entry,
+                (rc, handle, entries, ctx) -> {
+                    if (rc != BKException.Code.OK) {
+                        future.completeExceptionally(BKException.create(rc));
+                    } else {
+                        future.complete(entries.nextElement());
+                    }
+                }, null
+            );
+            return future;
+        }
+
+        static CompletableFuture<SchemaStorageFormat.SchemaEntry> 
parseSchemaEntry(LedgerEntry ledgerEntry) {
+            CompletableFuture<SchemaStorageFormat.SchemaEntry> result = new 
CompletableFuture<>();
+            try {
+                
result.complete(SchemaStorageFormat.SchemaEntry.parseFrom(ledgerEntry.getEntry()));
+            } catch (IOException e) {
+                result.completeExceptionally(e);
+            }
+            return result;
+        }
+
+        static SchemaStorageFormat.SchemaEntry newSchemaEntry(
+            List<SchemaStorageFormat.IndexEntry> index,
+            byte[] data
+        ) {
+            return SchemaStorageFormat.SchemaEntry.newBuilder()
+                .setSchemaData(copyFrom(data))
+                .addAllIndex(index)
+                .build();
+        }
+
+        static SchemaStorageFormat.PositionInfo newPositionInfo(long ledgerId, 
long entryId) {
+            return SchemaStorageFormat.PositionInfo.newBuilder()
+                .setLedgerId(ledgerId)
+                .setEntryId(entryId)
+                .build();
+        }
+    }
+
+    static class SchemaLocatorDeserializer implements 
ZooKeeperCache.Deserializer<SchemaStorageFormat.SchemaLocator> {
+        @Override
+        public SchemaStorageFormat.SchemaLocator deserialize(String key, 
byte[] content) throws Exception {
+            return SchemaStorageFormat.SchemaLocator.parseFrom(content);
+        }
+    }
+
+    static class LocatorEntry {
+        final SchemaStorageFormat.SchemaLocator locator;
+        final Integer zkZnodeVersion;
+
+        LocatorEntry(SchemaStorageFormat.SchemaLocator locator, Integer 
zkZnodeVersion) {
+            this.locator = locator;
+            this.zkZnodeVersion = zkZnodeVersion;
+        }
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorageFactory.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorageFactory.java
new file mode 100644
index 000000000..4b25374a9
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorageFactory.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.schema;
+
+import javax.validation.constraints.NotNull;
+import org.apache.pulsar.broker.PulsarService;
+
+@SuppressWarnings("unused")
+public class BookkeeperSchemaStorageFactory implements SchemaStorageFactory {
+    @Override
+    @NotNull
+    public SchemaStorage create(PulsarService pulsar) throws Exception {
+        BookkeeperSchemaStorage service = new BookkeeperSchemaStorage(pulsar);
+        service.init();
+        return service;
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/LongSchemaVersion.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/LongSchemaVersion.java
new file mode 100644
index 000000000..a837cd414
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/LongSchemaVersion.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.schema;
+
+import com.google.common.base.MoreObjects;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+import org.apache.pulsar.common.schema.SchemaVersion;
+
+class LongSchemaVersion implements SchemaVersion {
+    private final long version;
+
+    LongSchemaVersion(long version) {
+        this.version = version;
+    }
+
+    public long getVersion() {
+        return version;
+    }
+
+    @Override
+    public byte[] bytes() {
+        ByteBuffer buffer = ByteBuffer.allocate(Long.SIZE);
+        buffer.putLong(version);
+        buffer.rewind();
+        return buffer.array();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        LongSchemaVersion that = (LongSchemaVersion) o;
+        return version == that.version;
+    }
+
+    @Override
+    public int hashCode() {
+
+        return Objects.hash(version);
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this)
+            .add("version", version)
+            .toString();
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
index 1aec03950..d014f9268 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
@@ -23,10 +23,13 @@
 import static 
org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl.Functions.toPairs;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.InvalidProtocolBufferException;
 import java.time.Clock;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -38,6 +41,7 @@
 import org.apache.pulsar.common.schema.SchemaVersion;
 
 public class SchemaRegistryServiceImpl implements SchemaRegistryService {
+    private static HashFunction hashFunction = Hashing.sha256();
     private final SchemaStorage schemaStorage;
     private final Clock clock;
 
@@ -76,6 +80,7 @@
     @Override
     @NotNull
     public CompletableFuture<SchemaVersion> putSchemaIfAbsent(String schemaId, 
SchemaData schema) {
+        byte[] context = hashFunction.hashBytes(schema.getData()).asBytes();
         SchemaRegistryFormat.SchemaInfo info = 
SchemaRegistryFormat.SchemaInfo.newBuilder()
             .setType(Functions.convertFromDomainType(schema.getType()))
             .setSchema(ByteString.copyFrom(schema.getData()))
@@ -85,14 +90,14 @@
             .setTimestamp(clock.millis())
             .addAllProps(toPairs(schema.getProps()))
             .build();
-        return schemaStorage.put(schemaId, info.toByteArray());
+        return schemaStorage.put(schemaId, info.toByteArray(), context);
     }
 
     @Override
     @NotNull
     public CompletableFuture<SchemaVersion> deleteSchema(String schemaId, 
String user) {
         byte[] deletedEntry = deleted(schemaId, user).toByteArray();
-        return schemaStorage.put(schemaId, deletedEntry);
+        return schemaStorage.put(schemaId, deletedEntry, new byte[]{});
     }
 
     @Override
@@ -156,6 +161,9 @@ static SchemaType 
convertToDomainType(SchemaRegistryFormat.SchemaInfo.SchemaType
         }
 
         static List<SchemaRegistryFormat.SchemaInfo.KeyValuePair> 
toPairs(Map<String, String> map) {
+            if (isNull(map)) {
+                return Collections.emptyList();
+            }
             List<SchemaRegistryFormat.SchemaInfo.KeyValuePair> pairs = new 
ArrayList<>(map.size());
             for (Map.Entry<String, String> entry : map.entrySet()) {
                 SchemaRegistryFormat.SchemaInfo.KeyValuePair.Builder builder =
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorage.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorage.java
index 4d0d8af18..b0c80752b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorage.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorage.java
@@ -23,7 +23,7 @@
 
 public interface SchemaStorage {
 
-    CompletableFuture<SchemaVersion> put(String key, byte[] value);
+    CompletableFuture<SchemaVersion> put(String key, byte[] value, byte[] 
hash);
 
     CompletableFuture<StoredSchema> get(String key, SchemaVersion version);
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorageFormat.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorageFormat.java
new file mode 100644
index 000000000..df95e697d
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorageFormat.java
@@ -0,0 +1,1785 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+// Generated by the protocol buffer compiler.  DO NOT EDIT!
+// source: src/main/proto/SchemaStorageFormat.proto
+
+package org.apache.pulsar.broker.service.schema;
+
+public final class SchemaStorageFormat {
+  private SchemaStorageFormat() {}
+  public static void registerAllExtensions(
+      com.google.protobuf.ExtensionRegistryLite registry) {
+  }
+  public interface PositionInfoOrBuilder
+      extends com.google.protobuf.MessageLiteOrBuilder {
+    
+    // required int64 ledgerId = 1;
+    boolean hasLedgerId();
+    long getLedgerId();
+    
+    // required int64 entryId = 2;
+    boolean hasEntryId();
+    long getEntryId();
+  }
+  public static final class PositionInfo extends
+      com.google.protobuf.GeneratedMessageLite
+      implements PositionInfoOrBuilder {
+    // Use PositionInfo.newBuilder() to construct.
+    private PositionInfo(Builder builder) {
+      super(builder);
+    }
+    private PositionInfo(boolean noInit) {}
+    
+    private static final PositionInfo defaultInstance;
+    public static PositionInfo getDefaultInstance() {
+      return defaultInstance;
+    }
+    
+    public PositionInfo getDefaultInstanceForType() {
+      return defaultInstance;
+    }
+    
+    private int bitField0_;
+    // required int64 ledgerId = 1;
+    public static final int LEDGERID_FIELD_NUMBER = 1;
+    private long ledgerId_;
+    public boolean hasLedgerId() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    public long getLedgerId() {
+      return ledgerId_;
+    }
+    
+    // required int64 entryId = 2;
+    public static final int ENTRYID_FIELD_NUMBER = 2;
+    private long entryId_;
+    public boolean hasEntryId() {
+      return ((bitField0_ & 0x00000002) == 0x00000002);
+    }
+    public long getEntryId() {
+      return entryId_;
+    }
+    
+    private void initFields() {
+      ledgerId_ = 0L;
+      entryId_ = 0L;
+    }
+    private byte memoizedIsInitialized = -1;
+    public final boolean isInitialized() {
+      byte isInitialized = memoizedIsInitialized;
+      if (isInitialized != -1) return isInitialized == 1;
+      
+      if (!hasLedgerId()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      if (!hasEntryId()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      memoizedIsInitialized = 1;
+      return true;
+    }
+    
+    public void writeTo(com.google.protobuf.CodedOutputStream output)
+                        throws java.io.IOException {
+      getSerializedSize();
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeInt64(1, ledgerId_);
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        output.writeInt64(2, entryId_);
+      }
+    }
+    
+    private int memoizedSerializedSize = -1;
+    public int getSerializedSize() {
+      int size = memoizedSerializedSize;
+      if (size != -1) return size;
+    
+      size = 0;
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt64Size(1, ledgerId_);
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt64Size(2, entryId_);
+      }
+      memoizedSerializedSize = size;
+      return size;
+    }
+    
+    private static final long serialVersionUID = 0L;
+    @java.lang.Override
+    protected java.lang.Object writeReplace()
+        throws java.io.ObjectStreamException {
+      return super.writeReplace();
+    }
+    
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo 
parseFrom(
+        com.google.protobuf.ByteString data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data).buildParsed();
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo 
parseFrom(
+        com.google.protobuf.ByteString data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data, extensionRegistry)
+               .buildParsed();
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo 
parseFrom(byte[] data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data).buildParsed();
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo 
parseFrom(
+        byte[] data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data, extensionRegistry)
+               .buildParsed();
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo 
parseFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo 
parseFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo 
parseDelimitedFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo 
parseDelimitedFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo 
parseFrom(
+        com.google.protobuf.CodedInputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo 
parseFrom(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    
+    public static Builder newBuilder() { return Builder.create(); }
+    public Builder newBuilderForType() { return newBuilder(); }
+    public static Builder 
newBuilder(org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo
 prototype) {
+      return newBuilder().mergeFrom(prototype);
+    }
+    public Builder toBuilder() { return newBuilder(this); }
+    
+    public static final class Builder extends
+        com.google.protobuf.GeneratedMessageLite.Builder<
+          
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo, 
Builder>
+        implements 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfoOrBuilder
 {
+      // Construct using 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo.newBuilder()
+      private Builder() {
+        maybeForceBuilderInitialization();
+      }
+      
+      private void maybeForceBuilderInitialization() {
+      }
+      private static Builder create() {
+        return new Builder();
+      }
+      
+      public Builder clear() {
+        super.clear();
+        ledgerId_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000001);
+        entryId_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000002);
+        return this;
+      }
+      
+      public Builder clone() {
+        return create().mergeFrom(buildPartial());
+      }
+      
+      public 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo 
getDefaultInstanceForType() {
+        return 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo.getDefaultInstance();
+      }
+      
+      public 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo 
build() {
+        
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo result 
= buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(result);
+        }
+        return result;
+      }
+      
+      private 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo 
buildParsed()
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo result 
= buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(
+            result).asInvalidProtocolBufferException();
+        }
+        return result;
+      }
+      
+      public 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo 
buildPartial() {
+        
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo result 
= new 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo(this);
+        int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
+        if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        result.ledgerId_ = ledgerId_;
+        if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+          to_bitField0_ |= 0x00000002;
+        }
+        result.entryId_ = entryId_;
+        result.bitField0_ = to_bitField0_;
+        return result;
+      }
+      
+      public Builder 
mergeFrom(org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo
 other) {
+        if (other == 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo.getDefaultInstance())
 return this;
+        if (other.hasLedgerId()) {
+          setLedgerId(other.getLedgerId());
+        }
+        if (other.hasEntryId()) {
+          setEntryId(other.getEntryId());
+        }
+        return this;
+      }
+      
+      public final boolean isInitialized() {
+        if (!hasLedgerId()) {
+          
+          return false;
+        }
+        if (!hasEntryId()) {
+          
+          return false;
+        }
+        return true;
+      }
+      
+      public Builder mergeFrom(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        while (true) {
+          int tag = input.readTag();
+          switch (tag) {
+            case 0:
+              
+              return this;
+            default: {
+              if (!parseUnknownField(input, extensionRegistry, tag)) {
+                
+                return this;
+              }
+              break;
+            }
+            case 8: {
+              bitField0_ |= 0x00000001;
+              ledgerId_ = input.readInt64();
+              break;
+            }
+            case 16: {
+              bitField0_ |= 0x00000002;
+              entryId_ = input.readInt64();
+              break;
+            }
+          }
+        }
+      }
+      
+      private int bitField0_;
+      
+      // required int64 ledgerId = 1;
+      private long ledgerId_ ;
+      public boolean hasLedgerId() {
+        return ((bitField0_ & 0x00000001) == 0x00000001);
+      }
+      public long getLedgerId() {
+        return ledgerId_;
+      }
+      public Builder setLedgerId(long value) {
+        bitField0_ |= 0x00000001;
+        ledgerId_ = value;
+        
+        return this;
+      }
+      public Builder clearLedgerId() {
+        bitField0_ = (bitField0_ & ~0x00000001);
+        ledgerId_ = 0L;
+        
+        return this;
+      }
+      
+      // required int64 entryId = 2;
+      private long entryId_ ;
+      public boolean hasEntryId() {
+        return ((bitField0_ & 0x00000002) == 0x00000002);
+      }
+      public long getEntryId() {
+        return entryId_;
+      }
+      public Builder setEntryId(long value) {
+        bitField0_ |= 0x00000002;
+        entryId_ = value;
+        
+        return this;
+      }
+      public Builder clearEntryId() {
+        bitField0_ = (bitField0_ & ~0x00000002);
+        entryId_ = 0L;
+        
+        return this;
+      }
+      
+      // @@protoc_insertion_point(builder_scope:pulsar.schema.PositionInfo)
+    }
+    
+    static {
+      defaultInstance = new PositionInfo(true);
+      defaultInstance.initFields();
+    }
+    
+    // @@protoc_insertion_point(class_scope:pulsar.schema.PositionInfo)
+  }
+  
+  public interface IndexEntryOrBuilder
+      extends com.google.protobuf.MessageLiteOrBuilder {
+    
+    // required int64 version = 1;
+    boolean hasVersion();
+    long getVersion();
+    
+    // required .pulsar.schema.PositionInfo position = 2;
+    boolean hasPosition();
+    org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo 
getPosition();
+    
+    // required bytes hash = 3;
+    boolean hasHash();
+    com.google.protobuf.ByteString getHash();
+  }
+  public static final class IndexEntry extends
+      com.google.protobuf.GeneratedMessageLite
+      implements IndexEntryOrBuilder {
+    // Use IndexEntry.newBuilder() to construct.
+    private IndexEntry(Builder builder) {
+      super(builder);
+    }
+    private IndexEntry(boolean noInit) {}
+    
+    private static final IndexEntry defaultInstance;
+    public static IndexEntry getDefaultInstance() {
+      return defaultInstance;
+    }
+    
+    public IndexEntry getDefaultInstanceForType() {
+      return defaultInstance;
+    }
+    
+    private int bitField0_;
+    // required int64 version = 1;
+    public static final int VERSION_FIELD_NUMBER = 1;
+    private long version_;
+    public boolean hasVersion() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    public long getVersion() {
+      return version_;
+    }
+    
+    // required .pulsar.schema.PositionInfo position = 2;
+    public static final int POSITION_FIELD_NUMBER = 2;
+    private 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo 
position_;
+    public boolean hasPosition() {
+      return ((bitField0_ & 0x00000002) == 0x00000002);
+    }
+    public 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo 
getPosition() {
+      return position_;
+    }
+    
+    // required bytes hash = 3;
+    public static final int HASH_FIELD_NUMBER = 3;
+    private com.google.protobuf.ByteString hash_;
+    public boolean hasHash() {
+      return ((bitField0_ & 0x00000004) == 0x00000004);
+    }
+    public com.google.protobuf.ByteString getHash() {
+      return hash_;
+    }
+    
+    private void initFields() {
+      version_ = 0L;
+      position_ = 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo.getDefaultInstance();
+      hash_ = com.google.protobuf.ByteString.EMPTY;
+    }
+    private byte memoizedIsInitialized = -1;
+    public final boolean isInitialized() {
+      byte isInitialized = memoizedIsInitialized;
+      if (isInitialized != -1) return isInitialized == 1;
+      
+      if (!hasVersion()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      if (!hasPosition()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      if (!hasHash()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      if (!getPosition().isInitialized()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      memoizedIsInitialized = 1;
+      return true;
+    }
+    
+    public void writeTo(com.google.protobuf.CodedOutputStream output)
+                        throws java.io.IOException {
+      getSerializedSize();
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeInt64(1, version_);
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        output.writeMessage(2, position_);
+      }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        output.writeBytes(3, hash_);
+      }
+    }
+    
+    private int memoizedSerializedSize = -1;
+    public int getSerializedSize() {
+      int size = memoizedSerializedSize;
+      if (size != -1) return size;
+    
+      size = 0;
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt64Size(1, version_);
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeMessageSize(2, position_);
+      }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(3, hash_);
+      }
+      memoizedSerializedSize = size;
+      return size;
+    }
+    
+    private static final long serialVersionUID = 0L;
+    @java.lang.Override
+    protected java.lang.Object writeReplace()
+        throws java.io.ObjectStreamException {
+      return super.writeReplace();
+    }
+    
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry 
parseFrom(
+        com.google.protobuf.ByteString data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data).buildParsed();
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry 
parseFrom(
+        com.google.protobuf.ByteString data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data, extensionRegistry)
+               .buildParsed();
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry 
parseFrom(byte[] data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data).buildParsed();
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry 
parseFrom(
+        byte[] data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data, extensionRegistry)
+               .buildParsed();
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry 
parseFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry 
parseFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry 
parseDelimitedFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry 
parseDelimitedFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry 
parseFrom(
+        com.google.protobuf.CodedInputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry 
parseFrom(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    
+    public static Builder newBuilder() { return Builder.create(); }
+    public Builder newBuilderForType() { return newBuilder(); }
+    public static Builder 
newBuilder(org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry
 prototype) {
+      return newBuilder().mergeFrom(prototype);
+    }
+    public Builder toBuilder() { return newBuilder(this); }
+    
+    public static final class Builder extends
+        com.google.protobuf.GeneratedMessageLite.Builder<
+          
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry, Builder>
+        implements 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntryOrBuilder 
{
+      // Construct using 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry.newBuilder()
+      private Builder() {
+        maybeForceBuilderInitialization();
+      }
+      
+      private void maybeForceBuilderInitialization() {
+      }
+      private static Builder create() {
+        return new Builder();
+      }
+      
+      public Builder clear() {
+        super.clear();
+        version_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000001);
+        position_ = 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo.getDefaultInstance();
+        bitField0_ = (bitField0_ & ~0x00000002);
+        hash_ = com.google.protobuf.ByteString.EMPTY;
+        bitField0_ = (bitField0_ & ~0x00000004);
+        return this;
+      }
+      
+      public Builder clone() {
+        return create().mergeFrom(buildPartial());
+      }
+      
+      public 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry 
getDefaultInstanceForType() {
+        return 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry.getDefaultInstance();
+      }
+      
+      public 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry build() {
+        org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry 
result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(result);
+        }
+        return result;
+      }
+      
+      private 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry 
buildParsed()
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry 
result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(
+            result).asInvalidProtocolBufferException();
+        }
+        return result;
+      }
+      
+      public 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry 
buildPartial() {
+        org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry 
result = new 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry(this);
+        int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
+        if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        result.version_ = version_;
+        if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+          to_bitField0_ |= 0x00000002;
+        }
+        result.position_ = position_;
+        if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+          to_bitField0_ |= 0x00000004;
+        }
+        result.hash_ = hash_;
+        result.bitField0_ = to_bitField0_;
+        return result;
+      }
+      
+      public Builder 
mergeFrom(org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry
 other) {
+        if (other == 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry.getDefaultInstance())
 return this;
+        if (other.hasVersion()) {
+          setVersion(other.getVersion());
+        }
+        if (other.hasPosition()) {
+          mergePosition(other.getPosition());
+        }
+        if (other.hasHash()) {
+          setHash(other.getHash());
+        }
+        return this;
+      }
+      
+      public final boolean isInitialized() {
+        if (!hasVersion()) {
+          
+          return false;
+        }
+        if (!hasPosition()) {
+          
+          return false;
+        }
+        if (!hasHash()) {
+          
+          return false;
+        }
+        if (!getPosition().isInitialized()) {
+          
+          return false;
+        }
+        return true;
+      }
+      
+      public Builder mergeFrom(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        while (true) {
+          int tag = input.readTag();
+          switch (tag) {
+            case 0:
+              
+              return this;
+            default: {
+              if (!parseUnknownField(input, extensionRegistry, tag)) {
+                
+                return this;
+              }
+              break;
+            }
+            case 8: {
+              bitField0_ |= 0x00000001;
+              version_ = input.readInt64();
+              break;
+            }
+            case 18: {
+              
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo.Builder
 subBuilder = 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo.newBuilder();
+              if (hasPosition()) {
+                subBuilder.mergeFrom(getPosition());
+              }
+              input.readMessage(subBuilder, extensionRegistry);
+              setPosition(subBuilder.buildPartial());
+              break;
+            }
+            case 26: {
+              bitField0_ |= 0x00000004;
+              hash_ = input.readBytes();
+              break;
+            }
+          }
+        }
+      }
+      
+      private int bitField0_;
+      
+      // required int64 version = 1;
+      private long version_ ;
+      public boolean hasVersion() {
+        return ((bitField0_ & 0x00000001) == 0x00000001);
+      }
+      public long getVersion() {
+        return version_;
+      }
+      public Builder setVersion(long value) {
+        bitField0_ |= 0x00000001;
+        version_ = value;
+        
+        return this;
+      }
+      public Builder clearVersion() {
+        bitField0_ = (bitField0_ & ~0x00000001);
+        version_ = 0L;
+        
+        return this;
+      }
+      
+      // required .pulsar.schema.PositionInfo position = 2;
+      private 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo 
position_ = 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo.getDefaultInstance();
+      public boolean hasPosition() {
+        return ((bitField0_ & 0x00000002) == 0x00000002);
+      }
+      public 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo 
getPosition() {
+        return position_;
+      }
+      public Builder 
setPosition(org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo
 value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        position_ = value;
+        
+        bitField0_ |= 0x00000002;
+        return this;
+      }
+      public Builder setPosition(
+          
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo.Builder
 builderForValue) {
+        position_ = builderForValue.build();
+        
+        bitField0_ |= 0x00000002;
+        return this;
+      }
+      public Builder 
mergePosition(org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo
 value) {
+        if (((bitField0_ & 0x00000002) == 0x00000002) &&
+            position_ != 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo.getDefaultInstance())
 {
+          position_ =
+            
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo.newBuilder(position_).mergeFrom(value).buildPartial();
+        } else {
+          position_ = value;
+        }
+        
+        bitField0_ |= 0x00000002;
+        return this;
+      }
+      public Builder clearPosition() {
+        position_ = 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.PositionInfo.getDefaultInstance();
+        
+        bitField0_ = (bitField0_ & ~0x00000002);
+        return this;
+      }
+      
+      // required bytes hash = 3;
+      private com.google.protobuf.ByteString hash_ = 
com.google.protobuf.ByteString.EMPTY;
+      public boolean hasHash() {
+        return ((bitField0_ & 0x00000004) == 0x00000004);
+      }
+      public com.google.protobuf.ByteString getHash() {
+        return hash_;
+      }
+      public Builder setHash(com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000004;
+        hash_ = value;
+        
+        return this;
+      }
+      public Builder clearHash() {
+        bitField0_ = (bitField0_ & ~0x00000004);
+        hash_ = getDefaultInstance().getHash();
+        
+        return this;
+      }
+      
+      // @@protoc_insertion_point(builder_scope:pulsar.schema.IndexEntry)
+    }
+    
+    static {
+      defaultInstance = new IndexEntry(true);
+      defaultInstance.initFields();
+    }
+    
+    // @@protoc_insertion_point(class_scope:pulsar.schema.IndexEntry)
+  }
+  
+  public interface SchemaEntryOrBuilder
+      extends com.google.protobuf.MessageLiteOrBuilder {
+    
+    // required bytes schema_data = 2;
+    boolean hasSchemaData();
+    com.google.protobuf.ByteString getSchemaData();
+    
+    // repeated .pulsar.schema.IndexEntry index = 5;
+    
java.util.List<org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry>
 
+        getIndexList();
+    org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry 
getIndex(int index);
+    int getIndexCount();
+  }
+  public static final class SchemaEntry extends
+      com.google.protobuf.GeneratedMessageLite
+      implements SchemaEntryOrBuilder {
+    // Use SchemaEntry.newBuilder() to construct.
+    private SchemaEntry(Builder builder) {
+      super(builder);
+    }
+    private SchemaEntry(boolean noInit) {}
+    
+    private static final SchemaEntry defaultInstance;
+    public static SchemaEntry getDefaultInstance() {
+      return defaultInstance;
+    }
+    
+    public SchemaEntry getDefaultInstanceForType() {
+      return defaultInstance;
+    }
+    
+    private int bitField0_;
+    // required bytes schema_data = 2;
+    public static final int SCHEMA_DATA_FIELD_NUMBER = 2;
+    private com.google.protobuf.ByteString schemaData_;
+    public boolean hasSchemaData() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    public com.google.protobuf.ByteString getSchemaData() {
+      return schemaData_;
+    }
+    
+    // repeated .pulsar.schema.IndexEntry index = 5;
+    public static final int INDEX_FIELD_NUMBER = 5;
+    private 
java.util.List<org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry>
 index_;
+    public 
java.util.List<org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry>
 getIndexList() {
+      return index_;
+    }
+    public java.util.List<? extends 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntryOrBuilder>
 
+        getIndexOrBuilderList() {
+      return index_;
+    }
+    public int getIndexCount() {
+      return index_.size();
+    }
+    public 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry 
getIndex(int index) {
+      return index_.get(index);
+    }
+    public 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntryOrBuilder 
getIndexOrBuilder(
+        int index) {
+      return index_.get(index);
+    }
+    
+    private void initFields() {
+      schemaData_ = com.google.protobuf.ByteString.EMPTY;
+      index_ = java.util.Collections.emptyList();
+    }
+    private byte memoizedIsInitialized = -1;
+    public final boolean isInitialized() {
+      byte isInitialized = memoizedIsInitialized;
+      if (isInitialized != -1) return isInitialized == 1;
+      
+      if (!hasSchemaData()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      for (int i = 0; i < getIndexCount(); i++) {
+        if (!getIndex(i).isInitialized()) {
+          memoizedIsInitialized = 0;
+          return false;
+        }
+      }
+      memoizedIsInitialized = 1;
+      return true;
+    }
+    
+    public void writeTo(com.google.protobuf.CodedOutputStream output)
+                        throws java.io.IOException {
+      getSerializedSize();
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeBytes(2, schemaData_);
+      }
+      for (int i = 0; i < index_.size(); i++) {
+        output.writeMessage(5, index_.get(i));
+      }
+    }
+    
+    private int memoizedSerializedSize = -1;
+    public int getSerializedSize() {
+      int size = memoizedSerializedSize;
+      if (size != -1) return size;
+    
+      size = 0;
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(2, schemaData_);
+      }
+      for (int i = 0; i < index_.size(); i++) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeMessageSize(5, index_.get(i));
+      }
+      memoizedSerializedSize = size;
+      return size;
+    }
+    
+    private static final long serialVersionUID = 0L;
+    @java.lang.Override
+    protected java.lang.Object writeReplace()
+        throws java.io.ObjectStreamException {
+      return super.writeReplace();
+    }
+    
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaEntry 
parseFrom(
+        com.google.protobuf.ByteString data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data).buildParsed();
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaEntry 
parseFrom(
+        com.google.protobuf.ByteString data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data, extensionRegistry)
+               .buildParsed();
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaEntry 
parseFrom(byte[] data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data).buildParsed();
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaEntry 
parseFrom(
+        byte[] data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data, extensionRegistry)
+               .buildParsed();
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaEntry 
parseFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaEntry 
parseFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaEntry 
parseDelimitedFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaEntry 
parseDelimitedFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaEntry 
parseFrom(
+        com.google.protobuf.CodedInputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaEntry 
parseFrom(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    
+    public static Builder newBuilder() { return Builder.create(); }
+    public Builder newBuilderForType() { return newBuilder(); }
+    public static Builder 
newBuilder(org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaEntry
 prototype) {
+      return newBuilder().mergeFrom(prototype);
+    }
+    public Builder toBuilder() { return newBuilder(this); }
+    
+    public static final class Builder extends
+        com.google.protobuf.GeneratedMessageLite.Builder<
+          
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaEntry, 
Builder>
+        implements 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaEntryOrBuilder
 {
+      // Construct using 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaEntry.newBuilder()
+      private Builder() {
+        maybeForceBuilderInitialization();
+      }
+      
+      private void maybeForceBuilderInitialization() {
+      }
+      private static Builder create() {
+        return new Builder();
+      }
+      
+      public Builder clear() {
+        super.clear();
+        schemaData_ = com.google.protobuf.ByteString.EMPTY;
+        bitField0_ = (bitField0_ & ~0x00000001);
+        index_ = java.util.Collections.emptyList();
+        bitField0_ = (bitField0_ & ~0x00000002);
+        return this;
+      }
+      
+      public Builder clone() {
+        return create().mergeFrom(buildPartial());
+      }
+      
+      public 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaEntry 
getDefaultInstanceForType() {
+        return 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaEntry.getDefaultInstance();
+      }
+      
+      public 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaEntry build() 
{
+        
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaEntry result 
= buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(result);
+        }
+        return result;
+      }
+      
+      private 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaEntry 
buildParsed()
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaEntry result 
= buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(
+            result).asInvalidProtocolBufferException();
+        }
+        return result;
+      }
+      
+      public 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaEntry 
buildPartial() {
+        
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaEntry result 
= new 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaEntry(this);
+        int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
+        if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        result.schemaData_ = schemaData_;
+        if (((bitField0_ & 0x00000002) == 0x00000002)) {
+          index_ = java.util.Collections.unmodifiableList(index_);
+          bitField0_ = (bitField0_ & ~0x00000002);
+        }
+        result.index_ = index_;
+        result.bitField0_ = to_bitField0_;
+        return result;
+      }
+      
+      public Builder 
mergeFrom(org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaEntry
 other) {
+        if (other == 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaEntry.getDefaultInstance())
 return this;
+        if (other.hasSchemaData()) {
+          setSchemaData(other.getSchemaData());
+        }
+        if (!other.index_.isEmpty()) {
+          if (index_.isEmpty()) {
+            index_ = other.index_;
+            bitField0_ = (bitField0_ & ~0x00000002);
+          } else {
+            ensureIndexIsMutable();
+            index_.addAll(other.index_);
+          }
+          
+        }
+        return this;
+      }
+      
+      public final boolean isInitialized() {
+        if (!hasSchemaData()) {
+          
+          return false;
+        }
+        for (int i = 0; i < getIndexCount(); i++) {
+          if (!getIndex(i).isInitialized()) {
+            
+            return false;
+          }
+        }
+        return true;
+      }
+      
+      public Builder mergeFrom(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        while (true) {
+          int tag = input.readTag();
+          switch (tag) {
+            case 0:
+              
+              return this;
+            default: {
+              if (!parseUnknownField(input, extensionRegistry, tag)) {
+                
+                return this;
+              }
+              break;
+            }
+            case 18: {
+              bitField0_ |= 0x00000001;
+              schemaData_ = input.readBytes();
+              break;
+            }
+            case 42: {
+              
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry.Builder 
subBuilder = 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry.newBuilder();
+              input.readMessage(subBuilder, extensionRegistry);
+              addIndex(subBuilder.buildPartial());
+              break;
+            }
+          }
+        }
+      }
+      
+      private int bitField0_;
+      
+      // required bytes schema_data = 2;
+      private com.google.protobuf.ByteString schemaData_ = 
com.google.protobuf.ByteString.EMPTY;
+      public boolean hasSchemaData() {
+        return ((bitField0_ & 0x00000001) == 0x00000001);
+      }
+      public com.google.protobuf.ByteString getSchemaData() {
+        return schemaData_;
+      }
+      public Builder setSchemaData(com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000001;
+        schemaData_ = value;
+        
+        return this;
+      }
+      public Builder clearSchemaData() {
+        bitField0_ = (bitField0_ & ~0x00000001);
+        schemaData_ = getDefaultInstance().getSchemaData();
+        
+        return this;
+      }
+      
+      // repeated .pulsar.schema.IndexEntry index = 5;
+      private 
java.util.List<org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry>
 index_ =
+        java.util.Collections.emptyList();
+      private void ensureIndexIsMutable() {
+        if (!((bitField0_ & 0x00000002) == 0x00000002)) {
+          index_ = new 
java.util.ArrayList<org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry>(index_);
+          bitField0_ |= 0x00000002;
+         }
+      }
+      
+      public 
java.util.List<org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry>
 getIndexList() {
+        return java.util.Collections.unmodifiableList(index_);
+      }
+      public int getIndexCount() {
+        return index_.size();
+      }
+      public 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry 
getIndex(int index) {
+        return index_.get(index);
+      }
+      public Builder setIndex(
+          int index, 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        ensureIndexIsMutable();
+        index_.set(index, value);
+        
+        return this;
+      }
+      public Builder setIndex(
+          int index, 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry.Builder 
builderForValue) {
+        ensureIndexIsMutable();
+        index_.set(index, builderForValue.build());
+        
+        return this;
+      }
+      public Builder 
addIndex(org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry 
value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        ensureIndexIsMutable();
+        index_.add(value);
+        
+        return this;
+      }
+      public Builder addIndex(
+          int index, 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        ensureIndexIsMutable();
+        index_.add(index, value);
+        
+        return this;
+      }
+      public Builder addIndex(
+          
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry.Builder 
builderForValue) {
+        ensureIndexIsMutable();
+        index_.add(builderForValue.build());
+        
+        return this;
+      }
+      public Builder addIndex(
+          int index, 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry.Builder 
builderForValue) {
+        ensureIndexIsMutable();
+        index_.add(index, builderForValue.build());
+        
+        return this;
+      }
+      public Builder addAllIndex(
+          java.lang.Iterable<? extends 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry> values) 
{
+        ensureIndexIsMutable();
+        super.addAll(values, index_);
+        
+        return this;
+      }
+      public Builder clearIndex() {
+        index_ = java.util.Collections.emptyList();
+        bitField0_ = (bitField0_ & ~0x00000002);
+        
+        return this;
+      }
+      public Builder removeIndex(int index) {
+        ensureIndexIsMutable();
+        index_.remove(index);
+        
+        return this;
+      }
+      
+      // @@protoc_insertion_point(builder_scope:pulsar.schema.SchemaEntry)
+    }
+    
+    static {
+      defaultInstance = new SchemaEntry(true);
+      defaultInstance.initFields();
+    }
+    
+    // @@protoc_insertion_point(class_scope:pulsar.schema.SchemaEntry)
+  }
+  
+  public interface SchemaLocatorOrBuilder
+      extends com.google.protobuf.MessageLiteOrBuilder {
+    
+    // required .pulsar.schema.IndexEntry info = 1;
+    boolean hasInfo();
+    org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry 
getInfo();
+    
+    // repeated .pulsar.schema.IndexEntry index = 2;
+    
java.util.List<org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry>
 
+        getIndexList();
+    org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry 
getIndex(int index);
+    int getIndexCount();
+  }
+  public static final class SchemaLocator extends
+      com.google.protobuf.GeneratedMessageLite
+      implements SchemaLocatorOrBuilder {
+    // Use SchemaLocator.newBuilder() to construct.
+    private SchemaLocator(Builder builder) {
+      super(builder);
+    }
+    private SchemaLocator(boolean noInit) {}
+    
+    private static final SchemaLocator defaultInstance;
+    public static SchemaLocator getDefaultInstance() {
+      return defaultInstance;
+    }
+    
+    public SchemaLocator getDefaultInstanceForType() {
+      return defaultInstance;
+    }
+    
+    private int bitField0_;
+    // required .pulsar.schema.IndexEntry info = 1;
+    public static final int INFO_FIELD_NUMBER = 1;
+    private 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry info_;
+    public boolean hasInfo() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    public 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry 
getInfo() {
+      return info_;
+    }
+    
+    // repeated .pulsar.schema.IndexEntry index = 2;
+    public static final int INDEX_FIELD_NUMBER = 2;
+    private 
java.util.List<org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry>
 index_;
+    public 
java.util.List<org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry>
 getIndexList() {
+      return index_;
+    }
+    public java.util.List<? extends 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntryOrBuilder>
 
+        getIndexOrBuilderList() {
+      return index_;
+    }
+    public int getIndexCount() {
+      return index_.size();
+    }
+    public 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry 
getIndex(int index) {
+      return index_.get(index);
+    }
+    public 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntryOrBuilder 
getIndexOrBuilder(
+        int index) {
+      return index_.get(index);
+    }
+    
+    private void initFields() {
+      info_ = 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry.getDefaultInstance();
+      index_ = java.util.Collections.emptyList();
+    }
+    private byte memoizedIsInitialized = -1;
+    public final boolean isInitialized() {
+      byte isInitialized = memoizedIsInitialized;
+      if (isInitialized != -1) return isInitialized == 1;
+      
+      if (!hasInfo()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      if (!getInfo().isInitialized()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      for (int i = 0; i < getIndexCount(); i++) {
+        if (!getIndex(i).isInitialized()) {
+          memoizedIsInitialized = 0;
+          return false;
+        }
+      }
+      memoizedIsInitialized = 1;
+      return true;
+    }
+    
+    public void writeTo(com.google.protobuf.CodedOutputStream output)
+                        throws java.io.IOException {
+      getSerializedSize();
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeMessage(1, info_);
+      }
+      for (int i = 0; i < index_.size(); i++) {
+        output.writeMessage(2, index_.get(i));
+      }
+    }
+    
+    private int memoizedSerializedSize = -1;
+    public int getSerializedSize() {
+      int size = memoizedSerializedSize;
+      if (size != -1) return size;
+    
+      size = 0;
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeMessageSize(1, info_);
+      }
+      for (int i = 0; i < index_.size(); i++) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeMessageSize(2, index_.get(i));
+      }
+      memoizedSerializedSize = size;
+      return size;
+    }
+    
+    private static final long serialVersionUID = 0L;
+    @java.lang.Override
+    protected java.lang.Object writeReplace()
+        throws java.io.ObjectStreamException {
+      return super.writeReplace();
+    }
+    
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator 
parseFrom(
+        com.google.protobuf.ByteString data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data).buildParsed();
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator 
parseFrom(
+        com.google.protobuf.ByteString data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data, extensionRegistry)
+               .buildParsed();
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator 
parseFrom(byte[] data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data).buildParsed();
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator 
parseFrom(
+        byte[] data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data, extensionRegistry)
+               .buildParsed();
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator 
parseFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator 
parseFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator 
parseDelimitedFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator 
parseDelimitedFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator 
parseFrom(
+        com.google.protobuf.CodedInputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator 
parseFrom(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    
+    public static Builder newBuilder() { return Builder.create(); }
+    public Builder newBuilderForType() { return newBuilder(); }
+    public static Builder 
newBuilder(org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator
 prototype) {
+      return newBuilder().mergeFrom(prototype);
+    }
+    public Builder toBuilder() { return newBuilder(this); }
+    
+    public static final class Builder extends
+        com.google.protobuf.GeneratedMessageLite.Builder<
+          
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator, 
Builder>
+        implements 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocatorOrBuilder
 {
+      // Construct using 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator.newBuilder()
+      private Builder() {
+        maybeForceBuilderInitialization();
+      }
+      
+      private void maybeForceBuilderInitialization() {
+      }
+      private static Builder create() {
+        return new Builder();
+      }
+      
+      public Builder clear() {
+        super.clear();
+        info_ = 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry.getDefaultInstance();
+        bitField0_ = (bitField0_ & ~0x00000001);
+        index_ = java.util.Collections.emptyList();
+        bitField0_ = (bitField0_ & ~0x00000002);
+        return this;
+      }
+      
+      public Builder clone() {
+        return create().mergeFrom(buildPartial());
+      }
+      
+      public 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator 
getDefaultInstanceForType() {
+        return 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator.getDefaultInstance();
+      }
+      
+      public 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator 
build() {
+        
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator 
result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(result);
+        }
+        return result;
+      }
+      
+      private 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator 
buildParsed()
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator 
result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(
+            result).asInvalidProtocolBufferException();
+        }
+        return result;
+      }
+      
+      public 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator 
buildPartial() {
+        
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator 
result = new 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator(this);
+        int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
+        if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        result.info_ = info_;
+        if (((bitField0_ & 0x00000002) == 0x00000002)) {
+          index_ = java.util.Collections.unmodifiableList(index_);
+          bitField0_ = (bitField0_ & ~0x00000002);
+        }
+        result.index_ = index_;
+        result.bitField0_ = to_bitField0_;
+        return result;
+      }
+      
+      public Builder 
mergeFrom(org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator
 other) {
+        if (other == 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator.getDefaultInstance())
 return this;
+        if (other.hasInfo()) {
+          mergeInfo(other.getInfo());
+        }
+        if (!other.index_.isEmpty()) {
+          if (index_.isEmpty()) {
+            index_ = other.index_;
+            bitField0_ = (bitField0_ & ~0x00000002);
+          } else {
+            ensureIndexIsMutable();
+            index_.addAll(other.index_);
+          }
+          
+        }
+        return this;
+      }
+      
+      public final boolean isInitialized() {
+        if (!hasInfo()) {
+          
+          return false;
+        }
+        if (!getInfo().isInitialized()) {
+          
+          return false;
+        }
+        for (int i = 0; i < getIndexCount(); i++) {
+          if (!getIndex(i).isInitialized()) {
+            
+            return false;
+          }
+        }
+        return true;
+      }
+      
+      public Builder mergeFrom(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        while (true) {
+          int tag = input.readTag();
+          switch (tag) {
+            case 0:
+              
+              return this;
+            default: {
+              if (!parseUnknownField(input, extensionRegistry, tag)) {
+                
+                return this;
+              }
+              break;
+            }
+            case 10: {
+              
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry.Builder 
subBuilder = 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry.newBuilder();
+              if (hasInfo()) {
+                subBuilder.mergeFrom(getInfo());
+              }
+              input.readMessage(subBuilder, extensionRegistry);
+              setInfo(subBuilder.buildPartial());
+              break;
+            }
+            case 18: {
+              
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry.Builder 
subBuilder = 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry.newBuilder();
+              input.readMessage(subBuilder, extensionRegistry);
+              addIndex(subBuilder.buildPartial());
+              break;
+            }
+          }
+        }
+      }
+      
+      private int bitField0_;
+      
+      // required .pulsar.schema.IndexEntry info = 1;
+      private 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry info_ = 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry.getDefaultInstance();
+      public boolean hasInfo() {
+        return ((bitField0_ & 0x00000001) == 0x00000001);
+      }
+      public 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry 
getInfo() {
+        return info_;
+      }
+      public Builder 
setInfo(org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry 
value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        info_ = value;
+        
+        bitField0_ |= 0x00000001;
+        return this;
+      }
+      public Builder setInfo(
+          
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry.Builder 
builderForValue) {
+        info_ = builderForValue.build();
+        
+        bitField0_ |= 0x00000001;
+        return this;
+      }
+      public Builder 
mergeInfo(org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry
 value) {
+        if (((bitField0_ & 0x00000001) == 0x00000001) &&
+            info_ != 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry.getDefaultInstance())
 {
+          info_ =
+            
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry.newBuilder(info_).mergeFrom(value).buildPartial();
+        } else {
+          info_ = value;
+        }
+        
+        bitField0_ |= 0x00000001;
+        return this;
+      }
+      public Builder clearInfo() {
+        info_ = 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry.getDefaultInstance();
+        
+        bitField0_ = (bitField0_ & ~0x00000001);
+        return this;
+      }
+      
+      // repeated .pulsar.schema.IndexEntry index = 2;
+      private 
java.util.List<org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry>
 index_ =
+        java.util.Collections.emptyList();
+      private void ensureIndexIsMutable() {
+        if (!((bitField0_ & 0x00000002) == 0x00000002)) {
+          index_ = new 
java.util.ArrayList<org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry>(index_);
+          bitField0_ |= 0x00000002;
+         }
+      }
+      
+      public 
java.util.List<org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry>
 getIndexList() {
+        return java.util.Collections.unmodifiableList(index_);
+      }
+      public int getIndexCount() {
+        return index_.size();
+      }
+      public 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry 
getIndex(int index) {
+        return index_.get(index);
+      }
+      public Builder setIndex(
+          int index, 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        ensureIndexIsMutable();
+        index_.set(index, value);
+        
+        return this;
+      }
+      public Builder setIndex(
+          int index, 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry.Builder 
builderForValue) {
+        ensureIndexIsMutable();
+        index_.set(index, builderForValue.build());
+        
+        return this;
+      }
+      public Builder 
addIndex(org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry 
value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        ensureIndexIsMutable();
+        index_.add(value);
+        
+        return this;
+      }
+      public Builder addIndex(
+          int index, 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        ensureIndexIsMutable();
+        index_.add(index, value);
+        
+        return this;
+      }
+      public Builder addIndex(
+          
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry.Builder 
builderForValue) {
+        ensureIndexIsMutable();
+        index_.add(builderForValue.build());
+        
+        return this;
+      }
+      public Builder addIndex(
+          int index, 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry.Builder 
builderForValue) {
+        ensureIndexIsMutable();
+        index_.add(index, builderForValue.build());
+        
+        return this;
+      }
+      public Builder addAllIndex(
+          java.lang.Iterable<? extends 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry> values) 
{
+        ensureIndexIsMutable();
+        super.addAll(values, index_);
+        
+        return this;
+      }
+      public Builder clearIndex() {
+        index_ = java.util.Collections.emptyList();
+        bitField0_ = (bitField0_ & ~0x00000002);
+        
+        return this;
+      }
+      public Builder removeIndex(int index) {
+        ensureIndexIsMutable();
+        index_.remove(index);
+        
+        return this;
+      }
+      
+      // @@protoc_insertion_point(builder_scope:pulsar.schema.SchemaLocator)
+    }
+    
+    static {
+      defaultInstance = new SchemaLocator(true);
+      defaultInstance.initFields();
+    }
+    
+    // @@protoc_insertion_point(class_scope:pulsar.schema.SchemaLocator)
+  }
+  
+  
+  static {
+  }
+  
+  // @@protoc_insertion_point(outer_class_scope)
+}
diff --git a/pulsar-broker/src/main/proto/SchemaStorageFormat.proto 
b/pulsar-broker/src/main/proto/SchemaStorageFormat.proto
new file mode 100644
index 000000000..42cfc9f73
--- /dev/null
+++ b/pulsar-broker/src/main/proto/SchemaStorageFormat.proto
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+syntax = "proto2";
+
+package pulsar.schema;
+option java_package = "org.apache.pulsar.broker.service.schema";
+option optimize_for = LITE_RUNTIME;
+
+message PositionInfo {
+    required int64 ledgerId = 1;
+    required int64 entryId = 2;
+}
+
+message IndexEntry {
+    required int64 version = 1;
+    required PositionInfo position = 2;
+    required bytes hash = 3;
+}
+
+message SchemaEntry {
+    required bytes schema_data = 2;
+    repeated IndexEntry index = 5;
+}
+
+message SchemaLocator {
+    required IndexEntry info = 1;
+    repeated IndexEntry index = 2;
+}
\ No newline at end of file
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceThrottlingTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceThrottlingTest.java
index a3171c7a5..d476abe73 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceThrottlingTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceThrottlingTest.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.service;
 
 import static 
org.apache.pulsar.broker.service.BrokerService.BROKER_SERVICE_CONFIGURATION_PATH;
+import static org.mockito.Mockito.doReturn;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.fail;
@@ -41,6 +42,7 @@
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.ZooDefs;
+import org.mockito.Mockito;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
new file mode 100644
index 000000000..9ae34fe9d
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.schema;
+
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertTrue;
+
+import java.time.Clock;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.common.schema.SchemaData;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.common.schema.SchemaVersion;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class SchemaServiceTest extends MockedPulsarServiceBaseTest {
+
+    private static Clock MockClock = Clock.fixed(Instant.EPOCH, 
ZoneId.systemDefault());
+
+    private String schemaId1 = "1/2/3/4";
+    private String userId = "user";
+
+    private SchemaData schema1 = SchemaData.builder()
+        .user(userId)
+        .type(SchemaType.PROTOBUF)
+        .timestamp(MockClock.millis())
+        .isDeleted(false)
+        .data("message { required int64 a = 1};".getBytes())
+        .build();
+
+    private SchemaData schema2 = SchemaData.builder()
+        .user(userId)
+        .type(SchemaType.PROTOBUF)
+        .timestamp(MockClock.millis())
+        .isDeleted(false)
+        .data("message { required int64 b = 1};".getBytes())
+        .build();
+
+    private SchemaData schema3 = SchemaData.builder()
+        .user(userId)
+        .type(SchemaType.PROTOBUF)
+        .timestamp(MockClock.millis())
+        .isDeleted(false)
+        .data("message { required int64 c = 1};".getBytes())
+        .build();
+
+    private SchemaRegistryServiceImpl schemaRegistryService;
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        BookkeeperSchemaStorage storage = new BookkeeperSchemaStorage(pulsar);
+        storage.init();
+        storage.start();
+        schemaRegistryService = new SchemaRegistryServiceImpl(storage, 
MockClock);
+    }
+
+    @AfterMethod
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+        schemaRegistryService.close();
+    }
+
+    @Test
+    public void writeReadBackDeleteSchemaEntry() throws Exception {
+        putSchema(schemaId1, schema1, version(0));
+
+        SchemaData latest = getLatestSchema(schemaId1, version(0));
+        assertEquals(schema1, latest);
+
+        deleteSchema(schemaId1, version(1));
+
+        SchemaData latest2 = getLatestSchema(schemaId1, version(1));
+
+        assertTrue(latest2.isDeleted());
+    }
+
+    @Test
+    public void getReturnsTheLastWrittenEntry() throws Exception {
+        putSchema(schemaId1, schema1, version(0));
+        putSchema(schemaId1, schema2, version(1));
+
+        SchemaData latest = getLatestSchema(schemaId1, version(1));
+        assertEquals(schema2, latest);
+
+    }
+
+    @Test
+    public void getByVersionReturnsTheCorrectEntry() throws Exception {
+        putSchema(schemaId1, schema1, version(0));
+        putSchema(schemaId1, schema2, version(1));
+
+        SchemaData version0 = getSchema(schemaId1, version(0));
+        assertEquals(schema1, version0);
+    }
+
+    @Test
+    public void getByVersionReturnsTheCorrectEntry2() throws Exception {
+        putSchema(schemaId1, schema1, version(0));
+        putSchema(schemaId1, schema2, version(1));
+
+        SchemaData version1 = getSchema(schemaId1, version(1));
+        assertEquals(schema2, version1);
+    }
+
+    @Test
+    public void getByVersionReturnsTheCorrectEntry3() throws Exception {
+        putSchema(schemaId1, schema1, version(0));
+
+        SchemaData version1 = getSchema(schemaId1, version(0));
+        assertEquals(schema1, version1);
+    }
+
+    @Test
+    public void addLotsOfEntriesThenDelete() throws Exception {
+        SchemaData randomSchema1 = randomSchema();
+        SchemaData randomSchema2 = randomSchema();
+        SchemaData randomSchema3 = randomSchema();
+        SchemaData randomSchema4 = randomSchema();
+        SchemaData randomSchema5 = randomSchema();
+        SchemaData randomSchema6 = randomSchema();
+        SchemaData randomSchema7 = randomSchema();
+
+        putSchema(schemaId1, randomSchema1, version(0));
+        putSchema(schemaId1, randomSchema2, version(1));
+        putSchema(schemaId1, randomSchema3, version(2));
+        putSchema(schemaId1, randomSchema4, version(3));
+        putSchema(schemaId1, randomSchema5, version(4));
+        putSchema(schemaId1, randomSchema6, version(5));
+        putSchema(schemaId1, randomSchema7, version(6));
+
+        SchemaData version0 = getSchema(schemaId1, version(0));
+        assertEquals(randomSchema1, version0);
+
+        SchemaData version1 = getSchema(schemaId1, version(1));
+        assertEquals(randomSchema2, version1);
+
+        SchemaData version2 = getSchema(schemaId1, version(2));
+        assertEquals(randomSchema3, version2);
+
+        SchemaData version3 = getSchema(schemaId1, version(3));
+        assertEquals(randomSchema4, version3);
+
+        SchemaData version4 = getSchema(schemaId1, version(4));
+        assertEquals(randomSchema5, version4);
+
+        SchemaData version5 = getSchema(schemaId1, version(5));
+        assertEquals(randomSchema6, version5);
+
+        SchemaData version6 = getSchema(schemaId1, version(6));
+        assertEquals(randomSchema7, version6);
+
+        deleteSchema(schemaId1, version(7));
+
+        SchemaData version7 = getSchema(schemaId1, version(7));
+        assertTrue(version7.isDeleted());
+
+    }
+
+    @Test
+    public void writeSchemasToDifferentIds() throws Exception {
+        SchemaData schemaWithDifferentId = schema3;
+
+        putSchema(schemaId1, schema1, version(0));
+        String schemaId2 = "id2";
+        putSchema(schemaId2, schemaWithDifferentId, version(0));
+
+        SchemaData withFirstId = getLatestSchema(schemaId1, version(0));
+        SchemaData withDifferentId = getLatestSchema(schemaId2, version(0));
+
+        assertEquals(schema1, withFirstId);
+        assertEquals(schema3, withDifferentId);
+    }
+
+    @Test
+    public void dontReAddExistingSchemaAtRoot() throws Exception {
+        putSchema(schemaId1, schema1, version(0));
+        putSchema(schemaId1, schema1, version(0));
+        putSchema(schemaId1, schema1, version(0));
+    }
+
+    @Test
+    public void dontReAddExistingSchemaInMiddle() throws Exception {
+        putSchema(schemaId1, randomSchema(), version(0));
+        putSchema(schemaId1, schema2, version(1));
+        putSchema(schemaId1, randomSchema(), version(2));
+        putSchema(schemaId1, randomSchema(), version(3));
+        putSchema(schemaId1, randomSchema(), version(4));
+        putSchema(schemaId1, randomSchema(), version(5));
+        putSchema(schemaId1, schema2, version(1));
+    }
+
+    private void putSchema(String schemaId, SchemaData schema, SchemaVersion 
expectedVersion) throws Exception {
+        CompletableFuture<SchemaVersion> put = 
schemaRegistryService.putSchemaIfAbsent(schemaId, schema);
+        SchemaVersion newVersion = put.get();
+        assertEquals(expectedVersion, newVersion);
+    }
+
+    private SchemaData getLatestSchema(String schemaId, SchemaVersion 
expectedVersion) throws Exception {
+        SchemaRegistry.SchemaAndMetadata schemaAndVersion = 
schemaRegistryService.getSchema(schemaId).get();
+        assertEquals(expectedVersion, schemaAndVersion.version);
+        assertEquals(schemaId, schemaAndVersion.id);
+        return schemaAndVersion.schema;
+    }
+
+    private SchemaData getSchema(String schemaId, SchemaVersion version) 
throws Exception {
+        SchemaRegistry.SchemaAndMetadata schemaAndVersion = 
schemaRegistryService.getSchema(schemaId, version).get();
+        assertEquals(version, schemaAndVersion.version);
+        assertEquals(schemaId, schemaAndVersion.id);
+        return schemaAndVersion.schema;
+    }
+
+    private void deleteSchema(String schemaId, SchemaVersion expectedVersion) 
throws Exception {
+        SchemaVersion version = schemaRegistryService.deleteSchema(schemaId, 
userId).get();
+        assertEquals(expectedVersion, version);
+    }
+
+    private SchemaData randomSchema() {
+        UUID randomString = UUID.randomUUID();
+        return SchemaData.builder()
+            .user(userId)
+            .type(SchemaType.PROTOBUF)
+            .timestamp(MockClock.millis())
+            .isDeleted(false)
+            .data(randomString.toString().getBytes())
+            .build();
+    }
+
+    private SchemaVersion version(long version) {
+        return new LongSchemaVersion(version);
+    }
+}
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaData.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaData.java
index 5a5012c9b..3eb9e6eb7 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaData.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaData.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.common.schema;
 
+import java.util.HashMap;
 import java.util.Map;
 import lombok.Builder;
 import lombok.Data;
@@ -30,5 +31,6 @@
     private final long timestamp;
     private final String user;
     private final byte[] data;
-    private final Map<String, String> props;
+    @Builder.Default
+    public final Map<String, String> props = new HashMap<>();
 }
\ No newline at end of file
diff --git 
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
 
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
index d992c082f..b2715b3f2 100644
--- 
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
+++ 
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
@@ -235,6 +235,27 @@ public Boolean call() throws Exception {
         return getData(path, this, deserializer).map(e -> e.getKey());
     }
 
+    public <T> Optional<Entry<T, Stat>> getEntry(final String path, final 
Deserializer<T> deserializer) throws Exception {
+        return getData(path, this, deserializer);
+    }
+
+    public <T> CompletableFuture<Optional<Entry<T, Stat>>> getEntryAsync(final 
String path, final Deserializer<T> deserializer) {
+        CompletableFuture<Optional<Entry<T, Stat>>> future = new 
CompletableFuture<>();
+        getDataAsync(path, this, deserializer)
+            .thenAccept(future::complete)
+            .exceptionally(ex -> {
+                asyncInvalidate(path);
+                if (ex.getCause() instanceof NoNodeException) {
+                    future.complete(Optional.empty());
+                } else {
+                    future.completeExceptionally(ex.getCause());
+                }
+
+                return null;
+            });
+        return future;
+    }
+
     public <T> CompletableFuture<Optional<T>> getDataAsync(final String path, 
final Deserializer<T> deserializer) {
         CompletableFuture<Optional<T>> future = new CompletableFuture<>();
         getDataAsync(path, this, deserializer).thenAccept(data -> {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to