This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.8 by this push:
new edf85fd Support max-connection and max-connection-per-IP (#10754)
edf85fd is described below
commit edf85fd8c6198c267ebbe58de8812504e3b9efb0
Author: feynmanlin <[email protected]>
AuthorDate: Wed Jul 7 12:02:47 2021 +0800
Support max-connection and max-connection-per-IP (#10754)
### Motivation
Pulsar supports multi-tenant, and there may be one single user occupying
all connections.
Maybe novice user incorrectly used SDK, or we are being attacked.
In order to avoid a single user to affect entire cluster, I think it is
important to limit the number of connections.
For simplicity, I did not use radix tree to save IPs.
(cherry picked from commit 45caffa57d2ae6505621603ac252d21a4822aa2d)
---
conf/broker.conf | 6 +
.../apache/pulsar/broker/ServiceConfiguration.java | 12 ++
.../broker/service/ConnectionController.java | 137 +++++++++++++++++++++
.../apache/pulsar/broker/service/ServerCnx.java | 12 ++
.../broker/auth/MockedPulsarServiceBaseTest.java | 2 +
.../pulsar/broker/service/BrokerServiceTest.java | 120 ++++++++++++++++++
.../org/apache/pulsar/client/impl/ClientCnx.java | 4 +
7 files changed, 293 insertions(+)
diff --git a/conf/broker.conf b/conf/broker.conf
index 0fe68a0..6b20908 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -258,6 +258,12 @@ maxNamespacesPerTenant=0
# Using a value of 0, is disabling maxTopicsPerNamespace-limit check.
maxTopicsPerNamespace=0
+# The maximum number of connections in the broker. If it exceeds, new
connections are rejected.
+brokerMaxConnections=0
+
+# The maximum number of connections per IP. If it exceeds, new connections are
rejected.
+brokerMaxConnectionsPerIp=0
+
# Enable check for minimum allowed client library version
clientLibraryVersionCheckEnabled=false
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index d0ca117..1043a8a 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -541,6 +541,18 @@ public class ServiceConfiguration implements
PulsarConfiguration {
private int maxTopicsPerNamespace = 0;
@FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "The maximum number of connections in the broker. If it exceeds,
new connections are rejected."
+ )
+ private int brokerMaxConnections = 0;
+
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "The maximum number of connections per IP. If it exceeds, new
connections are rejected."
+ )
+ private int brokerMaxConnectionsPerIp = 0;
+
+ @FieldContext(
category = CATEGORY_SERVER,
dynamic = true,
doc = "Enable check for minimum allowed client library version"
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConnectionController.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConnectionController.java
new file mode 100644
index 0000000..51540e1
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConnectionController.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.common.tls.InetAddressUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public interface ConnectionController {
+
+ /**
+ * Increase the number of connections counter.
+ * @param remoteAddress
+ * @return
+ */
+ Sate increaseConnection(SocketAddress remoteAddress);
+
+ /**
+ * Decrease the number of connections counter.
+ * @param remoteAddress
+ */
+ void decreaseConnection(SocketAddress remoteAddress);
+
+ enum Sate {
+ OK, REACH_MAX_CONNECTION_PER_IP, REACH_MAX_CONNECTION;
+ }
+
+
+ class DefaultConnectionController implements ConnectionController {
+ private static final Logger log =
LoggerFactory.getLogger(DefaultConnectionController.class);
+ private static final Map<String, MutableInt> CONNECTIONS = new
HashMap<>();
+ private static final ReentrantLock lock = new ReentrantLock();
+ private static int totalConnectionNum = 0;
+
+ private final int maxConnections;
+ private final int maxConnectionPerIp;
+ private final boolean maxConnectionsLimitEnabled;
+ private final boolean maxConnectionsLimitPerIpEnabled;
+
+ public DefaultConnectionController(ServiceConfiguration configuration)
{
+ this.maxConnections = configuration.getBrokerMaxConnections();
+ this.maxConnectionPerIp =
configuration.getBrokerMaxConnectionsPerIp();
+ this.maxConnectionsLimitEnabled =
configuration.getBrokerMaxConnections() > 0;
+ this.maxConnectionsLimitPerIpEnabled =
configuration.getBrokerMaxConnectionsPerIp() > 0;
+ }
+
+ @Override
+ public Sate increaseConnection(SocketAddress remoteAddress) {
+ if (!maxConnectionsLimitEnabled &&
!maxConnectionsLimitPerIpEnabled) {
+ return Sate.OK;
+ }
+ if (!(remoteAddress instanceof InetSocketAddress)
+ || !isLegalIpAddress(((InetSocketAddress)
remoteAddress).getHostString())) {
+ return Sate.OK;
+ }
+ lock.lock();
+ try {
+ String ip = ((InetSocketAddress)
remoteAddress).getHostString();
+ if (maxConnectionsLimitPerIpEnabled) {
+ CONNECTIONS.computeIfAbsent(ip, (x) -> new
MutableInt(0)).increment();
+ }
+ if (maxConnectionsLimitEnabled) {
+ totalConnectionNum++;
+ }
+ if (maxConnectionsLimitEnabled && totalConnectionNum >
maxConnections) {
+ log.info("Reject connect request from {}, because reached
the maximum number of connections {}",
+ remoteAddress, totalConnectionNum);
+ return Sate.REACH_MAX_CONNECTION;
+ }
+ if (maxConnectionsLimitPerIpEnabled &&
CONNECTIONS.get(ip).getValue() > maxConnectionPerIp) {
+ log.info("Reject connect request from {}, because reached
the maximum number "
+ + "of connections per Ip {}",
+ remoteAddress, CONNECTIONS.get(ip).getValue());
+ return Sate.REACH_MAX_CONNECTION_PER_IP;
+ }
+ } catch (Exception e) {
+ log.error("increase connection failed", e);
+ } finally {
+ lock.unlock();
+ }
+ return Sate.OK;
+ }
+
+ @Override
+ public void decreaseConnection(SocketAddress remoteAddress) {
+ if (!maxConnectionsLimitEnabled &&
!maxConnectionsLimitPerIpEnabled) {
+ return;
+ }
+ if (!(remoteAddress instanceof InetSocketAddress)
+ || !isLegalIpAddress(((InetSocketAddress)
remoteAddress).getHostString())) {
+ return;
+ }
+ lock.lock();
+ try {
+ String ip = ((InetSocketAddress)
remoteAddress).getHostString();
+ MutableInt mutableInt = CONNECTIONS.get(ip);
+ if (maxConnectionsLimitPerIpEnabled && mutableInt != null &&
mutableInt.decrementAndGet() <= 0) {
+ CONNECTIONS.remove(ip);
+ }
+ if (maxConnectionsLimitEnabled) {
+ totalConnectionNum--;
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private boolean isLegalIpAddress(String address) {
+ return InetAddressUtils.isIPv4Address(address) ||
InetAddressUtils.isIPv6Address(address);
+ }
+ }
+
+
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 7787382..02a06ec 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -191,6 +191,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
private FeatureFlags features;
private PulsarCommandSender commandSender;
+ private final ConnectionController connectionController;
private static final KeySharedMeta emptyKeySharedMeta = new KeySharedMeta()
.setKeySharedMode(KeySharedMode.AUTO_SPLIT);
@@ -245,11 +246,21 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
this.maxPendingBytesPerThread =
conf.getMaxMessagePublishBufferSizeInMB() * 1024L * 1024L
/ conf.getNumIOThreads();
this.resumeThresholdPendingBytesPerThread =
this.maxPendingBytesPerThread / 2;
+ this.connectionController = new
ConnectionController.DefaultConnectionController(conf);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
+ ConnectionController.Sate sate =
connectionController.increaseConnection(remoteAddress);
+ if (!sate.equals(ConnectionController.Sate.OK)) {
+ ctx.channel().writeAndFlush(Commands.newError(-1,
ServerError.NotAllowedError,
+ sate.equals(ConnectionController.Sate.REACH_MAX_CONNECTION)
+ ? "Reached the maximum number of connections"
+ : "Reached the maximum number of connections on
address" + remoteAddress));
+ ctx.channel().close();
+ return;
+ }
log.info("New connection from {}", remoteAddress);
this.ctx = ctx;
this.commandSender = new
PulsarCommandSenderImpl(getBrokerService().getInterceptor(), this);
@@ -260,6 +271,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
+ connectionController.decreaseConnection(ctx.channel().remoteAddress());
isActive = false;
log.info("Closed connection from {}", remoteAddress);
BrokerInterceptor brokerInterceptor =
getBrokerService().getInterceptor();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index c6ab18d..0f512c5 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -466,6 +466,8 @@ public abstract class MockedPulsarServiceBaseTest extends
TestRetrySupport {
configuration.setWebServicePortTls(Optional.of(0));
configuration.setBookkeeperClientExposeStatsToPrometheus(true);
configuration.setNumExecutorThreadPoolSize(5);
+ configuration.setBrokerMaxConnections(0);
+ configuration.setBrokerMaxConnectionsPerIp(0);
return configuration;
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index ffd474f..d9223c5 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -38,12 +38,14 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.lang.reflect.Field;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
@@ -69,11 +71,13 @@ import
org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
import org.apache.pulsar.client.admin.BrokerStats;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.common.naming.NamespaceBundle;
@@ -107,6 +111,7 @@ public class BrokerServiceTest extends BrokerTestBase {
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
+ resetConfig();
}
// method for resetting state explicitly
@@ -236,6 +241,121 @@ public class BrokerServiceTest extends BrokerTestBase {
}
@Test
+ public void testConnectionController() throws Exception {
+ cleanup();
+ conf.setBrokerMaxConnections(3);
+ conf.setBrokerMaxConnectionsPerIp(2);
+ setup();
+ final String topicName = "persistent://prop/ns-abc/connection" +
UUID.randomUUID();
+ List<PulsarClient> clients = new ArrayList<>();
+ ClientBuilder clientBuilder =
+ PulsarClient.builder().operationTimeout(1, TimeUnit.DAYS)
+ .connectionTimeout(1, TimeUnit.DAYS)
+ .serviceUrl(brokerUrl.toString());
+ long startTime = System.currentTimeMillis();
+ clients.add(createNewConnection(topicName, clientBuilder));
+ clients.add(createNewConnection(topicName, clientBuilder));
+ createNewConnectionAndCheckFail(topicName, clientBuilder);
+ assertTrue(System.currentTimeMillis() - startTime < 20 * 1000);
+ cleanClient(clients);
+ clients.clear();
+
+ cleanup();
+ conf.setBrokerMaxConnections(2);
+ conf.setBrokerMaxConnectionsPerIp(3);
+ setup();
+ startTime = System.currentTimeMillis();
+ clientBuilder.serviceUrl(brokerUrl.toString());
+ clients.add(createNewConnection(topicName, clientBuilder));
+ clients.add(createNewConnection(topicName, clientBuilder));
+ createNewConnectionAndCheckFail(topicName, clientBuilder);
+ assertTrue(System.currentTimeMillis() - startTime < 20 * 1000);
+ cleanClient(clients);
+ clients.clear();
+ }
+
+ @Test
+ public void testConnectionController2() throws Exception {
+ cleanup();
+ conf.setBrokerMaxConnections(0);
+ conf.setBrokerMaxConnectionsPerIp(1);
+ setup();
+ final String topicName = "persistent://prop/ns-abc/connection" +
UUID.randomUUID();
+ List<PulsarClient> clients = new ArrayList<>();
+ ClientBuilder clientBuilder =
+ PulsarClient.builder().operationTimeout(1, TimeUnit.DAYS)
+ .connectionTimeout(1, TimeUnit.DAYS)
+ .serviceUrl(brokerUrl.toString());
+ long startTime = System.currentTimeMillis();
+ clients.add(createNewConnection(topicName, clientBuilder));
+ createNewConnectionAndCheckFail(topicName, clientBuilder);
+ assertTrue(System.currentTimeMillis() - startTime < 20 * 1000);
+ cleanClient(clients);
+ clients.clear();
+
+ cleanup();
+ conf.setBrokerMaxConnections(1);
+ conf.setBrokerMaxConnectionsPerIp(0);
+ setup();
+ startTime = System.currentTimeMillis();
+ clientBuilder.serviceUrl(brokerUrl.toString());
+ clients.add(createNewConnection(topicName, clientBuilder));
+ createNewConnectionAndCheckFail(topicName, clientBuilder);
+ assertTrue(System.currentTimeMillis() - startTime < 20 * 1000);
+ cleanClient(clients);
+ clients.clear();
+
+ cleanup();
+ conf.setBrokerMaxConnections(1);
+ conf.setBrokerMaxConnectionsPerIp(1);
+ setup();
+ startTime = System.currentTimeMillis();
+ clientBuilder.serviceUrl(brokerUrl.toString());
+ clients.add(createNewConnection(topicName, clientBuilder));
+ createNewConnectionAndCheckFail(topicName, clientBuilder);
+ assertTrue(System.currentTimeMillis() - startTime < 20 * 1000);
+ cleanClient(clients);
+ clients.clear();
+
+ cleanup();
+ conf.setBrokerMaxConnections(0);
+ conf.setBrokerMaxConnectionsPerIp(0);
+ setup();
+ clientBuilder.serviceUrl(brokerUrl.toString());
+ startTime = System.currentTimeMillis();
+ for (int i = 0; i < 10; i++) {
+ clients.add(createNewConnection(topicName, clientBuilder));
+ }
+ assertTrue(System.currentTimeMillis() - startTime < 20 * 1000);
+ cleanClient(clients);
+ clients.clear();
+
+ }
+
+ private void createNewConnectionAndCheckFail(String topicName,
ClientBuilder builder) throws Exception {
+ try {
+ createNewConnection(topicName, builder);
+ fail("should fail");
+ } catch (Exception e) {
+ assertTrue(e.getMessage().contains("Reached the maximum number of
connections"));
+ }
+ }
+
+ private PulsarClient createNewConnection(String topicName, ClientBuilder
clientBuilder) throws PulsarClientException {
+ PulsarClient client1 = clientBuilder.build();
+ client1.newProducer().topic(topicName).create().close();
+ return client1;
+ }
+
+ private void cleanClient(List<PulsarClient> clients) throws Exception {
+ for (PulsarClient client : clients) {
+ if (client != null) {
+ client.close();
+ }
+ }
+ }
+
+ @Test
public void testStatsOfStorageSizeWithSubscription() throws Exception {
final String topicName = "persistent://prop/ns-abc/no-subscription";
Producer<byte[]> producer =
pulsarClient.newProducer().topic(topicName).create();
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index f4d600b..3820497 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -685,6 +685,10 @@ public class ClientCnx extends PulsarHandler {
connectionFuture.completeExceptionally(new
PulsarClientException.AuthenticationException(error.getMessage()));
log.error("{} Failed to authenticate the client", ctx.channel());
}
+ if (error.getError() == ServerError.NotAllowedError) {
+ log.error("Get not allowed error, {}", error.getMessage());
+ connectionFuture.completeExceptionally(new
PulsarClientException.NotAllowedException(error.getMessage()));
+ }
CompletableFuture<?> requestFuture = pendingRequests.remove(requestId);
if (requestFuture != null) {
requestFuture.completeExceptionally(getPulsarClientException(error.getError(),
error.getMessage()));