jai1 closed pull request #1453: Added Throttling mechanism to Pulsar Proxy
URL: https://github.com/apache/incubator-pulsar/pull/1453
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/conf/proxy.conf b/conf/proxy.conf
index 5d0647d30..f73124038 100644
--- a/conf/proxy.conf
+++ b/conf/proxy.conf
@@ -69,6 +69,13 @@ superUserRoles=
 # make sure authentication is enabled for this to take effect
 forwardAuthorizationCredentials=false
 
+# --- RateLimiting ----
+# Max concurrent inbound Connections, proxy will reject requests beyond that. 
Default value is 10,000 
+maxConcurrentInboundConnections=10000
+
+# Max concurrent outbound Connections, proxy will error out requests beyond 
that. Default value is 10,000 
+maxConcurrentLookupRequests=10000
+
 ##### --- TLS --- #####
 
 # Enable TLS in the proxy
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index 92ff1074a..8b224f618 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -58,7 +58,6 @@
     private String originalPrincipal;
     private String clientAuthData;
     private String clientAuthMethod;
-    private boolean forwardAuthData;
     public static final String TLS_HANDLER = "tls";
 
     private final Authentication authentication;
@@ -70,7 +69,6 @@ public DirectProxyHandler(ProxyService service, 
ProxyConnection proxyConnection,
         this.clientAuthData = proxyConnection.clientAuthData;
         this.clientAuthMethod = proxyConnection.clientAuthMethod;
         ProxyConfiguration config = service.getConfiguration();
-        this.forwardAuthData = 
service.getConfiguration().forwardAuthorizationCredentials();
 
         // Start the connection attempt.
         Bootstrap b = new Bootstrap();
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
index 6da5a8973..aad42dfaa 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
@@ -65,28 +65,37 @@ public void handleLookup(CommandLookupTopic lookup) {
         if (log.isDebugEnabled()) {
             log.debug("Received Lookup from {}", clientAddress);
         }
-
-        lookupRequests.inc();
         long clientRequestId = lookup.getRequestId();
-        String topic = lookup.getTopic();
-        String serviceUrl;
-        if (isBlank(brokerServiceURL)) {
-            ServiceLookupData availableBroker = null;
-            try {
-                availableBroker = service.getDiscoveryProvider().nextBroker();
-            } catch (Exception e) {
-                log.warn("[{}] Failed to get next active broker {}", 
clientAddress, e.getMessage(), e);
-                proxyConnection.ctx().writeAndFlush(
-                        
Commands.newLookupErrorResponse(ServerError.ServiceNotReady, e.getMessage(), 
clientRequestId));
-                return;
+        if (this.service.getLookupRequestSemaphore().tryAcquire()) {
+            lookupRequests.inc();
+            String topic = lookup.getTopic();
+            String serviceUrl;
+            if (isBlank(brokerServiceURL)) {
+                ServiceLookupData availableBroker = null;
+                try {
+                    availableBroker = 
service.getDiscoveryProvider().nextBroker();
+                } catch (Exception e) {
+                    log.warn("[{}] Failed to get next active broker {}", 
clientAddress, e.getMessage(), e);
+                    
proxyConnection.ctx().writeAndFlush(Commands.newLookupErrorResponse(ServerError.ServiceNotReady,
+                            e.getMessage(), clientRequestId));
+                    return;
+                }
+                serviceUrl = this.connectWithTLS ? 
availableBroker.getPulsarServiceUrlTls()
+                        : availableBroker.getPulsarServiceUrl();
+            } else {
+                serviceUrl = this.connectWithTLS ? 
service.getConfiguration().getBrokerServiceURLTLS()
+                        : service.getConfiguration().getBrokerServiceURL();
             }
-            serviceUrl = this.connectWithTLS ? 
availableBroker.getPulsarServiceUrlTls()
-                    : availableBroker.getPulsarServiceUrl();
+            performLookup(clientRequestId, topic, serviceUrl, false, 10);
+            this.service.getLookupRequestSemaphore().release();
         } else {
-            serviceUrl = this.connectWithTLS ? 
service.getConfiguration().getBrokerServiceURLTLS()
-                    : service.getConfiguration().getBrokerServiceURL();
+            if (log.isDebugEnabled()) {
+                log.debug("Request ID {} from {} rejected - Too many 
concurrent lookup requests.", clientRequestId, clientAddress);
+            }
+            
proxyConnection.ctx().writeAndFlush(Commands.newLookupErrorResponse(ServerError.ServiceNotReady,
+                    "Too many concurrent lookup requests", clientRequestId));
         }
-        performLookup(clientRequestId, topic, serviceUrl, false, 10);
+
     }
 
     private void performLookup(long clientRequestId, String topic, String 
brokerServiceUrl, boolean authoritative,
@@ -121,27 +130,26 @@ private void performLookup(long clientRequestId, String 
topic, String brokerServ
             } else {
                 command = Commands.newLookup(topic, authoritative, requestId);
             }
-            clientCnx.newLookup(command,
-                    requestId).thenAccept(result -> {
-                        String brokerUrl = connectWithTLS ? 
result.brokerUrlTls : result.brokerUrl;
-                        if (result.redirect) {
-                            // Need to try the lookup again on a different 
broker
-                            performLookup(clientRequestId, topic, brokerUrl, 
result.authoritative, numberOfRetries - 1);
-                        } else {
-                            // Reply the same address for both TLS non-TLS. 
The reason is that whether we use TLS
-                            // between proxy
-                            // and broker is independent of whether the client 
itself uses TLS, but we need to force the
-                            // client
-                            // to use the appropriate target broker (and port) 
when it will connect back.
-                            
proxyConnection.ctx().writeAndFlush(Commands.newLookupResponse(brokerUrl, 
brokerUrl, true,
-                                    LookupType.Connect, clientRequestId, true 
/* this is coming from proxy */));
-                        }
-                    }).exceptionally(ex -> {
-                        log.warn("[{}] Failed to lookup topic {}: {}", 
clientAddress, topic, ex.getMessage());
-                        
proxyConnection.ctx().writeAndFlush(Commands.newLookupErrorResponse(ServerError.ServiceNotReady,
-                                ex.getMessage(), clientRequestId));
-                        return null;
-                    });
+            clientCnx.newLookup(command, requestId).thenAccept(result -> {
+                String brokerUrl = connectWithTLS ? result.brokerUrlTls : 
result.brokerUrl;
+                if (result.redirect) {
+                    // Need to try the lookup again on a different broker
+                    performLookup(clientRequestId, topic, brokerUrl, 
result.authoritative, numberOfRetries - 1);
+                } else {
+                    // Reply the same address for both TLS non-TLS. The reason 
is that whether we use TLS
+                    // between proxy
+                    // and broker is independent of whether the client itself 
uses TLS, but we need to force the
+                    // client
+                    // to use the appropriate target broker (and port) when it 
will connect back.
+                    
proxyConnection.ctx().writeAndFlush(Commands.newLookupResponse(brokerUrl, 
brokerUrl, true,
+                            LookupType.Connect, clientRequestId, true /* this 
is coming from proxy */));
+                }
+            }).exceptionally(ex -> {
+                log.warn("[{}] Failed to lookup topic {}: {}", clientAddress, 
topic, ex.getMessage());
+                proxyConnection.ctx().writeAndFlush(
+                        
Commands.newLookupErrorResponse(ServerError.ServiceNotReady, ex.getMessage(), 
clientRequestId));
+                return null;
+            });
         }).exceptionally(ex -> {
             // Failed to connect to backend broker
             proxyConnection.ctx().writeAndFlush(
@@ -155,13 +163,22 @@ public void 
handlePartitionMetadataResponse(CommandPartitionedTopicMetadata part
         if (log.isDebugEnabled()) {
             log.debug("[{}] Received PartitionMetadataLookup", clientAddress);
         }
-
         final long clientRequestId = partitionMetadata.getRequestId();
+        if (this.service.getLookupRequestSemaphore().tryAcquire()) {
+            handlePartitionMetadataResponse(partitionMetadata, 
clientRequestId);
+            this.service.getLookupRequestSemaphore().release();
+        } else {
+            
proxyConnection.ctx().writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.ServiceNotReady,
+                    "Too many concurrent lookup requests", clientRequestId));
+        }
+    }
+
+    private void 
handlePartitionMetadataResponse(CommandPartitionedTopicMetadata 
partitionMetadata,
+            long clientRequestId) {
         TopicName topicName = TopicName.get(partitionMetadata.getTopic());
         if (isBlank(brokerServiceURL)) {
-            
service.getDiscoveryProvider().getPartitionedTopicMetadata(service, topicName, 
proxyConnection.clientAuthRole,
-                    proxyConnection.authenticationData)
-                    .thenAccept(metadata -> {
+            
service.getDiscoveryProvider().getPartitionedTopicMetadata(service, topicName,
+                    proxyConnection.clientAuthRole, 
proxyConnection.authenticationData).thenAccept(metadata -> {
                         if (log.isDebugEnabled()) {
                             log.debug("[{}] Total number of partitions for 
topic {} is {}",
                                     proxyConnection.clientAuthRole, topicName, 
metadata.partitions);
@@ -202,18 +219,16 @@ public void 
handlePartitionMetadataResponse(CommandPartitionedTopicMetadata part
                 } else {
                     command = 
Commands.newPartitionMetadataRequest(topicName.toString(), requestId);
                 }
-                clientCnx.newLookup(
-                        command,
-                        requestId).thenAccept(lookupDataResult -> {
-                            proxyConnection.ctx().writeAndFlush(Commands
-                                    
.newPartitionMetadataResponse(lookupDataResult.partitions, clientRequestId));
-                        }).exceptionally((ex) -> {
-                            log.warn("[{}] failed to get Partitioned metadata 
: {}", topicName.toString(),
-                                    ex.getCause().getMessage(), ex);
-                            
proxyConnection.ctx().writeAndFlush(Commands.newLookupErrorResponse(
-                                    ServerError.ServiceNotReady, 
ex.getMessage(), clientRequestId));
-                            return null;
-                        });
+                clientCnx.newLookup(command, 
requestId).thenAccept(lookupDataResult -> {
+                    proxyConnection.ctx().writeAndFlush(
+                            
Commands.newPartitionMetadataResponse(lookupDataResult.partitions, 
clientRequestId));
+                }).exceptionally((ex) -> {
+                    log.warn("[{}] failed to get Partitioned metadata : {}", 
topicName.toString(),
+                            ex.getCause().getMessage(), ex);
+                    
proxyConnection.ctx().writeAndFlush(Commands.newLookupErrorResponse(ServerError.ServiceNotReady,
+                            ex.getMessage(), clientRequestId));
+                    return null;
+                });
             }).exceptionally(ex -> {
                 // Failed to connect to backend broker
                 
proxyConnection.ctx().writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.ServiceNotReady,
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index a8d3855cb..43b8d56e8 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -70,6 +70,12 @@
     // make sure authentication is enabled for this to take effect
     private boolean forwardAuthorizationCredentials = false;
 
+    // Max concurrent inbound Connections
+    private int maxConcurrentInboundConnections = 10000;
+
+    // Max concurrent outbound Connections
+    private int maxConcurrentLookupRequests = 10000;
+
     // Authentication settings of the proxy itself. Used to connect to brokers
     private String brokerClientAuthenticationPlugin;
     private String brokerClientAuthenticationParameters;
@@ -335,7 +341,23 @@ public void setTlsProtocols(Set<String> tlsProtocols) {
     public void setTlsCiphers(Set<String> tlsCiphers) {
         this.tlsCiphers = tlsCiphers;
     }
-    
+
+    public int getMaxConcurrentInboundConnections() {
+        return maxConcurrentInboundConnections;
+    }
+
+    public void setMaxConcurrentInboundConnections(int 
maxConcurrentInboundConnections) {
+        this.maxConcurrentInboundConnections = maxConcurrentInboundConnections;
+    }
+
+    public int getMaxConcurrentLookupRequests() {
+        return maxConcurrentLookupRequests;
+    }
+
+    public void setMaxConcurrentLookupRequests(int 
maxConcurrentLookupRequests) {
+        this.maxConcurrentLookupRequests = maxConcurrentLookupRequests;
+    }
+
     public boolean getTlsRequireTrustedClientCertOnConnect() {
         return tlsRequireTrustedClientCertOnConnect;
     }
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 89d737d1a..e65461a3b 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -40,6 +40,8 @@
 
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandler;
+import io.netty.channel.ChannelPipeline;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.FutureListener;
@@ -91,10 +93,28 @@ public ProxyConnection(ProxyService proxyService) {
         this.state = State.Init;
     }
 
+    @Override
+    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
+        super.channelRegistered(ctx);
+        activeConnections.inc();
+        if (activeConnections.get() > 
service.getConfiguration().getMaxConcurrentInboundConnections()) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("[{}] Too many connection opened {}", remoteAddress, 
activeConnections.get());
+            }
+            ctx.close();
+            return;
+        }
+    }
+
+    @Override
+    public void channelUnregistered(ChannelHandlerContext ctx) throws 
Exception {
+        super.channelUnregistered(ctx);
+        activeConnections.dec();
+    }
+    
     @Override
     public void channelActive(ChannelHandlerContext ctx) throws Exception {
         super.channelActive(ctx);
-        activeConnections.inc();
         newConnections.inc();
         LOG.info("[{}] New connection opened", remoteAddress);
     }
@@ -102,7 +122,6 @@ public void channelActive(ChannelHandlerContext ctx) throws 
Exception {
     @Override
     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
         super.channelInactive(ctx);
-        activeConnections.dec();
 
         if (directProxyHandler != null && directProxyHandler.outboundChannel 
!= null) {
             directProxyHandler.outboundChannel.close();
@@ -164,7 +183,7 @@ protected void handleConnect(CommandConnect connect) {
             close();
             return;
         }
-        
+
         if (connect.hasProxyToBrokerUrl()) {
             // Client already knows which broker to connect. Let's open a 
connection
             // there and just pass bytes in both directions
@@ -226,8 +245,7 @@ private boolean verifyAuthenticationIfNeeded(CommandConnect 
connect) {
                 sslSession = ((SslHandler) sslHandler).engine().getSession();
             }
             authenticationData = new AuthenticationDataCommand(authData, 
remoteAddress, sslSession);
-            clientAuthRole = service.getAuthenticationService()
-                    .authenticate(authenticationData, authMethod);
+            clientAuthRole = 
service.getAuthenticationService().authenticate(authenticationData, authMethod);
             LOG.info("[{}] Client successfully authenticated with {} role {}", 
remoteAddress, authMethod,
                     clientAuthRole);
             return true;
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index 433744273..c77ace2fb 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -25,6 +25,8 @@
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
@@ -78,12 +80,17 @@
 
     private LocalZooKeeperConnectionService localZooKeeperConnectionService;
 
+    protected final AtomicReference<Semaphore> lookupRequestSemaphore;
+
     private static final int numThreads = 
Runtime.getRuntime().availableProcessors();
 
     public ProxyService(ProxyConfiguration proxyConfig) throws IOException {
         checkNotNull(proxyConfig);
         this.proxyConfig = proxyConfig;
 
+        this.lookupRequestSemaphore = new AtomicReference<Semaphore>(
+                new Semaphore(proxyConfig.getMaxConcurrentLookupRequests(), 
false));
+
         String hostname;
         try {
             hostname = InetAddress.getLocalHost().getHostName();
@@ -93,7 +100,7 @@ public ProxyService(ProxyConfiguration proxyConfig) throws 
IOException {
         this.serviceUrl = String.format("pulsar://%s:%d/", hostname, 
proxyConfig.getServicePort());
         this.serviceUrlTls = String.format("pulsar://%s:%d/", hostname, 
proxyConfig.getServicePortTls());
 
-        this.acceptorGroup  = EventLoopUtil.newEventLoopGroup(1, 
acceptorThreadFactory);
+        this.acceptorGroup = EventLoopUtil.newEventLoopGroup(1, 
acceptorThreadFactory);
         this.workerGroup = EventLoopUtil.newEventLoopGroup(numThreads, 
workersThreadFactory);
 
         ClientConfigurationData clientConf = new ClientConfigurationData();
@@ -218,5 +225,9 @@ public void 
setConfigurationCacheService(ConfigurationCacheService configuration
         this.configurationCacheService = configurationCacheService;
     }
 
+    public Semaphore getLookupRequestSemaphore() {
+        return lookupRequestSemaphore.get();
+    }
+
     private static final Logger LOG = 
LoggerFactory.getLogger(ProxyService.class);
 }
\ No newline at end of file
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
new file mode 100644
index 000000000..008e751e3
--- /dev/null
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server;
+
+import static org.mockito.Mockito.doReturn;
+
+import org.apache.bookkeeper.test.PortManager;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class ProxyConnectionThrottlingTest extends MockedPulsarServiceBaseTest 
{
+
+    private final String DUMMY_VALUE = "DUMMY_VALUE";
+    private final int NUM_CONCURRENT_LOOKUP = 3;
+    private final int NUM_CONCURRENT_INBOUND_CONNECTION = 2;
+    private ProxyService proxyService;
+    private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+
+    @Override
+    @BeforeClass
+    protected void setup() throws Exception {
+        internalSetup();
+
+        proxyConfig.setServicePort(PortManager.nextFreePort());
+        proxyConfig.setZookeeperServers(DUMMY_VALUE);
+        proxyConfig.setGlobalZookeeperServers(DUMMY_VALUE);
+        proxyConfig.setMaxConcurrentLookupRequests(NUM_CONCURRENT_LOOKUP);
+        
proxyConfig.setMaxConcurrentInboundConnections(NUM_CONCURRENT_INBOUND_CONNECTION);
+        proxyService = Mockito.spy(new ProxyService(proxyConfig));
+        
doReturn(mockZooKeeperClientFactory).when(proxyService).getZooKeeperClientFactory();
+
+        proxyService.start();
+    }
+
+    @Override
+    @AfterClass
+    protected void cleanup() throws Exception {
+        internalCleanup();
+        proxyService.close();
+    }
+
+    @Test
+    public void testInboundConnection() throws Exception {
+        LOG.info("Creating producer 1");
+        PulsarClient client1 = 
PulsarClient.builder().serviceUrl("pulsar://localhost:" + 
proxyConfig.getServicePort())
+                .build();
+        Producer<byte[]> producer1 = 
client1.newProducer().topic("persistent://sample/test/local/producer-topic-1").create();
+        
+        LOG.info("Creating producer 2");
+        PulsarClient client2 = 
PulsarClient.builder().serviceUrl("pulsar://localhost:" + 
proxyConfig.getServicePort())
+                .build();
+        Producer<byte[]> producer2;
+        try {
+            producer2 = 
client2.newProducer().topic("persistent://sample/test/local/producer-topic-1").create();
+            producer2.send("Message 1".getBytes());
+            Assert.fail("Should have failed since max num of connections is 2 
and the first producer used them all up - one for discovery and other for 
producing.");
+        } catch (Exception ex) {
+            // OK
+        }
+    }
+    
+    private static final Logger LOG = 
LoggerFactory.getLogger(ProxyConnectionThrottlingTest.class);
+}
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
new file mode 100644
index 000000000..c661caea4
--- /dev/null
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server;
+
+import static org.mockito.Mockito.doReturn;
+import static org.testng.Assert.assertTrue;
+
+import org.apache.bookkeeper.test.PortManager;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class ProxyLookupThrottlingTest extends MockedPulsarServiceBaseTest {
+
+    private final String DUMMY_VALUE = "DUMMY_VALUE";
+    private final int NUM_CONCURRENT_LOOKUP = 3;
+    private final int NUM_CONCURRENT_INBOUND_CONNECTION = 5;
+    private ProxyService proxyService;
+    private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+
+    @Override
+    @BeforeClass
+    protected void setup() throws Exception {
+        internalSetup();
+
+        proxyConfig.setServicePort(PortManager.nextFreePort());
+        proxyConfig.setZookeeperServers(DUMMY_VALUE);
+        proxyConfig.setGlobalZookeeperServers(DUMMY_VALUE);
+        proxyConfig.setMaxConcurrentLookupRequests(NUM_CONCURRENT_LOOKUP);
+        
proxyConfig.setMaxConcurrentInboundConnections(NUM_CONCURRENT_INBOUND_CONNECTION);
+        proxyService = Mockito.spy(new ProxyService(proxyConfig));
+        
doReturn(mockZooKeeperClientFactory).when(proxyService).getZooKeeperClientFactory();
+
+        proxyService.start();
+    }
+
+    @Override
+    @AfterClass
+    protected void cleanup() throws Exception {
+        internalCleanup();
+        proxyService.close();
+    }
+
+    @Test
+    public void testLookup() throws Exception {
+        PulsarClient client = 
PulsarClient.builder().serviceUrl("pulsar://localhost:" + 
proxyConfig.getServicePort())
+                .connectionsPerBroker(5).ioThreads(5).build();
+        assertTrue(proxyService.getLookupRequestSemaphore().tryAcquire());
+        assertTrue(proxyService.getLookupRequestSemaphore().tryAcquire());
+        Producer<byte[]> producer1 = 
client.newProducer().topic("persistent://sample/test/local/producer-topic")
+                .create();
+        assertTrue(proxyService.getLookupRequestSemaphore().tryAcquire());
+        try {
+            Producer<byte[]> producer2 = 
client.newProducer().topic("persistent://sample/test/local/producer-topic")
+                    .create();
+            Assert.fail("Should have failed since can't acquire 
LookupRequestSemaphore");
+        } catch (Exception ex) {
+            // Ignore
+        }
+
+        proxyService.getLookupRequestSemaphore().release();
+        try {
+            Producer<byte[]> producer3 = 
client.newProducer().topic("persistent://sample/test/local/producer-topic")
+                    .create();
+        } catch (Exception ex) {
+            Assert.fail("Should not have failed since can acquire 
LookupRequestSemaphore");
+        }
+        client.close();
+    }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to