This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.8 by this push:
     new edf85fd  Support max-connection and max-connection-per-IP (#10754)
edf85fd is described below

commit edf85fd8c6198c267ebbe58de8812504e3b9efb0
Author: feynmanlin <[email protected]>
AuthorDate: Wed Jul 7 12:02:47 2021 +0800

    Support max-connection and max-connection-per-IP (#10754)
    
    ### Motivation
    
    Pulsar supports multi-tenant, and there may be one single user occupying 
all connections.
    Maybe novice user incorrectly used SDK, or we are being attacked.
    In order to avoid a single user to affect entire cluster, I think it is 
important to limit the number of connections.
    For simplicity, I did not use radix tree to save IPs.
    
    (cherry picked from commit 45caffa57d2ae6505621603ac252d21a4822aa2d)
---
 conf/broker.conf                                   |   6 +
 .../apache/pulsar/broker/ServiceConfiguration.java |  12 ++
 .../broker/service/ConnectionController.java       | 137 +++++++++++++++++++++
 .../apache/pulsar/broker/service/ServerCnx.java    |  12 ++
 .../broker/auth/MockedPulsarServiceBaseTest.java   |   2 +
 .../pulsar/broker/service/BrokerServiceTest.java   | 120 ++++++++++++++++++
 .../org/apache/pulsar/client/impl/ClientCnx.java   |   4 +
 7 files changed, 293 insertions(+)

diff --git a/conf/broker.conf b/conf/broker.conf
index 0fe68a0..6b20908 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -258,6 +258,12 @@ maxNamespacesPerTenant=0
 # Using a value of 0, is disabling maxTopicsPerNamespace-limit check.
 maxTopicsPerNamespace=0
 
+# The maximum number of connections in the broker. If it exceeds, new 
connections are rejected.
+brokerMaxConnections=0
+
+# The maximum number of connections per IP. If it exceeds, new connections are 
rejected.
+brokerMaxConnectionsPerIp=0
+
 # Enable check for minimum allowed client library version
 clientLibraryVersionCheckEnabled=false
 
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index d0ca117..1043a8a 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -541,6 +541,18 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     private int maxTopicsPerNamespace = 0;
 
     @FieldContext(
+        category = CATEGORY_POLICIES,
+        doc = "The maximum number of connections in the broker. If it exceeds, 
new connections are rejected."
+    )
+    private int brokerMaxConnections = 0;
+
+    @FieldContext(
+        category = CATEGORY_POLICIES,
+        doc = "The maximum number of connections per IP. If it exceeds, new 
connections are rejected."
+    )
+    private int brokerMaxConnectionsPerIp = 0;
+
+    @FieldContext(
         category = CATEGORY_SERVER,
         dynamic = true,
         doc = "Enable check for minimum allowed client library version"
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConnectionController.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConnectionController.java
new file mode 100644
index 0000000..51540e1
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConnectionController.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.common.tls.InetAddressUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public interface ConnectionController {
+
+    /**
+     * Increase the number of connections counter.
+     * @param remoteAddress
+     * @return
+     */
+    Sate increaseConnection(SocketAddress remoteAddress);
+
+    /**
+     * Decrease the number of connections counter.
+     * @param remoteAddress
+     */
+    void decreaseConnection(SocketAddress remoteAddress);
+
+    enum Sate {
+        OK, REACH_MAX_CONNECTION_PER_IP, REACH_MAX_CONNECTION;
+    }
+
+
+    class DefaultConnectionController implements ConnectionController {
+        private static final Logger log = 
LoggerFactory.getLogger(DefaultConnectionController.class);
+        private static final Map<String, MutableInt> CONNECTIONS = new 
HashMap<>();
+        private static final ReentrantLock lock = new ReentrantLock();
+        private static int totalConnectionNum = 0;
+
+        private final int maxConnections;
+        private final int maxConnectionPerIp;
+        private final boolean maxConnectionsLimitEnabled;
+        private final boolean maxConnectionsLimitPerIpEnabled;
+
+        public DefaultConnectionController(ServiceConfiguration configuration) 
{
+            this.maxConnections = configuration.getBrokerMaxConnections();
+            this.maxConnectionPerIp = 
configuration.getBrokerMaxConnectionsPerIp();
+            this.maxConnectionsLimitEnabled = 
configuration.getBrokerMaxConnections() > 0;
+            this.maxConnectionsLimitPerIpEnabled = 
configuration.getBrokerMaxConnectionsPerIp() > 0;
+        }
+
+        @Override
+        public Sate increaseConnection(SocketAddress remoteAddress) {
+            if (!maxConnectionsLimitEnabled && 
!maxConnectionsLimitPerIpEnabled) {
+                return Sate.OK;
+            }
+            if (!(remoteAddress instanceof InetSocketAddress)
+                    || !isLegalIpAddress(((InetSocketAddress) 
remoteAddress).getHostString())) {
+                return Sate.OK;
+            }
+            lock.lock();
+            try {
+                String ip = ((InetSocketAddress) 
remoteAddress).getHostString();
+                if (maxConnectionsLimitPerIpEnabled) {
+                    CONNECTIONS.computeIfAbsent(ip, (x) -> new 
MutableInt(0)).increment();
+                }
+                if (maxConnectionsLimitEnabled) {
+                    totalConnectionNum++;
+                }
+                if (maxConnectionsLimitEnabled && totalConnectionNum > 
maxConnections) {
+                    log.info("Reject connect request from {}, because reached 
the maximum number of connections {}",
+                            remoteAddress, totalConnectionNum);
+                    return Sate.REACH_MAX_CONNECTION;
+                }
+                if (maxConnectionsLimitPerIpEnabled && 
CONNECTIONS.get(ip).getValue() > maxConnectionPerIp) {
+                    log.info("Reject connect request from {}, because reached 
the maximum number "
+                                    + "of connections per Ip {}",
+                            remoteAddress, CONNECTIONS.get(ip).getValue());
+                    return Sate.REACH_MAX_CONNECTION_PER_IP;
+                }
+            } catch (Exception e) {
+                log.error("increase connection failed", e);
+            } finally {
+                lock.unlock();
+            }
+            return Sate.OK;
+        }
+
+        @Override
+        public void decreaseConnection(SocketAddress remoteAddress) {
+            if (!maxConnectionsLimitEnabled && 
!maxConnectionsLimitPerIpEnabled) {
+                return;
+            }
+            if (!(remoteAddress instanceof InetSocketAddress)
+                    || !isLegalIpAddress(((InetSocketAddress) 
remoteAddress).getHostString())) {
+                return;
+            }
+            lock.lock();
+            try {
+                String ip = ((InetSocketAddress) 
remoteAddress).getHostString();
+                MutableInt mutableInt = CONNECTIONS.get(ip);
+                if (maxConnectionsLimitPerIpEnabled && mutableInt != null && 
mutableInt.decrementAndGet() <= 0) {
+                    CONNECTIONS.remove(ip);
+                }
+                if (maxConnectionsLimitEnabled) {
+                    totalConnectionNum--;
+                }
+            } finally {
+                lock.unlock();
+            }
+        }
+
+        private boolean isLegalIpAddress(String address) {
+            return InetAddressUtils.isIPv4Address(address) || 
InetAddressUtils.isIPv6Address(address);
+        }
+    }
+
+
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 7787382..02a06ec 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -191,6 +191,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
     private FeatureFlags features;
 
     private PulsarCommandSender commandSender;
+    private final ConnectionController connectionController;
 
     private static final KeySharedMeta emptyKeySharedMeta = new KeySharedMeta()
             .setKeySharedMode(KeySharedMode.AUTO_SPLIT);
@@ -245,11 +246,21 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         this.maxPendingBytesPerThread = 
conf.getMaxMessagePublishBufferSizeInMB() * 1024L * 1024L
                 / conf.getNumIOThreads();
         this.resumeThresholdPendingBytesPerThread = 
this.maxPendingBytesPerThread / 2;
+        this.connectionController = new 
ConnectionController.DefaultConnectionController(conf);
     }
 
     @Override
     public void channelActive(ChannelHandlerContext ctx) throws Exception {
         super.channelActive(ctx);
+        ConnectionController.Sate sate = 
connectionController.increaseConnection(remoteAddress);
+        if (!sate.equals(ConnectionController.Sate.OK)) {
+            ctx.channel().writeAndFlush(Commands.newError(-1, 
ServerError.NotAllowedError,
+                    sate.equals(ConnectionController.Sate.REACH_MAX_CONNECTION)
+                            ? "Reached the maximum number of connections"
+                            : "Reached the maximum number of connections on 
address" + remoteAddress));
+            ctx.channel().close();
+            return;
+        }
         log.info("New connection from {}", remoteAddress);
         this.ctx = ctx;
         this.commandSender = new 
PulsarCommandSenderImpl(getBrokerService().getInterceptor(), this);
@@ -260,6 +271,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
     @Override
     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
         super.channelInactive(ctx);
+        connectionController.decreaseConnection(ctx.channel().remoteAddress());
         isActive = false;
         log.info("Closed connection from {}", remoteAddress);
         BrokerInterceptor brokerInterceptor = 
getBrokerService().getInterceptor();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index c6ab18d..0f512c5 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -466,6 +466,8 @@ public abstract class MockedPulsarServiceBaseTest extends 
TestRetrySupport {
         configuration.setWebServicePortTls(Optional.of(0));
         configuration.setBookkeeperClientExposeStatsToPrometheus(true);
         configuration.setNumExecutorThreadPoolSize(5);
+        configuration.setBrokerMaxConnections(0);
+        configuration.setBrokerMaxConnectionsPerIp(0);
         return configuration;
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index ffd474f..d9223c5 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -38,12 +38,14 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.lang.reflect.Field;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
@@ -69,11 +71,13 @@ import 
org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
 import org.apache.pulsar.client.admin.BrokerStats;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.auth.AuthenticationTls;
 import org.apache.pulsar.common.naming.NamespaceBundle;
@@ -107,6 +111,7 @@ public class BrokerServiceTest extends BrokerTestBase {
     @Override
     protected void cleanup() throws Exception {
         super.internalCleanup();
+        resetConfig();
     }
 
     // method for resetting state explicitly
@@ -236,6 +241,121 @@ public class BrokerServiceTest extends BrokerTestBase {
     }
 
     @Test
+    public void testConnectionController() throws Exception {
+        cleanup();
+        conf.setBrokerMaxConnections(3);
+        conf.setBrokerMaxConnectionsPerIp(2);
+        setup();
+        final String topicName = "persistent://prop/ns-abc/connection" + 
UUID.randomUUID();
+        List<PulsarClient> clients = new ArrayList<>();
+        ClientBuilder clientBuilder =
+                PulsarClient.builder().operationTimeout(1, TimeUnit.DAYS)
+                        .connectionTimeout(1, TimeUnit.DAYS)
+                        .serviceUrl(brokerUrl.toString());
+        long startTime = System.currentTimeMillis();
+        clients.add(createNewConnection(topicName, clientBuilder));
+        clients.add(createNewConnection(topicName, clientBuilder));
+        createNewConnectionAndCheckFail(topicName, clientBuilder);
+        assertTrue(System.currentTimeMillis() - startTime < 20 * 1000);
+        cleanClient(clients);
+        clients.clear();
+
+        cleanup();
+        conf.setBrokerMaxConnections(2);
+        conf.setBrokerMaxConnectionsPerIp(3);
+        setup();
+        startTime = System.currentTimeMillis();
+        clientBuilder.serviceUrl(brokerUrl.toString());
+        clients.add(createNewConnection(topicName, clientBuilder));
+        clients.add(createNewConnection(topicName, clientBuilder));
+        createNewConnectionAndCheckFail(topicName, clientBuilder);
+        assertTrue(System.currentTimeMillis() - startTime < 20 * 1000);
+        cleanClient(clients);
+        clients.clear();
+    }
+
+    @Test
+    public void testConnectionController2() throws Exception {
+        cleanup();
+        conf.setBrokerMaxConnections(0);
+        conf.setBrokerMaxConnectionsPerIp(1);
+        setup();
+        final String topicName = "persistent://prop/ns-abc/connection" + 
UUID.randomUUID();
+        List<PulsarClient> clients = new ArrayList<>();
+        ClientBuilder clientBuilder =
+                PulsarClient.builder().operationTimeout(1, TimeUnit.DAYS)
+                        .connectionTimeout(1, TimeUnit.DAYS)
+                        .serviceUrl(brokerUrl.toString());
+        long startTime = System.currentTimeMillis();
+        clients.add(createNewConnection(topicName, clientBuilder));
+        createNewConnectionAndCheckFail(topicName, clientBuilder);
+        assertTrue(System.currentTimeMillis() - startTime < 20 * 1000);
+        cleanClient(clients);
+        clients.clear();
+
+        cleanup();
+        conf.setBrokerMaxConnections(1);
+        conf.setBrokerMaxConnectionsPerIp(0);
+        setup();
+        startTime = System.currentTimeMillis();
+        clientBuilder.serviceUrl(brokerUrl.toString());
+        clients.add(createNewConnection(topicName, clientBuilder));
+        createNewConnectionAndCheckFail(topicName, clientBuilder);
+        assertTrue(System.currentTimeMillis() - startTime < 20 * 1000);
+        cleanClient(clients);
+        clients.clear();
+
+        cleanup();
+        conf.setBrokerMaxConnections(1);
+        conf.setBrokerMaxConnectionsPerIp(1);
+        setup();
+        startTime = System.currentTimeMillis();
+        clientBuilder.serviceUrl(brokerUrl.toString());
+        clients.add(createNewConnection(topicName, clientBuilder));
+        createNewConnectionAndCheckFail(topicName, clientBuilder);
+        assertTrue(System.currentTimeMillis() - startTime < 20 * 1000);
+        cleanClient(clients);
+        clients.clear();
+
+        cleanup();
+        conf.setBrokerMaxConnections(0);
+        conf.setBrokerMaxConnectionsPerIp(0);
+        setup();
+        clientBuilder.serviceUrl(brokerUrl.toString());
+        startTime = System.currentTimeMillis();
+        for (int i = 0; i < 10; i++) {
+            clients.add(createNewConnection(topicName, clientBuilder));
+        }
+        assertTrue(System.currentTimeMillis() - startTime < 20 * 1000);
+        cleanClient(clients);
+        clients.clear();
+
+    }
+
+    private void createNewConnectionAndCheckFail(String topicName, 
ClientBuilder builder) throws Exception {
+        try {
+            createNewConnection(topicName, builder);
+            fail("should fail");
+        } catch (Exception e) {
+            assertTrue(e.getMessage().contains("Reached the maximum number of 
connections"));
+        }
+    }
+
+    private PulsarClient createNewConnection(String topicName, ClientBuilder 
clientBuilder) throws PulsarClientException {
+        PulsarClient client1 = clientBuilder.build();
+        client1.newProducer().topic(topicName).create().close();
+        return client1;
+    }
+
+    private void cleanClient(List<PulsarClient> clients) throws Exception {
+        for (PulsarClient client : clients) {
+            if (client != null) {
+                client.close();
+            }
+        }
+    }
+
+    @Test
     public void testStatsOfStorageSizeWithSubscription() throws Exception {
         final String topicName = "persistent://prop/ns-abc/no-subscription";
         Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index f4d600b..3820497 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -685,6 +685,10 @@ public class ClientCnx extends PulsarHandler {
             connectionFuture.completeExceptionally(new 
PulsarClientException.AuthenticationException(error.getMessage()));
             log.error("{} Failed to authenticate the client", ctx.channel());
         }
+        if (error.getError() == ServerError.NotAllowedError) {
+            log.error("Get not allowed error, {}", error.getMessage());
+            connectionFuture.completeExceptionally(new 
PulsarClientException.NotAllowedException(error.getMessage()));
+        }
         CompletableFuture<?> requestFuture = pendingRequests.remove(requestId);
         if (requestFuture != null) {
             
requestFuture.completeExceptionally(getPulsarClientException(error.getError(), 
error.getMessage()));

Reply via email to