nicoloboschi commented on code in PR #17167:
URL: https://github.com/apache/pulsar/pull/17167#discussion_r954848182


##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java:
##########
@@ -144,22 +146,29 @@ public ProxyConnection(ProxyService proxyService, 
DnsAddressResolverGroup dnsAdd
         this.dnsAddressResolverGroup = dnsAddressResolverGroup;
         this.state = State.Init;
         this.brokerProxyValidator = service.getBrokerProxyValidator();
+        this.connectionController = proxyService.getConnectionController();
     }
 
     @Override
     public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
         super.channelRegistered(ctx);
         ProxyService.ACTIVE_CONNECTIONS.inc();
-        if (ProxyService.ACTIVE_CONNECTIONS.get() > 
service.getConfiguration().getMaxConcurrentInboundConnections()) {
-            state = State.Closing;
-            ctx.close();
+        SocketAddress rmAddress = ctx.channel().remoteAddress();
+        ConnectionController.State state = 
connectionController.increaseConnection(rmAddress);
+        if (!state.equals(ConnectionController.State.OK)) {
+            ctx.channel().writeAndFlush(Commands.newError(-1, 
ServerError.NotAllowedError,
+                    
state.equals(ConnectionController.State.REACH_MAX_CONNECTION)
+                            ? "Reached the maximum number of connections"
+                            : "Reached the maximum number of connections on 
address" + rmAddress));
+            ctx.channel().close();

Review Comment:
   should we close it in the writeAndFlush callback ? 



##########
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/limiter/ConnectionController.java:
##########
@@ -60,11 +60,11 @@ class DefaultConnectionController implements 
ConnectionController {
         private final boolean maxConnectionsLimitEnabled;
         private final boolean maxConnectionsLimitPerIpEnabled;
 
-        public DefaultConnectionController(ServiceConfiguration configuration) 
{
-            this.maxConnections = configuration.getBrokerMaxConnections();
-            this.maxConnectionPerIp = 
configuration.getBrokerMaxConnectionsPerIp();
-            this.maxConnectionsLimitEnabled = 
configuration.getBrokerMaxConnections() > 0;
-            this.maxConnectionsLimitPerIpEnabled = 
configuration.getBrokerMaxConnectionsPerIp() > 0;
+        public DefaultConnectionController(int maximumConnections, int 
maximumConnectionsPerId) {

Review Comment:
   typo: maximumConnectionsPerIp



##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java:
##########
@@ -385,6 +385,13 @@ public class ProxyConfiguration implements 
PulsarConfiguration {
     )
     private int maxConcurrentInboundConnections = 10000;
 
+    @FieldContext(
+            category = CATEGORY_RATE_LIMITING,
+            doc = "Max concurrent inbound connections per IP, The proxy will 
reject requests beyond that"

Review Comment:
   I suggest to use the same doc description as the broker one
   ```suggestion
               doc = "The maximum number of connections per IP. If it exceeds, 
new connections are rejected."
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to