alex-rufous commented on a change in pull request #76:
URL: https://github.com/apache/qpid-broker-j/pull/76#discussion_r563219790



##########
File path: broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
##########
@@ -64,6 +72,7 @@
     String QPID_DOCUMENTATION_URL = "qpid.helpURL";
     String BROKER_SHUTDOWN_TIMEOUT = "broker.shutdownTimeout";
     String BROKER_STATISTICS_REPORING_PERIOD = 
"broker.statisticsReportingPeriod";
+    String CONNECTION_FREQUENCY_PERIOD = 
"qpid.broker.connectionFrequencyPeriodInMillis";

Review comment:
        it  is a ConnectionLimitProvider specific setting. Thus it needs to be 
moved into a ConnectionLimitProvider

##########
File path: 
broker-core/src/main/java/org/apache/qpid/server/model/VirtualHostUserConnectionLimitProvider.java
##########
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.model;
+
+import org.apache.qpid.server.security.UserConnectionLimitProvider;
+
+@ManagedObject
+public interface VirtualHostUserConnectionLimitProvider<X extends 
VirtualHostUserConnectionLimitProvider<X>>

Review comment:
       I would like to suggest renaming VirtualHostUserConnectionLimitProvider 
into VirtualHostConnectionLimitProvider as it is slightly shorter.

