nicoloboschi commented on code in PR #17167:
URL: https://github.com/apache/pulsar/pull/17167#discussion_r954848182
##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java:
##########
@@ -144,22 +146,29 @@ public ProxyConnection(ProxyService proxyService,
DnsAddressResolverGroup dnsAdd
this.dnsAddressResolverGroup = dnsAddressResolverGroup;
this.state = State.Init;
this.brokerProxyValidator = service.getBrokerProxyValidator();
+ this.connectionController = proxyService.getConnectionController();
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
super.channelRegistered(ctx);
ProxyService.ACTIVE_CONNECTIONS.inc();
- if (ProxyService.ACTIVE_CONNECTIONS.get() >
service.getConfiguration().getMaxConcurrentInboundConnections()) {
- state = State.Closing;
- ctx.close();
+ SocketAddress rmAddress = ctx.channel().remoteAddress();
+ ConnectionController.State state =
connectionController.increaseConnection(rmAddress);
+ if (!state.equals(ConnectionController.State.OK)) {
+ ctx.channel().writeAndFlush(Commands.newError(-1,
ServerError.NotAllowedError,
+
state.equals(ConnectionController.State.REACH_MAX_CONNECTION)
+ ? "Reached the maximum number of connections"
+ : "Reached the maximum number of connections on
address" + rmAddress));
+ ctx.channel().close();
Review Comment:
should we close it in the writeAndFlush callback ?
##########
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/limiter/ConnectionController.java:
##########
@@ -60,11 +60,11 @@ class DefaultConnectionController implements
ConnectionController {
private final boolean maxConnectionsLimitEnabled;
private final boolean maxConnectionsLimitPerIpEnabled;
- public DefaultConnectionController(ServiceConfiguration configuration)
{
- this.maxConnections = configuration.getBrokerMaxConnections();
- this.maxConnectionPerIp =
configuration.getBrokerMaxConnectionsPerIp();
- this.maxConnectionsLimitEnabled =
configuration.getBrokerMaxConnections() > 0;
- this.maxConnectionsLimitPerIpEnabled =
configuration.getBrokerMaxConnectionsPerIp() > 0;
+ public DefaultConnectionController(int maximumConnections, int
maximumConnectionsPerId) {
Review Comment:
typo: maximumConnectionsPerIp
##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java:
##########
@@ -385,6 +385,13 @@ public class ProxyConfiguration implements
PulsarConfiguration {
)
private int maxConcurrentInboundConnections = 10000;
+ @FieldContext(
+ category = CATEGORY_RATE_LIMITING,
+ doc = "Max concurrent inbound connections per IP, The proxy will
reject requests beyond that"
Review Comment:
I suggest to use the same doc description as the broker one
```suggestion
doc = "The maximum number of connections per IP. If it exceeds,
new connections are rejected."
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]