This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 90908e9 Add option to retrieve schema for a topic from pulsar client
(#2215)
90908e9 is described below
commit 90908e9d950a747820c9ba8b4160372937062f90
Author: Matteo Merli <[email protected]>
AuthorDate: Mon Jul 23 00:08:15 2018 -0700
Add option to retrieve schema for a topic from pulsar client (#2215)
---
.../pulsar/broker/service/BrokerService.java | 4 +-
.../broker/service/PulsarChannelInitializer.java | 25 +-
.../apache/pulsar/broker/service/ServerCnx.java | 51 +-
.../PersistentDispatcherFailoverConsumerTest.java | 4 +-
.../service/PersistentTopicConcurrentTest.java | 2 +-
.../pulsar/broker/service/PersistentTopicTest.java | 2 +-
.../pulsar/broker/service/ServerCnxTest.java | 2 +-
.../broker/service/schema/ClientGetSchemaTest.java | 97 ++
.../schema/JsonSchemaCompatibilityCheckTest.java | 13 +-
.../pulsar/client/api/PulsarClientException.java | 10 +
.../client/impl/BinaryProtoLookupService.java | 21 +-
.../org/apache/pulsar/client/impl/ClientCnx.java | 57 +-
.../org/apache/pulsar/client/impl/HttpClient.java | 24 +-
.../pulsar/client/impl/HttpLookupService.java | 32 +-
.../apache/pulsar/client/impl/LookupService.java | 4 +
.../pulsar/client/impl/PulsarClientImpl.java | 23 +-
.../org/apache/pulsar/common/api/Commands.java | 60 +
.../apache/pulsar/common/api/PulsarDecoder.java | 29 +-
.../apache/pulsar/common/api/proto/PulsarApi.java | 1330 ++++++++++++++++++++
.../apache/pulsar/common/schema/SchemaInfo.java | 38 +
pulsar-common/src/main/proto/PulsarApi.proto | 21 +
21 files changed, 1800 insertions(+), 49 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 741afbf..4bbfc72 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -290,7 +290,7 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
ServiceConfiguration serviceConfig = pulsar.getConfiguration();
- bootstrap.childHandler(new PulsarChannelInitializer(this,
serviceConfig, false));
+ bootstrap.childHandler(new PulsarChannelInitializer(pulsar, false));
// Bind and start to accept incoming connections.
InetSocketAddress addr = new
InetSocketAddress(pulsar.getBindAddress(), port);
@@ -303,7 +303,7 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
if (serviceConfig.isTlsEnabled()) {
ServerBootstrap tlsBootstrap = bootstrap.clone();
- tlsBootstrap.childHandler(new PulsarChannelInitializer(this,
serviceConfig, true));
+ tlsBootstrap.childHandler(new PulsarChannelInitializer(pulsar,
true));
tlsBootstrap.bind(new InetSocketAddress(pulsar.getBindAddress(),
tlsPort)).sync();
log.info("Started Pulsar Broker TLS service on port {} - TLS
provider: {}", tlsPort,
SslContext.defaultServerProvider());
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
index 8c16a55..7858a11 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
@@ -18,37 +18,38 @@
*/
package org.apache.pulsar.broker.service;
-import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.common.api.ByteBufPair;
-import org.apache.pulsar.common.api.PulsarDecoder;
-import org.apache.pulsar.common.util.SecurityUtility;
-
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.ssl.SslContext;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.common.api.ByteBufPair;
+import org.apache.pulsar.common.api.PulsarDecoder;
+import org.apache.pulsar.common.util.SecurityUtility;
+
public class PulsarChannelInitializer extends
ChannelInitializer<SocketChannel> {
public static final String TLS_HANDLER = "tls";
- BrokerService brokerService;
- ServiceConfiguration serviceConfig;
- boolean enableTLS;
+
+ private final PulsarService pulsar;
+ private final boolean enableTLS;
/**
*
* @param brokerService
*/
- public PulsarChannelInitializer(BrokerService brokerService,
ServiceConfiguration serviceConfig,
+ public PulsarChannelInitializer(PulsarService pulsar,
boolean enableTLS) {
super();
- this.brokerService = brokerService;
- this.serviceConfig = serviceConfig;
+ this.pulsar = pulsar;
this.enableTLS = enableTLS;
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
+ ServiceConfiguration serviceConfig = pulsar.getConfiguration();
if (enableTLS) {
SslContext sslCtx = SecurityUtility.createNettySslContextForServer(
serviceConfig.isTlsAllowInsecureConnection(),
serviceConfig.getTlsTrustCertsFilePath(),
@@ -60,6 +61,6 @@ public class PulsarChannelInitializer extends
ChannelInitializer<SocketChannel>
ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
ch.pipeline().addLast("frameDecoder", new
LengthFieldBasedFrameDecoder(PulsarDecoder.MaxFrameSize, 0, 4, 0, 4));
- ch.pipeline().addLast("handler", new ServerCnx(brokerService));
+ ch.pipeline().addLast("handler", new ServerCnx(pulsar));
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 4b45f72..1d191bc 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -50,11 +50,13 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.util.SafeRun;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
import
org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import
org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
+import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
@@ -72,6 +74,7 @@ import
org.apache.pulsar.common.api.proto.PulsarApi.CommandConsumerStats;
import
org.apache.pulsar.common.api.proto.PulsarApi.CommandConsumerStatsResponse;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandFlow;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema;
import
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopic;
import
org.apache.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadata;
@@ -93,6 +96,7 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.schema.SchemaData;
+import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.schema.SchemaVersion;
import org.apache.pulsar.common.util.FutureUtil;
@@ -103,6 +107,7 @@ import org.slf4j.LoggerFactory;
public class ServerCnx extends PulsarHandler {
private final BrokerService service;
+ private final SchemaRegistryService schemaService;
private final ConcurrentLongHashMap<CompletableFuture<Producer>> producers;
private final ConcurrentLongHashMap<CompletableFuture<Consumer>> consumers;
private State state;
@@ -127,9 +132,10 @@ public class ServerCnx extends PulsarHandler {
Start, Connected, Failed
}
- public ServerCnx(BrokerService service) {
- super(service.getKeepAliveIntervalSeconds(), TimeUnit.SECONDS);
- this.service = service;
+ public ServerCnx(PulsarService pulsar) {
+ super(pulsar.getBrokerService().getKeepAliveIntervalSeconds(),
TimeUnit.SECONDS);
+ this.service = pulsar.getBrokerService();
+ this.schemaService = pulsar.getSchemaRegistryService();
this.state = State.Start;
// This maps are not heavily contended since most accesses are within
the cnx thread
@@ -301,7 +307,7 @@ public class ServerCnx extends PulsarHandler {
if (topicName == null) {
return;
}
-
+
final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
if (lookupSemaphore.tryAcquire()) {
if (invalidOriginalPrincipal(originalPrincipal)) {
@@ -1199,6 +1205,43 @@ public class ServerCnx extends PulsarHandler {
}
@Override
+ protected void handleGetSchema(CommandGetSchema commandGetSchema) {
+ if (log.isDebugEnabled()) {
+ log.debug("Received CommandGetSchema call from {}", remoteAddress);
+ }
+
+ long requestId = commandGetSchema.getRequestId();
+ SchemaVersion schemaVersion = SchemaVersion.Latest;
+ if (commandGetSchema.hasSchemaVersion()) {
+ schemaVersion =
schemaService.versionFromBytes(commandGetSchema.getSchemaVersion().toByteArray());
+ }
+
+ String schemaName;
+ try {
+ schemaName =
TopicName.get(commandGetSchema.getTopic()).getSchemaName();
+ } catch (Throwable t) {
+ ctx.writeAndFlush(
+ Commands.newGetSchemaResponseError(requestId,
ServerError.InvalidTopicName, t.getMessage()));
+ return;
+ }
+
+ schemaService.getSchema(schemaName,
schemaVersion).thenAccept(schemaAndMetadata -> {
+ if (schemaAndMetadata == null) {
+
ctx.writeAndFlush(Commands.newGetSchemaResponseError(requestId,
ServerError.TopicNotFound,
+ "Topic not found or no-schema"));
+ } else {
+ ctx.writeAndFlush(Commands.newGetSchemaResponse(requestId,
+ new SchemaInfo(schemaName, schemaAndMetadata.schema),
schemaAndMetadata.version));
+ }
+ }).exceptionally(ex -> {
+ ctx.writeAndFlush(
+ Commands.newGetSchemaResponseError(requestId,
ServerError.UnknownError, ex.getMessage()));
+ return null;
+ });
+ }
+
+
+ @Override
protected boolean isHandshakeCompleted() {
return state == State.Connected;
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
index ccd32e1..bc8d4bf 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
@@ -157,14 +157,14 @@ public class PersistentDispatcherFailoverConsumerTest {
return null;
}).when(channelCtx).writeAndFlush(any(), any());
- serverCnx = spy(new ServerCnx(brokerService));
+ serverCnx = spy(new ServerCnx(pulsar));
doReturn(true).when(serverCnx).isActive();
doReturn(true).when(serverCnx).isWritable();
doReturn(new InetSocketAddress("localhost",
1234)).when(serverCnx).clientAddress();
when(serverCnx.getRemoteEndpointProtocolVersion()).thenReturn(ProtocolVersion.v12.getNumber());
when(serverCnx.ctx()).thenReturn(channelCtx);
- serverCnxWithOldVersion = spy(new ServerCnx(brokerService));
+ serverCnxWithOldVersion = spy(new ServerCnx(pulsar));
doReturn(true).when(serverCnxWithOldVersion).isActive();
doReturn(true).when(serverCnxWithOldVersion).isWritable();
doReturn(new InetSocketAddress("localhost", 1234))
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
index 8bfd6ef..7c5ca20 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
@@ -98,7 +98,7 @@ public class PersistentTopicConcurrentTest extends
MockedBookKeeperTestCase {
brokerService = spy(new BrokerService(pulsar));
doReturn(brokerService).when(pulsar).getBrokerService();
- serverCnx = spy(new ServerCnx(brokerService));
+ serverCnx = spy(new ServerCnx(pulsar));
doReturn(true).when(serverCnx).isActive();
NamespaceService nsSvc = mock(NamespaceService.class);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index db35e74..f8ac40a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -171,7 +171,7 @@ public class PersistentTopicTest {
brokerService = spy(new BrokerService(pulsar));
doReturn(brokerService).when(pulsar).getBrokerService();
- serverCnx = spy(new ServerCnx(brokerService));
+ serverCnx = spy(new ServerCnx(pulsar));
doReturn(true).when(serverCnx).isActive();
doReturn(true).when(serverCnx).isWritable();
doReturn(new InetSocketAddress("localhost",
1234)).when(serverCnx).clientAddress();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 2b324ab..f746571 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -1379,7 +1379,7 @@ public class ServerCnxTest {
serverCnx.close();
channel.close().get();
}
- serverCnx = new ServerCnx(brokerService);
+ serverCnx = new ServerCnx(pulsar);
serverCnx.authRole = "";
channel = new EmbeddedChannel(new
LengthFieldBasedFrameDecoder(MaxMessageSize, 0, 4, 0, 4), serverCnx);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ClientGetSchemaTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ClientGetSchemaTest.java
new file mode 100644
index 0000000..7055492
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ClientGetSchemaTest.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.schema;
+
+import static org.testng.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+import lombok.Cleanup;
+
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+public class ClientGetSchemaTest extends ProducerConsumerBase {
+
+ private static final String topicBytes = "my-property/my-ns/topic-bytes";
+ private static final String topicString = "my-property/my-ns/topic-string";
+ private static final String topicJson = "my-property/my-ns/topic-json";
+ private static final String topicAvro = "my-property/my-ns/topic-avro";
+
+ List<Producer<?>> producers = new ArrayList<>();
+
+ private static class MyClass {
+ public String name;
+ public int age;
+ }
+
+ @BeforeClass
+ @Override
+ protected void setup() throws Exception {
+ super.internalSetup();
+ super.producerBaseSetup();
+
+ // Create few topics with different types
+
producers.add(pulsarClient.newProducer(Schema.BYTES).topic(topicBytes).create());
+
producers.add(pulsarClient.newProducer(Schema.STRING).topic(topicString).create());
+
producers.add(pulsarClient.newProducer(Schema.AVRO(MyClass.class)).topic(topicAvro).create());
+
producers.add(pulsarClient.newProducer(Schema.JSON(MyClass.class)).topic(topicJson).create());
+ }
+
+ @AfterClass
+ @Override
+ protected void cleanup() throws Exception {
+ producers.forEach(t -> {
+ try {
+ t.close();
+ } catch (PulsarClientException e) {
+ }
+ });
+ super.internalCleanup();
+ }
+
+ @DataProvider(name = "serviceUrl")
+ public String[] serviceUrls() {
+ return new String[] { "pulsar://" + pulsar.getAdvertisedAddress() +
":" + BROKER_PORT,
+ "http://" + pulsar.getAdvertisedAddress() + ":" +
BROKER_WEBSERVICE_PORT };
+ }
+
+ @Test(dataProvider = "serviceUrl")
+ public void testGetSchema(String serviceUrl) throws Exception {
+ @Cleanup
+ PulsarClientImpl client = (PulsarClientImpl)
PulsarClient.builder().serviceUrl(serviceUrl).build();
+
+ assertEquals(client.getSchema("non-existing-topic").join(),
Optional.empty());
+ assertEquals(client.getSchema(topicBytes).join(), Optional.empty());
+ assertEquals(client.getSchema(topicString).join(),
Optional.of(Schema.STRING.getSchemaInfo()));
+ assertEquals(client.getSchema(topicJson).join(),
Optional.of(Schema.JSON(MyClass.class).getSchemaInfo()));
+ assertEquals(client.getSchema(topicAvro).join(),
Optional.of(Schema.AVRO(MyClass.class).getSchemaInfo()));
+ }
+
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java
index d7e7c2c..341487f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java
@@ -22,9 +22,18 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.module.jsonSchema.JsonSchema;
import com.fasterxml.jackson.module.jsonSchema.JsonSchemaGenerator;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Date;
+import java.util.Map;
+
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
+
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.impl.schema.JSONSchema;
@@ -34,10 +43,6 @@ import org.apache.pulsar.common.schema.SchemaType;
import org.testng.Assert;
import org.testng.annotations.Test;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Map;
-
public class JsonSchemaCompatibilityCheckTest extends
BaseAvroSchemaCompatibilityTest{
@Override
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
index 2bcd142..2618066 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
@@ -51,6 +51,16 @@ public class PulsarClientException extends IOException {
}
}
+ public static class NotFoundException extends PulsarClientException {
+ public NotFoundException(String msg) {
+ super(msg);
+ }
+
+ public NotFoundException(Throwable t) {
+ super(t);
+ }
+ }
+
public static class TimeoutException extends PulsarClientException {
public TimeoutException(String msg) {
super(msg);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
index 6d8f678..29ae1f4 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
@@ -21,29 +21,32 @@ package org.apache.pulsar.client.impl;
import static java.lang.String.format;
import com.google.common.collect.Lists;
+
+import io.netty.buffer.ByteBuf;
+
import java.net.InetSocketAddress;
import java.net.URI;
import java.nio.channels.ClosedChannelException;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
-
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse;
import
org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse.LookupType;
-import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.schema.SchemaInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import io.netty.buffer.ByteBuf;
-
public class BinaryProtoLookupService implements LookupService {
private final PulsarClientImpl client;
@@ -181,6 +184,16 @@ public class BinaryProtoLookupService implements
LookupService {
return partitionFuture;
}
+ @Override
+ public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName
topicName) {
+ return
client.getCnxPool().getConnection(serviceAddress).thenCompose(clientCnx -> {
+ long requestId = client.newRequestId();
+ ByteBuf request = Commands.newGetSchema(requestId,
topicName.toString(), Optional.empty());
+
+ return clientCnx.sendGetSchema(request, requestId);
+ });
+ }
+
public String getServiceUrl() {
return serviceAddress.toString();
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 6a8ab74..887a890 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -23,6 +23,7 @@ import static
org.apache.pulsar.client.impl.HttpClient.getPulsarClientVersion;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Queues;
+
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
@@ -31,16 +32,20 @@ import io.netty.channel.EventLoopGroup;
import io.netty.channel.unix.Errors.NativeIoException;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.Promise;
+
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+
import javax.net.ssl.SSLSession;
+
import org.apache.commons.lang3.tuple.Pair;
import org.apache.http.conn.ssl.DefaultHostnameVerifier;
import org.apache.pulsar.client.api.Authentication;
@@ -56,6 +61,7 @@ import
org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseProducer;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnected;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandError;
import
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse;
import
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandMessage;
@@ -67,6 +73,7 @@ import
org.apache.pulsar.common.api.proto.PulsarApi.CommandSendReceipt;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSuccess;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
+import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -87,6 +94,9 @@ public class ClientCnx extends PulsarHandler {
private final ConcurrentLongHashMap<CompletableFuture<List<String>>>
pendingGetTopicsRequests =
new ConcurrentLongHashMap<>(16, 1);
+ private final
ConcurrentLongHashMap<CompletableFuture<Optional<SchemaInfo>>>
pendingGetSchemaRequests = new ConcurrentLongHashMap<>(
+ 16, 1);
+
private final ConcurrentLongHashMap<ProducerImpl<?>> producers = new
ConcurrentLongHashMap<>(16, 1);
private final ConcurrentLongHashMap<ConsumerImpl<?>> consumers = new
ConcurrentLongHashMap<>(16, 1);
@@ -151,7 +161,7 @@ public class ClientCnx extends PulsarHandler {
}
});
}
-
+
protected ByteBuf newConnectCommand() throws PulsarClientException {
String authData = "";
if (authentication.getAuthData().hasDataFromCommand()) {
@@ -178,6 +188,7 @@ public class ClientCnx extends PulsarHandler {
waitingLookupRequests.forEach(pair ->
pair.getRight().getRight().completeExceptionally(e));
pendingGetLastMessageIdRequests.forEach((key, future) ->
future.completeExceptionally(e));
pendingGetTopicsRequests.forEach((key, future) ->
future.completeExceptionally(e));
+ pendingGetSchemaRequests.forEach((key, future) ->
future.completeExceptionally(e));
// Notify all attached producers/consumers so they have a chance to
reconnect
producers.forEach((id, producer) -> producer.connectionClosed(this));
@@ -595,6 +606,31 @@ public class ClientCnx extends PulsarHandler {
}
}
+ @Override
+ protected void handleGetSchemaResponse(CommandGetSchemaResponse
commandGetSchemaResponse) {
+ checkArgument(state == State.Ready);
+
+ long requestId = commandGetSchemaResponse.getRequestId();
+
+ CompletableFuture<Optional<SchemaInfo>> future =
pendingGetSchemaRequests.remove(requestId);
+ if (future == null) {
+ log.warn("{} Received unknown request id from server: {}",
ctx.channel(), requestId);
+ return;
+ }
+
+ if (commandGetSchemaResponse.hasErrorCode()) {
+ // Request has failed
+ ServerError rc = commandGetSchemaResponse.getErrorCode();
+ if (rc == ServerError.TopicNotFound) {
+ future.complete(Optional.empty());
+ } else {
+ future.completeExceptionally(getPulsarClientException(rc,
commandGetSchemaResponse.getErrorMessage()));
+ }
+ } else {
+ future.complete(Optional.of(new
SchemaInfo(commandGetSchemaResponse.getSchema())));
+ }
+ }
+
Promise<Void> newPromise() {
return ctx.newPromise();
}
@@ -644,6 +680,23 @@ public class ClientCnx extends PulsarHandler {
return future;
}
+ public CompletableFuture<Optional<SchemaInfo>> sendGetSchema(ByteBuf
request, long requestId) {
+ CompletableFuture<Optional<SchemaInfo>> future = new
CompletableFuture<>();
+
+ pendingGetSchemaRequests.put(requestId, future);
+
+ ctx.writeAndFlush(request).addListener(writeFuture -> {
+ if (!writeFuture.isSuccess()) {
+ log.warn("{} Failed to send GetSchema request to broker: {}",
ctx.channel(),
+ writeFuture.cause().getMessage());
+ pendingGetLastMessageIdRequests.remove(requestId);
+ future.completeExceptionally(writeFuture.cause());
+ }
+ });
+
+ return future;
+ }
+
/**
* check serverError and take appropriate action
* <ul>
@@ -760,7 +813,7 @@ public class ClientCnx extends PulsarHandler {
return new PulsarClientException(errorMsg);
}
}
-
+
@VisibleForTesting
public void close() {
if (ctx != null) {
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
index eb67262..e5ef4c5 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
@@ -18,6 +18,13 @@
*/
package org.apache.pulsar.client.impl;
+import com.google.common.util.concurrent.MoreExecutors;
+
+import io.netty.channel.EventLoopGroup;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.ssl.SslContext;
+
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
@@ -31,6 +38,7 @@ import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.PulsarClientException.NotFoundException;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.SecurityUtility;
import org.asynchttpclient.AsyncCompletionHandler;
@@ -46,13 +54,6 @@ import org.asynchttpclient.channel.DefaultKeepAliveStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.util.concurrent.MoreExecutors;
-
-import io.netty.channel.EventLoopGroup;
-import io.netty.handler.codec.http.HttpRequest;
-import io.netty.handler.codec.http.HttpResponse;
-import io.netty.handler.ssl.SslContext;
-
public class HttpClient implements Closeable {
protected final static int DEFAULT_CONNECT_TIMEOUT_IN_SECONDS = 10;
@@ -158,8 +159,13 @@ public class HttpClient implements Closeable {
Response response = responseFuture.get();
if (response.getStatusCode() != HttpURLConnection.HTTP_OK)
{
log.warn("[{}] HTTP get request failed: {}",
requestUrl, response.getStatusText());
- future.completeExceptionally(
- new PulsarClientException("HTTP get request
failed: " + response.getStatusText()));
+ Exception e;
+ if (response.getStatusCode() ==
HttpURLConnection.HTTP_NOT_FOUND) {
+ e = new NotFoundException("Not found: " +
response.getStatusText());
+ } else {
+ e = new PulsarClientException("HTTP get request
failed: " + response.getStatusText());
+ }
+ future.completeExceptionally(e);
return;
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
index 185412c..a427fbd 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
@@ -19,25 +19,30 @@
package org.apache.pulsar.client.impl;
import com.google.common.collect.Lists;
+
+import io.netty.channel.EventLoopGroup;
+
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Arrays;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.PulsarClientException.NotFoundException;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.lookup.data.LookupData;
-import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.schema.GetSchemaResponse;
+import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import io.netty.channel.EventLoopGroup;
-
class HttpLookupService implements LookupService {
private final HttpClient httpClient;
@@ -122,6 +127,27 @@ class HttpLookupService implements LookupService {
}
@Override
+ public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName
topicName) {
+ CompletableFuture<Optional<SchemaInfo>> future = new
CompletableFuture<>();
+
+ String schemaName = topicName.getSchemaName();
+ String path = String.format("admin/v2/schemas/%s/schema", schemaName);
+
+ httpClient.get(path, GetSchemaResponse.class).thenAccept(response -> {
+ future.complete(Optional.of(new SchemaInfo(schemaName, response)));
+ }).exceptionally(ex -> {
+ if (ex.getCause() instanceof NotFoundException) {
+ future.complete(Optional.empty());
+ } else {
+ log.warn("Failed to get schema for topic {} : {}", topicName,
ex.getCause().getClass());
+ future.completeExceptionally(ex);
+ }
+ return null;
+ });
+ return future;
+ }
+
+ @Override
public void close() throws Exception {
httpClient.close();
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java
index aa00e54..769422b 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java
@@ -20,12 +20,14 @@ package org.apache.pulsar.client.impl;
import java.net.InetSocketAddress;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.schema.SchemaInfo;
/**
* Provides lookup service to find broker which serves given topic. It helps to
@@ -58,6 +60,8 @@ interface LookupService extends AutoCloseable {
*/
public CompletableFuture<PartitionedTopicMetadata>
getPartitionedTopicMetadata(TopicName topicName);
+ public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName
topicName);
+
/**
* Returns broker-service lookup api url.
*
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 84dcfc1..e156e10 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -23,12 +23,15 @@ import static org.apache.commons.lang3.StringUtils.isBlank;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+
import io.netty.channel.EventLoopGroup;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import io.netty.util.concurrent.DefaultThreadFactory;
+
import java.util.IdentityHashMap;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -36,9 +39,9 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
-
import java.util.regex.Pattern;
import java.util.stream.Collectors;
+
import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
@@ -63,6 +66,7 @@ import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.slf4j.Logger;
@@ -559,6 +563,23 @@ public class PulsarClientImpl implements PulsarClient {
return readerFuture;
}
+ /**
+ * Read the schema information for a given topic.
+ *
+ * If the topic does not exist or it has no schema associated, it will
return an empty response
+ */
+ public CompletableFuture<Optional<SchemaInfo>> getSchema(String topic) {
+ TopicName topicName;
+ try {
+ topicName = TopicName.get(topic);
+ } catch (Throwable t) {
+ return FutureUtil
+ .failedFuture(new
PulsarClientException.InvalidTopicNameException("Invalid topic name: " +
topic));
+ }
+
+ return lookup.getSchema(topicName);
+ }
+
@Override
public void close() throws PulsarClientException {
try {
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
index 3c1ac99..b5e2406 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
@@ -33,6 +33,7 @@ import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
@@ -462,6 +463,24 @@ public class Commands {
}
}
+ public static SchemaType getSchemaType(PulsarApi.Schema.Type type) {
+ switch (type) {
+ case None:
+ return SchemaType.NONE;
+ case String:
+ return SchemaType.STRING;
+ case Json:
+ return SchemaType.JSON;
+ case Protobuf:
+ return SchemaType.PROTOBUF;
+ case Avro:
+ return SchemaType.AVRO;
+ default:
+ return SchemaType.NONE;
+ }
+ }
+
+
private static PulsarApi.Schema getSchema(SchemaInfo schemaInfo) {
PulsarApi.Schema.Builder builder = PulsarApi.Schema.newBuilder()
.setName(schemaInfo.getName())
@@ -797,6 +816,47 @@ public class Commands {
return res;
}
+ public static ByteBuf newGetSchema(long requestId, String topic,
Optional<SchemaVersion> version) {
+ PulsarApi.CommandGetSchema.Builder schema =
PulsarApi.CommandGetSchema.newBuilder()
+ .setRequestId(requestId);
+ schema.setTopic(topic);
+ if (version.isPresent()) {
+
schema.setSchemaVersion(ByteString.copyFrom(version.get().bytes()));
+ }
+
+ ByteBuf res = serializeWithSize(BaseCommand.newBuilder()
+ .setType(Type.GET_SCHEMA)
+ .setGetSchema(schema.build()));
+ schema.recycle();
+ return res;
+ }
+
+ public static ByteBuf newGetSchemaResponse(long requestId, SchemaInfo
schema, SchemaVersion version) {
+ PulsarApi.CommandGetSchemaResponse.Builder schemaResponse =
PulsarApi.CommandGetSchemaResponse.newBuilder()
+ .setRequestId(requestId)
+ .setSchemaVersion(ByteString.copyFrom(version.bytes()))
+ .setSchema(getSchema(schema));
+
+ ByteBuf res = serializeWithSize(BaseCommand.newBuilder()
+ .setType(Type.GET_SCHEMA_RESPONSE)
+ .setGetSchemaResponse(schemaResponse.build()));
+ schemaResponse.recycle();
+ return res;
+ }
+
+ public static ByteBuf newGetSchemaResponseError(long requestId,
ServerError error, String errorMessage) {
+ PulsarApi.CommandGetSchemaResponse.Builder schemaResponse =
PulsarApi.CommandGetSchemaResponse.newBuilder()
+ .setRequestId(requestId)
+ .setErrorCode(error)
+ .setErrorMessage(errorMessage);
+
+ ByteBuf res = serializeWithSize(BaseCommand.newBuilder()
+ .setType(Type.GET_SCHEMA_RESPONSE)
+ .setGetSchemaResponse(schemaResponse.build()));
+ schemaResponse.recycle();
+ return res;
+ }
+
@VisibleForTesting
public static ByteBuf serializeWithSize(BaseCommand.Builder cmdBuilder) {
// / Wire format
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarDecoder.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarDecoder.java
index 5d2f1b0..f8c2d61 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarDecoder.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarDecoder.java
@@ -20,6 +20,10 @@ package org.apache.pulsar.common.api;
import static com.google.common.base.Preconditions.checkArgument;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck;
@@ -32,6 +36,8 @@ import
org.apache.pulsar.common.api.proto.PulsarApi.CommandConsumerStats;
import
org.apache.pulsar.common.api.proto.PulsarApi.CommandConsumerStatsResponse;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandError;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandFlow;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse;
import
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace;
import
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopic;
@@ -55,9 +61,6 @@ import
org.apache.pulsar.common.api.proto.PulsarApi.CommandUnsubscribe;
import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
public abstract class PulsarDecoder extends ChannelInboundHandlerAdapter {
@@ -288,6 +291,18 @@ public abstract class PulsarDecoder extends
ChannelInboundHandlerAdapter {
handleGetTopicsOfNamespaceSuccess(cmd.getGetTopicsOfNamespaceResponse());
cmd.getGetTopicsOfNamespaceResponse().recycle();
break;
+
+ case GET_SCHEMA:
+ checkArgument(cmd.hasGetSchema());
+ handleGetSchema(cmd.getGetSchema());
+ cmd.getGetSchema().recycle();
+ break;
+
+ case GET_SCHEMA_RESPONSE:
+ checkArgument(cmd.hasGetSchemaResponse());
+ handleGetSchemaResponse(cmd.getGetSchemaResponse());
+ cmd.getGetSchemaResponse().recycle();
+ break;
}
} finally {
if (cmdBuilder != null) {
@@ -431,5 +446,13 @@ public abstract class PulsarDecoder extends
ChannelInboundHandlerAdapter {
throw new UnsupportedOperationException();
}
+ protected void handleGetSchema(CommandGetSchema commandGetSchema) {
+ throw new UnsupportedOperationException();
+ }
+
+ protected void handleGetSchemaResponse(CommandGetSchemaResponse
commandGetSchemaResponse) {
+ throw new UnsupportedOperationException();
+ }
+
private static final Logger log =
LoggerFactory.getLogger(PulsarDecoder.class);
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index c161913..edc7e9b 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -23482,6 +23482,1138 @@ public final class PulsarApi {
//
@@protoc_insertion_point(class_scope:pulsar.proto.CommandGetTopicsOfNamespaceResponse)
}
+ public interface CommandGetSchemaOrBuilder
+ extends
org.apache.pulsar.shaded.com.google.protobuf.v241.MessageLiteOrBuilder {
+
+ // required uint64 request_id = 1;
+ boolean hasRequestId();
+ long getRequestId();
+
+ // required string topic = 2;
+ boolean hasTopic();
+ String getTopic();
+
+ // optional bytes schema_version = 3;
+ boolean hasSchemaVersion();
+ org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString
getSchemaVersion();
+ }
+ public static final class CommandGetSchema extends
+ org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite
+ implements CommandGetSchemaOrBuilder,
org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage
{
+ // Use CommandGetSchema.newBuilder() to construct.
+ private io.netty.util.Recycler.Handle handle;
+ private CommandGetSchema(io.netty.util.Recycler.Handle handle) {
+ this.handle = handle;
+ }
+
+ private static final io.netty.util.Recycler<CommandGetSchema> RECYCLER =
new io.netty.util.Recycler<CommandGetSchema>() {
+ protected CommandGetSchema newObject(Handle handle) {
+ return new CommandGetSchema(handle);
+ }
+ };
+
+ public void recycle() {
+ this.initFields();
+ this.memoizedIsInitialized = -1;
+ this.bitField0_ = 0;
+ this.memoizedSerializedSize = -1;
+ if (handle != null) { RECYCLER.recycle(this, handle); }
+ }
+
+ private CommandGetSchema(boolean noInit) {}
+
+ private static final CommandGetSchema defaultInstance;
+ public static CommandGetSchema getDefaultInstance() {
+ return defaultInstance;
+ }
+
+ public CommandGetSchema getDefaultInstanceForType() {
+ return defaultInstance;
+ }
+
+ private int bitField0_;
+ // required uint64 request_id = 1;
+ public static final int REQUEST_ID_FIELD_NUMBER = 1;
+ private long requestId_;
+ public boolean hasRequestId() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ public long getRequestId() {
+ return requestId_;
+ }
+
+ // required string topic = 2;
+ public static final int TOPIC_FIELD_NUMBER = 2;
+ private java.lang.Object topic_;
+ public boolean hasTopic() {
+ return ((bitField0_ & 0x00000002) == 0x00000002);
+ }
+ public String getTopic() {
+ java.lang.Object ref = topic_;
+ if (ref instanceof String) {
+ return (String) ref;
+ } else {
+ org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString bs =
+ (org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString) ref;
+ String s = bs.toStringUtf8();
+ if
(org.apache.pulsar.shaded.com.google.protobuf.v241.Internal.isValidUtf8(bs)) {
+ topic_ = s;
+ }
+ return s;
+ }
+ }
+ private org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString
getTopicBytes() {
+ java.lang.Object ref = topic_;
+ if (ref instanceof String) {
+ org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString b =
+
org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.copyFromUtf8((String)
ref);
+ topic_ = b;
+ return b;
+ } else {
+ return (org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString)
ref;
+ }
+ }
+
+ // optional bytes schema_version = 3;
+ public static final int SCHEMA_VERSION_FIELD_NUMBER = 3;
+ private org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString
schemaVersion_;
+ public boolean hasSchemaVersion() {
+ return ((bitField0_ & 0x00000004) == 0x00000004);
+ }
+ public org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString
getSchemaVersion() {
+ return schemaVersion_;
+ }
+
+ private void initFields() {
+ requestId_ = 0L;
+ topic_ = "";
+ schemaVersion_ =
org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.EMPTY;
+ }
+ private byte memoizedIsInitialized = -1;
+ public final boolean isInitialized() {
+ byte isInitialized = memoizedIsInitialized;
+ if (isInitialized != -1) return isInitialized == 1;
+
+ if (!hasRequestId()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ if (!hasTopic()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ memoizedIsInitialized = 1;
+ return true;
+ }
+
+ public void
writeTo(org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
output)
+ throws java.io.IOException {
+ throw new RuntimeException("Cannot use CodedOutputStream");
+ }
+
+ public void
writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream output)
+ throws java.io.IOException {
+ getSerializedSize();
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ output.writeUInt64(1, requestId_);
+ }
+ if (((bitField0_ & 0x00000002) == 0x00000002)) {
+ output.writeBytes(2, getTopicBytes());
+ }
+ if (((bitField0_ & 0x00000004) == 0x00000004)) {
+ output.writeBytes(3, schemaVersion_);
+ }
+ }
+
+ private int memoizedSerializedSize = -1;
+ public int getSerializedSize() {
+ int size = memoizedSerializedSize;
+ if (size != -1) return size;
+
+ size = 0;
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ size +=
org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+ .computeUInt64Size(1, requestId_);
+ }
+ if (((bitField0_ & 0x00000002) == 0x00000002)) {
+ size +=
org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+ .computeBytesSize(2, getTopicBytes());
+ }
+ if (((bitField0_ & 0x00000004) == 0x00000004)) {
+ size +=
org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+ .computeBytesSize(3, schemaVersion_);
+ }
+ memoizedSerializedSize = size;
+ return size;
+ }
+
+ private static final long serialVersionUID = 0L;
+ @java.lang.Override
+ protected java.lang.Object writeReplace()
+ throws java.io.ObjectStreamException {
+ return super.writeReplace();
+ }
+
+ public static
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema parseFrom(
+ org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString data)
+ throws
org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException
{
+ throw new RuntimeException("Disabled");
+ }
+ public static
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema parseFrom(
+ org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString data,
+
org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite
extensionRegistry)
+ throws
org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException
{
+ throw new RuntimeException("Disabled");
+ }
+ public static
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema parseFrom(byte[]
data)
+ throws
org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException
{
+ return newBuilder().mergeFrom(data).buildParsed();
+ }
+ public static
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema parseFrom(
+ byte[] data,
+
org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite
extensionRegistry)
+ throws
org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException
{
+ return newBuilder().mergeFrom(data, extensionRegistry)
+ .buildParsed();
+ }
+ public static
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema
parseFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input).buildParsed();
+ }
+ public static
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema parseFrom(
+ java.io.InputStream input,
+
org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite
extensionRegistry)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input, extensionRegistry)
+ .buildParsed();
+ }
+ public static
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema
parseDelimitedFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ Builder builder = newBuilder();
+ if (builder.mergeDelimitedFrom(input)) {
+ return builder.buildParsed();
+ } else {
+ return null;
+ }
+ }
+ public static
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema
parseDelimitedFrom(
+ java.io.InputStream input,
+
org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite
extensionRegistry)
+ throws java.io.IOException {
+ Builder builder = newBuilder();
+ if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
+ return builder.buildParsed();
+ } else {
+ return null;
+ }
+ }
+ public static
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema parseFrom(
+ org.apache.pulsar.shaded.com.google.protobuf.v241.CodedInputStream
input)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input).buildParsed();
+ }
+ public static
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema parseFrom(
+ org.apache.pulsar.shaded.com.google.protobuf.v241.CodedInputStream
input,
+
org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite
extensionRegistry)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input, extensionRegistry)
+ .buildParsed();
+ }
+
+ public static Builder newBuilder() { return Builder.create(); }
+ public Builder newBuilderForType() { return newBuilder(); }
+ public static Builder
newBuilder(org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema
prototype) {
+ return newBuilder().mergeFrom(prototype);
+ }
+ public Builder toBuilder() { return newBuilder(this); }
+
+ public static final class Builder extends
+
org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite.Builder<
+ org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema,
Builder>
+ implements
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaOrBuilder,
org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder
{
+ // Construct using
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema.newBuilder()
+ private final io.netty.util.Recycler.Handle handle;
+ private Builder(io.netty.util.Recycler.Handle handle) {
+ this.handle = handle;
+ maybeForceBuilderInitialization();
+ }
+ private final static io.netty.util.Recycler<Builder> RECYCLER = new
io.netty.util.Recycler<Builder>() {
+ protected Builder newObject(io.netty.util.Recycler.Handle handle) {
+ return new Builder(handle);
+ }
+ };
+
+ public void recycle() {
+ clear();
+ if (handle != null) {RECYCLER.recycle(this, handle);}
+ }
+
+ private void maybeForceBuilderInitialization() {
+ }
+ private static Builder create() {
+ return RECYCLER.get();
+ }
+
+ public Builder clear() {
+ super.clear();
+ requestId_ = 0L;
+ bitField0_ = (bitField0_ & ~0x00000001);
+ topic_ = "";
+ bitField0_ = (bitField0_ & ~0x00000002);
+ schemaVersion_ =
org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.EMPTY;
+ bitField0_ = (bitField0_ & ~0x00000004);
+ return this;
+ }
+
+ public Builder clone() {
+ return create().mergeFrom(buildPartial());
+ }
+
+ public org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema
getDefaultInstanceForType() {
+ return
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema.getDefaultInstance();
+ }
+
+ public org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema
build() {
+ org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema result =
buildPartial();
+ if (!result.isInitialized()) {
+ throw newUninitializedMessageException(result);
+ }
+ return result;
+ }
+
+ private org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema
buildParsed()
+ throws
org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException
{
+ org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema result =
buildPartial();
+ if (!result.isInitialized()) {
+ throw newUninitializedMessageException(
+ result).asInvalidProtocolBufferException();
+ }
+ return result;
+ }
+
+ public org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema
buildPartial() {
+ org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema result =
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema.RECYCLER.get();
+ int from_bitField0_ = bitField0_;
+ int to_bitField0_ = 0;
+ if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+ to_bitField0_ |= 0x00000001;
+ }
+ result.requestId_ = requestId_;
+ if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+ to_bitField0_ |= 0x00000002;
+ }
+ result.topic_ = topic_;
+ if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+ to_bitField0_ |= 0x00000004;
+ }
+ result.schemaVersion_ = schemaVersion_;
+ result.bitField0_ = to_bitField0_;
+ return result;
+ }
+
+ public Builder
mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema other) {
+ if (other ==
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema.getDefaultInstance())
return this;
+ if (other.hasRequestId()) {
+ setRequestId(other.getRequestId());
+ }
+ if (other.hasTopic()) {
+ setTopic(other.getTopic());
+ }
+ if (other.hasSchemaVersion()) {
+ setSchemaVersion(other.getSchemaVersion());
+ }
+ return this;
+ }
+
+ public final boolean isInitialized() {
+ if (!hasRequestId()) {
+
+ return false;
+ }
+ if (!hasTopic()) {
+
+ return false;
+ }
+ return true;
+ }
+
+ public Builder
mergeFrom(org.apache.pulsar.shaded.com.google.protobuf.v241.CodedInputStream
input,
+
org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite
extensionRegistry)
+ throws java.io.IOException {
+ throw new java.io.IOException("Merge from CodedInputStream is
disabled");
+ }
+ public Builder mergeFrom(
+ org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream input,
+
org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite
extensionRegistry)
+ throws java.io.IOException {
+ while (true) {
+ int tag = input.readTag();
+ switch (tag) {
+ case 0:
+
+ return this;
+ default: {
+ if (!input.skipField(tag)) {
+
+ return this;
+ }
+ break;
+ }
+ case 8: {
+ bitField0_ |= 0x00000001;
+ requestId_ = input.readUInt64();
+ break;
+ }
+ case 18: {
+ bitField0_ |= 0x00000002;
+ topic_ = input.readBytes();
+ break;
+ }
+ case 26: {
+ bitField0_ |= 0x00000004;
+ schemaVersion_ = input.readBytes();
+ break;
+ }
+ }
+ }
+ }
+
+ private int bitField0_;
+
+ // required uint64 request_id = 1;
+ private long requestId_ ;
+ public boolean hasRequestId() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ public long getRequestId() {
+ return requestId_;
+ }
+ public Builder setRequestId(long value) {
+ bitField0_ |= 0x00000001;
+ requestId_ = value;
+
+ return this;
+ }
+ public Builder clearRequestId() {
+ bitField0_ = (bitField0_ & ~0x00000001);
+ requestId_ = 0L;
+
+ return this;
+ }
+
+ // required string topic = 2;
+ private java.lang.Object topic_ = "";
+ public boolean hasTopic() {
+ return ((bitField0_ & 0x00000002) == 0x00000002);
+ }
+ public String getTopic() {
+ java.lang.Object ref = topic_;
+ if (!(ref instanceof String)) {
+ String s =
((org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString)
ref).toStringUtf8();
+ topic_ = s;
+ return s;
+ } else {
+ return (String) ref;
+ }
+ }
+ public Builder setTopic(String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000002;
+ topic_ = value;
+
+ return this;
+ }
+ public Builder clearTopic() {
+ bitField0_ = (bitField0_ & ~0x00000002);
+ topic_ = getDefaultInstance().getTopic();
+
+ return this;
+ }
+ void
setTopic(org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString value) {
+ bitField0_ |= 0x00000002;
+ topic_ = value;
+
+ }
+
+ // optional bytes schema_version = 3;
+ private org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString
schemaVersion_ =
org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.EMPTY;
+ public boolean hasSchemaVersion() {
+ return ((bitField0_ & 0x00000004) == 0x00000004);
+ }
+ public org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString
getSchemaVersion() {
+ return schemaVersion_;
+ }
+ public Builder
setSchemaVersion(org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString
value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000004;
+ schemaVersion_ = value;
+
+ return this;
+ }
+ public Builder clearSchemaVersion() {
+ bitField0_ = (bitField0_ & ~0x00000004);
+ schemaVersion_ = getDefaultInstance().getSchemaVersion();
+
+ return this;
+ }
+
+ // @@protoc_insertion_point(builder_scope:pulsar.proto.CommandGetSchema)
+ }
+
+ static {
+ defaultInstance = new CommandGetSchema(true);
+ defaultInstance.initFields();
+ }
+
+ // @@protoc_insertion_point(class_scope:pulsar.proto.CommandGetSchema)
+ }
+
+ public interface CommandGetSchemaResponseOrBuilder
+ extends
org.apache.pulsar.shaded.com.google.protobuf.v241.MessageLiteOrBuilder {
+
+ // required uint64 request_id = 1;
+ boolean hasRequestId();
+ long getRequestId();
+
+ // optional .pulsar.proto.ServerError error_code = 2;
+ boolean hasErrorCode();
+ org.apache.pulsar.common.api.proto.PulsarApi.ServerError getErrorCode();
+
+ // optional string error_message = 3;
+ boolean hasErrorMessage();
+ String getErrorMessage();
+
+ // optional .pulsar.proto.Schema schema = 4;
+ boolean hasSchema();
+ org.apache.pulsar.common.api.proto.PulsarApi.Schema getSchema();
+
+ // optional bytes schema_version = 5;
+ boolean hasSchemaVersion();
+ org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString
getSchemaVersion();
+ }
+ public static final class CommandGetSchemaResponse extends
+ org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite
+ implements CommandGetSchemaResponseOrBuilder,
org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage
{
+ // Use CommandGetSchemaResponse.newBuilder() to construct.
+ private io.netty.util.Recycler.Handle handle;
+ private CommandGetSchemaResponse(io.netty.util.Recycler.Handle handle) {
+ this.handle = handle;
+ }
+
+ private static final io.netty.util.Recycler<CommandGetSchemaResponse>
RECYCLER = new io.netty.util.Recycler<CommandGetSchemaResponse>() {
+ protected CommandGetSchemaResponse newObject(Handle handle) {
+ return new CommandGetSchemaResponse(handle);
+ }
+ };
+
+ public void recycle() {
+ this.initFields();
+ this.memoizedIsInitialized = -1;
+ this.bitField0_ = 0;
+ this.memoizedSerializedSize = -1;
+ if (handle != null) { RECYCLER.recycle(this, handle); }
+ }
+
+ private CommandGetSchemaResponse(boolean noInit) {}
+
+ private static final CommandGetSchemaResponse defaultInstance;
+ public static CommandGetSchemaResponse getDefaultInstance() {
+ return defaultInstance;
+ }
+
+ public CommandGetSchemaResponse getDefaultInstanceForType() {
+ return defaultInstance;
+ }
+
+ private int bitField0_;
+ // required uint64 request_id = 1;
+ public static final int REQUEST_ID_FIELD_NUMBER = 1;
+ private long requestId_;
+ public boolean hasRequestId() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ public long getRequestId() {
+ return requestId_;
+ }
+
+ // optional .pulsar.proto.ServerError error_code = 2;
+ public static final int ERROR_CODE_FIELD_NUMBER = 2;
+ private org.apache.pulsar.common.api.proto.PulsarApi.ServerError
errorCode_;
+ public boolean hasErrorCode() {
+ return ((bitField0_ & 0x00000002) == 0x00000002);
+ }
+ public org.apache.pulsar.common.api.proto.PulsarApi.ServerError
getErrorCode() {
+ return errorCode_;
+ }
+
+ // optional string error_message = 3;
+ public static final int ERROR_MESSAGE_FIELD_NUMBER = 3;
+ private java.lang.Object errorMessage_;
+ public boolean hasErrorMessage() {
+ return ((bitField0_ & 0x00000004) == 0x00000004);
+ }
+ public String getErrorMessage() {
+ java.lang.Object ref = errorMessage_;
+ if (ref instanceof String) {
+ return (String) ref;
+ } else {
+ org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString bs =
+ (org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString) ref;
+ String s = bs.toStringUtf8();
+ if
(org.apache.pulsar.shaded.com.google.protobuf.v241.Internal.isValidUtf8(bs)) {
+ errorMessage_ = s;
+ }
+ return s;
+ }
+ }
+ private org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString
getErrorMessageBytes() {
+ java.lang.Object ref = errorMessage_;
+ if (ref instanceof String) {
+ org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString b =
+
org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.copyFromUtf8((String)
ref);
+ errorMessage_ = b;
+ return b;
+ } else {
+ return (org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString)
ref;
+ }
+ }
+
+ // optional .pulsar.proto.Schema schema = 4;
+ public static final int SCHEMA_FIELD_NUMBER = 4;
+ private org.apache.pulsar.common.api.proto.PulsarApi.Schema schema_;
+ public boolean hasSchema() {
+ return ((bitField0_ & 0x00000008) == 0x00000008);
+ }
+ public org.apache.pulsar.common.api.proto.PulsarApi.Schema getSchema() {
+ return schema_;
+ }
+
+ // optional bytes schema_version = 5;
+ public static final int SCHEMA_VERSION_FIELD_NUMBER = 5;
+ private org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString
schemaVersion_;
+ public boolean hasSchemaVersion() {
+ return ((bitField0_ & 0x00000010) == 0x00000010);
+ }
+ public org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString
getSchemaVersion() {
+ return schemaVersion_;
+ }
+
+ private void initFields() {
+ requestId_ = 0L;
+ errorCode_ =
org.apache.pulsar.common.api.proto.PulsarApi.ServerError.UnknownError;
+ errorMessage_ = "";
+ schema_ =
org.apache.pulsar.common.api.proto.PulsarApi.Schema.getDefaultInstance();
+ schemaVersion_ =
org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.EMPTY;
+ }
+ private byte memoizedIsInitialized = -1;
+ public final boolean isInitialized() {
+ byte isInitialized = memoizedIsInitialized;
+ if (isInitialized != -1) return isInitialized == 1;
+
+ if (!hasRequestId()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ if (hasSchema()) {
+ if (!getSchema().isInitialized()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ }
+ memoizedIsInitialized = 1;
+ return true;
+ }
+
+ public void
writeTo(org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
output)
+ throws java.io.IOException {
+ throw new RuntimeException("Cannot use CodedOutputStream");
+ }
+
+ public void
writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream output)
+ throws java.io.IOException {
+ getSerializedSize();
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ output.writeUInt64(1, requestId_);
+ }
+ if (((bitField0_ & 0x00000002) == 0x00000002)) {
+ output.writeEnum(2, errorCode_.getNumber());
+ }
+ if (((bitField0_ & 0x00000004) == 0x00000004)) {
+ output.writeBytes(3, getErrorMessageBytes());
+ }
+ if (((bitField0_ & 0x00000008) == 0x00000008)) {
+ output.writeMessage(4, schema_);
+ }
+ if (((bitField0_ & 0x00000010) == 0x00000010)) {
+ output.writeBytes(5, schemaVersion_);
+ }
+ }
+
+ private int memoizedSerializedSize = -1;
+ public int getSerializedSize() {
+ int size = memoizedSerializedSize;
+ if (size != -1) return size;
+
+ size = 0;
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ size +=
org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+ .computeUInt64Size(1, requestId_);
+ }
+ if (((bitField0_ & 0x00000002) == 0x00000002)) {
+ size +=
org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+ .computeEnumSize(2, errorCode_.getNumber());
+ }
+ if (((bitField0_ & 0x00000004) == 0x00000004)) {
+ size +=
org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+ .computeBytesSize(3, getErrorMessageBytes());
+ }
+ if (((bitField0_ & 0x00000008) == 0x00000008)) {
+ size +=
org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+ .computeMessageSize(4, schema_);
+ }
+ if (((bitField0_ & 0x00000010) == 0x00000010)) {
+ size +=
org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+ .computeBytesSize(5, schemaVersion_);
+ }
+ memoizedSerializedSize = size;
+ return size;
+ }
+
+ private static final long serialVersionUID = 0L;
+ @java.lang.Override
+ protected java.lang.Object writeReplace()
+ throws java.io.ObjectStreamException {
+ return super.writeReplace();
+ }
+
+ public static
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse parseFrom(
+ org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString data)
+ throws
org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException
{
+ throw new RuntimeException("Disabled");
+ }
+ public static
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse parseFrom(
+ org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString data,
+
org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite
extensionRegistry)
+ throws
org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException
{
+ throw new RuntimeException("Disabled");
+ }
+ public static
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse
parseFrom(byte[] data)
+ throws
org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException
{
+ return newBuilder().mergeFrom(data).buildParsed();
+ }
+ public static
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse parseFrom(
+ byte[] data,
+
org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite
extensionRegistry)
+ throws
org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException
{
+ return newBuilder().mergeFrom(data, extensionRegistry)
+ .buildParsed();
+ }
+ public static
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse
parseFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input).buildParsed();
+ }
+ public static
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse parseFrom(
+ java.io.InputStream input,
+
org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite
extensionRegistry)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input, extensionRegistry)
+ .buildParsed();
+ }
+ public static
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse
parseDelimitedFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ Builder builder = newBuilder();
+ if (builder.mergeDelimitedFrom(input)) {
+ return builder.buildParsed();
+ } else {
+ return null;
+ }
+ }
+ public static
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse
parseDelimitedFrom(
+ java.io.InputStream input,
+
org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite
extensionRegistry)
+ throws java.io.IOException {
+ Builder builder = newBuilder();
+ if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
+ return builder.buildParsed();
+ } else {
+ return null;
+ }
+ }
+ public static
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse parseFrom(
+ org.apache.pulsar.shaded.com.google.protobuf.v241.CodedInputStream
input)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input).buildParsed();
+ }
+ public static
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse parseFrom(
+ org.apache.pulsar.shaded.com.google.protobuf.v241.CodedInputStream
input,
+
org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite
extensionRegistry)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input, extensionRegistry)
+ .buildParsed();
+ }
+
+ public static Builder newBuilder() { return Builder.create(); }
+ public Builder newBuilderForType() { return newBuilder(); }
+ public static Builder
newBuilder(org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse
prototype) {
+ return newBuilder().mergeFrom(prototype);
+ }
+ public Builder toBuilder() { return newBuilder(this); }
+
+ public static final class Builder extends
+
org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite.Builder<
+
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse, Builder>
+ implements
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponseOrBuilder,
org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder
{
+ // Construct using
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse.newBuilder()
+ private final io.netty.util.Recycler.Handle handle;
+ private Builder(io.netty.util.Recycler.Handle handle) {
+ this.handle = handle;
+ maybeForceBuilderInitialization();
+ }
+ private final static io.netty.util.Recycler<Builder> RECYCLER = new
io.netty.util.Recycler<Builder>() {
+ protected Builder newObject(io.netty.util.Recycler.Handle handle) {
+ return new Builder(handle);
+ }
+ };
+
+ public void recycle() {
+ clear();
+ if (handle != null) {RECYCLER.recycle(this, handle);}
+ }
+
+ private void maybeForceBuilderInitialization() {
+ }
+ private static Builder create() {
+ return RECYCLER.get();
+ }
+
+ public Builder clear() {
+ super.clear();
+ requestId_ = 0L;
+ bitField0_ = (bitField0_ & ~0x00000001);
+ errorCode_ =
org.apache.pulsar.common.api.proto.PulsarApi.ServerError.UnknownError;
+ bitField0_ = (bitField0_ & ~0x00000002);
+ errorMessage_ = "";
+ bitField0_ = (bitField0_ & ~0x00000004);
+ schema_ =
org.apache.pulsar.common.api.proto.PulsarApi.Schema.getDefaultInstance();
+ bitField0_ = (bitField0_ & ~0x00000008);
+ schemaVersion_ =
org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.EMPTY;
+ bitField0_ = (bitField0_ & ~0x00000010);
+ return this;
+ }
+
+ public Builder clone() {
+ return create().mergeFrom(buildPartial());
+ }
+
+ public
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse
getDefaultInstanceForType() {
+ return
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse.getDefaultInstance();
+ }
+
+ public
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse build() {
+ org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse
result = buildPartial();
+ if (!result.isInitialized()) {
+ throw newUninitializedMessageException(result);
+ }
+ return result;
+ }
+
+ private
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse
buildParsed()
+ throws
org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException
{
+ org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse
result = buildPartial();
+ if (!result.isInitialized()) {
+ throw newUninitializedMessageException(
+ result).asInvalidProtocolBufferException();
+ }
+ return result;
+ }
+
+ public
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse
buildPartial() {
+ org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse
result =
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse.RECYCLER.get();
+ int from_bitField0_ = bitField0_;
+ int to_bitField0_ = 0;
+ if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+ to_bitField0_ |= 0x00000001;
+ }
+ result.requestId_ = requestId_;
+ if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+ to_bitField0_ |= 0x00000002;
+ }
+ result.errorCode_ = errorCode_;
+ if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+ to_bitField0_ |= 0x00000004;
+ }
+ result.errorMessage_ = errorMessage_;
+ if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+ to_bitField0_ |= 0x00000008;
+ }
+ result.schema_ = schema_;
+ if (((from_bitField0_ & 0x00000010) == 0x00000010)) {
+ to_bitField0_ |= 0x00000010;
+ }
+ result.schemaVersion_ = schemaVersion_;
+ result.bitField0_ = to_bitField0_;
+ return result;
+ }
+
+ public Builder
mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse
other) {
+ if (other ==
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse.getDefaultInstance())
return this;
+ if (other.hasRequestId()) {
+ setRequestId(other.getRequestId());
+ }
+ if (other.hasErrorCode()) {
+ setErrorCode(other.getErrorCode());
+ }
+ if (other.hasErrorMessage()) {
+ setErrorMessage(other.getErrorMessage());
+ }
+ if (other.hasSchema()) {
+ mergeSchema(other.getSchema());
+ }
+ if (other.hasSchemaVersion()) {
+ setSchemaVersion(other.getSchemaVersion());
+ }
+ return this;
+ }
+
+ public final boolean isInitialized() {
+ if (!hasRequestId()) {
+
+ return false;
+ }
+ if (hasSchema()) {
+ if (!getSchema().isInitialized()) {
+
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public Builder
mergeFrom(org.apache.pulsar.shaded.com.google.protobuf.v241.CodedInputStream
input,
+
org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite
extensionRegistry)
+ throws java.io.IOException {
+ throw new java.io.IOException("Merge from CodedInputStream is
disabled");
+ }
+ public Builder mergeFrom(
+ org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream input,
+
org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite
extensionRegistry)
+ throws java.io.IOException {
+ while (true) {
+ int tag = input.readTag();
+ switch (tag) {
+ case 0:
+
+ return this;
+ default: {
+ if (!input.skipField(tag)) {
+
+ return this;
+ }
+ break;
+ }
+ case 8: {
+ bitField0_ |= 0x00000001;
+ requestId_ = input.readUInt64();
+ break;
+ }
+ case 16: {
+ int rawValue = input.readEnum();
+ org.apache.pulsar.common.api.proto.PulsarApi.ServerError value =
org.apache.pulsar.common.api.proto.PulsarApi.ServerError.valueOf(rawValue);
+ if (value != null) {
+ bitField0_ |= 0x00000002;
+ errorCode_ = value;
+ }
+ break;
+ }
+ case 26: {
+ bitField0_ |= 0x00000004;
+ errorMessage_ = input.readBytes();
+ break;
+ }
+ case 34: {
+ org.apache.pulsar.common.api.proto.PulsarApi.Schema.Builder
subBuilder = org.apache.pulsar.common.api.proto.PulsarApi.Schema.newBuilder();
+ if (hasSchema()) {
+ subBuilder.mergeFrom(getSchema());
+ }
+ input.readMessage(subBuilder, extensionRegistry);
+ setSchema(subBuilder.buildPartial());
+ subBuilder.recycle();
+ break;
+ }
+ case 42: {
+ bitField0_ |= 0x00000010;
+ schemaVersion_ = input.readBytes();
+ break;
+ }
+ }
+ }
+ }
+
+ private int bitField0_;
+
+ // required uint64 request_id = 1;
+ private long requestId_ ;
+ public boolean hasRequestId() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ public long getRequestId() {
+ return requestId_;
+ }
+ public Builder setRequestId(long value) {
+ bitField0_ |= 0x00000001;
+ requestId_ = value;
+
+ return this;
+ }
+ public Builder clearRequestId() {
+ bitField0_ = (bitField0_ & ~0x00000001);
+ requestId_ = 0L;
+
+ return this;
+ }
+
+ // optional .pulsar.proto.ServerError error_code = 2;
+ private org.apache.pulsar.common.api.proto.PulsarApi.ServerError
errorCode_ =
org.apache.pulsar.common.api.proto.PulsarApi.ServerError.UnknownError;
+ public boolean hasErrorCode() {
+ return ((bitField0_ & 0x00000002) == 0x00000002);
+ }
+ public org.apache.pulsar.common.api.proto.PulsarApi.ServerError
getErrorCode() {
+ return errorCode_;
+ }
+ public Builder
setErrorCode(org.apache.pulsar.common.api.proto.PulsarApi.ServerError value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000002;
+ errorCode_ = value;
+
+ return this;
+ }
+ public Builder clearErrorCode() {
+ bitField0_ = (bitField0_ & ~0x00000002);
+ errorCode_ =
org.apache.pulsar.common.api.proto.PulsarApi.ServerError.UnknownError;
+
+ return this;
+ }
+
+ // optional string error_message = 3;
+ private java.lang.Object errorMessage_ = "";
+ public boolean hasErrorMessage() {
+ return ((bitField0_ & 0x00000004) == 0x00000004);
+ }
+ public String getErrorMessage() {
+ java.lang.Object ref = errorMessage_;
+ if (!(ref instanceof String)) {
+ String s =
((org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString)
ref).toStringUtf8();
+ errorMessage_ = s;
+ return s;
+ } else {
+ return (String) ref;
+ }
+ }
+ public Builder setErrorMessage(String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000004;
+ errorMessage_ = value;
+
+ return this;
+ }
+ public Builder clearErrorMessage() {
+ bitField0_ = (bitField0_ & ~0x00000004);
+ errorMessage_ = getDefaultInstance().getErrorMessage();
+
+ return this;
+ }
+ void
setErrorMessage(org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString
value) {
+ bitField0_ |= 0x00000004;
+ errorMessage_ = value;
+
+ }
+
+ // optional .pulsar.proto.Schema schema = 4;
+ private org.apache.pulsar.common.api.proto.PulsarApi.Schema schema_ =
org.apache.pulsar.common.api.proto.PulsarApi.Schema.getDefaultInstance();
+ public boolean hasSchema() {
+ return ((bitField0_ & 0x00000008) == 0x00000008);
+ }
+ public org.apache.pulsar.common.api.proto.PulsarApi.Schema getSchema() {
+ return schema_;
+ }
+ public Builder
setSchema(org.apache.pulsar.common.api.proto.PulsarApi.Schema value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ schema_ = value;
+
+ bitField0_ |= 0x00000008;
+ return this;
+ }
+ public Builder setSchema(
+ org.apache.pulsar.common.api.proto.PulsarApi.Schema.Builder
builderForValue) {
+ schema_ = builderForValue.build();
+
+ bitField0_ |= 0x00000008;
+ return this;
+ }
+ public Builder
mergeSchema(org.apache.pulsar.common.api.proto.PulsarApi.Schema value) {
+ if (((bitField0_ & 0x00000008) == 0x00000008) &&
+ schema_ !=
org.apache.pulsar.common.api.proto.PulsarApi.Schema.getDefaultInstance()) {
+ schema_ =
+
org.apache.pulsar.common.api.proto.PulsarApi.Schema.newBuilder(schema_).mergeFrom(value).buildPartial();
+ } else {
+ schema_ = value;
+ }
+
+ bitField0_ |= 0x00000008;
+ return this;
+ }
+ public Builder clearSchema() {
+ schema_ =
org.apache.pulsar.common.api.proto.PulsarApi.Schema.getDefaultInstance();
+
+ bitField0_ = (bitField0_ & ~0x00000008);
+ return this;
+ }
+
+ // optional bytes schema_version = 5;
+ private org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString
schemaVersion_ =
org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.EMPTY;
+ public boolean hasSchemaVersion() {
+ return ((bitField0_ & 0x00000010) == 0x00000010);
+ }
+ public org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString
getSchemaVersion() {
+ return schemaVersion_;
+ }
+ public Builder
setSchemaVersion(org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString
value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000010;
+ schemaVersion_ = value;
+
+ return this;
+ }
+ public Builder clearSchemaVersion() {
+ bitField0_ = (bitField0_ & ~0x00000010);
+ schemaVersion_ = getDefaultInstance().getSchemaVersion();
+
+ return this;
+ }
+
+ //
@@protoc_insertion_point(builder_scope:pulsar.proto.CommandGetSchemaResponse)
+ }
+
+ static {
+ defaultInstance = new CommandGetSchemaResponse(true);
+ defaultInstance.initFields();
+ }
+
+ //
@@protoc_insertion_point(class_scope:pulsar.proto.CommandGetSchemaResponse)
+ }
+
public interface BaseCommandOrBuilder
extends
org.apache.pulsar.shaded.com.google.protobuf.v241.MessageLiteOrBuilder {
@@ -23616,6 +24748,14 @@ public final class PulsarApi {
// optional .pulsar.proto.CommandGetTopicsOfNamespaceResponse
getTopicsOfNamespaceResponse = 33;
boolean hasGetTopicsOfNamespaceResponse();
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse
getGetTopicsOfNamespaceResponse();
+
+ // optional .pulsar.proto.CommandGetSchema getSchema = 34;
+ boolean hasGetSchema();
+ org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema
getGetSchema();
+
+ // optional .pulsar.proto.CommandGetSchemaResponse getSchemaResponse = 35;
+ boolean hasGetSchemaResponse();
+ org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse
getGetSchemaResponse();
}
public static final class BaseCommand extends
org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite
@@ -23686,6 +24826,8 @@ public final class PulsarApi {
ACTIVE_CONSUMER_CHANGE(29, 31),
GET_TOPICS_OF_NAMESPACE(30, 32),
GET_TOPICS_OF_NAMESPACE_RESPONSE(31, 33),
+ GET_SCHEMA(32, 34),
+ GET_SCHEMA_RESPONSE(33, 35),
;
public static final int CONNECT_VALUE = 2;
@@ -23720,6 +24862,8 @@ public final class PulsarApi {
public static final int ACTIVE_CONSUMER_CHANGE_VALUE = 31;
public static final int GET_TOPICS_OF_NAMESPACE_VALUE = 32;
public static final int GET_TOPICS_OF_NAMESPACE_RESPONSE_VALUE = 33;
+ public static final int GET_SCHEMA_VALUE = 34;
+ public static final int GET_SCHEMA_RESPONSE_VALUE = 35;
public final int getNumber() { return value; }
@@ -23758,6 +24902,8 @@ public final class PulsarApi {
case 31: return ACTIVE_CONSUMER_CHANGE;
case 32: return GET_TOPICS_OF_NAMESPACE;
case 33: return GET_TOPICS_OF_NAMESPACE_RESPONSE;
+ case 34: return GET_SCHEMA;
+ case 35: return GET_SCHEMA_RESPONSE;
default: return null;
}
}
@@ -24115,6 +25261,26 @@ public final class PulsarApi {
return getTopicsOfNamespaceResponse_;
}
+ // optional .pulsar.proto.CommandGetSchema getSchema = 34;
+ public static final int GETSCHEMA_FIELD_NUMBER = 34;
+ private org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema
getSchema_;
+ public boolean hasGetSchema() {
+ return ((bitField1_ & 0x00000002) == 0x00000002);
+ }
+ public org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema
getGetSchema() {
+ return getSchema_;
+ }
+
+ // optional .pulsar.proto.CommandGetSchemaResponse getSchemaResponse = 35;
+ public static final int GETSCHEMARESPONSE_FIELD_NUMBER = 35;
+ private
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse
getSchemaResponse_;
+ public boolean hasGetSchemaResponse() {
+ return ((bitField1_ & 0x00000004) == 0x00000004);
+ }
+ public
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse
getGetSchemaResponse() {
+ return getSchemaResponse_;
+ }
+
private void initFields() {
type_ =
org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand.Type.CONNECT;
connect_ =
org.apache.pulsar.common.api.proto.PulsarApi.CommandConnect.getDefaultInstance();
@@ -24149,6 +25315,8 @@ public final class PulsarApi {
activeConsumerChange_ =
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange.getDefaultInstance();
getTopicsOfNamespace_ =
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.getDefaultInstance();
getTopicsOfNamespaceResponse_ =
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse.getDefaultInstance();
+ getSchema_ =
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema.getDefaultInstance();
+ getSchemaResponse_ =
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse.getDefaultInstance();
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -24339,6 +25507,18 @@ public final class PulsarApi {
return false;
}
}
+ if (hasGetSchema()) {
+ if (!getGetSchema().isInitialized()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ }
+ if (hasGetSchemaResponse()) {
+ if (!getGetSchemaResponse().isInitialized()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ }
memoizedIsInitialized = 1;
return true;
}
@@ -24450,6 +25630,12 @@ public final class PulsarApi {
if (((bitField1_ & 0x00000001) == 0x00000001)) {
output.writeMessage(33, getTopicsOfNamespaceResponse_);
}
+ if (((bitField1_ & 0x00000002) == 0x00000002)) {
+ output.writeMessage(34, getSchema_);
+ }
+ if (((bitField1_ & 0x00000004) == 0x00000004)) {
+ output.writeMessage(35, getSchemaResponse_);
+ }
}
private int memoizedSerializedSize = -1;
@@ -24590,6 +25776,14 @@ public final class PulsarApi {
size +=
org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
.computeMessageSize(33, getTopicsOfNamespaceResponse_);
}
+ if (((bitField1_ & 0x00000002) == 0x00000002)) {
+ size +=
org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+ .computeMessageSize(34, getSchema_);
+ }
+ if (((bitField1_ & 0x00000004) == 0x00000004)) {
+ size +=
org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+ .computeMessageSize(35, getSchemaResponse_);
+ }
memoizedSerializedSize = size;
return size;
}
@@ -24769,6 +25963,10 @@ public final class PulsarApi {
bitField0_ = (bitField0_ & ~0x80000000);
getTopicsOfNamespaceResponse_ =
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse.getDefaultInstance();
bitField1_ = (bitField1_ & ~0x00000001);
+ getSchema_ =
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema.getDefaultInstance();
+ bitField1_ = (bitField1_ & ~0x00000002);
+ getSchemaResponse_ =
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse.getDefaultInstance();
+ bitField1_ = (bitField1_ & ~0x00000004);
return this;
}
@@ -24936,6 +26134,14 @@ public final class PulsarApi {
to_bitField1_ |= 0x00000001;
}
result.getTopicsOfNamespaceResponse_ = getTopicsOfNamespaceResponse_;
+ if (((from_bitField1_ & 0x00000002) == 0x00000002)) {
+ to_bitField1_ |= 0x00000002;
+ }
+ result.getSchema_ = getSchema_;
+ if (((from_bitField1_ & 0x00000004) == 0x00000004)) {
+ to_bitField1_ |= 0x00000004;
+ }
+ result.getSchemaResponse_ = getSchemaResponse_;
result.bitField0_ = to_bitField0_;
result.bitField1_ = to_bitField1_;
return result;
@@ -25042,6 +26248,12 @@ public final class PulsarApi {
if (other.hasGetTopicsOfNamespaceResponse()) {
mergeGetTopicsOfNamespaceResponse(other.getGetTopicsOfNamespaceResponse());
}
+ if (other.hasGetSchema()) {
+ mergeGetSchema(other.getGetSchema());
+ }
+ if (other.hasGetSchemaResponse()) {
+ mergeGetSchemaResponse(other.getGetSchemaResponse());
+ }
return this;
}
@@ -25230,6 +26442,18 @@ public final class PulsarApi {
return false;
}
}
+ if (hasGetSchema()) {
+ if (!getGetSchema().isInitialized()) {
+
+ return false;
+ }
+ }
+ if (hasGetSchemaResponse()) {
+ if (!getGetSchemaResponse().isInitialized()) {
+
+ return false;
+ }
+ }
return true;
}
@@ -25584,6 +26808,26 @@ public final class PulsarApi {
subBuilder.recycle();
break;
}
+ case 274: {
+
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema.Builder
subBuilder =
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema.newBuilder();
+ if (hasGetSchema()) {
+ subBuilder.mergeFrom(getGetSchema());
+ }
+ input.readMessage(subBuilder, extensionRegistry);
+ setGetSchema(subBuilder.buildPartial());
+ subBuilder.recycle();
+ break;
+ }
+ case 282: {
+
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse.Builder
subBuilder =
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse.newBuilder();
+ if (hasGetSchemaResponse()) {
+ subBuilder.mergeFrom(getGetSchemaResponse());
+ }
+ input.readMessage(subBuilder, extensionRegistry);
+ setGetSchemaResponse(subBuilder.buildPartial());
+ subBuilder.recycle();
+ break;
+ }
}
}
}
@@ -26991,6 +28235,92 @@ public final class PulsarApi {
return this;
}
+ // optional .pulsar.proto.CommandGetSchema getSchema = 34;
+ private org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema
getSchema_ =
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema.getDefaultInstance();
+ public boolean hasGetSchema() {
+ return ((bitField1_ & 0x00000002) == 0x00000002);
+ }
+ public org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema
getGetSchema() {
+ return getSchema_;
+ }
+ public Builder
setGetSchema(org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema
value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ getSchema_ = value;
+
+ bitField1_ |= 0x00000002;
+ return this;
+ }
+ public Builder setGetSchema(
+
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema.Builder
builderForValue) {
+ getSchema_ = builderForValue.build();
+
+ bitField1_ |= 0x00000002;
+ return this;
+ }
+ public Builder
mergeGetSchema(org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema
value) {
+ if (((bitField1_ & 0x00000002) == 0x00000002) &&
+ getSchema_ !=
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema.getDefaultInstance())
{
+ getSchema_ =
+
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema.newBuilder(getSchema_).mergeFrom(value).buildPartial();
+ } else {
+ getSchema_ = value;
+ }
+
+ bitField1_ |= 0x00000002;
+ return this;
+ }
+ public Builder clearGetSchema() {
+ getSchema_ =
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema.getDefaultInstance();
+
+ bitField1_ = (bitField1_ & ~0x00000002);
+ return this;
+ }
+
+ // optional .pulsar.proto.CommandGetSchemaResponse getSchemaResponse =
35;
+ private
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse
getSchemaResponse_ =
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse.getDefaultInstance();
+ public boolean hasGetSchemaResponse() {
+ return ((bitField1_ & 0x00000004) == 0x00000004);
+ }
+ public
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse
getGetSchemaResponse() {
+ return getSchemaResponse_;
+ }
+ public Builder
setGetSchemaResponse(org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse
value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ getSchemaResponse_ = value;
+
+ bitField1_ |= 0x00000004;
+ return this;
+ }
+ public Builder setGetSchemaResponse(
+
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse.Builder
builderForValue) {
+ getSchemaResponse_ = builderForValue.build();
+
+ bitField1_ |= 0x00000004;
+ return this;
+ }
+ public Builder
mergeGetSchemaResponse(org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse
value) {
+ if (((bitField1_ & 0x00000004) == 0x00000004) &&
+ getSchemaResponse_ !=
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse.getDefaultInstance())
{
+ getSchemaResponse_ =
+
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse.newBuilder(getSchemaResponse_).mergeFrom(value).buildPartial();
+ } else {
+ getSchemaResponse_ = value;
+ }
+
+ bitField1_ |= 0x00000004;
+ return this;
+ }
+ public Builder clearGetSchemaResponse() {
+ getSchemaResponse_ =
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse.getDefaultInstance();
+
+ bitField1_ = (bitField1_ & ~0x00000004);
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:pulsar.proto.BaseCommand)
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
index 117019e..ca7b93f 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
@@ -20,17 +20,55 @@ package org.apache.pulsar.common.schema;
import java.util.Collections;
import java.util.Map;
+import java.util.TreeMap;
import lombok.AllArgsConstructor;
import lombok.Data;
+import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
+import org.apache.pulsar.common.api.Commands;
+import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
+import org.apache.pulsar.common.api.proto.PulsarApi.Schema;
+
@Data
@AllArgsConstructor
@NoArgsConstructor
public class SchemaInfo {
+
+ @EqualsAndHashCode.Exclude
private String name;
+
private byte[] schema;
private SchemaType type;
private Map<String, String> properties = Collections.emptyMap();
+
+ public SchemaInfo(String name, SchemaData data) {
+ this.name = name;
+ this.schema = data.getData();
+ this.type = data.getType();
+ this.properties = data.getProps();
+ }
+
+ public SchemaInfo(Schema schema) {
+ this.name = schema.getName();
+ this.schema = schema.getSchemaData().toByteArray();
+ this.type = Commands.getSchemaType(schema.getType());
+ if (schema.getPropertiesCount() == 0) {
+ this.properties = Collections.emptyMap();
+ } else {
+ this.properties = new TreeMap<>();
+ for (int i =0 ; i < schema.getPropertiesCount();i ++ ) {
+ KeyValue kv = schema.getProperties(i);
+ properties.put(kv.getKey(), kv.getValue());
+ }
+ }
+ }
+
+ public SchemaInfo(String name, GetSchemaResponse schema) {
+ this.name = name;
+ this.schema = schema.getData().getBytes();
+ this.type = schema.getType();
+ this.properties = schema.getProperties();
+ }
}
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto
b/pulsar-common/src/main/proto/PulsarApi.proto
index 06eca29..779a27b 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -512,6 +512,22 @@ message CommandGetTopicsOfNamespaceResponse {
repeated string topics = 2;
}
+message CommandGetSchema {
+ required uint64 request_id = 1;
+ required string topic = 2;
+
+ optional bytes schema_version = 3;
+}
+
+message CommandGetSchemaResponse {
+ required uint64 request_id = 1;
+ optional ServerError error_code = 2;
+ optional string error_message = 3;
+
+ optional Schema schema = 4;
+ optional bytes schema_version = 5;
+}
+
message BaseCommand {
enum Type {
CONNECT = 2;
@@ -564,6 +580,9 @@ message BaseCommand {
GET_TOPICS_OF_NAMESPACE = 32;
GET_TOPICS_OF_NAMESPACE_RESPONSE = 33;
+
+ GET_SCHEMA = 34;
+ GET_SCHEMA_RESPONSE = 35;
}
@@ -614,4 +633,6 @@ message BaseCommand {
optional CommandGetTopicsOfNamespace getTopicsOfNamespace = 32;
optional CommandGetTopicsOfNamespaceResponse
getTopicsOfNamespaceResponse = 33;
+ optional CommandGetSchema getSchema = 34;
+ optional CommandGetSchemaResponse getSchemaResponse = 35;
}