##########
File path: 
broker-core/src/main/java/org/apache/qpid/server/security/UserConnectionLimiter.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.security;
+
+import org.apache.qpid.server.model.Connection;
+import org.apache.qpid.server.model.ResourceAvailability;
+import org.apache.qpid.server.transport.AMQPConnection;
+import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
+
+import java.security.AccessControlContext;
+
+public interface UserConnectionLimiter
+{
+    ResourceAvailability register(AMQPConnection<?> connection, String userId);

Review comment:
       Why userId parameter is required? Why connection#getAuthenticatedUser() 
cannot be called?
   As per above I am not sure about returning ResourceAvailability. I would 
prefer throwing special runtime exception

##########
File path: 
broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
##########
@@ -20,33 +20,6 @@
  */
 package org.apache.qpid.server.protocol.v0_10;
 
-import static 
org.apache.qpid.server.protocol.v0_10.ServerConnection.State.CLOSED;

Review comment:
       import order is incorrect. please revert

##########
File path: 
broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
##########
@@ -53,6 +53,7 @@
 import com.google.common.collect.PeekingIterator;

Review comment:
       As per comments on 0-10 and 0-9 connections, the changes here could be 
reverted. We just need a simple try-catch block around registerConnection on VH.

##########
File path: broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
##########
@@ -130,6 +139,10 @@
     @ManagedContextDefault(name = BROKER_STATISTICS_REPORING_PERIOD)
     int DEFAULT_STATISTICS_REPORTING_PERIOD = 0;
 
+    @ManagedContextDefault(name = CONNECTION_FREQUENCY_PERIOD, description = 
"Interval (in milliseconds) to evaluate connection frequency")

Review comment:
       A context variable is specific to ConnectionLimitProvider Thus it needs 
to be moved into a ConnectionLimitProvider

##########
File path: broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
##########
@@ -394,6 +407,11 @@ void purgeUser(@Param(name="origin", description="The 
AuthenticationProvider the
 
     Collection<AccessControlProvider<?>> getAccessControlProviders();
 
+    default UserConnectionLimitProvider getUserConnectionLimitProvider()

Review comment:
       Please move this into ConnectionLimitProvider. I am not sure that we 
need a default implementation.

##########
File path: 
broker-core/src/main/java/org/apache/qpid/server/logging/messages/UserResourceLimit_logmessages.properties
##########
@@ -0,0 +1,23 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# User Resource Limit logging message i18n strings.
+ACCEPTED = URL-1001 : Accepted : {0} {1} by {2} : {3}
+REJECTED = URL-1002 : Rejected : {0} {1} by {2} : {3}
+DEFERRED = URL-1003 : Deferred : {0} {1} by {2}

Review comment:
       Why DEFERRED operational log is required?
   I personally do not see much value in DEFERRED. It looks to me as 
intermediate state before final decision is made(ACCEPTED or REJECTED). Do you 
have some specific requirements to log DEFEFRED?
   I think that we only need REJECTED operational logs. ACCEPTED and DEFERRED 
could be safely deleted. Actually, if a reject message is reported as part of 
connection CLOSE, we do not need REJECTED :)
   
   Let's delete this and report the reject reason as part of connection close 
operational logs - CON-1002 : Close[ : {0}]

##########
File path: 
broker-core/src/main/java/org/apache/qpid/server/model/BrokerUserConnectionLimitProvider.java
##########
@@ -16,18 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+package org.apache.qpid.server.model;
 
-package org.apache.qpid.server.virtualhost;
+import org.apache.qpid.server.security.UserConnectionLimitProvider;
 
-import org.apache.qpid.server.transport.AMQPConnection;
-
-public interface ConnectionPrincipalStatisticsRegistry
+@ManagedObject
+public interface BrokerUserConnectionLimitProvider<X extends 
BrokerUserConnectionLimitProvider<X>>

Review comment:
       I think we can drop User in a class name to make it shorter. Thus the 
UserConnectionLimitProvider can be changed into ConnectionLimitProvider and 
BrokerUserConnectionLimitProvider into BrokerConnectionLimitProvider
   
   More over using User in a class name could be misleading as connection 
limits are defined for users and groups. I think that setting limits per user 
groups will dominate

##########
File path: 
broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
##########
@@ -856,6 +858,17 @@ public boolean isManagementMode()
         return children;
     }
 
+    @Override
+    public UserConnectionLimitProvider getUserConnectionLimitProvider()
+    {
+        final Collection<BrokerUserConnectionLimitProvider> children = 
getChildren(BrokerUserConnectionLimitProvider.class);

Review comment:
       Is there any specific reason to limit a number of 
ConnectionLimitProviders to 1. What if someone would like to split connection 
limits by organization and manage each organization separately? It seems that 
managing many small sub-sets of data would be easier than huge aggregated one. 
Additionally, I would imagine that connection limits might come from different 
sources. For instance, a predefined sub-set of connection limits can be kept on 
broker side whilst the rest of limits can be retrieved from external rest 
service. Thus, it seems a combination of different implementations can be used.

##########
File path: 
broker-core/src/main/java/org/apache/qpid/server/model/BrokerUserConnectionLimitProvider.java
##########
@@ -16,18 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+package org.apache.qpid.server.model;
 
-package org.apache.qpid.server.virtualhost;
+import org.apache.qpid.server.security.UserConnectionLimitProvider;
 
-import org.apache.qpid.server.transport.AMQPConnection;
-
-public interface ConnectionPrincipalStatisticsRegistry
+@ManagedObject
+public interface BrokerUserConnectionLimitProvider<X extends 
BrokerUserConnectionLimitProvider<X>>
+        extends ConfiguredObject<X>, UserConnectionLimitProvider
 {
-    ConnectionPrincipalStatistics connectionOpened(AMQPConnection<?> 
connection);
-

Review comment:
       IMHO, it would be beneficial to define some common attributes. For 
example, the one I can think of is a list of  current limit settings. Something 
like
   `Collection<ConnectionLimit> getConnectionLimits()`
   

##########
File path: 
broker-core/src/main/java/org/apache/qpid/server/model/ResourceAvailability.java
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.model;
+
+@FunctionalInterface
+public interface ResourceAvailability

Review comment:
       As per comment above,   an exception thrown on reaching the limit  would 
work better. IMHO we can delete this interface

##########
File path: 
broker-core/src/main/java/org/apache/qpid/server/model/VirtualHostUserConnectionLimitProvider.java
##########
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.model;
+
+import org.apache.qpid.server.security.UserConnectionLimitProvider;
+
+@ManagedObject
+public interface VirtualHostUserConnectionLimitProvider<X extends 
VirtualHostUserConnectionLimitProvider<X>>
+        extends ConfiguredObject<X>, UserConnectionLimitProvider
+{

Review comment:
       Some common attributes like list of existing connection limits can be 
defined here.

##########
File path: broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
##########
@@ -20,6 +20,14 @@
  */
 package org.apache.qpid.server.model;
 
+import org.apache.qpid.server.configuration.CommonProperties;

Review comment:
       import statements should be sorted with the most fundamental packages 
first, and grouped with associated packages together and one blank line between 
groups. Thus the expected order should be
   
   * java classes
   * javax classes
   * third party dependencies
   * own project classes

##########
File path: 
broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/config/PortConnectionCounter.java
##########
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.user.connection.limits.config;
+
+import org.apache.qpid.server.security.group.GroupPrincipal;
+import org.apache.qpid.server.transport.AMQPConnection;
+import 
org.apache.qpid.server.user.connection.limits.outcome.AcceptRegistration;
+import org.apache.qpid.server.user.connection.limits.outcome.DeferRegistration;
+import 
org.apache.qpid.server.user.connection.limits.outcome.RejectRegistration;
+import 
org.apache.qpid.server.user.connection.limits.outcome.ResourceAvailabilityLogger;
+
+import javax.security.auth.Subject;
+import java.security.Principal;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+final class PortConnectionCounter
+{
+    private final Function<String, ConnectionCounter> 
_connectionCounterFactory;
+
+    private final Map<String, ConnectionCounter> _connectionCounters = new 
ConcurrentHashMap<>();
+
+    PortConnectionCounter(AbstractBuilder<?> builder)
+    {
+        super();
+        _connectionCounterFactory = builder.getConnectionCounterFactory();
+    }
+
+    public ResourceAvailabilityLogger register(AMQPConnection<?> connection, 
String userId)
+    {
+        if (userId == null)
+        {
+            return DeferRegistration.defer();
+        }
+        final String port = connection.getPort().getName();
+        return _connectionCounters.computeIfAbsent(userId, 
_connectionCounterFactory)
+                .registerConnection(userId, 
collectGroupPrincipals(connection.getSubject()), port);
+    }
+
+    public void deregister(String userId)
+    {
+        if (userId == null)
+        {
+            return;
+        }
+        _connectionCounters.computeIfAbsent(userId, user ->
+                {
+                    throw new IllegalArgumentException(
+                            String.format("The user '%s' has not been 
registered yet", user));
+                }
+        ).deregister();
+    }
+
+    public void maintain()
+    {
+        _connectionCounters.values().forEach(ConnectionCounter::maintain);
+    }
+
+    private Set<String> collectGroupPrincipals(Subject subject)
+    {
+        if (subject == null)
+        {
+            return Collections.emptySet();
+        }
+        final Set<String> principalNames = new HashSet<>();
+        for (final Principal principal : 
subject.getPrincipals(GroupPrincipal.class))
+        {
+            principalNames.add(principal.getName());
+        }
+        return principalNames;
+    }
+
+    static Builder newBuilder(Duration frequencyPeriod)
+    {
+        if (frequencyPeriod != null)
+        {
+            return new BuilderImpl(frequencyPeriod);
+        }
+        return new BuilderWithoutFrequencyImpl();
+    }
+
+    public interface Builder
+    {
+        Builder add(Rule rule);
+
+        Builder addAll(Collection<? extends Rule> rule);
+
+        PortConnectionCounter build();
+    }
+
+    private interface ConnectionCounter
+    {
+        ResourceAvailabilityLogger registerConnection(String userId, 
Set<String> groups, String port);
+
+        void deregister();
+
+        void maintain();
+    }
+
+    abstract static class AbstractBuilder<T extends CombinableLimit<T>> 
implements Builder
+    {
+        final Map<String, T> _userLimits = new HashMap<>();
+        T _defaultUserLimits;
+
+        abstract T newLimits(Rule rule);
+
+        abstract Function<String, ConnectionCounter> 
getConnectionCounterFactory();
+
+        AbstractBuilder(T defaultUserLimits)
+        {
+            super();
+            _defaultUserLimits = defaultUserLimits;
+        }
+
+        @Override
+        public Builder add(Rule rule)
+        {
+            addImpl(rule);
+            return this;
+        }
+
+        @Override
+        public Builder addAll(Collection<? extends Rule> rule)
+        {
+            if (rule != null)
+            {
+                rule.forEach(this::addImpl);
+            }
+            return this;
+        }
+
+        @Override
+        public PortConnectionCounter build()
+        {
+            return new PortConnectionCounter(this);
+        }
+
+        private void addImpl(Rule rule)
+        {
+            if (rule == null)
+            {
+                return;
+            }
+            final T newLimits = newLimits(rule);
+            if (newLimits.isEmpty())
+            {
+                return;
+            }
+            final String id = rule.getIdentity();
+            if (UclRulePredicates.isAllUser(id))
+            {
+                _defaultUserLimits = newLimits.mergeWith(_defaultUserLimits);
+            }
+            else
+            {
+                _userLimits.merge(id, newLimits, CombinableLimit::mergeWith);
+            }
+        }
+    }
+
+    private static final class BuilderImpl extends 
AbstractBuilder<UserConnectionLimits>
+    {
+        private final Duration _frequencyPeriod;
+
+        BuilderImpl(Duration frequencyPeriod)
+        {
+            super(UserConnectionLimits.noLimits());
+            _frequencyPeriod = Objects.requireNonNull(frequencyPeriod);
+        }
+
+        @Override
+        UserConnectionLimits newLimits(Rule rule)
+        {
+            return rule;
+        }
+
+        Function<String, ConnectionCounter> getConnectionCounterFactory()
+        {
+            return userId -> new CombinedConnectionCounterImpl(
+                    new LimitCompiler<>(_userLimits, _defaultUserLimits, 
UserConnectionLimits::noLimits),
+                    _frequencyPeriod);
+        }
+    }
+
+    private static final class BuilderWithoutFrequencyImpl extends 
AbstractBuilder<UserConnectionCountLimit>
+    {
+        BuilderWithoutFrequencyImpl()
+        {
+            super(UserConnectionCountLimit.noLimits());
+        }
+
+        @Override
+        UserConnectionCountLimit newLimits(Rule rule)
+        {
+            return UserConnectionCountLimit.newInstance(rule);
+        }
+
+        @Override
+        Function<String, ConnectionCounter> getConnectionCounterFactory()
+        {
+            return userId -> new ConnectionCounterImpl(
+                    new LimitCompiler<>(_userLimits, _defaultUserLimits, 
UserConnectionCountLimit::noLimits));
+        }
+    }
+
+    private static final class CombinedConnectionCounterImpl implements 
ConnectionCounter
+    {
+        private final LimitCompiler<UserConnectionLimits> _limits;
+
+        private final Duration _frequencyPeriod;
+        private final Queue<Instant> _registrationTime = new LinkedList<>();
+        private long _counter = 0L;
+
+        CombinedConnectionCounterImpl(LimitCompiler<UserConnectionLimits> 
limits, Duration frequencyPeriod)
+        {
+            super();
+            _limits = limits;
+            _frequencyPeriod = frequencyPeriod;
+        }
+
+        @Override
+        public ResourceAvailabilityLogger registerConnection(String userId, 
Set<String> groups, String port)
+        {
+            final UserConnectionLimits limits = _limits.compileLimits(userId, 
groups);
+            if (limits.isUserBlocked())
+            {
+                return RejectRegistration.blockedUser(userId, port);
+            }
+            final Integer connectionCountLimit = 
limits.getConnectionCountLimit();
+            final Integer connectionFrequencyLimit = 
limits.getConnectionFrequencyLimit();
+
+            if (connectionFrequencyLimit == null)
+            {
+                if (connectionCountLimit == null)
+                {
+                    return noLimits(userId, port);
+                }
+                return countLimit(userId, port, connectionCountLimit);
+            }
+            if (connectionCountLimit == null)
+            {
+                return frequencyLimit(userId, port, connectionFrequencyLimit);
+            }
+            return bothLimits(userId, port, connectionCountLimit, 
connectionFrequencyLimit);
+        }
+
+        private synchronized ResourceAvailabilityLogger noLimits(String id, 
String port)

Review comment:
       if we register connection in a configuration thread, the synchronized 
blocks would be redundant. I think we should be registering connections in the 
configuration thread

##########
File path: 
broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
##########
@@ -197,9 +197,7 @@
 
     boolean canAcceptNewConnection(final SocketAddress remoteSocketAddress);
 
-    int incrementConnectionCount();

Review comment:
       We need incrementConnectionCount in a method name. If you are removing 
this method, you need to rename another one where count is incremented.

##########
File path: 
broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java
##########
@@ -37,6 +37,8 @@
 import org.apache.qpid.server.virtualhost.ConnectionEstablishmentPolicy;
 import org.apache.qpid.server.virtualhost.LinkRegistryModel;
 
+import javax.security.auth.Subject;

Review comment:
       wrong import order. Though, it looks like this one is unused

##########
File path: 
broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
##########
@@ -542,69 +539,64 @@ public static String getInstalledProtocolsAsString()
     }
 
     @Override
-    public int incrementConnectionCount()
-    {
-        int openConnections = _connectionCount.incrementAndGet();
-        _totalConnectionCount.incrementAndGet();
-        int maxOpenConnections = getMaxOpenConnections();
-        if(maxOpenConnections > 0
-           && openConnections > (maxOpenConnections * _connectionWarnCount) / 
100
-           && _connectionCountWarningGiven.compareAndSet(false, true))
-        {
-            _container.getEventLogger().message(new PortLogSubject(this),
-                                                
PortMessages.CONNECTION_COUNT_WARN(openConnections,
-                                                                               
 _connectionWarnCount,
-                                                                               
 maxOpenConnections));
-        }
-        return openConnections;
-    }
-
-    @Override
-    public int decrementConnectionCount()
+    public long decrementConnectionCount()
     {
-        int openConnections = _connectionCount.decrementAndGet();
-        int maxOpenConnections = getMaxOpenConnections();
+        final long openConnections = _connectionCount.decrementAndGet();
+        final long maxOpenConnections = getMaxOpenConnections();
 
-        if(maxOpenConnections > 0
-           && openConnections < (maxOpenConnections * 
square(_connectionWarnCount)) / 10000)
+        if (maxOpenConnections > 0
+                && openConnections < (maxOpenConnections * 
square(_connectionWarnCount)) / 10000L)
         {
-           _connectionCountWarningGiven.compareAndSet(true,false);
+            _connectionCountWarningGiven.compareAndSet(true, false);
         }
-
-
         return openConnections;
     }
 
-    private static int square(int val)
+    private static long square(long val)
     {
         return val * val;
     }
 
     @Override
     public boolean canAcceptNewConnection(final SocketAddress 
remoteSocketAddress)

Review comment:
       the method now have a responsibility to increment connection counter. I 
think that should be reflected in the name

##########
File path: 
broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
##########
@@ -542,69 +539,64 @@ public static String getInstalledProtocolsAsString()
     }
 
     @Override
-    public int incrementConnectionCount()
-    {
-        int openConnections = _connectionCount.incrementAndGet();
-        _totalConnectionCount.incrementAndGet();
-        int maxOpenConnections = getMaxOpenConnections();
-        if(maxOpenConnections > 0
-           && openConnections > (maxOpenConnections * _connectionWarnCount) / 
100
-           && _connectionCountWarningGiven.compareAndSet(false, true))
-        {
-            _container.getEventLogger().message(new PortLogSubject(this),
-                                                
PortMessages.CONNECTION_COUNT_WARN(openConnections,
-                                                                               
 _connectionWarnCount,
-                                                                               
 maxOpenConnections));
-        }
-        return openConnections;
-    }
-
-    @Override
-    public int decrementConnectionCount()
+    public long decrementConnectionCount()
     {
-        int openConnections = _connectionCount.decrementAndGet();
-        int maxOpenConnections = getMaxOpenConnections();
+        final long openConnections = _connectionCount.decrementAndGet();
+        final long maxOpenConnections = getMaxOpenConnections();
 
-        if(maxOpenConnections > 0
-           && openConnections < (maxOpenConnections * 
square(_connectionWarnCount)) / 10000)
+        if (maxOpenConnections > 0
+                && openConnections < (maxOpenConnections * 
square(_connectionWarnCount)) / 10000L)
         {
-           _connectionCountWarningGiven.compareAndSet(true,false);
+            _connectionCountWarningGiven.compareAndSet(true, false);
         }
-
-
         return openConnections;
     }
 
-    private static int square(int val)
+    private static long square(long val)
     {
         return val * val;
     }
 
     @Override
     public boolean canAcceptNewConnection(final SocketAddress 
remoteSocketAddress)
     {
-        String addressString = remoteSocketAddress.toString();
+        final String addressString = remoteSocketAddress.toString();
         if (_closingOrDeleting.get())
         {
             _container.getEventLogger().message(new PortLogSubject(this),
-                                                
PortMessages.CONNECTION_REJECTED_CLOSED(addressString));
+                    PortMessages.CONNECTION_REJECTED_CLOSED(addressString));
             return false;
         }
-        else if (_maxOpenConnections > 0 && _connectionCount.get() >= 
_maxOpenConnections)
+
+        final long maxOpenConnections = getMaxOpenConnections();
+        if (maxOpenConnections > 0)
         {
-            _container.getEventLogger().message(new PortLogSubject(this),
-                                                
PortMessages.CONNECTION_REJECTED_TOO_MANY(addressString,
-                                                                               
        _maxOpenConnections));
-            return false;
+            final long currentCount = _connectionCount.getAndUpdate(count -> 
count < maxOpenConnections ? count + 1L : count) + 1L;

Review comment:
       This looks racy to me. Adding +1 outside of getAndUpdate can end-up in 
unexpected connection reject

##########
File path: 
broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/config/UserConnectionLimits.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.user.connection.limits.config;
+
+import java.util.Optional;
+
+public interface UserConnectionLimits extends 
CombinableLimit<UserConnectionLimits>
+{
+    Integer getConnectionCountLimit();
+
+    Integer getConnectionFrequencyLimit();
+
+    boolean isUserBlocked();
+
+    @Override
+    default UserConnectionLimits then(UserConnectionLimits other)
+    {
+        if (other != null && isEmpty())
+        {
+            return other;
+        }
+        return this;
+    }
+
+    @Override
+    default UserConnectionLimits mergeWith(UserConnectionLimits second)

Review comment:
       I would remove this default implementation

##########
File path: 
broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/outcome/AcceptRegistration.java
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.user.connection.limits.outcome;
+
+import java.time.Duration;
+
+public final class AcceptRegistration implements ResourceAvailabilityLogger

Review comment:
       I think this is not required

##########
File path: 
broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
##########
@@ -19,32 +19,9 @@
  */
 package org.apache.qpid.server.model.port;
 
-import java.io.IOException;

Review comment:
       wrong import order

##########
File path: 
broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/config/RuleSetCreator.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.user.connection.limits.config;
+
+import org.apache.qpid.server.logging.EventLoggerProvider;
+import org.apache.qpid.server.security.UserConnectionLimitProvider;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Optional;
+
+public final class RuleSetCreator extends ArrayList<Rule> implements 
UserConnectionLimitProvider
+{
+    private static final long serialVersionUID = -7;
+
+    public static final long DEFAULT_FREQUENCY_PERIOD = 60L * 1000L;
+
+    private Long _frequencyPeriod = null;
+
+    private boolean _logAllMessages = false;
+
+    public RuleSetCreator()
+    {
+        super(new ArrayList<>());
+    }
+
+    public void setFrequencyPeriod(long frequencyPeriod)

Review comment:
       I am not sure that _frequencyPeriod is needed when every rule is 
expressed using its own frequency 

##########
File path: 
broker-core/src/main/java/org/apache/qpid/server/security/UserConnectionLimitProvider.java
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.security;
+
+import org.apache.qpid.server.logging.EventLoggerProvider;
+
+@FunctionalInterface
+public interface UserConnectionLimitProvider

Review comment:
       Using ConnectionLimitProvider as class name should be sufficient

##########
File path: 
broker-core/src/main/java/org/apache/qpid/server/security/UserConnectionLimitProvider.java
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.security;
+
+import org.apache.qpid.server.logging.EventLoggerProvider;
+
+@FunctionalInterface
+public interface UserConnectionLimitProvider
+{
+    UserConnectionLimiter getLimiter(EventLoggerProvider eventLoggerProvider);

Review comment:
       Why we need parameter EventLoggerProvider eventLoggerProvider?
   Possibly, method getLimiter would be better to rename into 
getConnectionLimiter.
   Why (User)ConnectionLimiter is required? What about implementing all  
ConnectionLimiter directly in ConnectionLimitProvider?

##########
File path: 
broker-core/src/main/java/org/apache/qpid/server/security/UserConnectionLimiter.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.security;
+
+import org.apache.qpid.server.model.Connection;
+import org.apache.qpid.server.model.ResourceAvailability;
+import org.apache.qpid.server.transport.AMQPConnection;
+import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
+
+import java.security.AccessControlContext;
+
+public interface UserConnectionLimiter

Review comment:
       The name could be ConnectionLimiter.
   I am not sure we need a special ConnectionLimiter. A ConnectionLimitProvider 
can implement all its responsibilities. Though...I am not sure

##########
File path: 
broker-core/src/main/java/org/apache/qpid/server/security/UserConnectionLimiter.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.security;
+
+import org.apache.qpid.server.model.Connection;
+import org.apache.qpid.server.model.ResourceAvailability;
+import org.apache.qpid.server.transport.AMQPConnection;
+import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
+
+import java.security.AccessControlContext;
+
+public interface UserConnectionLimiter
+{
+    ResourceAvailability register(AMQPConnection<?> connection, String userId);
+
+    void deregister(Connection<?> connection, String userId);

Review comment:
       userId seems redundant

##########
File path: 
broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java
##########
@@ -48,8 +50,17 @@
 
     boolean registerConnection(AMQPConnection<?> connection,
                                final ConnectionEstablishmentPolicy 
connectionEstablishmentPolicy);
+
     void deregisterConnection(AMQPConnection<?> connection);
 
+    default ResourceAvailability requestConnectionSlot(AMQPConnection<?> 
connection, String userId)

Review comment:
       Why methods requestConnectionSlot and freeConnectionSlot are part of VH? 
Why they cannot be moved into ConnectionLimitProvider? It feels that they 
belong to ConnectionLimitProvider domain

##########
File path: 
broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/outcome/RejectLogger.java
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.user.connection.limits.outcome;
+
+import org.apache.qpid.server.logging.EventLoggerProvider;
+import org.apache.qpid.server.logging.messages.UserResourceLimitMessages;
+
+import java.util.Objects;
+
+public class RejectLogger implements EventLogger

Review comment:
       Let's delete this class

##########
File path: 
broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/outcome/RejectRegistration.java
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.user.connection.limits.outcome;
+
+import java.time.Duration;
+
+public final class RejectRegistration implements ResourceAvailabilityLogger

Review comment:
       I think the error messages can be moved into corresponding exception 
class. Let's delete this class

##########
File path: 
broker-core/src/main/java/org/apache/qpid/server/security/UserConnectionLimiter.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.security;
+
+import org.apache.qpid.server.model.Connection;
+import org.apache.qpid.server.model.ResourceAvailability;
+import org.apache.qpid.server.transport.AMQPConnection;
+import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
+
+import java.security.AccessControlContext;
+
+public interface UserConnectionLimiter
+{
+    ResourceAvailability register(AMQPConnection<?> connection, String userId);
+
+    void deregister(Connection<?> connection, String userId);
+
+    void scheduleTask(QueueManagingVirtualHost<?> virtualHost, 
AccessControlContext controlContext);

Review comment:
       I am wondering why we need to pass QueueManagingVirtualHost. I think 
that we need to pass an Executor implementation here. Also, why Broker 
housekeeping executor cannot be used for BrokerConnectionLimitProviders?

##########
File path: 
broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/plugins/UclFileBrokerUserConnectionLimitProvider.java
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.user.connection.limits.plugins;
+
+import org.apache.qpid.server.model.BrokerUserConnectionLimitProvider;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.ManagedAttribute;
+import org.apache.qpid.server.model.ManagedObject;
+import org.apache.qpid.server.model.ManagedOperation;
+
+@ManagedObject(category = false,
+        type = 
AbstractUclFileUserConnectionLimitProvider.UCL_FILE_PROVIDER_TYPE)
+public interface UclFileBrokerUserConnectionLimitProvider<C extends 
UclFileBrokerUserConnectionLimitProvider<C>>
+        extends BrokerUserConnectionLimitProvider<C>

Review comment:
       As per earlier comments we need an attribute to list existing rules

##########
File path: 
broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
##########
@@ -352,6 +357,39 @@ else if(prov.getState() == State.ACTIVE)
         }
     }
 
+    void updateUserConnectionLimiter()
+    {
+        if (!((SystemConfig) _broker.getParent()).isManagementMode())
+        {
+            final Collection<VirtualHostUserConnectionLimitProvider> children =
+                    getChildren(VirtualHostUserConnectionLimitProvider.class);
+            LOGGER.debug("Updating user connection limiter");
+
+            if (children.size() > 1)
+            {
+                throw new IllegalConfigurationException(String.format(

Review comment:
       Why a number of connection limit provider is limited to 1? It does not 
look right

##########
File path: 
broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
##########
@@ -2691,6 +2727,18 @@ public String getArguments()
         });
     }
 
+    @Override
+    public ResourceAvailability requestConnectionSlot(AMQPConnection<?> 
connection, String userId)

Review comment:
       This should go into ConnectionLimitProvider

##########
File path: 
broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
##########
@@ -2691,6 +2727,18 @@ public String getArguments()
         });
     }
 
+    @Override
+    public ResourceAvailability requestConnectionSlot(AMQPConnection<?> 
connection, String userId)
+    {
+        return _userConnectionLimiter.register(connection, userId);
+    }
+
+    @Override
+    public void freeConnectionSlot(AMQPConnection<?> connection, String 
userIds)

Review comment:
       This should go into ConnectionLimitProvider

##########
File path: 
broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
##########
@@ -320,6 +323,8 @@ public AbstractVirtualHost(final Map<String, Object> 
attributes, VirtualHostNode
         _fileSystemSpaceCheckerJobContext = 
getSystemTaskControllerContext("FileSystemSpaceChecker["+getName()+"]", 
_principal);
 
         _fileSystemSpaceChecker = new FileSystemSpaceChecker();
+        _userConnectionLimiter = UserConnectionLimiter.noLimits();
+        _userConnectionLimitChangeListener = new 
UserConnectionLimitChangeListener(this);

Review comment:
       This is a  leak. You cannot pass "this" before object finishes 
construction 

##########
File path: 
broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
##########
@@ -269,8 +270,22 @@ public void connectionOpen(ServerConnection sconn, 
ConnectionOpen open)
 
             try
             {
+                final AMQPConnection_0_10 amqpConnection = 
sconn.getAmqpConnection();
+                final Principal principal = sconn.getAuthorizedPrincipal();
+                if (principal != null)
+                {
+                    amqpConnection.freeConnectionSlots();
+                    final String userId = principal.getName();
+                    final ResourceAvailability resource = 
addressSpace.requestConnectionSlot(amqpConnection, userId);

Review comment:
       the slot registration functionality could be moved into 
VH#registerConnection. There the code can iterate through the existing VH and 
Broker ConnectionLimitProviders. Here, we can slimly  try catching any 
RejectException thrown from ConnectionLimitProviders and close connection 
accordingly

##########
File path: 
broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
##########
@@ -412,13 +410,24 @@ public void closed()
         }
         finally
         {
-            NamedAddressSpace addressSpace = getAddressSpace();

Review comment:
       These changes can be reverted after using delete task for slot free-up.

##########
File path: 
broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
##########
@@ -269,8 +270,22 @@ public void connectionOpen(ServerConnection sconn, 
ConnectionOpen open)
 
             try
             {
+                final AMQPConnection_0_10 amqpConnection = 
sconn.getAmqpConnection();
+                final Principal principal = sconn.getAuthorizedPrincipal();
+                if (principal != null)

Review comment:
       if principal is null it is a security issue. The principal should exist 
. If it is null, a ConnectionScopeException should be thrown at least

##########
File path: 
broker-core/src/main/java/org/apache/qpid/server/virtualhost/UserConnectionLimitChangeListener.java
##########
@@ -0,0 +1,78 @@
+package org.apache.qpid.server.virtualhost;
+
+import org.apache.qpid.server.model.AbstractConfigurationChangeListener;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.model.VirtualHostUserConnectionLimitProvider;
+
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+
+final class UserConnectionLimitChangeListener extends 
AbstractConfigurationChangeListener

Review comment:
       After moving functionality for connection registration/deregistration 
into ConnectionLimitProvider this will no longer be needed

##########
File path: 
broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/outcome/ResourceAvailabilityLogger.java
##########
@@ -0,0 +1,26 @@
+/*

Review comment:
       I think we can delete this interface

##########
File path: 
broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
##########
@@ -269,8 +270,22 @@ public void connectionOpen(ServerConnection sconn, 
ConnectionOpen open)
 
             try
             {
+                final AMQPConnection_0_10 amqpConnection = 
sconn.getAmqpConnection();
+                final Principal principal = sconn.getAuthorizedPrincipal();
+                if (principal != null)
+                {
+                    amqpConnection.freeConnectionSlots();

Review comment:
       why this call is required?

##########
File path: 
broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/plugins/AbstractRuleBasedUserConnectionLimitProvider.java
##########
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.user.connection.limits.plugins;
+
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.Content;
+import org.apache.qpid.server.model.CustomRestHeaders;
+import org.apache.qpid.server.model.ManagedAttributeField;
+import org.apache.qpid.server.model.RestContentHeader;
+import org.apache.qpid.server.user.connection.limits.config.Rule;
+import org.apache.qpid.server.user.connection.limits.config.RuleSetCreator;
+import org.apache.qpid.server.user.connection.limits.config.UclFileParser;
+import 
org.apache.qpid.server.user.connection.limits.config.UclRulePredicates.Property;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+abstract class AbstractRuleBasedUserConnectionLimitProvider<C extends 
AbstractRuleBasedUserConnectionLimitProvider<C>>
+        extends AbstractUserConnectionLimitProvider<C>
+{
+    private static final String FREQUENCY_PERIOD = "frequencyPeriod";
+    private static final String RULES = "rules";
+
+    static final String RULE_BASED_TYPE = "RuleBased";
+
+    @ManagedAttributeField
+    private Long _frequencyPeriod;
+
+    @ManagedAttributeField
+    private List<UclRule> _rules = new ArrayList<>();
+
+    public AbstractRuleBasedUserConnectionLimitProvider(ConfiguredObject<?> 
parent, Map<String, Object> attributes)
+    {
+        super(parent, attributes);
+    }
+
+    public List<UclRule> getRules()
+    {
+        return Collections.unmodifiableList(_rules);
+    }
+
+    public Long getFrequencyPeriod()
+    {
+        return Optional.ofNullable(_frequencyPeriod).orElseGet(
+                () -> getContextValue(Long.class, 
Broker.CONNECTION_FREQUENCY_PERIOD));
+    }
+
+    public Content extractRules()
+    {
+        return new StringContent(getName(), getFrequencyPeriod(), getRules());
+    }
+
+    public void loadFromFile(String path)
+    {
+        final RuleSetCreator ruleList = UclFileParser.parse(path);
+        final List<UclRule> uclRules = new ArrayList<>();
+        for (final Rule rule : ruleList)
+        {
+            uclRules.add(new UclRuleImpl(rule));
+        }
+        final Map<String, Object> attrs = new HashMap<>();
+        attrs.put(RULES, uclRules);
+        if (ruleList.isFrequencyPeriodSet())
+        {
+            attrs.put(FREQUENCY_PERIOD, ruleList.getFrequencyPeriod());
+        }
+        setAttributes(attrs);
+    }
+
+    public void clearRules()
+    {
+        final Map<String, Object> attrs = new HashMap<>();
+        attrs.put(RULES, new ArrayList<UclRule>());
+        setAttributes(attrs);
+    }
+
+    @Override
+    protected void postSetAttributes(final Set<String> actualUpdatedAttributes)
+    {
+        super.postSetAttributes(actualUpdatedAttributes);
+        forceNewUserConnectionLimitProvider();
+    }
+
+    @Override
+    protected RuleSetCreator newUserConnectionLimitProvider()
+    {
+        final RuleSetCreator creator = new RuleSetCreator();
+        creator.setFrequencyPeriod(getFrequencyPeriod());
+        for (final UclRule rule : getRules())
+        {
+            creator.add(Rule.newInstance(rule));
+        }
+        return creator;
+    }
+
+    private static final class StringContent implements Content, 
CustomRestHeaders

Review comment:
       I would prefer to use JSON format

##########
File path: 
broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
##########
@@ -149,7 +148,8 @@
     private long _maxUncommittedInMemorySize;
 
     private final Map<ServerTransaction, Set<Ticker>> _transactionTickers = 
new ConcurrentHashMap<>();
-    private volatile ConnectionPrincipalStatistics 
_connectionPrincipalStatistics;
+
+    private final Queue<ConnectionSlot> _registeredConnectionSlots = new 
ConcurrentLinkedQueue<>();

Review comment:
       I am not sure that this is needed

##########
File path: 
broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/config/Limits.java
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.user.connection.limits.config;
+
+final class Limits implements UserConnectionLimits
+{
+    private final Integer connectionCount;
+
+    private final Integer connectionFrequency;
+
+    Limits(final UserConnectionLimits first, final UserConnectionLimits second)
+    {
+        super();
+        this.connectionCount = min(first.getConnectionCountLimit(), 
second.getConnectionCountLimit());
+        this.connectionFrequency = min(first.getConnectionFrequencyLimit(), 
second.getConnectionFrequencyLimit());
+    }
+
+    @Override
+    public boolean isEmpty()
+    {
+        return connectionCount == null && connectionFrequency == null;
+    }
+
+    @Override
+    public Integer getConnectionCountLimit()
+    {
+        return connectionCount;
+    }
+
+    @Override
+    public Integer getConnectionFrequencyLimit()

Review comment:
       This could be a string expressed like **number of connections** per 
period of time, for example 1/s, 10/h, I believe that integer could be confusing

##########
File path: 
broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/config/Limits.java
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.user.connection.limits.config;
+
+final class Limits implements UserConnectionLimits

Review comment:
       This class deserves a better name. It represents combined connection 
limits

##########
File path: 
broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
##########
@@ -1161,28 +1161,35 @@ public void incrementTransactionBeginCounter()
     }
 
     @Override
-    public void registered(final ConnectionPrincipalStatistics 
connectionPrincipalStatistics)
+    public void freeConnectionSlots()
     {
-        _connectionPrincipalStatistics = connectionPrincipalStatistics;
+        ConnectionSlot slot;
+        while ((slot = _registeredConnectionSlots.poll()) != null)
+        {
+            slot.free();
+        }
     }
 
     @Override
-    public int getAuthenticatedPrincipalConnectionCount()
+    public void addConnectionSlot(NamedAddressSpace addressSpace, String 
userId)
     {
-        if (_connectionPrincipalStatistics == null)
-        {
-            return 0;
-        }
-        return _connectionPrincipalStatistics.getConnectionCount();
+        _registeredConnectionSlots.add(new ConnectionSlot(addressSpace, 
userId));
     }
 
-    @Override
-    public int getAuthenticatedPrincipalConnectionFrequency()
+    private final class ConnectionSlot

Review comment:
       As per comments above, a delete task Action<?> can do the clean-up

##########
File path: 
broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
##########
@@ -1161,28 +1161,35 @@ public void incrementTransactionBeginCounter()
     }
 
     @Override
-    public void registered(final ConnectionPrincipalStatistics 
connectionPrincipalStatistics)
+    public void freeConnectionSlots()

Review comment:
       This should be part of DeleteTask. Pls see 
org.apache.qpid.server.util.Deletable#addDeleteTask

##########
File path: 
broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
##########
@@ -20,34 +20,9 @@
  */
 package org.apache.qpid.server.transport;
 
-import java.net.InetSocketAddress;

Review comment:
       wrong import order, please revert

##########
File path: 
broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/config/Limits.java
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.user.connection.limits.config;
+
+final class Limits implements UserConnectionLimits
+{
+    private final Integer connectionCount;
+
+    private final Integer connectionFrequency;
+
+    Limits(final UserConnectionLimits first, final UserConnectionLimits second)
+    {
+        super();
+        this.connectionCount = min(first.getConnectionCountLimit(), 
second.getConnectionCountLimit());
+        this.connectionFrequency = min(first.getConnectionFrequencyLimit(), 
second.getConnectionFrequencyLimit());
+    }
+
+    @Override
+    public boolean isEmpty()
+    {
+        return connectionCount == null && connectionFrequency == null;
+    }
+
+    @Override
+    public Integer getConnectionCountLimit()
+    {
+        return connectionCount;
+    }
+
+    @Override
+    public Integer getConnectionFrequencyLimit()
+    {
+        return connectionFrequency;
+    }
+
+    @Override
+    public boolean isUserBlocked()

Review comment:
       Redundant method. It needs to be removed

##########
File path: 
broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/config/Limits.java
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.user.connection.limits.config;
+
+final class Limits implements UserConnectionLimits
+{
+    private final Integer connectionCount;
+
+    private final Integer connectionFrequency;
+
+    Limits(final UserConnectionLimits first, final UserConnectionLimits second)
+    {
+        super();
+        this.connectionCount = min(first.getConnectionCountLimit(), 
second.getConnectionCountLimit());
+        this.connectionFrequency = min(first.getConnectionFrequencyLimit(), 
second.getConnectionFrequencyLimit());
+    }
+
+    @Override
+    public boolean isEmpty()
+    {
+        return connectionCount == null && connectionFrequency == null;
+    }
+
+    @Override
+    public Integer getConnectionCountLimit()
+    {
+        return connectionCount;
+    }
+
+    @Override
+    public Integer getConnectionFrequencyLimit()
+    {
+        return connectionFrequency;
+    }
+
+    @Override
+    public boolean isUserBlocked()
+    {
+        return false;
+    }
+
+    static Integer min(final Integer first, final Integer second)
+    {
+        if (first == null)

Review comment:
       if any of parameter is null, it is a mistake. An exception has to be 
thrown

##########
File path: 
broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
##########
@@ -20,26 +20,25 @@
  */
 package org.apache.qpid.server.transport;
 
-import java.net.SocketAddress;

Review comment:
       wrong import order. please revert the change

##########
File path: 
broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
##########
@@ -29,6 +29,7 @@
 import java.nio.BufferUnderflowException;

Review comment:
       As per comments for 0-10 protocol connection, the changes here can be 
reverted. we just need to add a try-catch block around a code registering 
connection on VH

##########
File path: 
broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
##########
@@ -148,10 +147,7 @@ void registerTransactionTickers(ServerTransaction 
serverTransaction,
     @Override
     AmqpPort<?> getPort();
 
-    void registered(ConnectionPrincipalStatistics 
connectionPrincipalStatistics);
-
-    int getAuthenticatedPrincipalConnectionCount();
-
-    int getAuthenticatedPrincipalConnectionFrequency();
+    void addConnectionSlot(NamedAddressSpace addressSpace, String userId);

Review comment:
       I would like to remove addConnectionSlot and  freeConnectionSlots. IMHO, 
a DeleteTask for every identity should be registered on the connection to free 
the slot

##########
File path: 
broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/config/NonBlockingRule.java
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.user.connection.limits.config;
+
+import org.apache.qpid.server.user.connection.limits.plugins.UclRule;
+
+final class NonBlockingRule extends AbstractRule
+{
+    private final Integer connectionCount;
+
+    private final Integer connectionFrequency;
+
+    NonBlockingRule(UclRule rule)
+    {
+        this(rule.getPort(), rule.getIdentity(), rule.getCountLimit(), 
rule.getFrequencyLimit());
+    }
+
+    NonBlockingRule(String port, String identity, Integer connectionCount, 
Integer connectionFrequency)
+    {
+        super(port, identity);
+        this.connectionCount = connectionCount;
+        this.connectionFrequency = connectionFrequency;
+    }
+
+    @Override
+    public boolean isEmpty()
+    {
+        return connectionCount == null && connectionFrequency == null;

Review comment:
       if both are null values, an exception should be reported from constructor

##########
File path: 
broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/config/PortConnectionCounter.java
##########
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.user.connection.limits.config;
+
+import org.apache.qpid.server.security.group.GroupPrincipal;
+import org.apache.qpid.server.transport.AMQPConnection;
+import 
org.apache.qpid.server.user.connection.limits.outcome.AcceptRegistration;
+import org.apache.qpid.server.user.connection.limits.outcome.DeferRegistration;
+import 
org.apache.qpid.server.user.connection.limits.outcome.RejectRegistration;
+import 
org.apache.qpid.server.user.connection.limits.outcome.ResourceAvailabilityLogger;
+
+import javax.security.auth.Subject;
+import java.security.Principal;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+final class PortConnectionCounter
+{
+    private final Function<String, ConnectionCounter> 
_connectionCounterFactory;
+
+    private final Map<String, ConnectionCounter> _connectionCounters = new 
ConcurrentHashMap<>();
+
+    PortConnectionCounter(AbstractBuilder<?> builder)

Review comment:
       It is hard to understand why builder is passed into constructor. Why 
connection factory cannot be passed directly?

##########
File path: 
broker-plugins/management-http/src/main/java/resources/js/qpid/management/userconnectionlimitprovider/rulebased/show.js
##########
@@ -0,0 +1,197 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+define(["dojo/_base/connect",
+        "dojo/_base/event",
+        "dojo/parser",
+        "dojo/query",
+        "dojo/dom-construct",
+        "dijit/registry",
+        "dojox/html/entities",
+        "dojox/grid/EnhancedGrid",

Review comment:
       EnhancedGrid should not be used. Dgrid should be used instead

##########
File path: 
broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/config/PortConnectionCounter.java
##########
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.user.connection.limits.config;
+
+import org.apache.qpid.server.security.group.GroupPrincipal;
+import org.apache.qpid.server.transport.AMQPConnection;
+import 
org.apache.qpid.server.user.connection.limits.outcome.AcceptRegistration;
+import org.apache.qpid.server.user.connection.limits.outcome.DeferRegistration;
+import 
org.apache.qpid.server.user.connection.limits.outcome.RejectRegistration;
+import 
org.apache.qpid.server.user.connection.limits.outcome.ResourceAvailabilityLogger;
+
+import javax.security.auth.Subject;
+import java.security.Principal;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+final class PortConnectionCounter
+{
+    private final Function<String, ConnectionCounter> 
_connectionCounterFactory;
+
+    private final Map<String, ConnectionCounter> _connectionCounters = new 
ConcurrentHashMap<>();
+
+    PortConnectionCounter(AbstractBuilder<?> builder)
+    {
+        super();
+        _connectionCounterFactory = builder.getConnectionCounterFactory();
+    }
+
+    public ResourceAvailabilityLogger register(AMQPConnection<?> connection, 
String userId)
+    {
+        if (userId == null)
+        {
+            return DeferRegistration.defer();
+        }
+        final String port = connection.getPort().getName();
+        return _connectionCounters.computeIfAbsent(userId, 
_connectionCounterFactory)
+                .registerConnection(userId, 
collectGroupPrincipals(connection.getSubject()), port);
+    }
+
+    public void deregister(String userId)
+    {
+        if (userId == null)

Review comment:
       if userId is null, it is a mistake. An exception should be thrown

##########
File path: 
broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/config/NonBlockingRule.java
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.user.connection.limits.config;
+
+import org.apache.qpid.server.user.connection.limits.plugins.UclRule;
+
+final class NonBlockingRule extends AbstractRule
+{
+    private final Integer connectionCount;
+
+    private final Integer connectionFrequency;
+
+    NonBlockingRule(UclRule rule)
+    {
+        this(rule.getPort(), rule.getIdentity(), rule.getCountLimit(), 
rule.getFrequencyLimit());
+    }
+
+    NonBlockingRule(String port, String identity, Integer connectionCount, 
Integer connectionFrequency)
+    {
+        super(port, identity);
+        this.connectionCount = connectionCount;
+        this.connectionFrequency = connectionFrequency;
+    }
+
+    @Override
+    public boolean isEmpty()
+    {
+        return connectionCount == null && connectionFrequency == null;
+    }
+
+    @Override
+    public boolean isUserBlocked()
+    {
+        return false;
+    }
+
+    @Override
+    public Integer getConnectionCountLimit()
+    {
+        return connectionCount;
+    }
+
+    @Override
+    public Integer getConnectionFrequencyLimit()

Review comment:
       I am not sure about Integer type. We need a special frequency type

##########
File path: 
broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/config/PortConnectionCounter.java
##########
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.user.connection.limits.config;
+
+import org.apache.qpid.server.security.group.GroupPrincipal;
+import org.apache.qpid.server.transport.AMQPConnection;
+import 
org.apache.qpid.server.user.connection.limits.outcome.AcceptRegistration;
+import org.apache.qpid.server.user.connection.limits.outcome.DeferRegistration;
+import 
org.apache.qpid.server.user.connection.limits.outcome.RejectRegistration;
+import 
org.apache.qpid.server.user.connection.limits.outcome.ResourceAvailabilityLogger;
+
+import javax.security.auth.Subject;
+import java.security.Principal;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+final class PortConnectionCounter
+{
+    private final Function<String, ConnectionCounter> 
_connectionCounterFactory;

Review comment:
       This is confusion. Factory cannot be Function. Factory class usually has 
a create method. Using apply for that would be misleading. Please introduce a 
special interface if required

##########
File path: 
broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
##########
@@ -950,57 +960,67 @@ public void receiveConnectionOpen(AMQShortString 
virtualHostName,
             virtualHostStr = virtualHostStr.substring(1);
         }
 
-        NamedAddressSpace addressSpace = 
((AmqpPort)getPort()).getAddressSpace(virtualHostStr);
+        final NamedAddressSpace addressSpace = 
((AmqpPort)getPort()).getAddressSpace(virtualHostStr);
 
         if (addressSpace == null)
         {
             sendConnectionClose(ErrorCodes.NOT_FOUND,
                     "Unknown virtual host: '" + virtualHostName + "'", 0);
+            return;
+        }
 
+        // Check virtualhost access
+        if (!addressSpace.isActive())
+        {
+            final String redirectHost = 
addressSpace.getRedirectHost(getPort());
+            if (redirectHost != null)
+            {
+                sendConnectionClose(0, new AMQFrame(0, new 
ConnectionRedirectBody(getProtocolVersion(), 
AMQShortString.valueOf(redirectHost), null)));
+            }
+            else
+            {
+                sendConnectionClose(ErrorCodes.CONNECTION_FORCED,
+                        "Virtual host '" + addressSpace.getName() + "' is not 
active", 0);
+            }
+            return;
         }
-        else
+
+        try
         {
-            // Check virtualhost access
-            if (!addressSpace.isActive())
+            final Principal authenticatedPrincipal = getAuthorizedPrincipal();
+            if (authenticatedPrincipal != null)
             {
-                String redirectHost = addressSpace.getRedirectHost(getPort());
-                if(redirectHost != null)
+                freeConnectionSlots();
+                final String userId = authenticatedPrincipal.getName();
+                final ResourceAvailability resource = 
addressSpace.requestConnectionSlot(this, userId);
+                if (resource != null && !resource.isAvailable())
                 {
-                    sendConnectionClose(0, new AMQFrame(0, new 
ConnectionRedirectBody(getProtocolVersion(), 
AMQShortString.valueOf(redirectHost), null)));
-                }
-                else
-                {
-                    sendConnectionClose(ErrorCodes.CONNECTION_FORCED,
-                            "Virtual host '" + addressSpace.getName() + "' is 
not active", 0);
+                    sendConnectionClose(ErrorCodes.RESOURCE_ERROR, 
resource.getCause(), 0);
+                    return;
                 }
+                addConnectionSlot(addressSpace, userId);
+            }
+
+            addressSpace.registerConnection(this, new 
NoopConnectionEstablishmentPolicy());

Review comment:
       Here we can add try-catch block to catch a reject runtime exception. 
That should simplify the implementation

##########
File path: 
broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/config/Limits.java
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.user.connection.limits.config;
+
+final class Limits implements UserConnectionLimits
+{
+    private final Integer connectionCount;
+
+    private final Integer connectionFrequency;
+
+    Limits(final UserConnectionLimits first, final UserConnectionLimits second)
+    {
+        super();
+        this.connectionCount = min(first.getConnectionCountLimit(), 
second.getConnectionCountLimit());
+        this.connectionFrequency = min(first.getConnectionFrequencyLimit(), 
second.getConnectionFrequencyLimit());
+    }
+
+    @Override
+    public boolean isEmpty()
+    {
+        return connectionCount == null && connectionFrequency == null;

Review comment:
       if neither is set it is a user mistake. we should not allow that. A 
validation should detect that and throw a ConfigurationException. Method 
isEmpty can be deleted

##########
File path: 
broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
##########
@@ -229,6 +233,7 @@ public Result getResult(final Subject subject)
         }
     }, Result.DEFER);
 
+    private volatile UserConnectionLimiter _userConnectionLimiter;

Review comment:
       This should live on ConnectionLimitProvider. VH are already overloaded 
with responsibilities. IMHO, it would be cleaner to do connection limiting on 
the ConnectionLimitProvider

##########
File path: 
broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/config/PortConnectionCounter.java
##########
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.user.connection.limits.config;
+
+import org.apache.qpid.server.security.group.GroupPrincipal;
+import org.apache.qpid.server.transport.AMQPConnection;
+import 
org.apache.qpid.server.user.connection.limits.outcome.AcceptRegistration;
+import org.apache.qpid.server.user.connection.limits.outcome.DeferRegistration;
+import 
org.apache.qpid.server.user.connection.limits.outcome.RejectRegistration;
+import 
org.apache.qpid.server.user.connection.limits.outcome.ResourceAvailabilityLogger;
+
+import javax.security.auth.Subject;
+import java.security.Principal;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+final class PortConnectionCounter
+{
+    private final Function<String, ConnectionCounter> 
_connectionCounterFactory;
+
+    private final Map<String, ConnectionCounter> _connectionCounters = new 
ConcurrentHashMap<>();
+
+    PortConnectionCounter(AbstractBuilder<?> builder)
+    {
+        super();
+        _connectionCounterFactory = builder.getConnectionCounterFactory();
+    }
+
+    public ResourceAvailabilityLogger register(AMQPConnection<?> connection, 
String userId)
+    {
+        if (userId == null)

Review comment:
       if userId is null, it is a mistake. An exception should be thrown

##########
File path: 
broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/config/PortConnectionCounter.java
##########
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.user.connection.limits.config;
+
+import org.apache.qpid.server.security.group.GroupPrincipal;
+import org.apache.qpid.server.transport.AMQPConnection;
+import 
org.apache.qpid.server.user.connection.limits.outcome.AcceptRegistration;
+import org.apache.qpid.server.user.connection.limits.outcome.DeferRegistration;
+import 
org.apache.qpid.server.user.connection.limits.outcome.RejectRegistration;
+import 
org.apache.qpid.server.user.connection.limits.outcome.ResourceAvailabilityLogger;
+
+import javax.security.auth.Subject;
+import java.security.Principal;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+final class PortConnectionCounter
+{
+    private final Function<String, ConnectionCounter> 
_connectionCounterFactory;
+
+    private final Map<String, ConnectionCounter> _connectionCounters = new 
ConcurrentHashMap<>();
+
+    PortConnectionCounter(AbstractBuilder<?> builder)
+    {
+        super();
+        _connectionCounterFactory = builder.getConnectionCounterFactory();
+    }
+
+    public ResourceAvailabilityLogger register(AMQPConnection<?> connection, 
String userId)
+    {
+        if (userId == null)
+        {
+            return DeferRegistration.defer();
+        }
+        final String port = connection.getPort().getName();
+        return _connectionCounters.computeIfAbsent(userId, 
_connectionCounterFactory)

Review comment:
       this is very confusing...
   userId is passed as a parameter and groups are taken from connection. Why 
authenticated principal cannot be taken from connection?

##########
File path: 
broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/config/PortConnectionCounter.java
##########
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.user.connection.limits.config;
+
+import org.apache.qpid.server.security.group.GroupPrincipal;
+import org.apache.qpid.server.transport.AMQPConnection;
+import 
org.apache.qpid.server.user.connection.limits.outcome.AcceptRegistration;
+import org.apache.qpid.server.user.connection.limits.outcome.DeferRegistration;
+import 
org.apache.qpid.server.user.connection.limits.outcome.RejectRegistration;
+import 
org.apache.qpid.server.user.connection.limits.outcome.ResourceAvailabilityLogger;
+
+import javax.security.auth.Subject;
+import java.security.Principal;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+final class PortConnectionCounter
+{
+    private final Function<String, ConnectionCounter> 
_connectionCounterFactory;
+
+    private final Map<String, ConnectionCounter> _connectionCounters = new 
ConcurrentHashMap<>();
+
+    PortConnectionCounter(AbstractBuilder<?> builder)
+    {
+        super();
+        _connectionCounterFactory = builder.getConnectionCounterFactory();
+    }
+
+    public ResourceAvailabilityLogger register(AMQPConnection<?> connection, 
String userId)
+    {
+        if (userId == null)
+        {
+            return DeferRegistration.defer();
+        }
+        final String port = connection.getPort().getName();
+        return _connectionCounters.computeIfAbsent(userId, 
_connectionCounterFactory)
+                .registerConnection(userId, 
collectGroupPrincipals(connection.getSubject()), port);
+    }
+
+    public void deregister(String userId)
+    {
+        if (userId == null)
+        {
+            return;
+        }
+        _connectionCounters.computeIfAbsent(userId, user ->
+                {
+                    throw new IllegalArgumentException(
+                            String.format("The user '%s' has not been 
registered yet", user));
+                }
+        ).deregister();
+    }
+
+    public void maintain()
+    {
+        _connectionCounters.values().forEach(ConnectionCounter::maintain);
+    }
+
+    private Set<String> collectGroupPrincipals(Subject subject)
+    {
+        if (subject == null)
+        {
+            return Collections.emptySet();
+        }
+        final Set<String> principalNames = new HashSet<>();
+        for (final Principal principal : 
subject.getPrincipals(GroupPrincipal.class))
+        {
+            principalNames.add(principal.getName());
+        }
+        return principalNames;
+    }
+
+    static Builder newBuilder(Duration frequencyPeriod)
+    {
+        if (frequencyPeriod != null)
+        {
+            return new BuilderImpl(frequencyPeriod);
+        }
+        return new BuilderWithoutFrequencyImpl();
+    }
+
+    public interface Builder
+    {
+        Builder add(Rule rule);
+
+        Builder addAll(Collection<? extends Rule> rule);
+
+        PortConnectionCounter build();
+    }
+
+    private interface ConnectionCounter
+    {
+        ResourceAvailabilityLogger registerConnection(String userId, 
Set<String> groups, String port);

Review comment:
       ResourceAvailabilityLogger does not sound right.

##########
File path: 
broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/config/UserConnectionLimits.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.user.connection.limits.config;
+
+import java.util.Optional;
+
+public interface UserConnectionLimits extends 
CombinableLimit<UserConnectionLimits>
+{
+    Integer getConnectionCountLimit();
+
+    Integer getConnectionFrequencyLimit();
+
+    boolean isUserBlocked();

Review comment:
       this is redundant. connection limit of 0 or frequency of 0 should mean 
the same

##########
File path: 
broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/config/PortConnectionCounter.java
##########
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.user.connection.limits.config;
+
+import org.apache.qpid.server.security.group.GroupPrincipal;
+import org.apache.qpid.server.transport.AMQPConnection;
+import 
org.apache.qpid.server.user.connection.limits.outcome.AcceptRegistration;
+import org.apache.qpid.server.user.connection.limits.outcome.DeferRegistration;
+import 
org.apache.qpid.server.user.connection.limits.outcome.RejectRegistration;
+import 
org.apache.qpid.server.user.connection.limits.outcome.ResourceAvailabilityLogger;
+
+import javax.security.auth.Subject;
+import java.security.Principal;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+final class PortConnectionCounter
+{
+    private final Function<String, ConnectionCounter> 
_connectionCounterFactory;
+
+    private final Map<String, ConnectionCounter> _connectionCounters = new 
ConcurrentHashMap<>();
+
+    PortConnectionCounter(AbstractBuilder<?> builder)
+    {
+        super();
+        _connectionCounterFactory = builder.getConnectionCounterFactory();
+    }
+
+    public ResourceAvailabilityLogger register(AMQPConnection<?> connection, 
String userId)
+    {
+        if (userId == null)
+        {
+            return DeferRegistration.defer();
+        }
+        final String port = connection.getPort().getName();
+        return _connectionCounters.computeIfAbsent(userId, 
_connectionCounterFactory)
+                .registerConnection(userId, 
collectGroupPrincipals(connection.getSubject()), port);
+    }
+
+    public void deregister(String userId)
+    {
+        if (userId == null)
+        {
+            return;
+        }
+        _connectionCounters.computeIfAbsent(userId, user ->
+                {
+                    throw new IllegalArgumentException(
+                            String.format("The user '%s' has not been 
registered yet", user));
+                }
+        ).deregister();
+    }
+
+    public void maintain()
+    {
+        _connectionCounters.values().forEach(ConnectionCounter::maintain);
+    }
+
+    private Set<String> collectGroupPrincipals(Subject subject)
+    {
+        if (subject == null)
+        {
+            return Collections.emptySet();
+        }
+        final Set<String> principalNames = new HashSet<>();
+        for (final Principal principal : 
subject.getPrincipals(GroupPrincipal.class))
+        {
+            principalNames.add(principal.getName());
+        }
+        return principalNames;
+    }
+
+    static Builder newBuilder(Duration frequencyPeriod)
+    {
+        if (frequencyPeriod != null)
+        {
+            return new BuilderImpl(frequencyPeriod);
+        }
+        return new BuilderWithoutFrequencyImpl();
+    }
+
+    public interface Builder
+    {
+        Builder add(Rule rule);
+
+        Builder addAll(Collection<? extends Rule> rule);
+
+        PortConnectionCounter build();
+    }
+
+    private interface ConnectionCounter
+    {
+        ResourceAvailabilityLogger registerConnection(String userId, 
Set<String> groups, String port);
+
+        void deregister();
+
+        void maintain();
+    }
+
+    abstract static class AbstractBuilder<T extends CombinableLimit<T>> 
implements Builder

Review comment:
       I got lost in builders. I will review this later

##########
File path: 
broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/config/PortConnectionCounter.java
##########
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.user.connection.limits.config;
+
+import org.apache.qpid.server.security.group.GroupPrincipal;
+import org.apache.qpid.server.transport.AMQPConnection;
+import 
org.apache.qpid.server.user.connection.limits.outcome.AcceptRegistration;
+import org.apache.qpid.server.user.connection.limits.outcome.DeferRegistration;
+import 
org.apache.qpid.server.user.connection.limits.outcome.RejectRegistration;
+import 
org.apache.qpid.server.user.connection.limits.outcome.ResourceAvailabilityLogger;
+
+import javax.security.auth.Subject;
+import java.security.Principal;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+final class PortConnectionCounter
+{
+    private final Function<String, ConnectionCounter> 
_connectionCounterFactory;
+
+    private final Map<String, ConnectionCounter> _connectionCounters = new 
ConcurrentHashMap<>();
+
+    PortConnectionCounter(AbstractBuilder<?> builder)
+    {
+        super();
+        _connectionCounterFactory = builder.getConnectionCounterFactory();
+    }
+
+    public ResourceAvailabilityLogger register(AMQPConnection<?> connection, 
String userId)
+    {
+        if (userId == null)
+        {
+            return DeferRegistration.defer();
+        }
+        final String port = connection.getPort().getName();
+        return _connectionCounters.computeIfAbsent(userId, 
_connectionCounterFactory)
+                .registerConnection(userId, 
collectGroupPrincipals(connection.getSubject()), port);
+    }
+
+    public void deregister(String userId)
+    {
+        if (userId == null)
+        {
+            return;
+        }
+        _connectionCounters.computeIfAbsent(userId, user ->
+                {
+                    throw new IllegalArgumentException(

Review comment:
       this might crash the broker....which is possibly the right thing to do 
in this case. Though if multiple providers would be allowed, only some of them 
would have user registered. We need to look into this from multiple provider 
perspective.That would depend how delete task are registered on a connection

##########
File path: 
broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/config/UserConnectionLimits.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.user.connection.limits.config;
+
+import java.util.Optional;
+
+public interface UserConnectionLimits extends 
CombinableLimit<UserConnectionLimits>
+{
+    Integer getConnectionCountLimit();
+
+    Integer getConnectionFrequencyLimit();

Review comment:
       I think this should be either String or special Frequency type. The Qpid 
users should be allowed to set frequency using user friendly terminology, 
something like "1/s", "2/h", "10/d". I think that using units like seconds, 
hours and days should be sufficient

##########
File path: 
broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/config/PortConnectionCounter.java
##########
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.user.connection.limits.config;
+
+import org.apache.qpid.server.security.group.GroupPrincipal;
+import org.apache.qpid.server.transport.AMQPConnection;
+import 
org.apache.qpid.server.user.connection.limits.outcome.AcceptRegistration;
+import org.apache.qpid.server.user.connection.limits.outcome.DeferRegistration;
+import 
org.apache.qpid.server.user.connection.limits.outcome.RejectRegistration;
+import 
org.apache.qpid.server.user.connection.limits.outcome.ResourceAvailabilityLogger;
+
+import javax.security.auth.Subject;
+import java.security.Principal;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+final class PortConnectionCounter
+{
+    private final Function<String, ConnectionCounter> 
_connectionCounterFactory;
+
+    private final Map<String, ConnectionCounter> _connectionCounters = new 
ConcurrentHashMap<>();
+
+    PortConnectionCounter(AbstractBuilder<?> builder)
+    {
+        super();
+        _connectionCounterFactory = builder.getConnectionCounterFactory();
+    }
+
+    public ResourceAvailabilityLogger register(AMQPConnection<?> connection, 
String userId)
+    {
+        if (userId == null)
+        {
+            return DeferRegistration.defer();
+        }
+        final String port = connection.getPort().getName();
+        return _connectionCounters.computeIfAbsent(userId, 
_connectionCounterFactory)
+                .registerConnection(userId, 
collectGroupPrincipals(connection.getSubject()), port);
+    }
+
+    public void deregister(String userId)
+    {
+        if (userId == null)
+        {
+            return;
+        }
+        _connectionCounters.computeIfAbsent(userId, user ->
+                {
+                    throw new IllegalArgumentException(
+                            String.format("The user '%s' has not been 
registered yet", user));
+                }
+        ).deregister();
+    }
+
+    public void maintain()
+    {
+        _connectionCounters.values().forEach(ConnectionCounter::maintain);
+    }
+
+    private Set<String> collectGroupPrincipals(Subject subject)
+    {
+        if (subject == null)
+        {
+            return Collections.emptySet();
+        }
+        final Set<String> principalNames = new HashSet<>();

Review comment:
       I could be wrong but there should some utility methods to filter right 
principals. Please investigate

##########
File path: 
broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/config/PortConnectionCounter.java
##########
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.user.connection.limits.config;
+
+import org.apache.qpid.server.security.group.GroupPrincipal;
+import org.apache.qpid.server.transport.AMQPConnection;
+import 
org.apache.qpid.server.user.connection.limits.outcome.AcceptRegistration;
+import org.apache.qpid.server.user.connection.limits.outcome.DeferRegistration;
+import 
org.apache.qpid.server.user.connection.limits.outcome.RejectRegistration;
+import 
org.apache.qpid.server.user.connection.limits.outcome.ResourceAvailabilityLogger;
+
+import javax.security.auth.Subject;
+import java.security.Principal;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+final class PortConnectionCounter
+{
+    private final Function<String, ConnectionCounter> 
_connectionCounterFactory;
+
+    private final Map<String, ConnectionCounter> _connectionCounters = new 
ConcurrentHashMap<>();
+
+    PortConnectionCounter(AbstractBuilder<?> builder)
+    {
+        super();
+        _connectionCounterFactory = builder.getConnectionCounterFactory();
+    }
+
+    public ResourceAvailabilityLogger register(AMQPConnection<?> connection, 
String userId)
+    {
+        if (userId == null)
+        {
+            return DeferRegistration.defer();
+        }
+        final String port = connection.getPort().getName();
+        return _connectionCounters.computeIfAbsent(userId, 
_connectionCounterFactory)
+                .registerConnection(userId, 
collectGroupPrincipals(connection.getSubject()), port);

Review comment:
       I think it would read better if registerConnection is called for every 
group and authenticated principal

##########
File path: 
broker-core/src/main/java/org/apache/qpid/server/security/UserConnectionLimiter.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.security;
+
+import org.apache.qpid.server.model.Connection;

Review comment:
       wrong import order

##########
File path: 
broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/config/UserConnectionCountLimit.java
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.user.connection.limits.config;
+
+import java.util.Optional;
+
+@FunctionalInterface
+public interface UserConnectionCountLimit extends 
CombinableLimit<UserConnectionCountLimit>

Review comment:
       I am not sure that this class is really needed. I think having a single 
implementation would work better. At least it would make code simpler to 
follow. Also, at the moment there are a lot of code duplication between 
different implementations

##########
File path: 
broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/config/PortConnectionCounter.java
##########
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.user.connection.limits.config;
+
+import org.apache.qpid.server.security.group.GroupPrincipal;
+import org.apache.qpid.server.transport.AMQPConnection;
+import 
org.apache.qpid.server.user.connection.limits.outcome.AcceptRegistration;
+import org.apache.qpid.server.user.connection.limits.outcome.DeferRegistration;
+import 
org.apache.qpid.server.user.connection.limits.outcome.RejectRegistration;
+import 
org.apache.qpid.server.user.connection.limits.outcome.ResourceAvailabilityLogger;
+
+import javax.security.auth.Subject;
+import java.security.Principal;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+final class PortConnectionCounter
+{
+    private final Function<String, ConnectionCounter> 
_connectionCounterFactory;
+
+    private final Map<String, ConnectionCounter> _connectionCounters = new 
ConcurrentHashMap<>();
+
+    PortConnectionCounter(AbstractBuilder<?> builder)
+    {
+        super();
+        _connectionCounterFactory = builder.getConnectionCounterFactory();
+    }
+
+    public ResourceAvailabilityLogger register(AMQPConnection<?> connection, 
String userId)
+    {
+        if (userId == null)
+        {
+            return DeferRegistration.defer();
+        }
+        final String port = connection.getPort().getName();
+        return _connectionCounters.computeIfAbsent(userId, 
_connectionCounterFactory)
+                .registerConnection(userId, 
collectGroupPrincipals(connection.getSubject()), port);
+    }
+
+    public void deregister(String userId)
+    {
+        if (userId == null)
+        {
+            return;
+        }
+        _connectionCounters.computeIfAbsent(userId, user ->
+                {
+                    throw new IllegalArgumentException(
+                            String.format("The user '%s' has not been 
registered yet", user));
+                }
+        ).deregister();
+    }
+
+    public void maintain()
+    {
+        _connectionCounters.values().forEach(ConnectionCounter::maintain);
+    }
+
+    private Set<String> collectGroupPrincipals(Subject subject)
+    {
+        if (subject == null)

Review comment:
       this would be an error. Exception has to be thrown

##########
File path: 
broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/config/RuleSet.java
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.user.connection.limits.config;
+
+import org.apache.qpid.server.logging.EventLoggerProvider;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.security.UserConnectionLimiter;
+import org.apache.qpid.server.virtualhost.HouseKeepingTask;
+
+import java.security.AccessControlContext;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Objects;
+
+public interface RuleSet extends UserConnectionLimiter
+{
+    void maintain();
+
+    default MaintenanceTask newMaintenanceTask(VirtualHost<?> vhost, 
AccessControlContext context)
+    {
+        return new MaintenanceTask(this, vhost, context);
+    }
+
+    static Builder newBuilder(EventLoggerProvider logger, Duration 
frequencyPeriod)
+    {
+        return new RuleSetImpl.RuleSetBuilderImpl(logger, frequencyPeriod);
+    }
+
+    interface Builder
+    {
+        Builder logAllMessages(boolean all);
+
+        Builder addRule(Rule rule);
+
+        Builder addRules(Collection<? extends Rule> rules);
+
+        RuleSet build();
+    }
+
+    final class MaintenanceTask extends HouseKeepingTask
+    {
+        private final RuleSet _rulesSet;
+
+        MaintenanceTask(RuleSet ruleSet, VirtualHost<?> vhost, 
AccessControlContext context)

Review comment:
       I think we do not need VH here. I suppose a new constructor for 
HouseKeepingTask can be added to avoid dragging VH around. A broker house 
keeping executor could be utilized for Broker ConnectionLimitProviders

##########
File path: 
broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/config/BlockingRule.java
##########
@@ -16,29 +16,43 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+package org.apache.qpid.server.user.connection.limits.config;
 
-package org.apache.qpid.server.security.access.config.connection;
+import org.apache.qpid.server.user.connection.limits.plugins.UclRule;
 
-import org.apache.qpid.server.security.access.config.ObjectProperties;
-import org.apache.qpid.server.transport.AMQPConnection;
-
-public class ConnectionPrincipalLimitRule extends 
ConnectionPrincipalStatisticsRule
+final class BlockingRule extends AbstractRule

Review comment:
       I do not think that BlockingRule is needed. It is redundant. 
NonBlockingRule with a connection limit of 0 or a connection frequency of 0 can 
be used instead of BlockingRule.

##########
File path: 
broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/config/UserConnectionLimits.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.user.connection.limits.config;
+
+import java.util.Optional;
+
+public interface UserConnectionLimits extends 
CombinableLimit<UserConnectionLimits>

Review comment:
       ConnectionLimits can be used as a name

##########
File path: 
broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/config/RuleSetCreator.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.user.connection.limits.config;
+
+import org.apache.qpid.server.logging.EventLoggerProvider;
+import org.apache.qpid.server.security.UserConnectionLimitProvider;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Optional;
+
+public final class RuleSetCreator extends ArrayList<Rule> implements 
UserConnectionLimitProvider

Review comment:
       Please use composition instead of extending ArrayList

##########
File path: 
broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/config/UclRulePredicates.java
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.user.connection.limits.config;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+
+public final class UclRulePredicates

Review comment:
       I would rename it into ConnectionLimitRulePredicates. Though, I would 
prefer to replace bespoke rules format with json or yaml 

##########
File path: 
broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/config/UclFileParser.java
##########
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.user.connection.limits.config;
+
+import org.apache.qpid.server.configuration.IllegalConfigurationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.io.StreamTokenizer;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Paths;
+import java.util.ArrayDeque;
+import java.util.Iterator;
+import java.util.Locale;
+import java.util.Queue;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class UclFileParser

Review comment:
       Why do you need bespoke rule format? Would not it be simpler to use json 
or yaml? That should save some time and energy on implementation and 
maintenance of bespoke format parser.
   IMHO, would prefer to have rule defined like below
   
   [
   {
     "identity": "foo",
     "port": "bar",
     "connection_limit": 100,
     "connection_frequency_limit": "10/h",
   },
   ....
   ]
   

##########
File path: 
broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/config/UserConnectionLimits.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.user.connection.limits.config;
+
+import java.util.Optional;
+
+public interface UserConnectionLimits extends 
CombinableLimit<UserConnectionLimits>
+{
+    Integer getConnectionCountLimit();
+
+    Integer getConnectionFrequencyLimit();
+
+    boolean isUserBlocked();
+
+    @Override
+    default UserConnectionLimits then(UserConnectionLimits other)
+    {
+        if (other != null && isEmpty())

Review comment:
       I am not sure that this check is needed

##########
File path: 
broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/config/UserConnectionLimits.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.user.connection.limits.config;
+
+import java.util.Optional;
+
+public interface UserConnectionLimits extends 
CombinableLimit<UserConnectionLimits>
+{
+    Integer getConnectionCountLimit();
+
+    Integer getConnectionFrequencyLimit();
+
+    boolean isUserBlocked();
+
+    @Override
+    default UserConnectionLimits then(UserConnectionLimits other)
+    {
+        if (other != null && isEmpty())
+        {
+            return other;
+        }
+        return this;
+    }
+
+    @Override
+    default UserConnectionLimits mergeWith(UserConnectionLimits second)
+    {
+        if (second == null || isUserBlocked())
+        {
+            return this;
+        }
+        if (second.isUserBlocked())
+        {
+            return second;
+        }
+        return new Limits(this, second);

Review comment:
       this belong to the Limits class. It does not look clean when interface 
depends on implementation

##########
File path: 
broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/config/UclRulePredicates.java
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.user.connection.limits.config;
+
+import org.slf4j.Logger;

Review comment:
       wrong import order

##########
File path: 
broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/outcome/DeferRegistration.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.user.connection.limits.outcome;
+
+public final class DeferRegistration implements ResourceAvailabilityLogger

Review comment:
       This can be deleted

##########
File path: 
broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/outcome/AllLogger.java
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.user.connection.limits.outcome;
+
+import org.apache.qpid.server.logging.EventLoggerProvider;
+import org.apache.qpid.server.logging.messages.UserResourceLimitMessages;
+
+public final class AllLogger extends RejectLogger

Review comment:
       This can be safely deleted

##########
File path: 
broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/plugins/UclFileBrokerUserConnectionLimitProvider.java
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.user.connection.limits.plugins;
+
+import org.apache.qpid.server.model.BrokerUserConnectionLimitProvider;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.ManagedAttribute;
+import org.apache.qpid.server.model.ManagedObject;
+import org.apache.qpid.server.model.ManagedOperation;
+
+@ManagedObject(category = false,
+        type = 
AbstractUclFileUserConnectionLimitProvider.UCL_FILE_PROVIDER_TYPE)
+public interface UclFileBrokerUserConnectionLimitProvider<C extends 
UclFileBrokerUserConnectionLimitProvider<C>>

Review comment:
       Why do you need to keep rules in a separate file?
   What about keeping rules in a broker and VH configuration only?

##########
File path: 
broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/outcome/EventLogger.java
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.user.connection.limits.outcome;
+
+@FunctionalInterface
+public interface EventLogger

Review comment:
       We have EvenLogger in a different package. I would prefer to use a 
different name. Though, I do not see much value in keeping this interface. 
Let's delete it

##########
File path: 
broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/config/PortConnectionCounter.java
##########
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.user.connection.limits.config;
+
+import org.apache.qpid.server.security.group.GroupPrincipal;
+import org.apache.qpid.server.transport.AMQPConnection;
+import 
org.apache.qpid.server.user.connection.limits.outcome.AcceptRegistration;
+import org.apache.qpid.server.user.connection.limits.outcome.DeferRegistration;
+import 
org.apache.qpid.server.user.connection.limits.outcome.RejectRegistration;
+import 
org.apache.qpid.server.user.connection.limits.outcome.ResourceAvailabilityLogger;
+
+import javax.security.auth.Subject;
+import java.security.Principal;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+final class PortConnectionCounter
+{
+    private final Function<String, ConnectionCounter> 
_connectionCounterFactory;
+
+    private final Map<String, ConnectionCounter> _connectionCounters = new 
ConcurrentHashMap<>();
+
+    PortConnectionCounter(AbstractBuilder<?> builder)
+    {
+        super();
+        _connectionCounterFactory = builder.getConnectionCounterFactory();
+    }
+
+    public ResourceAvailabilityLogger register(AMQPConnection<?> connection, 
String userId)
+    {
+        if (userId == null)
+        {
+            return DeferRegistration.defer();
+        }
+        final String port = connection.getPort().getName();
+        return _connectionCounters.computeIfAbsent(userId, 
_connectionCounterFactory)
+                .registerConnection(userId, 
collectGroupPrincipals(connection.getSubject()), port);
+    }
+
+    public void deregister(String userId)
+    {
+        if (userId == null)
+        {
+            return;
+        }
+        _connectionCounters.computeIfAbsent(userId, user ->
+                {
+                    throw new IllegalArgumentException(
+                            String.format("The user '%s' has not been 
registered yet", user));
+                }
+        ).deregister();
+    }
+
+    public void maintain()
+    {
+        _connectionCounters.values().forEach(ConnectionCounter::maintain);
+    }
+
+    private Set<String> collectGroupPrincipals(Subject subject)
+    {
+        if (subject == null)
+        {
+            return Collections.emptySet();
+        }
+        final Set<String> principalNames = new HashSet<>();
+        for (final Principal principal : 
subject.getPrincipals(GroupPrincipal.class))
+        {
+            principalNames.add(principal.getName());
+        }
+        return principalNames;
+    }
+
+    static Builder newBuilder(Duration frequencyPeriod)
+    {
+        if (frequencyPeriod != null)
+        {
+            return new BuilderImpl(frequencyPeriod);
+        }
+        return new BuilderWithoutFrequencyImpl();
+    }
+
+    public interface Builder
+    {
+        Builder add(Rule rule);
+
+        Builder addAll(Collection<? extends Rule> rule);
+
+        PortConnectionCounter build();
+    }
+
+    private interface ConnectionCounter
+    {
+        ResourceAvailabilityLogger registerConnection(String userId, 
Set<String> groups, String port);
+
+        void deregister();
+
+        void maintain();
+    }
+
+    abstract static class AbstractBuilder<T extends CombinableLimit<T>> 
implements Builder
+    {
+        final Map<String, T> _userLimits = new HashMap<>();
+        T _defaultUserLimits;
+
+        abstract T newLimits(Rule rule);
+
+        abstract Function<String, ConnectionCounter> 
getConnectionCounterFactory();

Review comment:
       It is confusing to use Function as a factory




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@qpid.apache.org
For additional commands, e-mail: dev-h...@qpid.apache.org

Reply via email to