This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f04eb51 Schema registry (2/N) (#1319)
f04eb51 is described below
commit f04eb517e40ea23779fc7c0264c3537dd7d6671f
Author: Dave Rusek <[email protected]>
AuthorDate: Thu Mar 8 15:35:09 2018 -0700
Schema registry (2/N) (#1319)
* Schema Registry proto changes
* Infrastructure to store schemas
* Renumber schema fields
* Update Pulsar API with schema changes
* Revert field number change
* Fix merge conflict
* Fix broken merge
* Address issues in review
* Add schema type back to proto definition
* Address comments regarding lombok usage
* Remove reserved future enum fields
* regenerate code from protobuf
* Remove unused code
* Add schema version to producer success message
* plumb schema through to producer
* Revert "Add schema version to producer success message"
This reverts commit e7e72f468cf46f1605524a7399520c22763583c9.
* Revert "Revert "Add schema version to producer success message""
This reverts commit 7b902f6bdb1cb054e26577747ff4dd8c326a6248.
* Persist schema on producer connect
* Add principal to schema on publish
* Reformat function for readability
* Remove unused protoc profile
* Rename put on schema registry to putIfAbsent
* wip: address review comments
* switch underscore to slash in schema name
* blah
* Fix protobuf version incompatibility
* Add appropriate license headers
---
pom.xml | 14 +
.../apache/pulsar/broker/ServiceConfiguration.java | 12 +-
pulsar-broker/pom.xml | 6 +
.../org/apache/pulsar/broker/PulsarService.java | 21 +-
.../org/apache/pulsar/broker/service/Producer.java | 23 +-
.../apache/pulsar/broker/service/ServerCnx.java | 122 +-
.../org/apache/pulsar/broker/service/Topic.java | 8 +-
.../service/nonpersistent/NonPersistentTopic.java | 30 +-
.../broker/service/persistent/PersistentTopic.java | 11 +
.../schema/DefaultSchemaRegistryService.java | 57 +
.../broker/service/schema/SchemaRegistry.java | 79 ++
.../service/schema/SchemaRegistryService.java | 46 +
.../service/schema/SchemaRegistryServiceImpl.java | 190 +++
.../broker/service/schema/SchemaStorage.java | 36 +
.../service/schema/SchemaStorageFactory.java | 27 +
.../pulsar/broker/service/schema/StoredSchema.java | 68 +
.../service/schema/proto/SchemaRegistryFormat.java | 1373 ++++++++++++++++++++
.../src/main/proto/SchemaRegistryFormat.proto | 45 +
.../pulsar/broker/service/PersistentTopicTest.java | 15 +-
.../pulsar/broker/service/ServerCnxTest.java | 19 +-
.../apache/pulsar/client/api/ClientErrorsTest.java | 16 +-
.../pulsar/client/api/MockBrokerService.java | 3 +-
.../org/apache/pulsar/common/api/Commands.java | 14 +-
.../org/apache/pulsar/common/naming/TopicName.java | 6 +
.../apache/pulsar/common/schema/EmptyVersion.java | 28 +
.../apache/pulsar/common/schema/LatestVersion.java | 28 +
.../apache/pulsar/common/schema/SchemaData.java | 34 +
.../apache/pulsar/common/schema/SchemaType.java | 23 +
.../apache/pulsar/common/schema/SchemaVersion.java | 26 +
29 files changed, 2281 insertions(+), 99 deletions(-)
diff --git a/pom.xml b/pom.xml
index 081fcdb..7137195 100644
--- a/pom.xml
+++ b/pom.xml
@@ -408,6 +408,20 @@ flexible messaging model and an intuitive client
API.</description>
<type>test-jar</type>
<version>${log4j2.version}</version>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-core</artifactId>
+ <type>test-jar</type>
+ <version>${log4j2.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-api</artifactId>
+ <type>test-jar</type>
+ <version>${log4j2.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index bd47970..cc2a409 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -415,6 +415,8 @@ public class ServiceConfiguration implements
PulsarConfiguration {
@FieldContext(dynamic = true)
private boolean preferLaterVersions = false;
+ private String schemaRegistryStorageClassName =
"org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorageFactory";
+
/**** --- WebSocket --- ****/
// Number of IO threads in Pulsar Client used in WebSocket proxy
private int webSocketNumIoThreads =
Runtime.getRuntime().availableProcessors();
@@ -1449,7 +1451,15 @@ public class ServiceConfiguration implements
PulsarConfiguration {
public void setExposeTopicLevelMetricsInPrometheus(boolean
exposeTopicLevelMetricsInPrometheus) {
this.exposeTopicLevelMetricsInPrometheus =
exposeTopicLevelMetricsInPrometheus;
}
-
+
+ public String getSchemaRegistryStorageClassName() {
+ return schemaRegistryStorageClassName;
+ }
+
+ public void setSchemaRegistryStorageClassName(String className) {
+ schemaRegistryStorageClassName = className;
+ }
+
public boolean authenticateOriginalAuthData() {
return authenticateOriginalAuthData;
}
diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index abead55..8681cad 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -55,6 +55,12 @@
</dependency>
<dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <version>${protobuf2.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-common</artifactId>
<version>${project.version}</version>
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 6bbc1e6..51014a7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -20,6 +20,9 @@ package org.apache.pulsar.broker;
import static
org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
import java.net.URL;
import java.util.List;
@@ -34,7 +37,6 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
-
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
@@ -52,6 +54,7 @@ import
org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.stats.MetricsGenerator;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
import org.apache.pulsar.broker.web.WebService;
@@ -80,11 +83,6 @@ import org.eclipse.jetty.servlet.ServletHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-import io.netty.util.concurrent.DefaultThreadFactory;
-
/**
* Main class for Pulsar broker service
*/
@@ -123,6 +121,7 @@ public class PulsarService implements AutoCloseable {
private final String brokerServiceUrl;
private final String brokerServiceUrlTls;
private final String brokerVersion;
+ private SchemaRegistryService schemaRegistryService = null;
private final Optional<WorkerService> functionWorkerService;
private final MessagingServiceShutdownHook shutdownService;
@@ -233,6 +232,10 @@ public class PulsarService implements AutoCloseable {
loadManager.stop();
}
+ if (schemaRegistryService != null) {
+ schemaRegistryService.close();
+ }
+
state = State.Closed;
} catch (Exception e) {
@@ -359,6 +362,8 @@ public class PulsarService implements AutoCloseable {
this.metricsGenerator = new MetricsGenerator(this);
+ schemaRegistryService = SchemaRegistryService.create(this);
+
state = State.Started;
acquireSLANamespace();
@@ -701,4 +706,8 @@ public class PulsarService implements AutoCloseable {
public String getBrokerVersion() {
return brokerVersion;
}
+
+ public SchemaRegistryService getSchemaRegistryService() {
+ return schemaRegistryService;
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index 9268ac7..35bc315 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -23,12 +23,16 @@ import static
com.scurrilous.circe.checksum.Crc32cIntChecksum.computeChecksum;
import static org.apache.pulsar.common.api.Commands.hasChecksum;
import static org.apache.pulsar.common.api.Commands.readChecksum;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.MoreObjects;
+import io.netty.buffer.ByteBuf;
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
-
import org.apache.bookkeeper.mledger.util.Rate;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import
org.apache.pulsar.broker.service.BrokerServiceException.TopicClosedException;
@@ -42,17 +46,11 @@ import
org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.NonPersistentPublisherStats;
import org.apache.pulsar.common.policies.data.PublisherStats;
+import org.apache.pulsar.common.schema.SchemaVersion;
import org.apache.pulsar.common.util.DateFormatter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.MoreObjects;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.util.Recycler;
-import io.netty.util.Recycler.Handle;
-
/**
* Represents a currently connected producer
*/
@@ -82,8 +80,10 @@ public class Producer {
private final Map<String, String> metadata;
+ private final SchemaVersion schemaVersion;
+
public Producer(Topic topic, ServerCnx cnx, long producerId, String
producerName, String appId,
- boolean isEncrypted, Map<String, String> metadata) {
+ boolean isEncrypted, Map<String, String> metadata, SchemaVersion
schemaVersion) {
this.topic = topic;
this.cnx = cnx;
this.producerId = producerId;
@@ -110,6 +110,7 @@ public class Producer {
this.remoteCluster = isRemote ? producerName.split("\\.")[2] : null;
this.isEncrypted = isEncrypted;
+ this.schemaVersion = schemaVersion;
}
@Override
@@ -492,6 +493,10 @@ public class Producer {
}
}
+ public SchemaVersion getSchemaVersion() {
+ return schemaVersion;
+ }
+
private static final Logger log = LoggerFactory.getLogger(Producer.class);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index a386ece..e0ff275 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -25,6 +25,12 @@ import static
org.apache.pulsar.broker.lookup.TopicLookup.lookupTopicAsync;
import static org.apache.pulsar.common.api.Commands.newLookupErrorResponse;
import static org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion.v5;
+import com.google.protobuf.GeneratedMessageLite;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOption;
+import io.netty.handler.ssl.SslHandler;
import java.net.SocketAddress;
import java.util.List;
import java.util.Map;
@@ -32,6 +38,7 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;
import org.apache.bookkeeper.mledger.Position;
@@ -52,6 +59,7 @@ import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.api.CommandUtils;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.PulsarHandler;
+import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseConsumer;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseProducer;
@@ -74,24 +82,18 @@ import
org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
-import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.naming.Metadata;
import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ConsumerStats;
+import org.apache.pulsar.common.schema.SchemaData;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.common.schema.SchemaVersion;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Sets;
-import com.google.protobuf.GeneratedMessageLite;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelOption;
-import io.netty.handler.ssl.SslHandler;
-
public class ServerCnx extends PulsarHandler {
private final BrokerService service;
private final ConcurrentLongHashMap<CompletableFuture<Producer>> producers;
@@ -697,6 +699,36 @@ public class ServerCnx extends PulsarHandler {
});
}
+ private static SchemaType getType(PulsarApi.Schema.Type protocolType) {
+ switch (protocolType) {
+ case Json:
+ return SchemaType.JSON;
+ case Avro:
+ return SchemaType.AVRO;
+ case Thrift:
+ return SchemaType.THRIFT;
+ case Protobuf:
+ return SchemaType.PROTOBUF;
+ default:
+ return SchemaType.NONE;
+ }
+ }
+
+ private SchemaData getSchema(PulsarApi.Schema protocolSchema) {
+ return SchemaData.builder()
+ .data(protocolSchema.getSchemaData().toByteArray())
+ .isDeleted(false)
+ .timestamp(System.currentTimeMillis())
+ .user(originalPrincipal)
+ .type(getType(protocolSchema.getType()))
+ .props(protocolSchema.getPropertiesList().stream().collect(
+ Collectors.toMap(
+ PulsarApi.KeyValue::getKey,
+ PulsarApi.KeyValue::getValue
+ )
+ )).build();
+ }
+
@Override
protected void handleProducer(final CommandProducer cmdProducer) {
checkArgument(state == State.Connected);
@@ -752,7 +784,8 @@ public class ServerCnx extends PulsarHandler {
Producer producer =
existingProducerFuture.getNow(null);
log.info("[{}] Producer with the same id is
already created: {}", remoteAddress,
producer);
-
ctx.writeAndFlush(Commands.newProducerSuccess(requestId,
producer.getProducerName()));
+
ctx.writeAndFlush(Commands.newProducerSuccess(requestId,
producer.getProducerName(),
+ producer.getSchemaVersion()));
return null;
} else {
// There was an early request to create a
producer with
@@ -805,41 +838,56 @@ public class ServerCnx extends PulsarHandler {
disableTcpNoDelayIfNeeded(topicName.toString(),
producerName);
- Producer producer = new Producer(topic,
ServerCnx.this, producerId, producerName, authRole,
- isEncrypted, metadata);
+ CompletableFuture<SchemaVersion>
schemaVersionFuture;
+ if (cmdProducer.hasSchema()) {
+ schemaVersionFuture =
topic.addSchema(getSchema(cmdProducer.getSchema()));
+ } else {
+ schemaVersionFuture =
CompletableFuture.completedFuture(SchemaVersion.Empty);
+ }
+
+ schemaVersionFuture.exceptionally(exception -> {
+ ctx.writeAndFlush(Commands.newError(requestId,
ServerError.UnknownError, exception.getMessage()));
+ producers.remove(producerId, producerFuture);
+ return null;
+ });
+
+ schemaVersionFuture.thenAccept(schemaVersion -> {
+ Producer producer = new Producer(topic,
ServerCnx.this, producerId, producerName, authRole,
+ isEncrypted, metadata, schemaVersion);
- try {
- topic.addProducer(producer);
+ try {
+ topic.addProducer(producer);
- if (isActive()) {
- if (producerFuture.complete(producer)) {
- log.info("[{}] Created new producer:
{}", remoteAddress, producer);
-
ctx.writeAndFlush(Commands.newProducerSuccess(requestId, producerName,
- producer.getLastSequenceId()));
- return;
+ if (isActive()) {
+ if (producerFuture.complete(producer))
{
+ log.info("[{}] Created new
producer: {}", remoteAddress, producer);
+
ctx.writeAndFlush(Commands.newProducerSuccess(requestId, producerName,
+ producer.getLastSequenceId(),
producer.getSchemaVersion()));
+ return;
+ } else {
+ // The producer's future was
completed before by
+ // a close command
+ producer.closeNow();
+ log.info("[{}] Cleared producer
created after timeout on client side {}",
+ remoteAddress, producer);
+ }
} else {
- // The producer's future was completed
before by
- // a close command
producer.closeNow();
- log.info("[{}] Cleared producer
created after timeout on client side {}",
- remoteAddress, producer);
- }
- } else {
- producer.closeNow();
- log.info("[{}] Cleared producer created
after connection was closed: {}",
+ log.info("[{}] Cleared producer
created after connection was closed: {}",
remoteAddress, producer);
- producerFuture.completeExceptionally(
+ producerFuture.completeExceptionally(
new
IllegalStateException("Producer created after connection was closed"));
- }
- } catch (BrokerServiceException ise) {
- log.error("[{}] Failed to add producer to
topic {}: {}", remoteAddress, topicName,
+ }
+ } catch (BrokerServiceException ise) {
+ log.error("[{}] Failed to add producer to
topic {}: {}", remoteAddress, topicName,
ise.getMessage());
- ctx.writeAndFlush(Commands.newError(requestId,
+
ctx.writeAndFlush(Commands.newError(requestId,
BrokerServiceException.getClientErrorCode(ise), ise.getMessage()));
- producerFuture.completeExceptionally(ise);
- }
+ producerFuture.completeExceptionally(ise);
+ }
- producers.remove(producerId, producerFuture);
+ producers.remove(producerId, producerFuture);
+ });
}).exceptionally(exception -> {
Throwable cause = exception.getCause();
if (!(cause instanceof
ServiceUnitNotReadyException)) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index 80aed77..be60116 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -18,9 +18,9 @@
*/
package org.apache.pulsar.broker.service;
+import io.netty.buffer.ByteBuf;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
-
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.broker.stats.NamespaceStats;
@@ -30,13 +30,13 @@ import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.PersistentTopicStats;
import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.schema.SchemaData;
+import org.apache.pulsar.common.schema.SchemaVersion;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.utils.StatsOutputStream;
-import io.netty.buffer.ByteBuf;
-
public interface Topic {
public interface PublishContext {
@@ -125,4 +125,6 @@ public interface Topic {
PersistentTopicInternalStats getInternalStats();
Position getLastMessageId();
+
+ CompletableFuture<SchemaVersion> addSchema(SchemaData schema);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index ef355f0..4a6de05 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -22,6 +22,13 @@ import static
com.google.common.base.Preconditions.checkArgument;
import static org.apache.bookkeeper.mledger.impl.EntryCacheManager.create;
import static
org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
+import com.carrotsearch.hppc.ObjectObjectHashMap;
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import io.netty.buffer.ByteBuf;
+import io.netty.util.concurrent.FastThreadLocal;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -32,7 +39,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
@@ -70,6 +76,8 @@ import
org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import
org.apache.pulsar.common.policies.data.PersistentTopicInternalStats.CursorStats;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublisherStats;
+import org.apache.pulsar.common.schema.SchemaData;
+import org.apache.pulsar.common.schema.SchemaVersion;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
@@ -79,15 +87,6 @@ import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.carrotsearch.hppc.ObjectObjectHashMap;
-import com.google.common.base.MoreObjects;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.util.concurrent.FastThreadLocal;
-
public class NonPersistentTopic implements Topic {
private final String topic;
@@ -973,7 +972,14 @@ public class NonPersistentTopic implements Topic {
this.hasBatchMessagePublished = true;
}
-
-
private static final Logger log =
LoggerFactory.getLogger(NonPersistentTopic.class);
+
+ @Override
+ public CompletableFuture<SchemaVersion> addSchema(SchemaData schema) {
+ String base = TopicName.get(getName()).getPartitionedTopicName();
+ String id = TopicName.get(base).getSchemaName();
+ return brokerService.pulsar()
+ .getSchemaRegistryService()
+ .putSchemaIfAbsent(id, schema);
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 62017f0..cdca1cd 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -92,6 +92,8 @@ import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublisherStats;
import org.apache.pulsar.common.policies.data.ReplicatorStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
+import org.apache.pulsar.common.schema.SchemaData;
+import org.apache.pulsar.common.schema.SchemaVersion;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.FutureUtil;
@@ -1601,4 +1603,13 @@ public class PersistentTopic implements Topic,
AddEntryCallback {
}
private static final Logger log =
LoggerFactory.getLogger(PersistentTopic.class);
+
+ @Override
+ public CompletableFuture<SchemaVersion> addSchema(SchemaData schema) {
+ String base = TopicName.get(getName()).getPartitionedTopicName();
+ String id = TopicName.get(base).getSchemaName();
+ return brokerService.pulsar()
+ .getSchemaRegistryService()
+ .putSchemaIfAbsent(id, schema);
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java
new file mode 100644
index 0000000..db3b9f7
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.schema;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.common.schema.SchemaData;
+import org.apache.pulsar.common.schema.SchemaVersion;
+
+public class DefaultSchemaRegistryService implements SchemaRegistryService {
+ @Override
+ public CompletableFuture<SchemaAndMetadata> getSchema(String schemaId) {
+ return completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<SchemaAndMetadata> getSchema(String schemaId,
SchemaVersion version) {
+ return completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<SchemaVersion> putSchemaIfAbsent(String schemaId,
SchemaData schema) {
+ return completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<SchemaVersion> deleteSchema(String schemaId,
String user) {
+ return completedFuture(null);
+ }
+
+ @Override
+ public SchemaVersion versionFromBytes(byte[] version) {
+ return null;
+ }
+
+ @Override
+ public void close() throws Exception {
+
+ }
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java
new file mode 100644
index 0000000..4dfbd6d
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.schema;
+
+import com.google.common.base.MoreObjects;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.common.schema.SchemaData;
+import org.apache.pulsar.common.schema.SchemaVersion;
+
+public interface SchemaRegistry extends AutoCloseable {
+
+ CompletableFuture<SchemaAndMetadata> getSchema(String schemaId);
+
+ CompletableFuture<SchemaAndMetadata> getSchema(String schemaId,
SchemaVersion version);
+
+ CompletableFuture<SchemaVersion> putSchemaIfAbsent(String schemaId,
SchemaData schema);
+
+ CompletableFuture<SchemaVersion> deleteSchema(String schemaId, String
user);
+
+ SchemaVersion versionFromBytes(byte[] version);
+
+ class SchemaAndMetadata {
+ public final String id;
+ public final SchemaData schema;
+ public final SchemaVersion version;
+
+ SchemaAndMetadata(String id, SchemaData schema, SchemaVersion version)
{
+ this.id = id;
+ this.schema = schema;
+ this.version = version;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ SchemaAndMetadata that = (SchemaAndMetadata) o;
+ return version == that.version &&
+ Objects.equals(id, that.id) &&
+ Objects.equals(schema, that.schema);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(id, schema, version);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("id", id)
+ .add("schema", schema)
+ .add("version", version)
+ .toString();
+ }
+ }
+
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java
new file mode 100644
index 0000000..69e7364
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.schema;
+
+import java.lang.reflect.Method;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public interface SchemaRegistryService extends SchemaRegistry {
+ String CreateMethodName = "create";
+ Logger log = LoggerFactory.getLogger(SchemaRegistryService.class);
+
+ static SchemaRegistryService create(PulsarService pulsar) {
+ try {
+ ServiceConfiguration config = pulsar.getConfiguration();
+ final Class<?> storageClass =
Class.forName(config.getSchemaRegistryStorageClassName());
+ Object factoryInstance = storageClass.newInstance();
+ Method createMethod = storageClass.getMethod(CreateMethodName,
PulsarService.class);
+ SchemaStorage schemaStorage = (SchemaStorage)
createMethod.invoke(factoryInstance, pulsar);
+ return new SchemaRegistryServiceImpl(schemaStorage);
+ } catch (Exception e) {
+ log.warn("Error when trying to create scehema registry storage:
{}", e);
+ }
+ return new DefaultSchemaRegistryService();
+ }
+
+ void close() throws Exception;
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
new file mode 100644
index 0000000..1aec039
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.schema;
+
+import static java.util.Objects.isNull;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static
org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl.Functions.toPairs;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.time.Clock;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import javax.validation.constraints.NotNull;
+import org.apache.pulsar.broker.service.schema.proto.SchemaRegistryFormat;
+import org.apache.pulsar.common.schema.SchemaData;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.common.schema.SchemaVersion;
+
+public class SchemaRegistryServiceImpl implements SchemaRegistryService {
+ private final SchemaStorage schemaStorage;
+ private final Clock clock;
+
+ @VisibleForTesting
+ SchemaRegistryServiceImpl(SchemaStorage schemaStorage, Clock clock) {
+ this.schemaStorage = schemaStorage;
+ this.clock = clock;
+ }
+
+ @VisibleForTesting
+ SchemaRegistryServiceImpl(SchemaStorage schemaStorage) {
+ this(schemaStorage, Clock.systemUTC());
+ }
+
+ @Override
+ @NotNull
+ public CompletableFuture<SchemaAndMetadata> getSchema(String schemaId) {
+ return getSchema(schemaId, SchemaVersion.Latest);
+ }
+
+ @Override
+ @NotNull
+ public CompletableFuture<SchemaAndMetadata> getSchema(String schemaId,
SchemaVersion version) {
+ return schemaStorage.get(schemaId, version).thenCompose(stored -> {
+ if (isNull(stored)) {
+ return completedFuture(null);
+ } else {
+ return Functions.bytesToSchemaInfo(stored.data)
+ .thenApply(Functions::schemaInfoToSchema)
+ .thenApply(schema -> new SchemaAndMetadata(schemaId,
schema, stored.version));
+ }
+ }
+ );
+ }
+
+ @Override
+ @NotNull
+ public CompletableFuture<SchemaVersion> putSchemaIfAbsent(String schemaId,
SchemaData schema) {
+ SchemaRegistryFormat.SchemaInfo info =
SchemaRegistryFormat.SchemaInfo.newBuilder()
+ .setType(Functions.convertFromDomainType(schema.getType()))
+ .setSchema(ByteString.copyFrom(schema.getData()))
+ .setSchemaId(schemaId)
+ .setUser(schema.getUser())
+ .setDeleted(false)
+ .setTimestamp(clock.millis())
+ .addAllProps(toPairs(schema.getProps()))
+ .build();
+ return schemaStorage.put(schemaId, info.toByteArray());
+ }
+
+ @Override
+ @NotNull
+ public CompletableFuture<SchemaVersion> deleteSchema(String schemaId,
String user) {
+ byte[] deletedEntry = deleted(schemaId, user).toByteArray();
+ return schemaStorage.put(schemaId, deletedEntry);
+ }
+
+ @Override
+ public SchemaVersion versionFromBytes(byte[] version) {
+ return schemaStorage.versionFromBytes(version);
+ }
+
+ @Override
+ public void close() throws Exception {
+ schemaStorage.close();
+ }
+
+ private SchemaRegistryFormat.SchemaInfo deleted(String schemaId, String
user) {
+ return SchemaRegistryFormat.SchemaInfo.newBuilder()
+ .setSchemaId(schemaId)
+ .setType(SchemaRegistryFormat.SchemaInfo.SchemaType.NONE)
+ .setSchema(ByteString.EMPTY)
+ .setUser(user)
+ .setDeleted(true)
+ .setTimestamp(clock.millis())
+ .build();
+ }
+
+ interface Functions {
+ static SchemaType
convertToDomainType(SchemaRegistryFormat.SchemaInfo.SchemaType type) {
+ switch (type) {
+ case AVRO:
+ return SchemaType.AVRO;
+ case JSON:
+ return SchemaType.JSON;
+ case PROTO:
+ return SchemaType.PROTOBUF;
+ case THRIFT:
+ return SchemaType.THRIFT;
+ default:
+ return SchemaType.NONE;
+ }
+ }
+
+ static SchemaRegistryFormat.SchemaInfo.SchemaType
convertFromDomainType(SchemaType type) {
+ switch (type) {
+ case AVRO:
+ return SchemaRegistryFormat.SchemaInfo.SchemaType.AVRO;
+ case JSON:
+ return SchemaRegistryFormat.SchemaInfo.SchemaType.JSON;
+ case THRIFT:
+ return SchemaRegistryFormat.SchemaInfo.SchemaType.THRIFT;
+ case PROTOBUF:
+ return SchemaRegistryFormat.SchemaInfo.SchemaType.PROTO;
+ default:
+ return SchemaRegistryFormat.SchemaInfo.SchemaType.NONE;
+ }
+ }
+
+ static Map<String, String>
toMap(List<SchemaRegistryFormat.SchemaInfo.KeyValuePair> pairs) {
+ Map<String, String> map = new HashMap<>();
+ for (SchemaRegistryFormat.SchemaInfo.KeyValuePair pair : pairs) {
+ map.put(pair.getKey(), pair.getValue());
+ }
+ return map;
+ }
+
+ static List<SchemaRegistryFormat.SchemaInfo.KeyValuePair>
toPairs(Map<String, String> map) {
+ List<SchemaRegistryFormat.SchemaInfo.KeyValuePair> pairs = new
ArrayList<>(map.size());
+ for (Map.Entry<String, String> entry : map.entrySet()) {
+ SchemaRegistryFormat.SchemaInfo.KeyValuePair.Builder builder =
+ SchemaRegistryFormat.SchemaInfo.KeyValuePair.newBuilder();
+
pairs.add(builder.setKey(entry.getKey()).setValue(entry.getValue()).build());
+ }
+ return pairs;
+ }
+
+ static SchemaData schemaInfoToSchema(SchemaRegistryFormat.SchemaInfo
info) {
+ return SchemaData.builder()
+ .user(info.getUser())
+ .type(convertToDomainType(info.getType()))
+ .data(info.getSchema().toByteArray())
+ .isDeleted(info.getDeleted())
+ .props(toMap(info.getPropsList()))
+ .build();
+ }
+
+ static CompletableFuture<SchemaRegistryFormat.SchemaInfo>
bytesToSchemaInfo(byte[] bytes) {
+ CompletableFuture<SchemaRegistryFormat.SchemaInfo> future;
+ try {
+ future =
completedFuture(SchemaRegistryFormat.SchemaInfo.parseFrom(bytes));
+ } catch (InvalidProtocolBufferException e) {
+ future = new CompletableFuture<>();
+ future.completeExceptionally(e);
+ }
+ return future;
+ }
+ }
+
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorage.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorage.java
new file mode 100644
index 0000000..4d0d8af
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorage.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.schema;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.common.schema.SchemaVersion;
+
+public interface SchemaStorage {
+
+ CompletableFuture<SchemaVersion> put(String key, byte[] value);
+
+ CompletableFuture<StoredSchema> get(String key, SchemaVersion version);
+
+ CompletableFuture<SchemaVersion> delete(String key);
+
+ SchemaVersion versionFromBytes(byte[] version);
+
+ void close() throws Exception;
+
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorageFactory.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorageFactory.java
new file mode 100644
index 0000000..c4cff34
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorageFactory.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.schema;
+
+import javax.validation.constraints.NotNull;
+import org.apache.pulsar.broker.PulsarService;
+
+public interface SchemaStorageFactory {
+ @NotNull
+ SchemaStorage create(PulsarService pulsar) throws Exception;
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/StoredSchema.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/StoredSchema.java
new file mode 100644
index 0000000..f28a707
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/StoredSchema.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.schema;
+
+import com.google.common.base.MoreObjects;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.pulsar.common.schema.SchemaVersion;
+
+public class StoredSchema {
+ public final byte[] data;
+ public final SchemaVersion version;
+ public final Map<String, String> metadata;
+
+ public StoredSchema(byte[] data, SchemaVersion version, Map<String,
String> metadata) {
+ this.data = data;
+ this.version = version;
+ this.metadata = metadata;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ StoredSchema that = (StoredSchema) o;
+ return Arrays.equals(data, that.data) &&
+ Objects.equals(version, that.version) &&
+ Objects.equals(metadata, that.metadata);
+ }
+
+ @Override
+ public int hashCode() {
+
+ int result = Objects.hash(version, metadata);
+ result = 31 * result + Arrays.hashCode(data);
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("data", data)
+ .add("version", version)
+ .add("metadata", metadata)
+ .toString();
+ }
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/proto/SchemaRegistryFormat.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/proto/SchemaRegistryFormat.java
new file mode 100644
index 0000000..3922731
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/proto/SchemaRegistryFormat.java
@@ -0,0 +1,1373 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+// Generated by the protocol buffer compiler. DO NOT EDIT!
+// source: src/main/proto/SchemaRegistryFormat.proto
+
+package org.apache.pulsar.broker.service.schema.proto;
+
+public final class SchemaRegistryFormat {
+ private SchemaRegistryFormat() {}
+ public static void registerAllExtensions(
+ com.google.protobuf.ExtensionRegistryLite registry) {
+ }
+ public interface SchemaInfoOrBuilder
+ extends com.google.protobuf.MessageLiteOrBuilder {
+
+ // required string schema_id = 1;
+ boolean hasSchemaId();
+ String getSchemaId();
+
+ // required string user = 2;
+ boolean hasUser();
+ String getUser();
+
+ // required .pulsar.schema.SchemaInfo.SchemaType type = 3;
+ boolean hasType();
+ SchemaRegistryFormat.SchemaInfo.SchemaType getType();
+
+ // required bytes schema = 4;
+ boolean hasSchema();
+ com.google.protobuf.ByteString getSchema();
+
+ // required int64 timestamp = 5;
+ boolean hasTimestamp();
+ long getTimestamp();
+
+ // required bool deleted = 6;
+ boolean hasDeleted();
+ boolean getDeleted();
+
+ // repeated .pulsar.schema.SchemaInfo.KeyValuePair props = 7;
+ java.util.List<SchemaRegistryFormat.SchemaInfo.KeyValuePair>
+ getPropsList();
+ SchemaRegistryFormat.SchemaInfo.KeyValuePair getProps(int index);
+ int getPropsCount();
+ }
+ public static final class SchemaInfo extends
+ com.google.protobuf.GeneratedMessageLite
+ implements SchemaInfoOrBuilder {
+ // Use SchemaInfo.newBuilder() to construct.
+ private SchemaInfo(Builder builder) {
+ super(builder);
+ }
+ private SchemaInfo(boolean noInit) {}
+
+ private static final SchemaInfo defaultInstance;
+ public static SchemaInfo getDefaultInstance() {
+ return defaultInstance;
+ }
+
+ public SchemaInfo getDefaultInstanceForType() {
+ return defaultInstance;
+ }
+
+ public enum SchemaType
+ implements com.google.protobuf.Internal.EnumLite {
+ NONE(0, 1),
+ THRIFT(1, 2),
+ AVRO(2, 3),
+ JSON(3, 4),
+ PROTO(4, 5),
+ ;
+
+ public static final int NONE_VALUE = 1;
+ public static final int THRIFT_VALUE = 2;
+ public static final int AVRO_VALUE = 3;
+ public static final int JSON_VALUE = 4;
+ public static final int PROTO_VALUE = 5;
+
+
+ public final int getNumber() { return value; }
+
+ public static SchemaType valueOf(int value) {
+ switch (value) {
+ case 1: return NONE;
+ case 2: return THRIFT;
+ case 3: return AVRO;
+ case 4: return JSON;
+ case 5: return PROTO;
+ default: return null;
+ }
+ }
+
+ public static com.google.protobuf.Internal.EnumLiteMap<SchemaType>
+ internalGetValueMap() {
+ return internalValueMap;
+ }
+ private static com.google.protobuf.Internal.EnumLiteMap<SchemaType>
+ internalValueMap =
+ new com.google.protobuf.Internal.EnumLiteMap<SchemaType>() {
+ public SchemaType findValueByNumber(int number) {
+ return SchemaType.valueOf(number);
+ }
+ };
+
+ private final int value;
+
+ private SchemaType(int index, int value) {
+ this.value = value;
+ }
+
+ //
@@protoc_insertion_point(enum_scope:pulsar.schema.SchemaInfo.SchemaType)
+ }
+
+ public interface KeyValuePairOrBuilder
+ extends com.google.protobuf.MessageLiteOrBuilder {
+
+ // required string key = 1;
+ boolean hasKey();
+ String getKey();
+
+ // required string value = 2;
+ boolean hasValue();
+ String getValue();
+ }
+ public static final class KeyValuePair extends
+ com.google.protobuf.GeneratedMessageLite
+ implements KeyValuePairOrBuilder {
+ // Use KeyValuePair.newBuilder() to construct.
+ private KeyValuePair(Builder builder) {
+ super(builder);
+ }
+ private KeyValuePair(boolean noInit) {}
+
+ private static final KeyValuePair defaultInstance;
+ public static KeyValuePair getDefaultInstance() {
+ return defaultInstance;
+ }
+
+ public KeyValuePair getDefaultInstanceForType() {
+ return defaultInstance;
+ }
+
+ private int bitField0_;
+ // required string key = 1;
+ public static final int KEY_FIELD_NUMBER = 1;
+ private java.lang.Object key_;
+ public boolean hasKey() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ public String getKey() {
+ java.lang.Object ref = key_;
+ if (ref instanceof String) {
+ return (String) ref;
+ } else {
+ com.google.protobuf.ByteString bs =
+ (com.google.protobuf.ByteString) ref;
+ String s = bs.toStringUtf8();
+ if (com.google.protobuf.Internal.isValidUtf8(bs)) {
+ key_ = s;
+ }
+ return s;
+ }
+ }
+ private com.google.protobuf.ByteString getKeyBytes() {
+ java.lang.Object ref = key_;
+ if (ref instanceof String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8((String) ref);
+ key_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+
+ // required string value = 2;
+ public static final int VALUE_FIELD_NUMBER = 2;
+ private java.lang.Object value_;
+ public boolean hasValue() {
+ return ((bitField0_ & 0x00000002) == 0x00000002);
+ }
+ public String getValue() {
+ java.lang.Object ref = value_;
+ if (ref instanceof String) {
+ return (String) ref;
+ } else {
+ com.google.protobuf.ByteString bs =
+ (com.google.protobuf.ByteString) ref;
+ String s = bs.toStringUtf8();
+ if (com.google.protobuf.Internal.isValidUtf8(bs)) {
+ value_ = s;
+ }
+ return s;
+ }
+ }
+ private com.google.protobuf.ByteString getValueBytes() {
+ java.lang.Object ref = value_;
+ if (ref instanceof String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8((String) ref);
+ value_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+
+ private void initFields() {
+ key_ = "";
+ value_ = "";
+ }
+ private byte memoizedIsInitialized = -1;
+ public final boolean isInitialized() {
+ byte isInitialized = memoizedIsInitialized;
+ if (isInitialized != -1) return isInitialized == 1;
+
+ if (!hasKey()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ if (!hasValue()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ memoizedIsInitialized = 1;
+ return true;
+ }
+
+ public void writeTo(com.google.protobuf.CodedOutputStream output)
+ throws java.io.IOException {
+ getSerializedSize();
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ output.writeBytes(1, getKeyBytes());
+ }
+ if (((bitField0_ & 0x00000002) == 0x00000002)) {
+ output.writeBytes(2, getValueBytes());
+ }
+ }
+
+ private int memoizedSerializedSize = -1;
+ public int getSerializedSize() {
+ int size = memoizedSerializedSize;
+ if (size != -1) return size;
+
+ size = 0;
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBytesSize(1, getKeyBytes());
+ }
+ if (((bitField0_ & 0x00000002) == 0x00000002)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBytesSize(2, getValueBytes());
+ }
+ memoizedSerializedSize = size;
+ return size;
+ }
+
+ private static final long serialVersionUID = 0L;
+ @java.lang.Override
+ protected java.lang.Object writeReplace()
+ throws java.io.ObjectStreamException {
+ return super.writeReplace();
+ }
+
+ public static SchemaRegistryFormat.SchemaInfo.KeyValuePair parseFrom(
+ com.google.protobuf.ByteString data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return newBuilder().mergeFrom(data).buildParsed();
+ }
+ public static SchemaRegistryFormat.SchemaInfo.KeyValuePair parseFrom(
+ com.google.protobuf.ByteString data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return newBuilder().mergeFrom(data, extensionRegistry)
+ .buildParsed();
+ }
+ public static SchemaRegistryFormat.SchemaInfo.KeyValuePair
parseFrom(byte[] data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return newBuilder().mergeFrom(data).buildParsed();
+ }
+ public static SchemaRegistryFormat.SchemaInfo.KeyValuePair parseFrom(
+ byte[] data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return newBuilder().mergeFrom(data, extensionRegistry)
+ .buildParsed();
+ }
+ public static SchemaRegistryFormat.SchemaInfo.KeyValuePair
parseFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input).buildParsed();
+ }
+ public static SchemaRegistryFormat.SchemaInfo.KeyValuePair parseFrom(
+ java.io.InputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input, extensionRegistry)
+ .buildParsed();
+ }
+ public static SchemaRegistryFormat.SchemaInfo.KeyValuePair
parseDelimitedFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ Builder builder = newBuilder();
+ if (builder.mergeDelimitedFrom(input)) {
+ return builder.buildParsed();
+ } else {
+ return null;
+ }
+ }
+ public static SchemaRegistryFormat.SchemaInfo.KeyValuePair
parseDelimitedFrom(
+ java.io.InputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ Builder builder = newBuilder();
+ if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
+ return builder.buildParsed();
+ } else {
+ return null;
+ }
+ }
+ public static SchemaRegistryFormat.SchemaInfo.KeyValuePair parseFrom(
+ com.google.protobuf.CodedInputStream input)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input).buildParsed();
+ }
+ public static SchemaRegistryFormat.SchemaInfo.KeyValuePair parseFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input, extensionRegistry)
+ .buildParsed();
+ }
+
+ public static Builder newBuilder() { return Builder.create(); }
+ public Builder newBuilderForType() { return newBuilder(); }
+ public static Builder
newBuilder(SchemaRegistryFormat.SchemaInfo.KeyValuePair prototype) {
+ return newBuilder().mergeFrom(prototype);
+ }
+ public Builder toBuilder() { return newBuilder(this); }
+
+ public static final class Builder extends
+ com.google.protobuf.GeneratedMessageLite.Builder<
+ SchemaRegistryFormat.SchemaInfo.KeyValuePair, Builder>
+ implements SchemaRegistryFormat.SchemaInfo.KeyValuePairOrBuilder {
+ // Construct using
org.apache.pulsar.broker.service.schema.proto.SchemaRegistryFormat.SchemaInfo.KeyValuePair.newBuilder()
+ private Builder() {
+ maybeForceBuilderInitialization();
+ }
+
+ private void maybeForceBuilderInitialization() {
+ }
+ private static Builder create() {
+ return new Builder();
+ }
+
+ public Builder clear() {
+ super.clear();
+ key_ = "";
+ bitField0_ = (bitField0_ & ~0x00000001);
+ value_ = "";
+ bitField0_ = (bitField0_ & ~0x00000002);
+ return this;
+ }
+
+ public Builder clone() {
+ return create().mergeFrom(buildPartial());
+ }
+
+ public SchemaRegistryFormat.SchemaInfo.KeyValuePair
getDefaultInstanceForType() {
+ return
SchemaRegistryFormat.SchemaInfo.KeyValuePair.getDefaultInstance();
+ }
+
+ public SchemaRegistryFormat.SchemaInfo.KeyValuePair build() {
+ SchemaRegistryFormat.SchemaInfo.KeyValuePair result = buildPartial();
+ if (!result.isInitialized()) {
+ throw newUninitializedMessageException(result);
+ }
+ return result;
+ }
+
+ private SchemaRegistryFormat.SchemaInfo.KeyValuePair buildParsed()
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ SchemaRegistryFormat.SchemaInfo.KeyValuePair result = buildPartial();
+ if (!result.isInitialized()) {
+ throw newUninitializedMessageException(
+ result).asInvalidProtocolBufferException();
+ }
+ return result;
+ }
+
+ public SchemaRegistryFormat.SchemaInfo.KeyValuePair buildPartial() {
+ SchemaRegistryFormat.SchemaInfo.KeyValuePair result = new
SchemaRegistryFormat.SchemaInfo.KeyValuePair(this);
+ int from_bitField0_ = bitField0_;
+ int to_bitField0_ = 0;
+ if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+ to_bitField0_ |= 0x00000001;
+ }
+ result.key_ = key_;
+ if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+ to_bitField0_ |= 0x00000002;
+ }
+ result.value_ = value_;
+ result.bitField0_ = to_bitField0_;
+ return result;
+ }
+
+ public Builder mergeFrom(SchemaRegistryFormat.SchemaInfo.KeyValuePair
other) {
+ if (other ==
SchemaRegistryFormat.SchemaInfo.KeyValuePair.getDefaultInstance()) return this;
+ if (other.hasKey()) {
+ setKey(other.getKey());
+ }
+ if (other.hasValue()) {
+ setValue(other.getValue());
+ }
+ return this;
+ }
+
+ public final boolean isInitialized() {
+ if (!hasKey()) {
+
+ return false;
+ }
+ if (!hasValue()) {
+
+ return false;
+ }
+ return true;
+ }
+
+ public Builder mergeFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ while (true) {
+ int tag = input.readTag();
+ switch (tag) {
+ case 0:
+
+ return this;
+ default: {
+ if (!parseUnknownField(input, extensionRegistry, tag)) {
+
+ return this;
+ }
+ break;
+ }
+ case 10: {
+ bitField0_ |= 0x00000001;
+ key_ = input.readBytes();
+ break;
+ }
+ case 18: {
+ bitField0_ |= 0x00000002;
+ value_ = input.readBytes();
+ break;
+ }
+ }
+ }
+ }
+
+ private int bitField0_;
+
+ // required string key = 1;
+ private java.lang.Object key_ = "";
+ public boolean hasKey() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ public String getKey() {
+ java.lang.Object ref = key_;
+ if (!(ref instanceof String)) {
+ String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();
+ key_ = s;
+ return s;
+ } else {
+ return (String) ref;
+ }
+ }
+ public Builder setKey(String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000001;
+ key_ = value;
+
+ return this;
+ }
+ public Builder clearKey() {
+ bitField0_ = (bitField0_ & ~0x00000001);
+ key_ = getDefaultInstance().getKey();
+
+ return this;
+ }
+ void setKey(com.google.protobuf.ByteString value) {
+ bitField0_ |= 0x00000001;
+ key_ = value;
+
+ }
+
+ // required string value = 2;
+ private java.lang.Object value_ = "";
+ public boolean hasValue() {
+ return ((bitField0_ & 0x00000002) == 0x00000002);
+ }
+ public String getValue() {
+ java.lang.Object ref = value_;
+ if (!(ref instanceof String)) {
+ String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();
+ value_ = s;
+ return s;
+ } else {
+ return (String) ref;
+ }
+ }
+ public Builder setValue(String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000002;
+ value_ = value;
+
+ return this;
+ }
+ public Builder clearValue() {
+ bitField0_ = (bitField0_ & ~0x00000002);
+ value_ = getDefaultInstance().getValue();
+
+ return this;
+ }
+ void setValue(com.google.protobuf.ByteString value) {
+ bitField0_ |= 0x00000002;
+ value_ = value;
+
+ }
+
+ //
@@protoc_insertion_point(builder_scope:pulsar.schema.SchemaInfo.KeyValuePair)
+ }
+
+ static {
+ defaultInstance = new KeyValuePair(true);
+ defaultInstance.initFields();
+ }
+
+ //
@@protoc_insertion_point(class_scope:pulsar.schema.SchemaInfo.KeyValuePair)
+ }
+
+ private int bitField0_;
+ // required string schema_id = 1;
+ public static final int SCHEMA_ID_FIELD_NUMBER = 1;
+ private java.lang.Object schemaId_;
+ public boolean hasSchemaId() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ public String getSchemaId() {
+ java.lang.Object ref = schemaId_;
+ if (ref instanceof String) {
+ return (String) ref;
+ } else {
+ com.google.protobuf.ByteString bs =
+ (com.google.protobuf.ByteString) ref;
+ String s = bs.toStringUtf8();
+ if (com.google.protobuf.Internal.isValidUtf8(bs)) {
+ schemaId_ = s;
+ }
+ return s;
+ }
+ }
+ private com.google.protobuf.ByteString getSchemaIdBytes() {
+ java.lang.Object ref = schemaId_;
+ if (ref instanceof String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8((String) ref);
+ schemaId_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+
+ // required string user = 2;
+ public static final int USER_FIELD_NUMBER = 2;
+ private java.lang.Object user_;
+ public boolean hasUser() {
+ return ((bitField0_ & 0x00000002) == 0x00000002);
+ }
+ public String getUser() {
+ java.lang.Object ref = user_;
+ if (ref instanceof String) {
+ return (String) ref;
+ } else {
+ com.google.protobuf.ByteString bs =
+ (com.google.protobuf.ByteString) ref;
+ String s = bs.toStringUtf8();
+ if (com.google.protobuf.Internal.isValidUtf8(bs)) {
+ user_ = s;
+ }
+ return s;
+ }
+ }
+ private com.google.protobuf.ByteString getUserBytes() {
+ java.lang.Object ref = user_;
+ if (ref instanceof String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8((String) ref);
+ user_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+
+ // required .pulsar.schema.SchemaInfo.SchemaType type = 3;
+ public static final int TYPE_FIELD_NUMBER = 3;
+ private SchemaRegistryFormat.SchemaInfo.SchemaType type_;
+ public boolean hasType() {
+ return ((bitField0_ & 0x00000004) == 0x00000004);
+ }
+ public SchemaRegistryFormat.SchemaInfo.SchemaType getType() {
+ return type_;
+ }
+
+ // required bytes schema = 4;
+ public static final int SCHEMA_FIELD_NUMBER = 4;
+ private com.google.protobuf.ByteString schema_;
+ public boolean hasSchema() {
+ return ((bitField0_ & 0x00000008) == 0x00000008);
+ }
+ public com.google.protobuf.ByteString getSchema() {
+ return schema_;
+ }
+
+ // required int64 timestamp = 5;
+ public static final int TIMESTAMP_FIELD_NUMBER = 5;
+ private long timestamp_;
+ public boolean hasTimestamp() {
+ return ((bitField0_ & 0x00000010) == 0x00000010);
+ }
+ public long getTimestamp() {
+ return timestamp_;
+ }
+
+ // required bool deleted = 6;
+ public static final int DELETED_FIELD_NUMBER = 6;
+ private boolean deleted_;
+ public boolean hasDeleted() {
+ return ((bitField0_ & 0x00000020) == 0x00000020);
+ }
+ public boolean getDeleted() {
+ return deleted_;
+ }
+
+ // repeated .pulsar.schema.SchemaInfo.KeyValuePair props = 7;
+ public static final int PROPS_FIELD_NUMBER = 7;
+ private java.util.List<SchemaRegistryFormat.SchemaInfo.KeyValuePair>
props_;
+ public java.util.List<SchemaRegistryFormat.SchemaInfo.KeyValuePair>
getPropsList() {
+ return props_;
+ }
+ public java.util.List<? extends
SchemaRegistryFormat.SchemaInfo.KeyValuePairOrBuilder>
+ getPropsOrBuilderList() {
+ return props_;
+ }
+ public int getPropsCount() {
+ return props_.size();
+ }
+ public SchemaRegistryFormat.SchemaInfo.KeyValuePair getProps(int index) {
+ return props_.get(index);
+ }
+ public SchemaRegistryFormat.SchemaInfo.KeyValuePairOrBuilder
getPropsOrBuilder(
+ int index) {
+ return props_.get(index);
+ }
+
+ private void initFields() {
+ schemaId_ = "";
+ user_ = "";
+ type_ = SchemaRegistryFormat.SchemaInfo.SchemaType.NONE;
+ schema_ = com.google.protobuf.ByteString.EMPTY;
+ timestamp_ = 0L;
+ deleted_ = false;
+ props_ = java.util.Collections.emptyList();
+ }
+ private byte memoizedIsInitialized = -1;
+ public final boolean isInitialized() {
+ byte isInitialized = memoizedIsInitialized;
+ if (isInitialized != -1) return isInitialized == 1;
+
+ if (!hasSchemaId()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ if (!hasUser()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ if (!hasType()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ if (!hasSchema()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ if (!hasTimestamp()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ if (!hasDeleted()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ for (int i = 0; i < getPropsCount(); i++) {
+ if (!getProps(i).isInitialized()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ }
+ memoizedIsInitialized = 1;
+ return true;
+ }
+
+ public void writeTo(com.google.protobuf.CodedOutputStream output)
+ throws java.io.IOException {
+ getSerializedSize();
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ output.writeBytes(1, getSchemaIdBytes());
+ }
+ if (((bitField0_ & 0x00000002) == 0x00000002)) {
+ output.writeBytes(2, getUserBytes());
+ }
+ if (((bitField0_ & 0x00000004) == 0x00000004)) {
+ output.writeEnum(3, type_.getNumber());
+ }
+ if (((bitField0_ & 0x00000008) == 0x00000008)) {
+ output.writeBytes(4, schema_);
+ }
+ if (((bitField0_ & 0x00000010) == 0x00000010)) {
+ output.writeInt64(5, timestamp_);
+ }
+ if (((bitField0_ & 0x00000020) == 0x00000020)) {
+ output.writeBool(6, deleted_);
+ }
+ for (int i = 0; i < props_.size(); i++) {
+ output.writeMessage(7, props_.get(i));
+ }
+ }
+
+ private int memoizedSerializedSize = -1;
+ public int getSerializedSize() {
+ int size = memoizedSerializedSize;
+ if (size != -1) return size;
+
+ size = 0;
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBytesSize(1, getSchemaIdBytes());
+ }
+ if (((bitField0_ & 0x00000002) == 0x00000002)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBytesSize(2, getUserBytes());
+ }
+ if (((bitField0_ & 0x00000004) == 0x00000004)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeEnumSize(3, type_.getNumber());
+ }
+ if (((bitField0_ & 0x00000008) == 0x00000008)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBytesSize(4, schema_);
+ }
+ if (((bitField0_ & 0x00000010) == 0x00000010)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeInt64Size(5, timestamp_);
+ }
+ if (((bitField0_ & 0x00000020) == 0x00000020)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBoolSize(6, deleted_);
+ }
+ for (int i = 0; i < props_.size(); i++) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeMessageSize(7, props_.get(i));
+ }
+ memoizedSerializedSize = size;
+ return size;
+ }
+
+ private static final long serialVersionUID = 0L;
+ @java.lang.Override
+ protected java.lang.Object writeReplace()
+ throws java.io.ObjectStreamException {
+ return super.writeReplace();
+ }
+
+ public static SchemaRegistryFormat.SchemaInfo parseFrom(
+ com.google.protobuf.ByteString data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return newBuilder().mergeFrom(data).buildParsed();
+ }
+ public static SchemaRegistryFormat.SchemaInfo parseFrom(
+ com.google.protobuf.ByteString data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return newBuilder().mergeFrom(data, extensionRegistry)
+ .buildParsed();
+ }
+ public static SchemaRegistryFormat.SchemaInfo parseFrom(byte[] data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return newBuilder().mergeFrom(data).buildParsed();
+ }
+ public static SchemaRegistryFormat.SchemaInfo parseFrom(
+ byte[] data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return newBuilder().mergeFrom(data, extensionRegistry)
+ .buildParsed();
+ }
+ public static SchemaRegistryFormat.SchemaInfo
parseFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input).buildParsed();
+ }
+ public static SchemaRegistryFormat.SchemaInfo parseFrom(
+ java.io.InputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input, extensionRegistry)
+ .buildParsed();
+ }
+ public static SchemaRegistryFormat.SchemaInfo
parseDelimitedFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ Builder builder = newBuilder();
+ if (builder.mergeDelimitedFrom(input)) {
+ return builder.buildParsed();
+ } else {
+ return null;
+ }
+ }
+ public static SchemaRegistryFormat.SchemaInfo parseDelimitedFrom(
+ java.io.InputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ Builder builder = newBuilder();
+ if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
+ return builder.buildParsed();
+ } else {
+ return null;
+ }
+ }
+ public static SchemaRegistryFormat.SchemaInfo parseFrom(
+ com.google.protobuf.CodedInputStream input)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input).buildParsed();
+ }
+ public static SchemaRegistryFormat.SchemaInfo parseFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input, extensionRegistry)
+ .buildParsed();
+ }
+
+ public static Builder newBuilder() { return Builder.create(); }
+ public Builder newBuilderForType() { return newBuilder(); }
+ public static Builder newBuilder(SchemaRegistryFormat.SchemaInfo
prototype) {
+ return newBuilder().mergeFrom(prototype);
+ }
+ public Builder toBuilder() { return newBuilder(this); }
+
+ public static final class Builder extends
+ com.google.protobuf.GeneratedMessageLite.Builder<
+ SchemaRegistryFormat.SchemaInfo, Builder>
+ implements SchemaRegistryFormat.SchemaInfoOrBuilder {
+ // Construct using
org.apache.pulsar.broker.service.schema.proto.SchemaRegistryFormat.SchemaInfo.newBuilder()
+ private Builder() {
+ maybeForceBuilderInitialization();
+ }
+
+ private void maybeForceBuilderInitialization() {
+ }
+ private static Builder create() {
+ return new Builder();
+ }
+
+ public Builder clear() {
+ super.clear();
+ schemaId_ = "";
+ bitField0_ = (bitField0_ & ~0x00000001);
+ user_ = "";
+ bitField0_ = (bitField0_ & ~0x00000002);
+ type_ = SchemaRegistryFormat.SchemaInfo.SchemaType.NONE;
+ bitField0_ = (bitField0_ & ~0x00000004);
+ schema_ = com.google.protobuf.ByteString.EMPTY;
+ bitField0_ = (bitField0_ & ~0x00000008);
+ timestamp_ = 0L;
+ bitField0_ = (bitField0_ & ~0x00000010);
+ deleted_ = false;
+ bitField0_ = (bitField0_ & ~0x00000020);
+ props_ = java.util.Collections.emptyList();
+ bitField0_ = (bitField0_ & ~0x00000040);
+ return this;
+ }
+
+ public Builder clone() {
+ return create().mergeFrom(buildPartial());
+ }
+
+ public SchemaRegistryFormat.SchemaInfo getDefaultInstanceForType() {
+ return SchemaRegistryFormat.SchemaInfo.getDefaultInstance();
+ }
+
+ public SchemaRegistryFormat.SchemaInfo build() {
+ SchemaRegistryFormat.SchemaInfo result = buildPartial();
+ if (!result.isInitialized()) {
+ throw newUninitializedMessageException(result);
+ }
+ return result;
+ }
+
+ private SchemaRegistryFormat.SchemaInfo buildParsed()
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ SchemaRegistryFormat.SchemaInfo result = buildPartial();
+ if (!result.isInitialized()) {
+ throw newUninitializedMessageException(
+ result).asInvalidProtocolBufferException();
+ }
+ return result;
+ }
+
+ public SchemaRegistryFormat.SchemaInfo buildPartial() {
+ SchemaRegistryFormat.SchemaInfo result = new
SchemaRegistryFormat.SchemaInfo(this);
+ int from_bitField0_ = bitField0_;
+ int to_bitField0_ = 0;
+ if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+ to_bitField0_ |= 0x00000001;
+ }
+ result.schemaId_ = schemaId_;
+ if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+ to_bitField0_ |= 0x00000002;
+ }
+ result.user_ = user_;
+ if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+ to_bitField0_ |= 0x00000004;
+ }
+ result.type_ = type_;
+ if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+ to_bitField0_ |= 0x00000008;
+ }
+ result.schema_ = schema_;
+ if (((from_bitField0_ & 0x00000010) == 0x00000010)) {
+ to_bitField0_ |= 0x00000010;
+ }
+ result.timestamp_ = timestamp_;
+ if (((from_bitField0_ & 0x00000020) == 0x00000020)) {
+ to_bitField0_ |= 0x00000020;
+ }
+ result.deleted_ = deleted_;
+ if (((bitField0_ & 0x00000040) == 0x00000040)) {
+ props_ = java.util.Collections.unmodifiableList(props_);
+ bitField0_ = (bitField0_ & ~0x00000040);
+ }
+ result.props_ = props_;
+ result.bitField0_ = to_bitField0_;
+ return result;
+ }
+
+ public Builder mergeFrom(SchemaRegistryFormat.SchemaInfo other) {
+ if (other == SchemaRegistryFormat.SchemaInfo.getDefaultInstance())
return this;
+ if (other.hasSchemaId()) {
+ setSchemaId(other.getSchemaId());
+ }
+ if (other.hasUser()) {
+ setUser(other.getUser());
+ }
+ if (other.hasType()) {
+ setType(other.getType());
+ }
+ if (other.hasSchema()) {
+ setSchema(other.getSchema());
+ }
+ if (other.hasTimestamp()) {
+ setTimestamp(other.getTimestamp());
+ }
+ if (other.hasDeleted()) {
+ setDeleted(other.getDeleted());
+ }
+ if (!other.props_.isEmpty()) {
+ if (props_.isEmpty()) {
+ props_ = other.props_;
+ bitField0_ = (bitField0_ & ~0x00000040);
+ } else {
+ ensurePropsIsMutable();
+ props_.addAll(other.props_);
+ }
+
+ }
+ return this;
+ }
+
+ public final boolean isInitialized() {
+ if (!hasSchemaId()) {
+
+ return false;
+ }
+ if (!hasUser()) {
+
+ return false;
+ }
+ if (!hasType()) {
+
+ return false;
+ }
+ if (!hasSchema()) {
+
+ return false;
+ }
+ if (!hasTimestamp()) {
+
+ return false;
+ }
+ if (!hasDeleted()) {
+
+ return false;
+ }
+ for (int i = 0; i < getPropsCount(); i++) {
+ if (!getProps(i).isInitialized()) {
+
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public Builder mergeFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ while (true) {
+ int tag = input.readTag();
+ switch (tag) {
+ case 0:
+
+ return this;
+ default: {
+ if (!parseUnknownField(input, extensionRegistry, tag)) {
+
+ return this;
+ }
+ break;
+ }
+ case 10: {
+ bitField0_ |= 0x00000001;
+ schemaId_ = input.readBytes();
+ break;
+ }
+ case 18: {
+ bitField0_ |= 0x00000002;
+ user_ = input.readBytes();
+ break;
+ }
+ case 24: {
+ int rawValue = input.readEnum();
+ SchemaRegistryFormat.SchemaInfo.SchemaType value =
SchemaRegistryFormat.SchemaInfo.SchemaType.valueOf(rawValue);
+ if (value != null) {
+ bitField0_ |= 0x00000004;
+ type_ = value;
+ }
+ break;
+ }
+ case 34: {
+ bitField0_ |= 0x00000008;
+ schema_ = input.readBytes();
+ break;
+ }
+ case 40: {
+ bitField0_ |= 0x00000010;
+ timestamp_ = input.readInt64();
+ break;
+ }
+ case 48: {
+ bitField0_ |= 0x00000020;
+ deleted_ = input.readBool();
+ break;
+ }
+ case 58: {
+ SchemaRegistryFormat.SchemaInfo.KeyValuePair.Builder subBuilder
= SchemaRegistryFormat.SchemaInfo.KeyValuePair.newBuilder();
+ input.readMessage(subBuilder, extensionRegistry);
+ addProps(subBuilder.buildPartial());
+ break;
+ }
+ }
+ }
+ }
+
+ private int bitField0_;
+
+ // required string schema_id = 1;
+ private java.lang.Object schemaId_ = "";
+ public boolean hasSchemaId() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ public String getSchemaId() {
+ java.lang.Object ref = schemaId_;
+ if (!(ref instanceof String)) {
+ String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();
+ schemaId_ = s;
+ return s;
+ } else {
+ return (String) ref;
+ }
+ }
+ public Builder setSchemaId(String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000001;
+ schemaId_ = value;
+
+ return this;
+ }
+ public Builder clearSchemaId() {
+ bitField0_ = (bitField0_ & ~0x00000001);
+ schemaId_ = getDefaultInstance().getSchemaId();
+
+ return this;
+ }
+ void setSchemaId(com.google.protobuf.ByteString value) {
+ bitField0_ |= 0x00000001;
+ schemaId_ = value;
+
+ }
+
+ // required string user = 2;
+ private java.lang.Object user_ = "";
+ public boolean hasUser() {
+ return ((bitField0_ & 0x00000002) == 0x00000002);
+ }
+ public String getUser() {
+ java.lang.Object ref = user_;
+ if (!(ref instanceof String)) {
+ String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();
+ user_ = s;
+ return s;
+ } else {
+ return (String) ref;
+ }
+ }
+ public Builder setUser(String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000002;
+ user_ = value;
+
+ return this;
+ }
+ public Builder clearUser() {
+ bitField0_ = (bitField0_ & ~0x00000002);
+ user_ = getDefaultInstance().getUser();
+
+ return this;
+ }
+ void setUser(com.google.protobuf.ByteString value) {
+ bitField0_ |= 0x00000002;
+ user_ = value;
+
+ }
+
+ // required .pulsar.schema.SchemaInfo.SchemaType type = 3;
+ private SchemaRegistryFormat.SchemaInfo.SchemaType type_ =
SchemaRegistryFormat.SchemaInfo.SchemaType.NONE;
+ public boolean hasType() {
+ return ((bitField0_ & 0x00000004) == 0x00000004);
+ }
+ public SchemaRegistryFormat.SchemaInfo.SchemaType getType() {
+ return type_;
+ }
+ public Builder setType(SchemaRegistryFormat.SchemaInfo.SchemaType value)
{
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000004;
+ type_ = value;
+
+ return this;
+ }
+ public Builder clearType() {
+ bitField0_ = (bitField0_ & ~0x00000004);
+ type_ = SchemaRegistryFormat.SchemaInfo.SchemaType.NONE;
+
+ return this;
+ }
+
+ // required bytes schema = 4;
+ private com.google.protobuf.ByteString schema_ =
com.google.protobuf.ByteString.EMPTY;
+ public boolean hasSchema() {
+ return ((bitField0_ & 0x00000008) == 0x00000008);
+ }
+ public com.google.protobuf.ByteString getSchema() {
+ return schema_;
+ }
+ public Builder setSchema(com.google.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000008;
+ schema_ = value;
+
+ return this;
+ }
+ public Builder clearSchema() {
+ bitField0_ = (bitField0_ & ~0x00000008);
+ schema_ = getDefaultInstance().getSchema();
+
+ return this;
+ }
+
+ // required int64 timestamp = 5;
+ private long timestamp_ ;
+ public boolean hasTimestamp() {
+ return ((bitField0_ & 0x00000010) == 0x00000010);
+ }
+ public long getTimestamp() {
+ return timestamp_;
+ }
+ public Builder setTimestamp(long value) {
+ bitField0_ |= 0x00000010;
+ timestamp_ = value;
+
+ return this;
+ }
+ public Builder clearTimestamp() {
+ bitField0_ = (bitField0_ & ~0x00000010);
+ timestamp_ = 0L;
+
+ return this;
+ }
+
+ // required bool deleted = 6;
+ private boolean deleted_ ;
+ public boolean hasDeleted() {
+ return ((bitField0_ & 0x00000020) == 0x00000020);
+ }
+ public boolean getDeleted() {
+ return deleted_;
+ }
+ public Builder setDeleted(boolean value) {
+ bitField0_ |= 0x00000020;
+ deleted_ = value;
+
+ return this;
+ }
+ public Builder clearDeleted() {
+ bitField0_ = (bitField0_ & ~0x00000020);
+ deleted_ = false;
+
+ return this;
+ }
+
+ // repeated .pulsar.schema.SchemaInfo.KeyValuePair props = 7;
+ private java.util.List<SchemaRegistryFormat.SchemaInfo.KeyValuePair>
props_ =
+ java.util.Collections.emptyList();
+ private void ensurePropsIsMutable() {
+ if (!((bitField0_ & 0x00000040) == 0x00000040)) {
+ props_ = new
java.util.ArrayList<SchemaRegistryFormat.SchemaInfo.KeyValuePair>(props_);
+ bitField0_ |= 0x00000040;
+ }
+ }
+
+ public java.util.List<SchemaRegistryFormat.SchemaInfo.KeyValuePair>
getPropsList() {
+ return java.util.Collections.unmodifiableList(props_);
+ }
+ public int getPropsCount() {
+ return props_.size();
+ }
+ public SchemaRegistryFormat.SchemaInfo.KeyValuePair getProps(int index) {
+ return props_.get(index);
+ }
+ public Builder setProps(
+ int index, SchemaRegistryFormat.SchemaInfo.KeyValuePair value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ ensurePropsIsMutable();
+ props_.set(index, value);
+
+ return this;
+ }
+ public Builder setProps(
+ int index, SchemaRegistryFormat.SchemaInfo.KeyValuePair.Builder
builderForValue) {
+ ensurePropsIsMutable();
+ props_.set(index, builderForValue.build());
+
+ return this;
+ }
+ public Builder addProps(SchemaRegistryFormat.SchemaInfo.KeyValuePair
value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ ensurePropsIsMutable();
+ props_.add(value);
+
+ return this;
+ }
+ public Builder addProps(
+ int index, SchemaRegistryFormat.SchemaInfo.KeyValuePair value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ ensurePropsIsMutable();
+ props_.add(index, value);
+
+ return this;
+ }
+ public Builder addProps(
+ SchemaRegistryFormat.SchemaInfo.KeyValuePair.Builder
builderForValue) {
+ ensurePropsIsMutable();
+ props_.add(builderForValue.build());
+
+ return this;
+ }
+ public Builder addProps(
+ int index, SchemaRegistryFormat.SchemaInfo.KeyValuePair.Builder
builderForValue) {
+ ensurePropsIsMutable();
+ props_.add(index, builderForValue.build());
+
+ return this;
+ }
+ public Builder addAllProps(
+ java.lang.Iterable<? extends
SchemaRegistryFormat.SchemaInfo.KeyValuePair> values) {
+ ensurePropsIsMutable();
+ super.addAll(values, props_);
+
+ return this;
+ }
+ public Builder clearProps() {
+ props_ = java.util.Collections.emptyList();
+ bitField0_ = (bitField0_ & ~0x00000040);
+
+ return this;
+ }
+ public Builder removeProps(int index) {
+ ensurePropsIsMutable();
+ props_.remove(index);
+
+ return this;
+ }
+
+ // @@protoc_insertion_point(builder_scope:pulsar.schema.SchemaInfo)
+ }
+
+ static {
+ defaultInstance = new SchemaInfo(true);
+ defaultInstance.initFields();
+ }
+
+ // @@protoc_insertion_point(class_scope:pulsar.schema.SchemaInfo)
+ }
+
+
+ static {
+ }
+
+ // @@protoc_insertion_point(outer_class_scope)
+}
diff --git a/pulsar-broker/src/main/proto/SchemaRegistryFormat.proto
b/pulsar-broker/src/main/proto/SchemaRegistryFormat.proto
new file mode 100644
index 0000000..e497eaf
--- /dev/null
+++ b/pulsar-broker/src/main/proto/SchemaRegistryFormat.proto
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+syntax = "proto2";
+
+package pulsar.schema;
+option java_package = "org.apache.pulsar.broker.service.schema.proto";
+option optimize_for = LITE_RUNTIME;
+
+message SchemaInfo {
+ enum SchemaType {
+ NONE = 1;
+ THRIFT = 2;
+ AVRO = 3;
+ JSON = 4;
+ PROTO = 5;
+ }
+ message KeyValuePair {
+ required string key = 1;
+ required string value = 2;
+ }
+ required string schema_id = 1;
+ required string user = 2;
+ required SchemaType type = 3;
+ required bytes schema = 4;
+ required int64 timestamp = 5;
+ required bool deleted = 6;
+
+ repeated KeyValuePair props = 7;
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index d6756b4..ce053fe 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -96,6 +96,7 @@ import
org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.schema.SchemaVersion;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.compaction.CompactedTopic;
import org.apache.pulsar.compaction.Compactor;
@@ -335,7 +336,7 @@ public class PersistentTopicTest {
String role = "appid1";
// 1. simple add producer
Producer producer = new Producer(topic, serverCnx, 1 /* producer id
*/, "prod-name",
- role, false, null);
+ role, false, null, SchemaVersion.Latest);
topic.addProducer(producer);
assertEquals(topic.getProducers().size(), 1);
@@ -351,7 +352,7 @@ public class PersistentTopicTest {
// 3. add producer for a different topic
PersistentTopic failTopic = new PersistentTopic(failTopicName,
ledgerMock, brokerService);
Producer failProducer = new Producer(failTopic, serverCnx, 2 /*
producer id */, "prod-name",
- role, false, null);
+ role, false, null, SchemaVersion.Latest);
try {
topic.addProducer(failProducer);
fail("should have failed");
@@ -371,18 +372,18 @@ public class PersistentTopicTest {
PersistentTopic topic = new PersistentTopic(successTopicName,
ledgerMock, brokerService);
String role = "appid1";
// 1. add producer1
- Producer producer = new Producer(topic, serverCnx, 1 /* producer id
*/, "prod-name1", role, false, null);
+ Producer producer = new Producer(topic, serverCnx, 1 /* producer id
*/, "prod-name1", role, false, null, SchemaVersion.Latest);
topic.addProducer(producer);
assertEquals(topic.getProducers().size(), 1);
// 2. add producer2
- Producer producer2 = new Producer(topic, serverCnx, 2 /* producer id
*/, "prod-name2", role, false, null);
+ Producer producer2 = new Producer(topic, serverCnx, 2 /* producer id
*/, "prod-name2", role, false, null, SchemaVersion.Latest);
topic.addProducer(producer2);
assertEquals(topic.getProducers().size(), 2);
// 3. add producer3 but reached maxProducersPerTopic
try {
- Producer producer3 = new Producer(topic, serverCnx, 3 /* producer
id */, "prod-name3", role, false, null);
+ Producer producer3 = new Producer(topic, serverCnx, 3 /* producer
id */, "prod-name3", role, false, null, SchemaVersion.Latest);
topic.addProducer(producer3);
fail("should have failed");
} catch (BrokerServiceException e) {
@@ -721,7 +722,7 @@ public class PersistentTopicTest {
// 2. delete topic with producer
topic = (PersistentTopic)
brokerService.getTopic(successTopicName).get();
Producer producer = new Producer(topic, serverCnx, 1 /* producer id
*/, "prod-name",
- role, false, null);
+ role, false, null, SchemaVersion.Latest);
topic.addProducer(producer);
assertTrue(topic.delete().isCompletedExceptionally());
@@ -877,7 +878,7 @@ public class PersistentTopicTest {
String role = "appid1";
Thread.sleep(10); /* delay to ensure that the delete gets executed
first */
Producer producer = new Producer(topic, serverCnx, 1 /* producer
id */, "prod-name",
- role, false, null);
+ role, false, null, SchemaVersion.Latest);
topic.addProducer(producer);
fail("Should have failed");
} catch (BrokerServiceException e) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index ded2f00..9fb02ea 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -35,6 +35,12 @@ import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
+import com.google.common.collect.Maps;
+import com.google.protobuf.ByteString;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.embedded.EmbeddedChannel;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayList;
@@ -43,9 +49,7 @@ import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-
import javax.naming.AuthenticationException;
-
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
@@ -70,6 +74,7 @@ import
org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.ServerCnx.State;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.service.schema.DefaultSchemaRegistryService;
import org.apache.pulsar.broker.service.utils.ClientChannelHelper;
import org.apache.pulsar.common.api.ByteBufPair;
import org.apache.pulsar.common.api.Commands;
@@ -103,14 +108,6 @@ import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import com.google.common.collect.Maps;
-import com.google.protobuf.ByteString;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-import io.netty.channel.embedded.EmbeddedChannel;
-import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
-
/**
*/
@Test
@@ -143,6 +140,8 @@ public class ServerCnxTest {
public void setup() throws Exception {
svcConfig = spy(new ServiceConfiguration());
pulsar = spy(new PulsarService(svcConfig));
+ doReturn(new
DefaultSchemaRegistryService()).when(pulsar).getSchemaRegistryService();
+
svcConfig.setKeepAliveIntervalSeconds(inSec(1, TimeUnit.SECONDS));
svcConfig.setBacklogQuotaCheckEnabled(false);
doReturn(svcConfig).when(pulsar).getConfiguration();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java
index 5d2b922..a0f007a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java
@@ -23,12 +23,12 @@ import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
+import io.netty.channel.ChannelHandlerContext;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
-
import org.apache.bookkeeper.test.PortManager;
import org.apache.pulsar.client.api.PulsarClientException.LookupException;
import org.apache.pulsar.client.impl.ConsumerBase;
@@ -36,12 +36,11 @@ import org.apache.pulsar.client.impl.ProducerBase;
import org.apache.pulsar.common.api.Commands;
import
org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse.LookupType;
import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
+import org.apache.pulsar.common.schema.SchemaVersion;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import io.netty.channel.ChannelHandlerContext;
-
/**
*/
public class ClientErrorsTest {
@@ -143,7 +142,7 @@ public class ClientErrorsTest {
mockBrokerService.setHandleProducer((ctx, producer) -> {
if (counter.incrementAndGet() == 2) {
-
ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(),
"default-producer"));
+
ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(),
"default-producer", SchemaVersion.Empty));
return;
}
ctx.writeAndFlush(Commands.newError(producer.getRequestId(),
ServerError.ServiceNotReady, "msg"));
@@ -217,7 +216,8 @@ public class ClientErrorsTest {
ctx.writeAndFlush(Commands.newError(producer.getRequestId(),
ServerError.AuthenticationError, "msg"));
return;
}
-
ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(),
"default-producer"));
+
ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(),
"default-producer", SchemaVersion.Empty));
+
});
ProducerBase<byte[]> producer1 = (ProducerBase<byte[]>)
client.newProducer().topic(topic1).create();
@@ -255,7 +255,7 @@ public class ClientErrorsTest {
int i = counter.incrementAndGet();
if (i == 1 || i == 5) {
// succeed on 1st and 5th attempts
-
ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(),
"default-producer"));
+
ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(),
"default-producer", SchemaVersion.Empty));
return;
}
ctx.writeAndFlush(Commands.newError(producer.getRequestId(),
ServerError.PersistenceError, "msg"));
@@ -479,7 +479,7 @@ public class ClientErrorsTest {
ctx.writeAndFlush(Commands.newError(producer.getRequestId(),
ServerError.AuthorizationError, "msg"));
return;
}
-
ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(),
"default-producer"));
+
ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(),
"default-producer", SchemaVersion.Empty));
});
mockBrokerService.setHandleCloseProducer((ctx, closeProducer) -> {
@@ -583,7 +583,7 @@ public class ClientErrorsTest {
});
mockBrokerService.setHandleProducer((ctx, produce) -> {
-
ctx.writeAndFlush(Commands.newProducerSuccess(produce.getRequestId(),
"default-producer"));
+
ctx.writeAndFlush(Commands.newProducerSuccess(produce.getRequestId(),
"default-producer", SchemaVersion.Empty));
});
mockBrokerService.setHandleSend((ctx, sendCmd, headersAndPayload) -> {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java
index bdc8e32..91e5bbe 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java
@@ -47,6 +47,7 @@ import
org.apache.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetad
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSend;
import org.apache.pulsar.common.lookup.data.LookupData;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.schema.SchemaVersion;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
@@ -174,7 +175,7 @@ public class MockBrokerService {
return;
}
// default
-
ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(),
"default-producer"));
+
ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(),
"default-producer", SchemaVersion.Empty));
}
@Override
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
index b51d29e..093f944 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.common.api;
+import static com.google.protobuf.ByteString.copyFromUtf8;
import static com.scurrilous.circe.checksum.Crc32cIntChecksum.computeChecksum;
import static com.scurrilous.circe.checksum.Crc32cIntChecksum.resumeChecksum;
@@ -37,11 +38,11 @@ import
org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand.Type;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.ValidationError;
+import
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseConsumer;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseProducer;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnect;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnected;
-import
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange;
import
org.apache.pulsar.common.api.proto.PulsarApi.CommandConsumerStatsResponse;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandError;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandFlow;
@@ -72,6 +73,7 @@ import
org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
+import org.apache.pulsar.common.schema.SchemaVersion;
import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream;
@@ -115,7 +117,7 @@ public class Commands {
}
if (authData != null) {
- connectBuilder.setAuthData(ByteString.copyFromUtf8(authData));
+ connectBuilder.setAuthData(copyFromUtf8(authData));
}
if (originalPrincipal != null) {
@@ -165,15 +167,16 @@ public class Commands {
return res;
}
- public static ByteBuf newProducerSuccess(long requestId, String
producerName) {
- return newProducerSuccess(requestId, producerName, -1);
+ public static ByteBuf newProducerSuccess(long requestId, String
producerName, SchemaVersion schemaVersion) {
+ return newProducerSuccess(requestId, producerName, -1, schemaVersion);
}
- public static ByteBuf newProducerSuccess(long requestId, String
producerName, long lastSequenceId) {
+ public static ByteBuf newProducerSuccess(long requestId, String
producerName, long lastSequenceId, SchemaVersion schemaVersion) {
CommandProducerSuccess.Builder producerSuccessBuilder =
CommandProducerSuccess.newBuilder();
producerSuccessBuilder.setRequestId(requestId);
producerSuccessBuilder.setProducerName(producerName);
producerSuccessBuilder.setLastSequenceId(lastSequenceId);
+
producerSuccessBuilder.setSchemaVersion(ByteString.copyFrom(schemaVersion.bytes()));
CommandProducerSuccess producerSuccess =
producerSuccessBuilder.build();
ByteBuf res = serializeWithSize(
BaseCommand.newBuilder().setType(Type.PRODUCER_SUCCESS).setProducerSuccess(producerSuccess));
@@ -980,4 +983,5 @@ public class Commands {
public static boolean peerSupportsActiveConsumerListener(int peerVersion) {
return peerVersion >= ProtocolVersion.v12.getNumber();
}
+
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
index 61339e1..c34f46c 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
@@ -288,6 +288,12 @@ public class TopicName implements ServiceUnitId {
return cluster == null ||
Constants.GLOBAL_CLUSTER.equalsIgnoreCase(cluster);
}
+ public String getSchemaName() {
+ return getProperty()
+ + "/" + getNamespacePortion()
+ + "/" + getLocalName();
+ }
+
@Override
public String toString() {
return completeTopicName;
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/EmptyVersion.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/EmptyVersion.java
new file mode 100644
index 0000000..0aaefb3
--- /dev/null
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/EmptyVersion.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.schema;
+
+final public class EmptyVersion implements SchemaVersion {
+ private static final byte[] EMPTY = new byte[]{};
+
+ @Override
+ public byte[] bytes() {
+ return EMPTY;
+ }
+}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/LatestVersion.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/LatestVersion.java
new file mode 100644
index 0000000..b26231c
--- /dev/null
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/LatestVersion.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.schema;
+
+final class LatestVersion implements SchemaVersion {
+ private static final byte[] EMPTY = new byte[]{};
+
+ @Override
+ public byte[] bytes() {
+ return EMPTY;
+ }
+}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaData.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaData.java
new file mode 100644
index 0000000..5a5012c
--- /dev/null
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaData.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.schema;
+
+import java.util.Map;
+import lombok.Builder;
+import lombok.Data;
+
+@Builder
+@Data
+public class SchemaData {
+ private final SchemaType type;
+ private final boolean isDeleted;
+ private final long timestamp;
+ private final String user;
+ private final byte[] data;
+ private final Map<String, String> props;
+}
\ No newline at end of file
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
new file mode 100644
index 0000000..e9a01f0
--- /dev/null
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.schema;
+
+public enum SchemaType {
+ AVRO, PROTOBUF, THRIFT, JSON, NONE
+}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaVersion.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaVersion.java
new file mode 100644
index 0000000..e31e45d
--- /dev/null
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaVersion.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.schema;
+
+public interface SchemaVersion {
+ SchemaVersion Latest = new LatestVersion();
+ SchemaVersion Empty = new EmptyVersion();
+
+ byte[] bytes();
+}
--
To stop receiving notification emails like this one, please contact
[email protected].