This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1e097cb [pulsar-proxy]Add the LookupProxyHandler handle getSchema
request and test (#4083)
1e097cb is described below
commit 1e097cbced91789d8515744cb20d1fecc17b3b41
Author: congbo <[email protected]>
AuthorDate: Sun Apr 21 22:06:34 2019 +0800
[pulsar-proxy]Add the LookupProxyHandler handle getSchema request and test
(#4083)
### Motivation
In order to support #3742 #3876.
Now, proxy handle ProxyLookupRequests don't support GetSchema.
### Modifications
Add the getSchema method implementation in ProxyConnection
### Verifying this change
Add new a test in ProxyTest testGetSchema()
### Dependencies (does it add or upgrade a dependency): (yes / no)
The public API: (no)
The schema: (no)
The default values of configurations: (no)
The wire protocol: (no)
The rest endpoints: (yes)
The admin cli options: (no)
Anything that affects deployment: (no)
### Documentation
Does this pull request introduce a new feature? (no)
If yes, how is the feature documented? (no)
If a feature is not documented yet in this PR, please create a followup
issue for adding the documentation
---
.../pulsar/proxy/server/LookupProxyHandler.java | 125 ++++++++++++++++-----
.../pulsar/proxy/server/ProxyConnection.java | 7 ++
.../org/apache/pulsar/proxy/server/ProxyTest.java | 44 ++++++++
3 files changed, 151 insertions(+), 25 deletions(-)
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
index 0072dc5..548c522 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
@@ -24,14 +24,19 @@ import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.Optional;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.api.Commands;
import
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopic;
import
org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse.LookupType;
import
org.apache.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadata;
import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.schema.BytesSchemaVersion;
+import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,6 +65,11 @@ public class LookupProxyHandler {
.create()
.register();
+ private static final Counter getSchemaRequests = Counter
+ .build("pulsar_proxy_get_schema_requests", "Counter of schema
requests")
+ .create()
+ .register();
+
static final Counter rejectedLookupRequests =
Counter.build("pulsar_proxy_rejected_lookup_requests",
"Counter of topic lookup requests rejected due to
throttling").create().register();
@@ -280,26 +290,12 @@ public class LookupProxyHandler {
}
}
-
private void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace
commandGetTopicsOfNamespace,
long clientRequestId) {
- String serviceUrl;
- if (isBlank(brokerServiceURL)) {
- ServiceLookupData availableBroker;
- try {
- availableBroker = service.getDiscoveryProvider().nextBroker();
- } catch (Exception e) {
- log.warn("[{}] Failed to get next active broker {}",
clientAddress, e.getMessage(), e);
- proxyConnection.ctx().writeAndFlush(Commands.newError(
- clientRequestId, ServerError.ServiceNotReady,
e.getMessage()
- ));
- return;
- }
- serviceUrl = this.connectWithTLS ?
- availableBroker.getPulsarServiceUrlTls() :
availableBroker.getPulsarServiceUrl();
- } else {
- serviceUrl = this.connectWithTLS ?
- service.getConfiguration().getBrokerServiceURLTLS() :
service.getConfiguration().getBrokerServiceURL();
+ String serviceUrl = getServiceUrl(clientRequestId);
+
+ if(!StringUtils.isNotBlank(serviceUrl)) {
+ return;
}
performGetTopicsOfNamespace(clientRequestId,
commandGetTopicsOfNamespace.getNamespace(), serviceUrl, 10,
commandGetTopicsOfNamespace.getMode());
@@ -316,16 +312,12 @@ public class LookupProxyHandler {
return;
}
- URI brokerURI;
- try {
- brokerURI = new URI(brokerServiceUrl);
- } catch (URISyntaxException e) {
- proxyConnection.ctx().writeAndFlush(
- Commands.newError(clientRequestId,
ServerError.MetadataError, e.getMessage()));
+ InetSocketAddress addr = getAddr(brokerServiceUrl, clientRequestId);
+
+ if(addr == null){
return;
}
- InetSocketAddress addr =
InetSocketAddress.createUnresolved(brokerURI.getHost(), brokerURI.getPort());
if (log.isDebugEnabled()) {
log.debug("Getting connections to '{}' for getting
TopicsOfNamespace '{}' with clientReq Id '{}'",
addr, namespaceName, clientRequestId);
@@ -352,5 +344,88 @@ public class LookupProxyHandler {
});
}
+ public void handleGetSchema(CommandGetSchema commandGetSchema) {
+ getSchemaRequests.inc();
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Received GetSchema", clientAddress);
+ }
+
+ final long clientRequestId = commandGetSchema.getRequestId();
+ String serviceUrl = getServiceUrl(clientRequestId);
+
+ if(!StringUtils.isNotBlank(serviceUrl)) {
+ return;
+ }
+ InetSocketAddress addr = getAddr(serviceUrl, clientRequestId);
+
+ if(addr == null){
+ return;
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("Getting connections to '{}' for getting schema of topic
'{}' with clientReq Id '{}'",
+ addr, commandGetSchema.getTopic(), clientRequestId);
+ }
+
+
proxyConnection.getConnectionPool().getConnection(addr).thenAccept(clientCnx ->
{
+ // Connected to backend broker
+ long requestId = proxyConnection.newRequestId();
+ ByteBuf command;
+ byte[] schemaVersion =
commandGetSchema.getSchemaVersion().toByteArray();
+ command = Commands.newGetSchema(requestId,
commandGetSchema.getTopic(),
+ Optional.ofNullable(BytesSchemaVersion.of(schemaVersion)));
+ clientCnx.sendGetSchema(command,
requestId).thenAccept(optionalSchemaInfo -> {
+ SchemaInfo schemaInfo = optionalSchemaInfo.get();
+ proxyConnection.ctx().writeAndFlush(
+ Commands.newGetSchemaResponse(clientRequestId,
+ schemaInfo,
+ BytesSchemaVersion.of(schemaVersion)));
+ }).exceptionally(ex -> {
+ log.warn("[{}] Failed to get schema {}: {}", clientAddress,
commandGetSchema.getTopic(), ex.getMessage());
+ proxyConnection.ctx().writeAndFlush(
+ Commands.newError(clientRequestId,
ServerError.ServiceNotReady, ex.getMessage()));
+ return null;
+ });
+ }).exceptionally(ex -> {
+ // Failed to connect to backend broker
+ proxyConnection.ctx().writeAndFlush(
+ Commands.newError(clientRequestId,
ServerError.ServiceNotReady, ex.getMessage()));
+ return null;
+ });
+
+ }
+
+ private String getServiceUrl(long clientRequestId) {
+ if (isBlank(brokerServiceURL)) {
+ ServiceLookupData availableBroker;
+ try {
+ availableBroker = service.getDiscoveryProvider().nextBroker();
+ } catch (Exception e) {
+ log.warn("[{}] Failed to get next active broker {}",
clientAddress, e.getMessage(), e);
+ proxyConnection.ctx().writeAndFlush(Commands.newError(
+ clientRequestId, ServerError.ServiceNotReady,
e.getMessage()
+ ));
+ return null;
+ }
+ return this.connectWithTLS ?
+ availableBroker.getPulsarServiceUrlTls() :
availableBroker.getPulsarServiceUrl();
+ } else {
+ return this.connectWithTLS ?
+ service.getConfiguration().getBrokerServiceURLTLS() :
service.getConfiguration().getBrokerServiceURL();
+ }
+
+ }
+
+ private InetSocketAddress getAddr(String brokerServiceUrl, long
clientRequestId) {
+ URI brokerURI;
+ try {
+ brokerURI = new URI(brokerServiceUrl);
+ } catch (URISyntaxException e) {
+ proxyConnection.ctx().writeAndFlush(
+ Commands.newError(clientRequestId,
ServerError.MetadataError, e.getMessage()));
+ return null;
+ }
+ return InetSocketAddress.createUnresolved(brokerURI.getHost(),
brokerURI.getPort());
+ }
+
private static final Logger log =
LoggerFactory.getLogger(LookupProxyHandler.class);
}
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index a1147f6..18e4e5d 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -47,6 +47,7 @@ import
org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthResponse;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnect;
import
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopic;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema;
import
org.apache.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadata;
import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
import org.slf4j.Logger;
@@ -371,6 +372,12 @@ public class ProxyConnection extends PulsarHandler
implements FutureListener<Voi
lookupProxyHandler.handleGetTopicsOfNamespace(commandGetTopicsOfNamespace);
}
+ @Override
+ protected void handleGetSchema(CommandGetSchema commandGetSchema) {
+ checkArgument(state == State.ProxyLookupRequests);
+
+ lookupProxyHandler.handleGetSchema(commandGetSchema);
+ }
/**
* handles discovery request from client ands sends next active broker
address
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
index be47d9e..94cb83e 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
@@ -24,12 +24,17 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import static org.mockito.Mockito.doReturn;
import static org.testng.Assert.assertEquals;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.avro.reflect.Nullable;
import org.apache.bookkeeper.test.PortManager;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationService;
@@ -47,11 +52,14 @@ import
org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange;
import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -65,6 +73,18 @@ public class ProxyTest extends MockedPulsarServiceBaseTest {
private ProxyService proxyService;
private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+
+ @Data
+ @ToString
+ @EqualsAndHashCode
+ public static class Foo {
+ @Nullable
+ private String field1;
+ @Nullable
+ private String field2;
+ private int field3;
+ }
+
@Override
@BeforeClass
protected void setup() throws Exception {
@@ -206,6 +226,29 @@ public class ProxyTest extends MockedPulsarServiceBaseTest
{
}
@Test
+ private void testGetSchema() throws Exception {
+ PulsarClient client =
PulsarClient.builder().serviceUrl("pulsar://localhost:" +
proxyConfig.getServicePort().get())
+ .build();
+ Producer<Foo> producer;
+ Schema schema = Schema.AVRO(Foo.class);
+ try {
+ producer =
client.newProducer(schema).topic("persistent://sample/test/local/get-schema")
+ .create();
+ } catch (Exception ex) {
+ Assert.fail("Should not have failed since can acquire
LookupRequestSemaphore");
+ }
+ byte[] schemaVersion = new byte[8];
+ byte b = new Long(0l).byteValue();
+ for (int i = 0; i<8; i++){
+ schemaVersion[i] = b;
+ }
+ SchemaInfo schemaInfo = ((PulsarClientImpl)client).getLookup()
+
.getSchema(TopicName.get("persistent://sample/test/local/get-schema"),
schemaVersion).get().orElse(null);
+ Assert.assertEquals(new String(schemaInfo.getSchema()), new
String(schema.getSchemaInfo().getSchema()));
+ client.close();
+ }
+
+ @Test
private void testProtocolVersionAdvertisement() throws Exception {
final String url = "pulsar://localhost:" +
proxyConfig.getServicePort().get();
final String topic =
"persistent://sample/test/local/protocol-version-advertisement";
@@ -234,6 +277,7 @@ public class ProxyTest extends MockedPulsarServiceBaseTest {
client.close();
}
+
private static PulsarClient
getClientActiveConsumerChangeNotSupported(ClientConfigurationData conf)
throws Exception {
ThreadFactory threadFactory = new
DefaultThreadFactory("pulsar-client-io", Thread.currentThread().isDaemon());