This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ce522cdbe18 [feat][proxy] Support proxy limit maximum connections per 
IP (#17167)
ce522cdbe18 is described below

commit ce522cdbe18403048780e2b82188526136f52f9c
Author: Qiang Zhao <[email protected]>
AuthorDate: Mon Aug 29 19:14:38 2022 +0800

    [feat][proxy] Support proxy limit maximum connections per IP (#17167)
---
 conf/proxy.conf                                    |  3 +
 .../broker/limiter}/ConnectionController.java      | 25 +++++---
 .../apache/pulsar/broker/limiter/package-info.java | 22 +++++++
 .../apache/pulsar/broker/service/ServerCnx.java    |  5 +-
 .../pulsar/proxy/server/ProxyConfiguration.java    |  7 +++
 .../pulsar/proxy/server/ProxyConnection.java       | 15 ++++-
 .../apache/pulsar/proxy/server/ProxyService.java   |  7 +++
 .../server/ProxyConnectionThrottlingTest.java      | 72 +++++++++++++++-------
 8 files changed, 123 insertions(+), 33 deletions(-)

diff --git a/conf/proxy.conf b/conf/proxy.conf
index 3767bb52eaa..abec2c2c8b4 100644
--- a/conf/proxy.conf
+++ b/conf/proxy.conf
@@ -193,6 +193,9 @@ tlsCertRefreshCheckDurationSec=300
 # Max concurrent inbound connections. The proxy will reject requests beyond 
that.
 maxConcurrentInboundConnections=10000
 
+# Max concurrent inbound connections per IP, The proxy will reject requests 
beyond that.
+maxConcurrentInboundConnectionsPerIp=0
+
 # Max concurrent outbound connections. The proxy will error out requests 
beyond that.
 maxConcurrentLookupRequests=50000
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConnectionController.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/limiter/ConnectionController.java
similarity index 87%
rename from 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConnectionController.java
rename to 
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/limiter/ConnectionController.java
index 65c3a6c4f2a..609d0cc1907 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConnectionController.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/limiter/ConnectionController.java
@@ -16,15 +16,15 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.broker.service;
+package org.apache.pulsar.broker.limiter;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.locks.ReentrantLock;
 import org.apache.commons.lang3.mutable.MutableInt;
-import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.common.tls.InetAddressUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -60,11 +60,11 @@ public interface ConnectionController {
         private final boolean maxConnectionsLimitEnabled;
         private final boolean maxConnectionsLimitPerIpEnabled;
 
-        public DefaultConnectionController(ServiceConfiguration configuration) 
{
-            this.maxConnections = configuration.getBrokerMaxConnections();
-            this.maxConnectionPerIp = 
configuration.getBrokerMaxConnectionsPerIp();
-            this.maxConnectionsLimitEnabled = 
configuration.getBrokerMaxConnections() > 0;
-            this.maxConnectionsLimitPerIpEnabled = 
configuration.getBrokerMaxConnectionsPerIp() > 0;
+        public DefaultConnectionController(int maxConnections, int 
maxConnectionPerIp) {
+            this.maxConnections = maxConnections;
+            this.maxConnectionPerIp = maxConnectionPerIp;
+            this.maxConnectionsLimitEnabled = maxConnections > 0;
+            this.maxConnectionsLimitPerIpEnabled = maxConnectionPerIp > 0;
         }
 
         @Override
@@ -131,6 +131,17 @@ public interface ConnectionController {
         private boolean isLegalIpAddress(String address) {
             return InetAddressUtils.isIPv4Address(address) || 
InetAddressUtils.isIPv6Address(address);
         }
+
+        @VisibleForTesting
+        public static int getTotalConnectionNum() {
+            return totalConnectionNum;
+        }
+
+        @VisibleForTesting
+        public static Map<String, MutableInt> getConnections() {
+            return CONNECTIONS;
+        }
+
     }
 
 
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/limiter/package-info.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/limiter/package-info.java
new file mode 100644
index 00000000000..9483f342798
--- /dev/null
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/limiter/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Pulsar Client API.
+ */
+package org.apache.pulsar.broker.limiter;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 604b824881a..55b82e9639e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -75,6 +75,7 @@ import 
org.apache.pulsar.broker.authentication.AuthenticationDataSubscription;
 import org.apache.pulsar.broker.authentication.AuthenticationProvider;
 import org.apache.pulsar.broker.authentication.AuthenticationState;
 import org.apache.pulsar.broker.intercept.BrokerInterceptor;
+import org.apache.pulsar.broker.limiter.ConnectionController;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
@@ -274,7 +275,9 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         this.maxPendingBytesPerThread = 
conf.getMaxMessagePublishBufferSizeInMB() * 1024L * 1024L
                 / conf.getNumIOThreads();
         this.resumeThresholdPendingBytesPerThread = 
this.maxPendingBytesPerThread / 2;
-        this.connectionController = new 
ConnectionController.DefaultConnectionController(conf);
+        this.connectionController = new 
ConnectionController.DefaultConnectionController(
+                conf.getBrokerMaxConnections(),
+                conf.getBrokerMaxConnectionsPerIp());
         this.enableSubscriptionPatternEvaluation = 
conf.isEnableBrokerSideSubscriptionPatternEvaluation();
         this.maxSubscriptionPatternLength = 
conf.getSubscriptionPatternMaxLength();
         this.topicListService = new TopicListService(pulsar, this,
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index af3e55d1993..9938a1fe307 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -392,6 +392,13 @@ public class ProxyConfiguration implements 
PulsarConfiguration {
     )
     private int maxConcurrentInboundConnections = 10000;
 
+    @FieldContext(
+            category = CATEGORY_RATE_LIMITING,
+            doc = "The maximum number of connections per IP. If it exceeds, 
new connections are rejected."
+    )
+    private int maxConcurrentInboundConnectionsPerIp = 0;
+
+
     @FieldContext(
         category = CATEGORY_RATE_LIMITING,
         doc = "Max concurrent lookup requests. The proxy will reject requests 
beyond that"
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index fe62b606314..8dbfd0844eb 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -49,6 +49,7 @@ import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.authentication.AuthenticationProvider;
 import org.apache.pulsar.broker.authentication.AuthenticationState;
+import org.apache.pulsar.broker.limiter.ConnectionController;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.ClientCnx;
@@ -92,6 +93,7 @@ public class ProxyConnection extends PulsarHandler {
     @Getter
     private DirectProxyHandler directProxyHandler = null;
     private final BrokerProxyValidator brokerProxyValidator;
+    private final ConnectionController connectionController;
     String clientAuthRole;
     AuthData clientAuthData;
     String clientAuthMethod;
@@ -144,15 +146,21 @@ public class ProxyConnection extends PulsarHandler {
         this.dnsAddressResolverGroup = dnsAddressResolverGroup;
         this.state = State.Init;
         this.brokerProxyValidator = service.getBrokerProxyValidator();
+        this.connectionController = proxyService.getConnectionController();
     }
 
     @Override
     public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
         super.channelRegistered(ctx);
         ProxyService.ACTIVE_CONNECTIONS.inc();
-        if (ProxyService.ACTIVE_CONNECTIONS.get() > 
service.getConfiguration().getMaxConcurrentInboundConnections()) {
-            state = State.Closing;
-            ctx.close();
+        SocketAddress rmAddress = ctx.channel().remoteAddress();
+        ConnectionController.State state = 
connectionController.increaseConnection(rmAddress);
+        if (!state.equals(ConnectionController.State.OK)) {
+            ctx.writeAndFlush(Commands.newError(-1, 
ServerError.NotAllowedError,
+                    
state.equals(ConnectionController.State.REACH_MAX_CONNECTION)
+                            ? "Reached the maximum number of connections"
+                            : "Reached the maximum number of connections on 
address" + rmAddress))
+                            .addListener(result -> ctx.close());
             ProxyService.REJECTED_CONNECTIONS.inc();
         }
     }
@@ -160,6 +168,7 @@ public class ProxyConnection extends PulsarHandler {
     @Override
     public void channelUnregistered(ChannelHandlerContext ctx) throws 
Exception {
         super.channelUnregistered(ctx);
+        connectionController.decreaseConnection(ctx.channel().remoteAddress());
         ProxyService.ACTIVE_CONNECTIONS.dec();
     }
 
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index 2fb3fd67446..8b8b474e5e3 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -56,6 +56,7 @@ import lombok.Setter;
 import org.apache.pulsar.broker.ServiceConfigurationUtils;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.authorization.AuthorizationService;
+import org.apache.pulsar.broker.limiter.ConnectionController;
 import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
@@ -146,6 +147,9 @@ public class ProxyService implements Closeable {
     private PrometheusMetricsServlet metricsServlet;
     private List<PrometheusRawMetricsProvider> pendingMetricsProviders;
 
+    @Getter
+    private final ConnectionController connectionController;
+
     public ProxyService(ProxyConfiguration proxyConfig,
                         AuthenticationService authenticationService) throws 
Exception {
         requireNonNull(proxyConfig);
@@ -202,6 +206,9 @@ public class ProxyService implements Closeable {
         } else {
             proxyClientAuthentication = AuthenticationDisabled.INSTANCE;
         }
+        this.connectionController = new 
ConnectionController.DefaultConnectionController(
+                proxyConfig.getMaxConcurrentInboundConnections(),
+                proxyConfig.getMaxConcurrentInboundConnectionsPerIp());
     }
 
     public void start() throws Exception {
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
index cb23ad42131..fb4de9a65b0 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
@@ -19,31 +19,30 @@
 package org.apache.pulsar.proxy.server;
 
 import static org.mockito.Mockito.doReturn;
-
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
-
 import lombok.Cleanup;
-
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.broker.limiter.ConnectionController;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.metadata.impl.ZKMetadataStore;
 import org.mockito.Mockito;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+@Slf4j
 public class ProxyConnectionThrottlingTest extends MockedPulsarServiceBaseTest 
{
 
     private final int NUM_CONCURRENT_LOOKUP = 3;
-    private final int NUM_CONCURRENT_INBOUND_CONNECTION = 2;
+    private final int NUM_CONCURRENT_INBOUND_CONNECTION = 4;
     private ProxyService proxyService;
     private ProxyConfiguration proxyConfig = new ProxyConfiguration();
 
@@ -58,8 +57,9 @@ public class ProxyConnectionThrottlingTest extends 
MockedPulsarServiceBaseTest {
         proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
         proxyConfig.setMaxConcurrentLookupRequests(NUM_CONCURRENT_LOOKUP);
         
proxyConfig.setMaxConcurrentInboundConnections(NUM_CONCURRENT_INBOUND_CONNECTION);
+        
proxyConfig.setMaxConcurrentInboundConnectionsPerIp(NUM_CONCURRENT_INBOUND_CONNECTION);
         proxyService = Mockito.spy(new ProxyService(proxyConfig, new 
AuthenticationService(
-                                                            
PulsarConfigurationLoader.convertFrom(proxyConfig))));
+                PulsarConfigurationLoader.convertFrom(proxyConfig))));
         doReturn(new 
ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore();
         doReturn(new 
ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();
 
@@ -75,35 +75,63 @@ public class ProxyConnectionThrottlingTest extends 
MockedPulsarServiceBaseTest {
 
     @Test
     public void testInboundConnection() throws Exception {
-        LOG.info("Creating producer 1");
-        @Cleanup
+        log.info("Creating producer 1");
         PulsarClient client1 = PulsarClient.builder()
                 .serviceUrl(proxyService.getServiceUrl())
                 .operationTimeout(1000, TimeUnit.MILLISECONDS)
                 .build();
 
-        @Cleanup
-        Producer<byte[]> producer1 = 
client1.newProducer(Schema.BYTES).topic("persistent://sample/test/local/producer-topic-1").create();
+        Producer<byte[]> producer1 = client1.newProducer(Schema.BYTES)
+                
.topic("persistent://sample/test/local/producer-topic-1").create();
 
-        LOG.info("Creating producer 2");
-        @Cleanup
+        log.info("Creating producer 2");
         PulsarClient client2 = PulsarClient.builder()
                 .serviceUrl(proxyService.getServiceUrl())
                 .operationTimeout(1000, TimeUnit.MILLISECONDS)
                 .build();
 
-        Assert.assertEquals(ProxyService.REJECTED_CONNECTIONS.get(), 0.0d);
+        Producer<byte[]> producer2 = client2.newProducer(Schema.BYTES)
+                
.topic("persistent://sample/test/local/producer-topic-1").create();
+
+        log.info("Creating producer 3");
+        @Cleanup
+        PulsarClient client3 = PulsarClient.builder()
+                .serviceUrl(proxyService.getServiceUrl())
+                .operationTimeout(1000, TimeUnit.MILLISECONDS)
+                .build();
         try {
-            @Cleanup
-            Producer<byte[]> producer2 = 
client2.newProducer(Schema.BYTES).topic("persistent://sample/test/local/producer-topic-1").create();
-            producer2.send("Message 1".getBytes());
-            Assert.fail("Should have failed since max num of connections is 2 
and the first producer used them all up - one for discovery and other for 
producing.");
+            Producer<byte[]> producer3 = client3.newProducer(Schema.BYTES)
+                    
.topic("persistent://sample/test/local/producer-topic-1").create();
+            producer3.send("Message 1".getBytes());
+            Assert.fail("Should have failed since max num of connections is 2 
and the first" +
+                    " producer used them all up - one for discovery and other 
for producing.");
         } catch (Exception ex) {
             // OK
         }
-        // should add retry count since retry every 100ms and operation 
timeout is set to 1000ms
-        Assert.assertEquals(ProxyService.REJECTED_CONNECTIONS.get(), 5.0d);
-    }
+        
Assert.assertEquals(ConnectionController.DefaultConnectionController.getTotalConnectionNum(),
 4);
+        
Assert.assertEquals(ConnectionController.DefaultConnectionController.getConnections().size(),
 1);
+        Set<String> keys = 
ConnectionController.DefaultConnectionController.getConnections().keySet();
+        for (String key : keys) {
+            
Assert.assertEquals((int)ConnectionController.DefaultConnectionController
+                    .getConnections().get(key).toInteger(), 4);
+        }
+        Assert.assertEquals(ProxyService.ACTIVE_CONNECTIONS.get(), 4.0d);
+
+        client1.close();
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(ProxyConnectionThrottlingTest.class);
+        
Assert.assertEquals(ConnectionController.DefaultConnectionController.getTotalConnectionNum(),
 2);
+        
Assert.assertEquals(ConnectionController.DefaultConnectionController.getConnections().size(),
 1);
+        keys = 
ConnectionController.DefaultConnectionController.getConnections().keySet();
+        for (String key : keys) {
+            
Assert.assertEquals((int)ConnectionController.DefaultConnectionController
+                    .getConnections().get(key).toInteger(), 2);
+        }
+        Assert.assertEquals(ProxyService.ACTIVE_CONNECTIONS.get(), 2.0d);
+
+        client2.close();
+
+        
Assert.assertEquals(ConnectionController.DefaultConnectionController.getTotalConnectionNum(),
 0);
+        
Assert.assertEquals(ConnectionController.DefaultConnectionController.getConnections().size(),
 0);
+        Assert.assertEquals(ProxyService.ACTIVE_CONNECTIONS.get(), 0.0d);
+    }
 }

Reply via email to