This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ce522cdbe18 [feat][proxy] Support proxy limit maximum connections per
IP (#17167)
ce522cdbe18 is described below
commit ce522cdbe18403048780e2b82188526136f52f9c
Author: Qiang Zhao <[email protected]>
AuthorDate: Mon Aug 29 19:14:38 2022 +0800
[feat][proxy] Support proxy limit maximum connections per IP (#17167)
---
conf/proxy.conf | 3 +
.../broker/limiter}/ConnectionController.java | 25 +++++---
.../apache/pulsar/broker/limiter/package-info.java | 22 +++++++
.../apache/pulsar/broker/service/ServerCnx.java | 5 +-
.../pulsar/proxy/server/ProxyConfiguration.java | 7 +++
.../pulsar/proxy/server/ProxyConnection.java | 15 ++++-
.../apache/pulsar/proxy/server/ProxyService.java | 7 +++
.../server/ProxyConnectionThrottlingTest.java | 72 +++++++++++++++-------
8 files changed, 123 insertions(+), 33 deletions(-)
diff --git a/conf/proxy.conf b/conf/proxy.conf
index 3767bb52eaa..abec2c2c8b4 100644
--- a/conf/proxy.conf
+++ b/conf/proxy.conf
@@ -193,6 +193,9 @@ tlsCertRefreshCheckDurationSec=300
# Max concurrent inbound connections. The proxy will reject requests beyond
that.
maxConcurrentInboundConnections=10000
+# Max concurrent inbound connections per IP, The proxy will reject requests
beyond that.
+maxConcurrentInboundConnectionsPerIp=0
+
# Max concurrent outbound connections. The proxy will error out requests
beyond that.
maxConcurrentLookupRequests=50000
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConnectionController.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/limiter/ConnectionController.java
similarity index 87%
rename from
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConnectionController.java
rename to
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/limiter/ConnectionController.java
index 65c3a6c4f2a..609d0cc1907 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConnectionController.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/limiter/ConnectionController.java
@@ -16,15 +16,15 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.broker.service;
+package org.apache.pulsar.broker.limiter;
+import com.google.common.annotations.VisibleForTesting;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.lang3.mutable.MutableInt;
-import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.common.tls.InetAddressUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,11 +60,11 @@ public interface ConnectionController {
private final boolean maxConnectionsLimitEnabled;
private final boolean maxConnectionsLimitPerIpEnabled;
- public DefaultConnectionController(ServiceConfiguration configuration)
{
- this.maxConnections = configuration.getBrokerMaxConnections();
- this.maxConnectionPerIp =
configuration.getBrokerMaxConnectionsPerIp();
- this.maxConnectionsLimitEnabled =
configuration.getBrokerMaxConnections() > 0;
- this.maxConnectionsLimitPerIpEnabled =
configuration.getBrokerMaxConnectionsPerIp() > 0;
+ public DefaultConnectionController(int maxConnections, int
maxConnectionPerIp) {
+ this.maxConnections = maxConnections;
+ this.maxConnectionPerIp = maxConnectionPerIp;
+ this.maxConnectionsLimitEnabled = maxConnections > 0;
+ this.maxConnectionsLimitPerIpEnabled = maxConnectionPerIp > 0;
}
@Override
@@ -131,6 +131,17 @@ public interface ConnectionController {
private boolean isLegalIpAddress(String address) {
return InetAddressUtils.isIPv4Address(address) ||
InetAddressUtils.isIPv6Address(address);
}
+
+ @VisibleForTesting
+ public static int getTotalConnectionNum() {
+ return totalConnectionNum;
+ }
+
+ @VisibleForTesting
+ public static Map<String, MutableInt> getConnections() {
+ return CONNECTIONS;
+ }
+
}
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/limiter/package-info.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/limiter/package-info.java
new file mode 100644
index 00000000000..9483f342798
--- /dev/null
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/limiter/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Pulsar Client API.
+ */
+package org.apache.pulsar.broker.limiter;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 604b824881a..55b82e9639e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -75,6 +75,7 @@ import
org.apache.pulsar.broker.authentication.AuthenticationDataSubscription;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationState;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
+import org.apache.pulsar.broker.limiter.ConnectionController;
import
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
import
org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import
org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
@@ -274,7 +275,9 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
this.maxPendingBytesPerThread =
conf.getMaxMessagePublishBufferSizeInMB() * 1024L * 1024L
/ conf.getNumIOThreads();
this.resumeThresholdPendingBytesPerThread =
this.maxPendingBytesPerThread / 2;
- this.connectionController = new
ConnectionController.DefaultConnectionController(conf);
+ this.connectionController = new
ConnectionController.DefaultConnectionController(
+ conf.getBrokerMaxConnections(),
+ conf.getBrokerMaxConnectionsPerIp());
this.enableSubscriptionPatternEvaluation =
conf.isEnableBrokerSideSubscriptionPatternEvaluation();
this.maxSubscriptionPatternLength =
conf.getSubscriptionPatternMaxLength();
this.topicListService = new TopicListService(pulsar, this,
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index af3e55d1993..9938a1fe307 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -392,6 +392,13 @@ public class ProxyConfiguration implements
PulsarConfiguration {
)
private int maxConcurrentInboundConnections = 10000;
+ @FieldContext(
+ category = CATEGORY_RATE_LIMITING,
+ doc = "The maximum number of connections per IP. If it exceeds,
new connections are rejected."
+ )
+ private int maxConcurrentInboundConnectionsPerIp = 0;
+
+
@FieldContext(
category = CATEGORY_RATE_LIMITING,
doc = "Max concurrent lookup requests. The proxy will reject requests
beyond that"
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index fe62b606314..8dbfd0844eb 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -49,6 +49,7 @@ import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationState;
+import org.apache.pulsar.broker.limiter.ConnectionController;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.ClientCnx;
@@ -92,6 +93,7 @@ public class ProxyConnection extends PulsarHandler {
@Getter
private DirectProxyHandler directProxyHandler = null;
private final BrokerProxyValidator brokerProxyValidator;
+ private final ConnectionController connectionController;
String clientAuthRole;
AuthData clientAuthData;
String clientAuthMethod;
@@ -144,15 +146,21 @@ public class ProxyConnection extends PulsarHandler {
this.dnsAddressResolverGroup = dnsAddressResolverGroup;
this.state = State.Init;
this.brokerProxyValidator = service.getBrokerProxyValidator();
+ this.connectionController = proxyService.getConnectionController();
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
super.channelRegistered(ctx);
ProxyService.ACTIVE_CONNECTIONS.inc();
- if (ProxyService.ACTIVE_CONNECTIONS.get() >
service.getConfiguration().getMaxConcurrentInboundConnections()) {
- state = State.Closing;
- ctx.close();
+ SocketAddress rmAddress = ctx.channel().remoteAddress();
+ ConnectionController.State state =
connectionController.increaseConnection(rmAddress);
+ if (!state.equals(ConnectionController.State.OK)) {
+ ctx.writeAndFlush(Commands.newError(-1,
ServerError.NotAllowedError,
+
state.equals(ConnectionController.State.REACH_MAX_CONNECTION)
+ ? "Reached the maximum number of connections"
+ : "Reached the maximum number of connections on
address" + rmAddress))
+ .addListener(result -> ctx.close());
ProxyService.REJECTED_CONNECTIONS.inc();
}
}
@@ -160,6 +168,7 @@ public class ProxyConnection extends PulsarHandler {
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws
Exception {
super.channelUnregistered(ctx);
+ connectionController.decreaseConnection(ctx.channel().remoteAddress());
ProxyService.ACTIVE_CONNECTIONS.dec();
}
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index 2fb3fd67446..8b8b474e5e3 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -56,6 +56,7 @@ import lombok.Setter;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authorization.AuthorizationService;
+import org.apache.pulsar.broker.limiter.ConnectionController;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
@@ -146,6 +147,9 @@ public class ProxyService implements Closeable {
private PrometheusMetricsServlet metricsServlet;
private List<PrometheusRawMetricsProvider> pendingMetricsProviders;
+ @Getter
+ private final ConnectionController connectionController;
+
public ProxyService(ProxyConfiguration proxyConfig,
AuthenticationService authenticationService) throws
Exception {
requireNonNull(proxyConfig);
@@ -202,6 +206,9 @@ public class ProxyService implements Closeable {
} else {
proxyClientAuthentication = AuthenticationDisabled.INSTANCE;
}
+ this.connectionController = new
ConnectionController.DefaultConnectionController(
+ proxyConfig.getMaxConcurrentInboundConnections(),
+ proxyConfig.getMaxConcurrentInboundConnectionsPerIp());
}
public void start() throws Exception {
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
index cb23ad42131..fb4de9a65b0 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
@@ -19,31 +19,30 @@
package org.apache.pulsar.proxy.server;
import static org.mockito.Mockito.doReturn;
-
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
-
import lombok.Cleanup;
-
+import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.broker.limiter.ConnectionController;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.mockito.Mockito;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+@Slf4j
public class ProxyConnectionThrottlingTest extends MockedPulsarServiceBaseTest
{
private final int NUM_CONCURRENT_LOOKUP = 3;
- private final int NUM_CONCURRENT_INBOUND_CONNECTION = 2;
+ private final int NUM_CONCURRENT_INBOUND_CONNECTION = 4;
private ProxyService proxyService;
private ProxyConfiguration proxyConfig = new ProxyConfiguration();
@@ -58,8 +57,9 @@ public class ProxyConnectionThrottlingTest extends
MockedPulsarServiceBaseTest {
proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
proxyConfig.setMaxConcurrentLookupRequests(NUM_CONCURRENT_LOOKUP);
proxyConfig.setMaxConcurrentInboundConnections(NUM_CONCURRENT_INBOUND_CONNECTION);
+
proxyConfig.setMaxConcurrentInboundConnectionsPerIp(NUM_CONCURRENT_INBOUND_CONNECTION);
proxyService = Mockito.spy(new ProxyService(proxyConfig, new
AuthenticationService(
-
PulsarConfigurationLoader.convertFrom(proxyConfig))));
+ PulsarConfigurationLoader.convertFrom(proxyConfig))));
doReturn(new
ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore();
doReturn(new
ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();
@@ -75,35 +75,63 @@ public class ProxyConnectionThrottlingTest extends
MockedPulsarServiceBaseTest {
@Test
public void testInboundConnection() throws Exception {
- LOG.info("Creating producer 1");
- @Cleanup
+ log.info("Creating producer 1");
PulsarClient client1 = PulsarClient.builder()
.serviceUrl(proxyService.getServiceUrl())
.operationTimeout(1000, TimeUnit.MILLISECONDS)
.build();
- @Cleanup
- Producer<byte[]> producer1 =
client1.newProducer(Schema.BYTES).topic("persistent://sample/test/local/producer-topic-1").create();
+ Producer<byte[]> producer1 = client1.newProducer(Schema.BYTES)
+
.topic("persistent://sample/test/local/producer-topic-1").create();
- LOG.info("Creating producer 2");
- @Cleanup
+ log.info("Creating producer 2");
PulsarClient client2 = PulsarClient.builder()
.serviceUrl(proxyService.getServiceUrl())
.operationTimeout(1000, TimeUnit.MILLISECONDS)
.build();
- Assert.assertEquals(ProxyService.REJECTED_CONNECTIONS.get(), 0.0d);
+ Producer<byte[]> producer2 = client2.newProducer(Schema.BYTES)
+
.topic("persistent://sample/test/local/producer-topic-1").create();
+
+ log.info("Creating producer 3");
+ @Cleanup
+ PulsarClient client3 = PulsarClient.builder()
+ .serviceUrl(proxyService.getServiceUrl())
+ .operationTimeout(1000, TimeUnit.MILLISECONDS)
+ .build();
try {
- @Cleanup
- Producer<byte[]> producer2 =
client2.newProducer(Schema.BYTES).topic("persistent://sample/test/local/producer-topic-1").create();
- producer2.send("Message 1".getBytes());
- Assert.fail("Should have failed since max num of connections is 2
and the first producer used them all up - one for discovery and other for
producing.");
+ Producer<byte[]> producer3 = client3.newProducer(Schema.BYTES)
+
.topic("persistent://sample/test/local/producer-topic-1").create();
+ producer3.send("Message 1".getBytes());
+ Assert.fail("Should have failed since max num of connections is 2
and the first" +
+ " producer used them all up - one for discovery and other
for producing.");
} catch (Exception ex) {
// OK
}
- // should add retry count since retry every 100ms and operation
timeout is set to 1000ms
- Assert.assertEquals(ProxyService.REJECTED_CONNECTIONS.get(), 5.0d);
- }
+
Assert.assertEquals(ConnectionController.DefaultConnectionController.getTotalConnectionNum(),
4);
+
Assert.assertEquals(ConnectionController.DefaultConnectionController.getConnections().size(),
1);
+ Set<String> keys =
ConnectionController.DefaultConnectionController.getConnections().keySet();
+ for (String key : keys) {
+
Assert.assertEquals((int)ConnectionController.DefaultConnectionController
+ .getConnections().get(key).toInteger(), 4);
+ }
+ Assert.assertEquals(ProxyService.ACTIVE_CONNECTIONS.get(), 4.0d);
+
+ client1.close();
- private static final Logger LOG =
LoggerFactory.getLogger(ProxyConnectionThrottlingTest.class);
+
Assert.assertEquals(ConnectionController.DefaultConnectionController.getTotalConnectionNum(),
2);
+
Assert.assertEquals(ConnectionController.DefaultConnectionController.getConnections().size(),
1);
+ keys =
ConnectionController.DefaultConnectionController.getConnections().keySet();
+ for (String key : keys) {
+
Assert.assertEquals((int)ConnectionController.DefaultConnectionController
+ .getConnections().get(key).toInteger(), 2);
+ }
+ Assert.assertEquals(ProxyService.ACTIVE_CONNECTIONS.get(), 2.0d);
+
+ client2.close();
+
+
Assert.assertEquals(ConnectionController.DefaultConnectionController.getTotalConnectionNum(),
0);
+
Assert.assertEquals(ConnectionController.DefaultConnectionController.getConnections().size(),
0);
+ Assert.assertEquals(ProxyService.ACTIVE_CONNECTIONS.get(), 0.0d);
+ }
}