alex-rufous commented on a change in pull request #76: URL: https://github.com/apache/qpid-broker-j/pull/76#discussion_r563219790
########## File path: broker-core/src/main/java/org/apache/qpid/server/model/Broker.java ########## @@ -64,6 +72,7 @@ String QPID_DOCUMENTATION_URL = "qpid.helpURL"; String BROKER_SHUTDOWN_TIMEOUT = "broker.shutdownTimeout"; String BROKER_STATISTICS_REPORING_PERIOD = "broker.statisticsReportingPeriod"; + String CONNECTION_FREQUENCY_PERIOD = "qpid.broker.connectionFrequencyPeriodInMillis"; Review comment: it is a ConnectionLimitProvider specific setting. Thus it needs to be moved into a ConnectionLimitProvider ########## File path: broker-core/src/main/java/org/apache/qpid/server/model/VirtualHostUserConnectionLimitProvider.java ########## @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.model; + +import org.apache.qpid.server.security.UserConnectionLimitProvider; + +@ManagedObject +public interface VirtualHostUserConnectionLimitProvider<X extends VirtualHostUserConnectionLimitProvider<X>> Review comment: I would like to suggest renaming VirtualHostUserConnectionLimitProvider into VirtualHostConnectionLimitProvider as it is slightly shorter. ########## File path: broker-core/src/main/java/org/apache/qpid/server/security/UserConnectionLimiter.java ########## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.security; + +import org.apache.qpid.server.model.Connection; +import org.apache.qpid.server.model.ResourceAvailability; +import org.apache.qpid.server.transport.AMQPConnection; +import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost; + +import java.security.AccessControlContext; + +public interface UserConnectionLimiter +{ + ResourceAvailability register(AMQPConnection<?> connection, String userId); Review comment: Why userId parameter is required? Why connection#getAuthenticatedUser() cannot be called? As per above I am not sure about returning ResourceAvailability. I would prefer throwing special runtime exception ########## File path: broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java ########## @@ -20,33 +20,6 @@ */ package org.apache.qpid.server.protocol.v0_10; -import static org.apache.qpid.server.protocol.v0_10.ServerConnection.State.CLOSED; Review comment: import order is incorrect. please revert ########## File path: broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java ########## @@ -53,6 +53,7 @@ import com.google.common.collect.PeekingIterator; Review comment: As per comments on 0-10 and 0-9 connections, the changes here could be reverted. We just need a simple try-catch block around registerConnection on VH. ########## File path: broker-core/src/main/java/org/apache/qpid/server/model/Broker.java ########## @@ -130,6 +139,10 @@ @ManagedContextDefault(name = BROKER_STATISTICS_REPORING_PERIOD) int DEFAULT_STATISTICS_REPORTING_PERIOD = 0; + @ManagedContextDefault(name = CONNECTION_FREQUENCY_PERIOD, description = "Interval (in milliseconds) to evaluate connection frequency") Review comment: A context variable is specific to ConnectionLimitProvider Thus it needs to be moved into a ConnectionLimitProvider ########## File path: broker-core/src/main/java/org/apache/qpid/server/model/Broker.java ########## @@ -394,6 +407,11 @@ void purgeUser(@Param(name="origin", description="The AuthenticationProvider the Collection<AccessControlProvider<?>> getAccessControlProviders(); + default UserConnectionLimitProvider getUserConnectionLimitProvider() Review comment: Please move this into ConnectionLimitProvider. I am not sure that we need a default implementation. ########## File path: broker-core/src/main/java/org/apache/qpid/server/logging/messages/UserResourceLimit_logmessages.properties ########## @@ -0,0 +1,23 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# User Resource Limit logging message i18n strings. +ACCEPTED = URL-1001 : Accepted : {0} {1} by {2} : {3} +REJECTED = URL-1002 : Rejected : {0} {1} by {2} : {3} +DEFERRED = URL-1003 : Deferred : {0} {1} by {2} Review comment: Why DEFERRED operational log is required? I personally do not see much value in DEFERRED. It looks to me as intermediate state before final decision is made(ACCEPTED or REJECTED). Do you have some specific requirements to log DEFEFRED? I think that we only need REJECTED operational logs. ACCEPTED and DEFERRED could be safely deleted. Actually, if a reject message is reported as part of connection CLOSE, we do not need REJECTED :) Let's delete this and report the reject reason as part of connection close operational logs - CON-1002 : Close[ : {0}] ########## File path: broker-core/src/main/java/org/apache/qpid/server/model/BrokerUserConnectionLimitProvider.java ########## @@ -16,18 +16,12 @@ * specific language governing permissions and limitations * under the License. */ +package org.apache.qpid.server.model; -package org.apache.qpid.server.virtualhost; +import org.apache.qpid.server.security.UserConnectionLimitProvider; -import org.apache.qpid.server.transport.AMQPConnection; - -public interface ConnectionPrincipalStatisticsRegistry +@ManagedObject +public interface BrokerUserConnectionLimitProvider<X extends BrokerUserConnectionLimitProvider<X>> Review comment: I think we can drop User in a class name to make it shorter. Thus the UserConnectionLimitProvider can be changed into ConnectionLimitProvider and BrokerUserConnectionLimitProvider into BrokerConnectionLimitProvider More over using User in a class name could be misleading as connection limits are defined for users and groups. I think that setting limits per user groups will dominate ########## File path: broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java ########## @@ -856,6 +858,17 @@ public boolean isManagementMode() return children; } + @Override + public UserConnectionLimitProvider getUserConnectionLimitProvider() + { + final Collection<BrokerUserConnectionLimitProvider> children = getChildren(BrokerUserConnectionLimitProvider.class); Review comment: Is there any specific reason to limit a number of ConnectionLimitProviders to 1. What if someone would like to split connection limits by organization and manage each organization separately? It seems that managing many small sub-sets of data would be easier than huge aggregated one. Additionally, I would imagine that connection limits might come from different sources. For instance, a predefined sub-set of connection limits can be kept on broker side whilst the rest of limits can be retrieved from external rest service. Thus, it seems a combination of different implementations can be used. ########## File path: broker-core/src/main/java/org/apache/qpid/server/model/BrokerUserConnectionLimitProvider.java ########## @@ -16,18 +16,12 @@ * specific language governing permissions and limitations * under the License. */ +package org.apache.qpid.server.model; -package org.apache.qpid.server.virtualhost; +import org.apache.qpid.server.security.UserConnectionLimitProvider; -import org.apache.qpid.server.transport.AMQPConnection; - -public interface ConnectionPrincipalStatisticsRegistry +@ManagedObject +public interface BrokerUserConnectionLimitProvider<X extends BrokerUserConnectionLimitProvider<X>> + extends ConfiguredObject<X>, UserConnectionLimitProvider { - ConnectionPrincipalStatistics connectionOpened(AMQPConnection<?> connection); - Review comment: IMHO, it would be beneficial to define some common attributes. For example, the one I can think of is a list of current limit settings. Something like `Collection<ConnectionLimit> getConnectionLimits()` ########## File path: broker-core/src/main/java/org/apache/qpid/server/model/ResourceAvailability.java ########## @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.model; + +@FunctionalInterface +public interface ResourceAvailability Review comment: As per comment above, an exception thrown on reaching the limit would work better. IMHO we can delete this interface ########## File path: broker-core/src/main/java/org/apache/qpid/server/model/VirtualHostUserConnectionLimitProvider.java ########## @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.model; + +import org.apache.qpid.server.security.UserConnectionLimitProvider; + +@ManagedObject +public interface VirtualHostUserConnectionLimitProvider<X extends VirtualHostUserConnectionLimitProvider<X>> + extends ConfiguredObject<X>, UserConnectionLimitProvider +{ Review comment: Some common attributes like list of existing connection limits can be defined here. ########## File path: broker-core/src/main/java/org/apache/qpid/server/model/Broker.java ########## @@ -20,6 +20,14 @@ */ package org.apache.qpid.server.model; +import org.apache.qpid.server.configuration.CommonProperties; Review comment: import statements should be sorted with the most fundamental packages first, and grouped with associated packages together and one blank line between groups. Thus the expected order should be * java classes * javax classes * third party dependencies * own project classes ########## File path: broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/config/PortConnectionCounter.java ########## @@ -0,0 +1,424 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.user.connection.limits.config; + +import org.apache.qpid.server.security.group.GroupPrincipal; +import org.apache.qpid.server.transport.AMQPConnection; +import org.apache.qpid.server.user.connection.limits.outcome.AcceptRegistration; +import org.apache.qpid.server.user.connection.limits.outcome.DeferRegistration; +import org.apache.qpid.server.user.connection.limits.outcome.RejectRegistration; +import org.apache.qpid.server.user.connection.limits.outcome.ResourceAvailabilityLogger; + +import javax.security.auth.Subject; +import java.security.Principal; +import java.time.Duration; +import java.time.Instant; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.Map; +import java.util.Objects; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; +import java.util.function.Supplier; + +final class PortConnectionCounter +{ + private final Function<String, ConnectionCounter> _connectionCounterFactory; + + private final Map<String, ConnectionCounter> _connectionCounters = new ConcurrentHashMap<>(); + + PortConnectionCounter(AbstractBuilder<?> builder) + { + super(); + _connectionCounterFactory = builder.getConnectionCounterFactory(); + } + + public ResourceAvailabilityLogger register(AMQPConnection<?> connection, String userId) + { + if (userId == null) + { + return DeferRegistration.defer(); + } + final String port = connection.getPort().getName(); + return _connectionCounters.computeIfAbsent(userId, _connectionCounterFactory) + .registerConnection(userId, collectGroupPrincipals(connection.getSubject()), port); + } + + public void deregister(String userId) + { + if (userId == null) + { + return; + } + _connectionCounters.computeIfAbsent(userId, user -> + { + throw new IllegalArgumentException( + String.format("The user '%s' has not been registered yet", user)); + } + ).deregister(); + } + + public void maintain() + { + _connectionCounters.values().forEach(ConnectionCounter::maintain); + } + + private Set<String> collectGroupPrincipals(Subject subject) + { + if (subject == null) + { + return Collections.emptySet(); + } + final Set<String> principalNames = new HashSet<>(); + for (final Principal principal : subject.getPrincipals(GroupPrincipal.class)) + { + principalNames.add(principal.getName()); + } + return principalNames; + } + + static Builder newBuilder(Duration frequencyPeriod) + { + if (frequencyPeriod != null) + { + return new BuilderImpl(frequencyPeriod); + } + return new BuilderWithoutFrequencyImpl(); + } + + public interface Builder + { + Builder add(Rule rule); + + Builder addAll(Collection<? extends Rule> rule); + + PortConnectionCounter build(); + } + + private interface ConnectionCounter + { + ResourceAvailabilityLogger registerConnection(String userId, Set<String> groups, String port); + + void deregister(); + + void maintain(); + } + + abstract static class AbstractBuilder<T extends CombinableLimit<T>> implements Builder + { + final Map<String, T> _userLimits = new HashMap<>(); + T _defaultUserLimits; + + abstract T newLimits(Rule rule); + + abstract Function<String, ConnectionCounter> getConnectionCounterFactory(); + + AbstractBuilder(T defaultUserLimits) + { + super(); + _defaultUserLimits = defaultUserLimits; + } + + @Override + public Builder add(Rule rule) + { + addImpl(rule); + return this; + } + + @Override + public Builder addAll(Collection<? extends Rule> rule) + { + if (rule != null) + { + rule.forEach(this::addImpl); + } + return this; + } + + @Override + public PortConnectionCounter build() + { + return new PortConnectionCounter(this); + } + + private void addImpl(Rule rule) + { + if (rule == null) + { + return; + } + final T newLimits = newLimits(rule); + if (newLimits.isEmpty()) + { + return; + } + final String id = rule.getIdentity(); + if (UclRulePredicates.isAllUser(id)) + { + _defaultUserLimits = newLimits.mergeWith(_defaultUserLimits); + } + else + { + _userLimits.merge(id, newLimits, CombinableLimit::mergeWith); + } + } + } + + private static final class BuilderImpl extends AbstractBuilder<UserConnectionLimits> + { + private final Duration _frequencyPeriod; + + BuilderImpl(Duration frequencyPeriod) + { + super(UserConnectionLimits.noLimits()); + _frequencyPeriod = Objects.requireNonNull(frequencyPeriod); + } + + @Override + UserConnectionLimits newLimits(Rule rule) + { + return rule; + } + + Function<String, ConnectionCounter> getConnectionCounterFactory() + { + return userId -> new CombinedConnectionCounterImpl( + new LimitCompiler<>(_userLimits, _defaultUserLimits, UserConnectionLimits::noLimits), + _frequencyPeriod); + } + } + + private static final class BuilderWithoutFrequencyImpl extends AbstractBuilder<UserConnectionCountLimit> + { + BuilderWithoutFrequencyImpl() + { + super(UserConnectionCountLimit.noLimits()); + } + + @Override + UserConnectionCountLimit newLimits(Rule rule) + { + return UserConnectionCountLimit.newInstance(rule); + } + + @Override + Function<String, ConnectionCounter> getConnectionCounterFactory() + { + return userId -> new ConnectionCounterImpl( + new LimitCompiler<>(_userLimits, _defaultUserLimits, UserConnectionCountLimit::noLimits)); + } + } + + private static final class CombinedConnectionCounterImpl implements ConnectionCounter + { + private final LimitCompiler<UserConnectionLimits> _limits; + + private final Duration _frequencyPeriod; + private final Queue<Instant> _registrationTime = new LinkedList<>(); + private long _counter = 0L; + + CombinedConnectionCounterImpl(LimitCompiler<UserConnectionLimits> limits, Duration frequencyPeriod) + { + super(); + _limits = limits; + _frequencyPeriod = frequencyPeriod; + } + + @Override + public ResourceAvailabilityLogger registerConnection(String userId, Set<String> groups, String port) + { + final UserConnectionLimits limits = _limits.compileLimits(userId, groups); + if (limits.isUserBlocked()) + { + return RejectRegistration.blockedUser(userId, port); + } + final Integer connectionCountLimit = limits.getConnectionCountLimit(); + final Integer connectionFrequencyLimit = limits.getConnectionFrequencyLimit(); + + if (connectionFrequencyLimit == null) + { + if (connectionCountLimit == null) + { + return noLimits(userId, port); + } + return countLimit(userId, port, connectionCountLimit); + } + if (connectionCountLimit == null) + { + return frequencyLimit(userId, port, connectionFrequencyLimit); + } + return bothLimits(userId, port, connectionCountLimit, connectionFrequencyLimit); + } + + private synchronized ResourceAvailabilityLogger noLimits(String id, String port) Review comment: if we register connection in a configuration thread, the synchronized blocks would be redundant. I think we should be registering connections in the configuration thread ########## File path: broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java ########## @@ -197,9 +197,7 @@ boolean canAcceptNewConnection(final SocketAddress remoteSocketAddress); - int incrementConnectionCount(); Review comment: We need incrementConnectionCount in a method name. If you are removing this method, you need to rename another one where count is incremented. ########## File path: broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java ########## @@ -37,6 +37,8 @@ import org.apache.qpid.server.virtualhost.ConnectionEstablishmentPolicy; import org.apache.qpid.server.virtualhost.LinkRegistryModel; +import javax.security.auth.Subject; Review comment: wrong import order. Though, it looks like this one is unused ########## File path: broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java ########## @@ -542,69 +539,64 @@ public static String getInstalledProtocolsAsString() } @Override - public int incrementConnectionCount() - { - int openConnections = _connectionCount.incrementAndGet(); - _totalConnectionCount.incrementAndGet(); - int maxOpenConnections = getMaxOpenConnections(); - if(maxOpenConnections > 0 - && openConnections > (maxOpenConnections * _connectionWarnCount) / 100 - && _connectionCountWarningGiven.compareAndSet(false, true)) - { - _container.getEventLogger().message(new PortLogSubject(this), - PortMessages.CONNECTION_COUNT_WARN(openConnections, - _connectionWarnCount, - maxOpenConnections)); - } - return openConnections; - } - - @Override - public int decrementConnectionCount() + public long decrementConnectionCount() { - int openConnections = _connectionCount.decrementAndGet(); - int maxOpenConnections = getMaxOpenConnections(); + final long openConnections = _connectionCount.decrementAndGet(); + final long maxOpenConnections = getMaxOpenConnections(); - if(maxOpenConnections > 0 - && openConnections < (maxOpenConnections * square(_connectionWarnCount)) / 10000) + if (maxOpenConnections > 0 + && openConnections < (maxOpenConnections * square(_connectionWarnCount)) / 10000L) { - _connectionCountWarningGiven.compareAndSet(true,false); + _connectionCountWarningGiven.compareAndSet(true, false); } - - return openConnections; } - private static int square(int val) + private static long square(long val) { return val * val; } @Override public boolean canAcceptNewConnection(final SocketAddress remoteSocketAddress) Review comment: the method now have a responsibility to increment connection counter. I think that should be reflected in the name ########## File path: broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java ########## @@ -542,69 +539,64 @@ public static String getInstalledProtocolsAsString() } @Override - public int incrementConnectionCount() - { - int openConnections = _connectionCount.incrementAndGet(); - _totalConnectionCount.incrementAndGet(); - int maxOpenConnections = getMaxOpenConnections(); - if(maxOpenConnections > 0 - && openConnections > (maxOpenConnections * _connectionWarnCount) / 100 - && _connectionCountWarningGiven.compareAndSet(false, true)) - { - _container.getEventLogger().message(new PortLogSubject(this), - PortMessages.CONNECTION_COUNT_WARN(openConnections, - _connectionWarnCount, - maxOpenConnections)); - } - return openConnections; - } - - @Override - public int decrementConnectionCount() + public long decrementConnectionCount() { - int openConnections = _connectionCount.decrementAndGet(); - int maxOpenConnections = getMaxOpenConnections(); + final long openConnections = _connectionCount.decrementAndGet(); + final long maxOpenConnections = getMaxOpenConnections(); - if(maxOpenConnections > 0 - && openConnections < (maxOpenConnections * square(_connectionWarnCount)) / 10000) + if (maxOpenConnections > 0 + && openConnections < (maxOpenConnections * square(_connectionWarnCount)) / 10000L) { - _connectionCountWarningGiven.compareAndSet(true,false); + _connectionCountWarningGiven.compareAndSet(true, false); } - - return openConnections; } - private static int square(int val) + private static long square(long val) { return val * val; } @Override public boolean canAcceptNewConnection(final SocketAddress remoteSocketAddress) { - String addressString = remoteSocketAddress.toString(); + final String addressString = remoteSocketAddress.toString(); if (_closingOrDeleting.get()) { _container.getEventLogger().message(new PortLogSubject(this), - PortMessages.CONNECTION_REJECTED_CLOSED(addressString)); + PortMessages.CONNECTION_REJECTED_CLOSED(addressString)); return false; } - else if (_maxOpenConnections > 0 && _connectionCount.get() >= _maxOpenConnections) + + final long maxOpenConnections = getMaxOpenConnections(); + if (maxOpenConnections > 0) { - _container.getEventLogger().message(new PortLogSubject(this), - PortMessages.CONNECTION_REJECTED_TOO_MANY(addressString, - _maxOpenConnections)); - return false; + final long currentCount = _connectionCount.getAndUpdate(count -> count < maxOpenConnections ? count + 1L : count) + 1L; Review comment: This looks racy to me. Adding +1 outside of getAndUpdate can end-up in unexpected connection reject ########## File path: broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/config/UserConnectionLimits.java ########## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.user.connection.limits.config; + +import java.util.Optional; + +public interface UserConnectionLimits extends CombinableLimit<UserConnectionLimits> +{ + Integer getConnectionCountLimit(); + + Integer getConnectionFrequencyLimit(); + + boolean isUserBlocked(); + + @Override + default UserConnectionLimits then(UserConnectionLimits other) + { + if (other != null && isEmpty()) + { + return other; + } + return this; + } + + @Override + default UserConnectionLimits mergeWith(UserConnectionLimits second) Review comment: I would remove this default implementation ########## File path: broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/outcome/AcceptRegistration.java ########## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.user.connection.limits.outcome; + +import java.time.Duration; + +public final class AcceptRegistration implements ResourceAvailabilityLogger Review comment: I think this is not required ########## File path: broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java ########## @@ -19,32 +19,9 @@ */ package org.apache.qpid.server.model.port; -import java.io.IOException; Review comment: wrong import order ########## File path: broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/config/RuleSetCreator.java ########## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.user.connection.limits.config; + +import org.apache.qpid.server.logging.EventLoggerProvider; +import org.apache.qpid.server.security.UserConnectionLimitProvider; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Optional; + +public final class RuleSetCreator extends ArrayList<Rule> implements UserConnectionLimitProvider +{ + private static final long serialVersionUID = -7; + + public static final long DEFAULT_FREQUENCY_PERIOD = 60L * 1000L; + + private Long _frequencyPeriod = null; + + private boolean _logAllMessages = false; + + public RuleSetCreator() + { + super(new ArrayList<>()); + } + + public void setFrequencyPeriod(long frequencyPeriod) Review comment: I am not sure that _frequencyPeriod is needed when every rule is expressed using its own frequency ########## File path: broker-core/src/main/java/org/apache/qpid/server/security/UserConnectionLimitProvider.java ########## @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.security; + +import org.apache.qpid.server.logging.EventLoggerProvider; + +@FunctionalInterface +public interface UserConnectionLimitProvider Review comment: Using ConnectionLimitProvider as class name should be sufficient ########## File path: broker-core/src/main/java/org/apache/qpid/server/security/UserConnectionLimitProvider.java ########## @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.security; + +import org.apache.qpid.server.logging.EventLoggerProvider; + +@FunctionalInterface +public interface UserConnectionLimitProvider +{ + UserConnectionLimiter getLimiter(EventLoggerProvider eventLoggerProvider); Review comment: Why we need parameter EventLoggerProvider eventLoggerProvider? Possibly, method getLimiter would be better to rename into getConnectionLimiter. Why (User)ConnectionLimiter is required? What about implementing all ConnectionLimiter directly in ConnectionLimitProvider? ########## File path: broker-core/src/main/java/org/apache/qpid/server/security/UserConnectionLimiter.java ########## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.security; + +import org.apache.qpid.server.model.Connection; +import org.apache.qpid.server.model.ResourceAvailability; +import org.apache.qpid.server.transport.AMQPConnection; +import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost; + +import java.security.AccessControlContext; + +public interface UserConnectionLimiter Review comment: The name could be ConnectionLimiter. I am not sure we need a special ConnectionLimiter. A ConnectionLimitProvider can implement all its responsibilities. Though...I am not sure ########## File path: broker-core/src/main/java/org/apache/qpid/server/security/UserConnectionLimiter.java ########## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.security; + +import org.apache.qpid.server.model.Connection; +import org.apache.qpid.server.model.ResourceAvailability; +import org.apache.qpid.server.transport.AMQPConnection; +import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost; + +import java.security.AccessControlContext; + +public interface UserConnectionLimiter +{ + ResourceAvailability register(AMQPConnection<?> connection, String userId); + + void deregister(Connection<?> connection, String userId); Review comment: userId seems redundant ########## File path: broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java ########## @@ -48,8 +50,17 @@ boolean registerConnection(AMQPConnection<?> connection, final ConnectionEstablishmentPolicy connectionEstablishmentPolicy); + void deregisterConnection(AMQPConnection<?> connection); + default ResourceAvailability requestConnectionSlot(AMQPConnection<?> connection, String userId) Review comment: Why methods requestConnectionSlot and freeConnectionSlot are part of VH? Why they cannot be moved into ConnectionLimitProvider? It feels that they belong to ConnectionLimitProvider domain ########## File path: broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/outcome/RejectLogger.java ########## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.user.connection.limits.outcome; + +import org.apache.qpid.server.logging.EventLoggerProvider; +import org.apache.qpid.server.logging.messages.UserResourceLimitMessages; + +import java.util.Objects; + +public class RejectLogger implements EventLogger Review comment: Let's delete this class ########## File path: broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/outcome/RejectRegistration.java ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.user.connection.limits.outcome; + +import java.time.Duration; + +public final class RejectRegistration implements ResourceAvailabilityLogger Review comment: I think the error messages can be moved into corresponding exception class. Let's delete this class ########## File path: broker-core/src/main/java/org/apache/qpid/server/security/UserConnectionLimiter.java ########## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.security; + +import org.apache.qpid.server.model.Connection; +import org.apache.qpid.server.model.ResourceAvailability; +import org.apache.qpid.server.transport.AMQPConnection; +import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost; + +import java.security.AccessControlContext; + +public interface UserConnectionLimiter +{ + ResourceAvailability register(AMQPConnection<?> connection, String userId); + + void deregister(Connection<?> connection, String userId); + + void scheduleTask(QueueManagingVirtualHost<?> virtualHost, AccessControlContext controlContext); Review comment: I am wondering why we need to pass QueueManagingVirtualHost. I think that we need to pass an Executor implementation here. Also, why Broker housekeeping executor cannot be used for BrokerConnectionLimitProviders? ########## File path: broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/plugins/UclFileBrokerUserConnectionLimitProvider.java ########## @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.user.connection.limits.plugins; + +import org.apache.qpid.server.model.BrokerUserConnectionLimitProvider; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.ManagedAttribute; +import org.apache.qpid.server.model.ManagedObject; +import org.apache.qpid.server.model.ManagedOperation; + +@ManagedObject(category = false, + type = AbstractUclFileUserConnectionLimitProvider.UCL_FILE_PROVIDER_TYPE) +public interface UclFileBrokerUserConnectionLimitProvider<C extends UclFileBrokerUserConnectionLimitProvider<C>> + extends BrokerUserConnectionLimitProvider<C> Review comment: As per earlier comments we need an attribute to list existing rules ########## File path: broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java ########## @@ -352,6 +357,39 @@ else if(prov.getState() == State.ACTIVE) } } + void updateUserConnectionLimiter() + { + if (!((SystemConfig) _broker.getParent()).isManagementMode()) + { + final Collection<VirtualHostUserConnectionLimitProvider> children = + getChildren(VirtualHostUserConnectionLimitProvider.class); + LOGGER.debug("Updating user connection limiter"); + + if (children.size() > 1) + { + throw new IllegalConfigurationException(String.format( Review comment: Why a number of connection limit provider is limited to 1? It does not look right ########## File path: broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java ########## @@ -2691,6 +2727,18 @@ public String getArguments() }); } + @Override + public ResourceAvailability requestConnectionSlot(AMQPConnection<?> connection, String userId) Review comment: This should go into ConnectionLimitProvider ########## File path: broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java ########## @@ -2691,6 +2727,18 @@ public String getArguments() }); } + @Override + public ResourceAvailability requestConnectionSlot(AMQPConnection<?> connection, String userId) + { + return _userConnectionLimiter.register(connection, userId); + } + + @Override + public void freeConnectionSlot(AMQPConnection<?> connection, String userIds) Review comment: This should go into ConnectionLimitProvider ########## File path: broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java ########## @@ -320,6 +323,8 @@ public AbstractVirtualHost(final Map<String, Object> attributes, VirtualHostNode _fileSystemSpaceCheckerJobContext = getSystemTaskControllerContext("FileSystemSpaceChecker["+getName()+"]", _principal); _fileSystemSpaceChecker = new FileSystemSpaceChecker(); + _userConnectionLimiter = UserConnectionLimiter.noLimits(); + _userConnectionLimitChangeListener = new UserConnectionLimitChangeListener(this); Review comment: This is a leak. You cannot pass "this" before object finishes construction ########## File path: broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java ########## @@ -269,8 +270,22 @@ public void connectionOpen(ServerConnection sconn, ConnectionOpen open) try { + final AMQPConnection_0_10 amqpConnection = sconn.getAmqpConnection(); + final Principal principal = sconn.getAuthorizedPrincipal(); + if (principal != null) + { + amqpConnection.freeConnectionSlots(); + final String userId = principal.getName(); + final ResourceAvailability resource = addressSpace.requestConnectionSlot(amqpConnection, userId); Review comment: the slot registration functionality could be moved into VH#registerConnection. There the code can iterate through the existing VH and Broker ConnectionLimitProviders. Here, we can slimly try catching any RejectException thrown from ConnectionLimitProviders and close connection accordingly ########## File path: broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java ########## @@ -412,13 +410,24 @@ public void closed() } finally { - NamedAddressSpace addressSpace = getAddressSpace(); Review comment: These changes can be reverted after using delete task for slot free-up. ########## File path: broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java ########## @@ -269,8 +270,22 @@ public void connectionOpen(ServerConnection sconn, ConnectionOpen open) try { + final AMQPConnection_0_10 amqpConnection = sconn.getAmqpConnection(); + final Principal principal = sconn.getAuthorizedPrincipal(); + if (principal != null) Review comment: if principal is null it is a security issue. The principal should exist . If it is null, a ConnectionScopeException should be thrown at least ########## File path: broker-core/src/main/java/org/apache/qpid/server/virtualhost/UserConnectionLimitChangeListener.java ########## @@ -0,0 +1,78 @@ +package org.apache.qpid.server.virtualhost; + +import org.apache.qpid.server.model.AbstractConfigurationChangeListener; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.model.VirtualHostUserConnectionLimitProvider; + +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; + +final class UserConnectionLimitChangeListener extends AbstractConfigurationChangeListener Review comment: After moving functionality for connection registration/deregistration into ConnectionLimitProvider this will no longer be needed ########## File path: broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/outcome/ResourceAvailabilityLogger.java ########## @@ -0,0 +1,26 @@ +/* Review comment: I think we can delete this interface ########## File path: broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java ########## @@ -269,8 +270,22 @@ public void connectionOpen(ServerConnection sconn, ConnectionOpen open) try { + final AMQPConnection_0_10 amqpConnection = sconn.getAmqpConnection(); + final Principal principal = sconn.getAuthorizedPrincipal(); + if (principal != null) + { + amqpConnection.freeConnectionSlots(); Review comment: why this call is required? ########## File path: broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/plugins/AbstractRuleBasedUserConnectionLimitProvider.java ########## @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.user.connection.limits.plugins; + +import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.Content; +import org.apache.qpid.server.model.CustomRestHeaders; +import org.apache.qpid.server.model.ManagedAttributeField; +import org.apache.qpid.server.model.RestContentHeader; +import org.apache.qpid.server.user.connection.limits.config.Rule; +import org.apache.qpid.server.user.connection.limits.config.RuleSetCreator; +import org.apache.qpid.server.user.connection.limits.config.UclFileParser; +import org.apache.qpid.server.user.connection.limits.config.UclRulePredicates.Property; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; + +abstract class AbstractRuleBasedUserConnectionLimitProvider<C extends AbstractRuleBasedUserConnectionLimitProvider<C>> + extends AbstractUserConnectionLimitProvider<C> +{ + private static final String FREQUENCY_PERIOD = "frequencyPeriod"; + private static final String RULES = "rules"; + + static final String RULE_BASED_TYPE = "RuleBased"; + + @ManagedAttributeField + private Long _frequencyPeriod; + + @ManagedAttributeField + private List<UclRule> _rules = new ArrayList<>(); + + public AbstractRuleBasedUserConnectionLimitProvider(ConfiguredObject<?> parent, Map<String, Object> attributes) + { + super(parent, attributes); + } + + public List<UclRule> getRules() + { + return Collections.unmodifiableList(_rules); + } + + public Long getFrequencyPeriod() + { + return Optional.ofNullable(_frequencyPeriod).orElseGet( + () -> getContextValue(Long.class, Broker.CONNECTION_FREQUENCY_PERIOD)); + } + + public Content extractRules() + { + return new StringContent(getName(), getFrequencyPeriod(), getRules()); + } + + public void loadFromFile(String path) + { + final RuleSetCreator ruleList = UclFileParser.parse(path); + final List<UclRule> uclRules = new ArrayList<>(); + for (final Rule rule : ruleList) + { + uclRules.add(new UclRuleImpl(rule)); + } + final Map<String, Object> attrs = new HashMap<>(); + attrs.put(RULES, uclRules); + if (ruleList.isFrequencyPeriodSet()) + { + attrs.put(FREQUENCY_PERIOD, ruleList.getFrequencyPeriod()); + } + setAttributes(attrs); + } + + public void clearRules() + { + final Map<String, Object> attrs = new HashMap<>(); + attrs.put(RULES, new ArrayList<UclRule>()); + setAttributes(attrs); + } + + @Override + protected void postSetAttributes(final Set<String> actualUpdatedAttributes) + { + super.postSetAttributes(actualUpdatedAttributes); + forceNewUserConnectionLimitProvider(); + } + + @Override + protected RuleSetCreator newUserConnectionLimitProvider() + { + final RuleSetCreator creator = new RuleSetCreator(); + creator.setFrequencyPeriod(getFrequencyPeriod()); + for (final UclRule rule : getRules()) + { + creator.add(Rule.newInstance(rule)); + } + return creator; + } + + private static final class StringContent implements Content, CustomRestHeaders Review comment: I would prefer to use JSON format ########## File path: broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java ########## @@ -149,7 +148,8 @@ private long _maxUncommittedInMemorySize; private final Map<ServerTransaction, Set<Ticker>> _transactionTickers = new ConcurrentHashMap<>(); - private volatile ConnectionPrincipalStatistics _connectionPrincipalStatistics; + + private final Queue<ConnectionSlot> _registeredConnectionSlots = new ConcurrentLinkedQueue<>(); Review comment: I am not sure that this is needed ########## File path: broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/config/Limits.java ########## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.user.connection.limits.config; + +final class Limits implements UserConnectionLimits +{ + private final Integer connectionCount; + + private final Integer connectionFrequency; + + Limits(final UserConnectionLimits first, final UserConnectionLimits second) + { + super(); + this.connectionCount = min(first.getConnectionCountLimit(), second.getConnectionCountLimit()); + this.connectionFrequency = min(first.getConnectionFrequencyLimit(), second.getConnectionFrequencyLimit()); + } + + @Override + public boolean isEmpty() + { + return connectionCount == null && connectionFrequency == null; + } + + @Override + public Integer getConnectionCountLimit() + { + return connectionCount; + } + + @Override + public Integer getConnectionFrequencyLimit() Review comment: This could be a string expressed like **number of connections** per period of time, for example 1/s, 10/h, I believe that integer could be confusing ########## File path: broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/config/Limits.java ########## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.user.connection.limits.config; + +final class Limits implements UserConnectionLimits Review comment: This class deserves a better name. It represents combined connection limits ########## File path: broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java ########## @@ -1161,28 +1161,35 @@ public void incrementTransactionBeginCounter() } @Override - public void registered(final ConnectionPrincipalStatistics connectionPrincipalStatistics) + public void freeConnectionSlots() { - _connectionPrincipalStatistics = connectionPrincipalStatistics; + ConnectionSlot slot; + while ((slot = _registeredConnectionSlots.poll()) != null) + { + slot.free(); + } } @Override - public int getAuthenticatedPrincipalConnectionCount() + public void addConnectionSlot(NamedAddressSpace addressSpace, String userId) { - if (_connectionPrincipalStatistics == null) - { - return 0; - } - return _connectionPrincipalStatistics.getConnectionCount(); + _registeredConnectionSlots.add(new ConnectionSlot(addressSpace, userId)); } - @Override - public int getAuthenticatedPrincipalConnectionFrequency() + private final class ConnectionSlot Review comment: As per comments above, a delete task Action<?> can do the clean-up ########## File path: broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java ########## @@ -1161,28 +1161,35 @@ public void incrementTransactionBeginCounter() } @Override - public void registered(final ConnectionPrincipalStatistics connectionPrincipalStatistics) + public void freeConnectionSlots() Review comment: This should be part of DeleteTask. Pls see org.apache.qpid.server.util.Deletable#addDeleteTask ########## File path: broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java ########## @@ -20,34 +20,9 @@ */ package org.apache.qpid.server.transport; -import java.net.InetSocketAddress; Review comment: wrong import order, please revert ########## File path: broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/config/Limits.java ########## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.user.connection.limits.config; + +final class Limits implements UserConnectionLimits +{ + private final Integer connectionCount; + + private final Integer connectionFrequency; + + Limits(final UserConnectionLimits first, final UserConnectionLimits second) + { + super(); + this.connectionCount = min(first.getConnectionCountLimit(), second.getConnectionCountLimit()); + this.connectionFrequency = min(first.getConnectionFrequencyLimit(), second.getConnectionFrequencyLimit()); + } + + @Override + public boolean isEmpty() + { + return connectionCount == null && connectionFrequency == null; + } + + @Override + public Integer getConnectionCountLimit() + { + return connectionCount; + } + + @Override + public Integer getConnectionFrequencyLimit() + { + return connectionFrequency; + } + + @Override + public boolean isUserBlocked() Review comment: Redundant method. It needs to be removed ########## File path: broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/config/Limits.java ########## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.user.connection.limits.config; + +final class Limits implements UserConnectionLimits +{ + private final Integer connectionCount; + + private final Integer connectionFrequency; + + Limits(final UserConnectionLimits first, final UserConnectionLimits second) + { + super(); + this.connectionCount = min(first.getConnectionCountLimit(), second.getConnectionCountLimit()); + this.connectionFrequency = min(first.getConnectionFrequencyLimit(), second.getConnectionFrequencyLimit()); + } + + @Override + public boolean isEmpty() + { + return connectionCount == null && connectionFrequency == null; + } + + @Override + public Integer getConnectionCountLimit() + { + return connectionCount; + } + + @Override + public Integer getConnectionFrequencyLimit() + { + return connectionFrequency; + } + + @Override + public boolean isUserBlocked() + { + return false; + } + + static Integer min(final Integer first, final Integer second) + { + if (first == null) Review comment: if any of parameter is null, it is a mistake. An exception has to be thrown ########## File path: broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java ########## @@ -20,26 +20,25 @@ */ package org.apache.qpid.server.transport; -import java.net.SocketAddress; Review comment: wrong import order. please revert the change ########## File path: broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java ########## @@ -29,6 +29,7 @@ import java.nio.BufferUnderflowException; Review comment: As per comments for 0-10 protocol connection, the changes here can be reverted. we just need to add a try-catch block around a code registering connection on VH ########## File path: broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java ########## @@ -148,10 +147,7 @@ void registerTransactionTickers(ServerTransaction serverTransaction, @Override AmqpPort<?> getPort(); - void registered(ConnectionPrincipalStatistics connectionPrincipalStatistics); - - int getAuthenticatedPrincipalConnectionCount(); - - int getAuthenticatedPrincipalConnectionFrequency(); + void addConnectionSlot(NamedAddressSpace addressSpace, String userId); Review comment: I would like to remove addConnectionSlot and freeConnectionSlots. IMHO, a DeleteTask for every identity should be registered on the connection to free the slot ########## File path: broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/config/NonBlockingRule.java ########## @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.user.connection.limits.config; + +import org.apache.qpid.server.user.connection.limits.plugins.UclRule; + +final class NonBlockingRule extends AbstractRule +{ + private final Integer connectionCount; + + private final Integer connectionFrequency; + + NonBlockingRule(UclRule rule) + { + this(rule.getPort(), rule.getIdentity(), rule.getCountLimit(), rule.getFrequencyLimit()); + } + + NonBlockingRule(String port, String identity, Integer connectionCount, Integer connectionFrequency) + { + super(port, identity); + this.connectionCount = connectionCount; + this.connectionFrequency = connectionFrequency; + } + + @Override + public boolean isEmpty() + { + return connectionCount == null && connectionFrequency == null; Review comment: if both are null values, an exception should be reported from constructor ########## File path: broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/config/PortConnectionCounter.java ########## @@ -0,0 +1,424 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.user.connection.limits.config; + +import org.apache.qpid.server.security.group.GroupPrincipal; +import org.apache.qpid.server.transport.AMQPConnection; +import org.apache.qpid.server.user.connection.limits.outcome.AcceptRegistration; +import org.apache.qpid.server.user.connection.limits.outcome.DeferRegistration; +import org.apache.qpid.server.user.connection.limits.outcome.RejectRegistration; +import org.apache.qpid.server.user.connection.limits.outcome.ResourceAvailabilityLogger; + +import javax.security.auth.Subject; +import java.security.Principal; +import java.time.Duration; +import java.time.Instant; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.Map; +import java.util.Objects; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; +import java.util.function.Supplier; + +final class PortConnectionCounter +{ + private final Function<String, ConnectionCounter> _connectionCounterFactory; + + private final Map<String, ConnectionCounter> _connectionCounters = new ConcurrentHashMap<>(); + + PortConnectionCounter(AbstractBuilder<?> builder) Review comment: It is hard to understand why builder is passed into constructor. Why connection factory cannot be passed directly? ########## File path: broker-plugins/management-http/src/main/java/resources/js/qpid/management/userconnectionlimitprovider/rulebased/show.js ########## @@ -0,0 +1,197 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +define(["dojo/_base/connect", + "dojo/_base/event", + "dojo/parser", + "dojo/query", + "dojo/dom-construct", + "dijit/registry", + "dojox/html/entities", + "dojox/grid/EnhancedGrid", Review comment: EnhancedGrid should not be used. Dgrid should be used instead ########## File path: broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/config/PortConnectionCounter.java ########## @@ -0,0 +1,424 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.user.connection.limits.config; + +import org.apache.qpid.server.security.group.GroupPrincipal; +import org.apache.qpid.server.transport.AMQPConnection; +import org.apache.qpid.server.user.connection.limits.outcome.AcceptRegistration; +import org.apache.qpid.server.user.connection.limits.outcome.DeferRegistration; +import org.apache.qpid.server.user.connection.limits.outcome.RejectRegistration; +import org.apache.qpid.server.user.connection.limits.outcome.ResourceAvailabilityLogger; + +import javax.security.auth.Subject; +import java.security.Principal; +import java.time.Duration; +import java.time.Instant; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.Map; +import java.util.Objects; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; +import java.util.function.Supplier; + +final class PortConnectionCounter +{ + private final Function<String, ConnectionCounter> _connectionCounterFactory; + + private final Map<String, ConnectionCounter> _connectionCounters = new ConcurrentHashMap<>(); + + PortConnectionCounter(AbstractBuilder<?> builder) + { + super(); + _connectionCounterFactory = builder.getConnectionCounterFactory(); + } + + public ResourceAvailabilityLogger register(AMQPConnection<?> connection, String userId) + { + if (userId == null) + { + return DeferRegistration.defer(); + } + final String port = connection.getPort().getName(); + return _connectionCounters.computeIfAbsent(userId, _connectionCounterFactory) + .registerConnection(userId, collectGroupPrincipals(connection.getSubject()), port); + } + + public void deregister(String userId) + { + if (userId == null) Review comment: if userId is null, it is a mistake. An exception should be thrown ########## File path: broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/config/NonBlockingRule.java ########## @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.user.connection.limits.config; + +import org.apache.qpid.server.user.connection.limits.plugins.UclRule; + +final class NonBlockingRule extends AbstractRule +{ + private final Integer connectionCount; + + private final Integer connectionFrequency; + + NonBlockingRule(UclRule rule) + { + this(rule.getPort(), rule.getIdentity(), rule.getCountLimit(), rule.getFrequencyLimit()); + } + + NonBlockingRule(String port, String identity, Integer connectionCount, Integer connectionFrequency) + { + super(port, identity); + this.connectionCount = connectionCount; + this.connectionFrequency = connectionFrequency; + } + + @Override + public boolean isEmpty() + { + return connectionCount == null && connectionFrequency == null; + } + + @Override + public boolean isUserBlocked() + { + return false; + } + + @Override + public Integer getConnectionCountLimit() + { + return connectionCount; + } + + @Override + public Integer getConnectionFrequencyLimit() Review comment: I am not sure about Integer type. We need a special frequency type ########## File path: broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/config/PortConnectionCounter.java ########## @@ -0,0 +1,424 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.user.connection.limits.config; + +import org.apache.qpid.server.security.group.GroupPrincipal; +import org.apache.qpid.server.transport.AMQPConnection; +import org.apache.qpid.server.user.connection.limits.outcome.AcceptRegistration; +import org.apache.qpid.server.user.connection.limits.outcome.DeferRegistration; +import org.apache.qpid.server.user.connection.limits.outcome.RejectRegistration; +import org.apache.qpid.server.user.connection.limits.outcome.ResourceAvailabilityLogger; + +import javax.security.auth.Subject; +import java.security.Principal; +import java.time.Duration; +import java.time.Instant; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.Map; +import java.util.Objects; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; +import java.util.function.Supplier; + +final class PortConnectionCounter +{ + private final Function<String, ConnectionCounter> _connectionCounterFactory; Review comment: This is confusion. Factory cannot be Function. Factory class usually has a create method. Using apply for that would be misleading. Please introduce a special interface if required ########## File path: broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java ########## @@ -950,57 +960,67 @@ public void receiveConnectionOpen(AMQShortString virtualHostName, virtualHostStr = virtualHostStr.substring(1); } - NamedAddressSpace addressSpace = ((AmqpPort)getPort()).getAddressSpace(virtualHostStr); + final NamedAddressSpace addressSpace = ((AmqpPort)getPort()).getAddressSpace(virtualHostStr); if (addressSpace == null) { sendConnectionClose(ErrorCodes.NOT_FOUND, "Unknown virtual host: '" + virtualHostName + "'", 0); + return; + } + // Check virtualhost access + if (!addressSpace.isActive()) + { + final String redirectHost = addressSpace.getRedirectHost(getPort()); + if (redirectHost != null) + { + sendConnectionClose(0, new AMQFrame(0, new ConnectionRedirectBody(getProtocolVersion(), AMQShortString.valueOf(redirectHost), null))); + } + else + { + sendConnectionClose(ErrorCodes.CONNECTION_FORCED, + "Virtual host '" + addressSpace.getName() + "' is not active", 0); + } + return; } - else + + try { - // Check virtualhost access - if (!addressSpace.isActive()) + final Principal authenticatedPrincipal = getAuthorizedPrincipal(); + if (authenticatedPrincipal != null) { - String redirectHost = addressSpace.getRedirectHost(getPort()); - if(redirectHost != null) + freeConnectionSlots(); + final String userId = authenticatedPrincipal.getName(); + final ResourceAvailability resource = addressSpace.requestConnectionSlot(this, userId); + if (resource != null && !resource.isAvailable()) { - sendConnectionClose(0, new AMQFrame(0, new ConnectionRedirectBody(getProtocolVersion(), AMQShortString.valueOf(redirectHost), null))); - } - else - { - sendConnectionClose(ErrorCodes.CONNECTION_FORCED, - "Virtual host '" + addressSpace.getName() + "' is not active", 0); + sendConnectionClose(ErrorCodes.RESOURCE_ERROR, resource.getCause(), 0); + return; } + addConnectionSlot(addressSpace, userId); + } + + addressSpace.registerConnection(this, new NoopConnectionEstablishmentPolicy()); Review comment: Here we can add try-catch block to catch a reject runtime exception. That should simplify the implementation ########## File path: broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/config/Limits.java ########## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.user.connection.limits.config; + +final class Limits implements UserConnectionLimits +{ + private final Integer connectionCount; + + private final Integer connectionFrequency; + + Limits(final UserConnectionLimits first, final UserConnectionLimits second) + { + super(); + this.connectionCount = min(first.getConnectionCountLimit(), second.getConnectionCountLimit()); + this.connectionFrequency = min(first.getConnectionFrequencyLimit(), second.getConnectionFrequencyLimit()); + } + + @Override + public boolean isEmpty() + { + return connectionCount == null && connectionFrequency == null; Review comment: if neither is set it is a user mistake. we should not allow that. A validation should detect that and throw a ConfigurationException. Method isEmpty can be deleted ########## File path: broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java ########## @@ -229,6 +233,7 @@ public Result getResult(final Subject subject) } }, Result.DEFER); + private volatile UserConnectionLimiter _userConnectionLimiter; Review comment: This should live on ConnectionLimitProvider. VH are already overloaded with responsibilities. IMHO, it would be cleaner to do connection limiting on the ConnectionLimitProvider ########## File path: broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/config/PortConnectionCounter.java ########## @@ -0,0 +1,424 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.user.connection.limits.config; + +import org.apache.qpid.server.security.group.GroupPrincipal; +import org.apache.qpid.server.transport.AMQPConnection; +import org.apache.qpid.server.user.connection.limits.outcome.AcceptRegistration; +import org.apache.qpid.server.user.connection.limits.outcome.DeferRegistration; +import org.apache.qpid.server.user.connection.limits.outcome.RejectRegistration; +import org.apache.qpid.server.user.connection.limits.outcome.ResourceAvailabilityLogger; + +import javax.security.auth.Subject; +import java.security.Principal; +import java.time.Duration; +import java.time.Instant; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.Map; +import java.util.Objects; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; +import java.util.function.Supplier; + +final class PortConnectionCounter +{ + private final Function<String, ConnectionCounter> _connectionCounterFactory; + + private final Map<String, ConnectionCounter> _connectionCounters = new ConcurrentHashMap<>(); + + PortConnectionCounter(AbstractBuilder<?> builder) + { + super(); + _connectionCounterFactory = builder.getConnectionCounterFactory(); + } + + public ResourceAvailabilityLogger register(AMQPConnection<?> connection, String userId) + { + if (userId == null) Review comment: if userId is null, it is a mistake. An exception should be thrown ########## File path: broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/config/PortConnectionCounter.java ########## @@ -0,0 +1,424 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.user.connection.limits.config; + +import org.apache.qpid.server.security.group.GroupPrincipal; +import org.apache.qpid.server.transport.AMQPConnection; +import org.apache.qpid.server.user.connection.limits.outcome.AcceptRegistration; +import org.apache.qpid.server.user.connection.limits.outcome.DeferRegistration; +import org.apache.qpid.server.user.connection.limits.outcome.RejectRegistration; +import org.apache.qpid.server.user.connection.limits.outcome.ResourceAvailabilityLogger; + +import javax.security.auth.Subject; +import java.security.Principal; +import java.time.Duration; +import java.time.Instant; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.Map; +import java.util.Objects; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; +import java.util.function.Supplier; + +final class PortConnectionCounter +{ + private final Function<String, ConnectionCounter> _connectionCounterFactory; + + private final Map<String, ConnectionCounter> _connectionCounters = new ConcurrentHashMap<>(); + + PortConnectionCounter(AbstractBuilder<?> builder) + { + super(); + _connectionCounterFactory = builder.getConnectionCounterFactory(); + } + + public ResourceAvailabilityLogger register(AMQPConnection<?> connection, String userId) + { + if (userId == null) + { + return DeferRegistration.defer(); + } + final String port = connection.getPort().getName(); + return _connectionCounters.computeIfAbsent(userId, _connectionCounterFactory) Review comment: this is very confusing... userId is passed as a parameter and groups are taken from connection. Why authenticated principal cannot be taken from connection? ########## File path: broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/config/PortConnectionCounter.java ########## @@ -0,0 +1,424 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.user.connection.limits.config; + +import org.apache.qpid.server.security.group.GroupPrincipal; +import org.apache.qpid.server.transport.AMQPConnection; +import org.apache.qpid.server.user.connection.limits.outcome.AcceptRegistration; +import org.apache.qpid.server.user.connection.limits.outcome.DeferRegistration; +import org.apache.qpid.server.user.connection.limits.outcome.RejectRegistration; +import org.apache.qpid.server.user.connection.limits.outcome.ResourceAvailabilityLogger; + +import javax.security.auth.Subject; +import java.security.Principal; +import java.time.Duration; +import java.time.Instant; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.Map; +import java.util.Objects; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; +import java.util.function.Supplier; + +final class PortConnectionCounter +{ + private final Function<String, ConnectionCounter> _connectionCounterFactory; + + private final Map<String, ConnectionCounter> _connectionCounters = new ConcurrentHashMap<>(); + + PortConnectionCounter(AbstractBuilder<?> builder) + { + super(); + _connectionCounterFactory = builder.getConnectionCounterFactory(); + } + + public ResourceAvailabilityLogger register(AMQPConnection<?> connection, String userId) + { + if (userId == null) + { + return DeferRegistration.defer(); + } + final String port = connection.getPort().getName(); + return _connectionCounters.computeIfAbsent(userId, _connectionCounterFactory) + .registerConnection(userId, collectGroupPrincipals(connection.getSubject()), port); + } + + public void deregister(String userId) + { + if (userId == null) + { + return; + } + _connectionCounters.computeIfAbsent(userId, user -> + { + throw new IllegalArgumentException( + String.format("The user '%s' has not been registered yet", user)); + } + ).deregister(); + } + + public void maintain() + { + _connectionCounters.values().forEach(ConnectionCounter::maintain); + } + + private Set<String> collectGroupPrincipals(Subject subject) + { + if (subject == null) + { + return Collections.emptySet(); + } + final Set<String> principalNames = new HashSet<>(); + for (final Principal principal : subject.getPrincipals(GroupPrincipal.class)) + { + principalNames.add(principal.getName()); + } + return principalNames; + } + + static Builder newBuilder(Duration frequencyPeriod) + { + if (frequencyPeriod != null) + { + return new BuilderImpl(frequencyPeriod); + } + return new BuilderWithoutFrequencyImpl(); + } + + public interface Builder + { + Builder add(Rule rule); + + Builder addAll(Collection<? extends Rule> rule); + + PortConnectionCounter build(); + } + + private interface ConnectionCounter + { + ResourceAvailabilityLogger registerConnection(String userId, Set<String> groups, String port); Review comment: ResourceAvailabilityLogger does not sound right. ########## File path: broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/config/UserConnectionLimits.java ########## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.user.connection.limits.config; + +import java.util.Optional; + +public interface UserConnectionLimits extends CombinableLimit<UserConnectionLimits> +{ + Integer getConnectionCountLimit(); + + Integer getConnectionFrequencyLimit(); + + boolean isUserBlocked(); Review comment: this is redundant. connection limit of 0 or frequency of 0 should mean the same ########## File path: broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/config/PortConnectionCounter.java ########## @@ -0,0 +1,424 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.user.connection.limits.config; + +import org.apache.qpid.server.security.group.GroupPrincipal; +import org.apache.qpid.server.transport.AMQPConnection; +import org.apache.qpid.server.user.connection.limits.outcome.AcceptRegistration; +import org.apache.qpid.server.user.connection.limits.outcome.DeferRegistration; +import org.apache.qpid.server.user.connection.limits.outcome.RejectRegistration; +import org.apache.qpid.server.user.connection.limits.outcome.ResourceAvailabilityLogger; + +import javax.security.auth.Subject; +import java.security.Principal; +import java.time.Duration; +import java.time.Instant; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.Map; +import java.util.Objects; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; +import java.util.function.Supplier; + +final class PortConnectionCounter +{ + private final Function<String, ConnectionCounter> _connectionCounterFactory; + + private final Map<String, ConnectionCounter> _connectionCounters = new ConcurrentHashMap<>(); + + PortConnectionCounter(AbstractBuilder<?> builder) + { + super(); + _connectionCounterFactory = builder.getConnectionCounterFactory(); + } + + public ResourceAvailabilityLogger register(AMQPConnection<?> connection, String userId) + { + if (userId == null) + { + return DeferRegistration.defer(); + } + final String port = connection.getPort().getName(); + return _connectionCounters.computeIfAbsent(userId, _connectionCounterFactory) + .registerConnection(userId, collectGroupPrincipals(connection.getSubject()), port); + } + + public void deregister(String userId) + { + if (userId == null) + { + return; + } + _connectionCounters.computeIfAbsent(userId, user -> + { + throw new IllegalArgumentException( + String.format("The user '%s' has not been registered yet", user)); + } + ).deregister(); + } + + public void maintain() + { + _connectionCounters.values().forEach(ConnectionCounter::maintain); + } + + private Set<String> collectGroupPrincipals(Subject subject) + { + if (subject == null) + { + return Collections.emptySet(); + } + final Set<String> principalNames = new HashSet<>(); + for (final Principal principal : subject.getPrincipals(GroupPrincipal.class)) + { + principalNames.add(principal.getName()); + } + return principalNames; + } + + static Builder newBuilder(Duration frequencyPeriod) + { + if (frequencyPeriod != null) + { + return new BuilderImpl(frequencyPeriod); + } + return new BuilderWithoutFrequencyImpl(); + } + + public interface Builder + { + Builder add(Rule rule); + + Builder addAll(Collection<? extends Rule> rule); + + PortConnectionCounter build(); + } + + private interface ConnectionCounter + { + ResourceAvailabilityLogger registerConnection(String userId, Set<String> groups, String port); + + void deregister(); + + void maintain(); + } + + abstract static class AbstractBuilder<T extends CombinableLimit<T>> implements Builder Review comment: I got lost in builders. I will review this later ########## File path: broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/config/PortConnectionCounter.java ########## @@ -0,0 +1,424 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.user.connection.limits.config; + +import org.apache.qpid.server.security.group.GroupPrincipal; +import org.apache.qpid.server.transport.AMQPConnection; +import org.apache.qpid.server.user.connection.limits.outcome.AcceptRegistration; +import org.apache.qpid.server.user.connection.limits.outcome.DeferRegistration; +import org.apache.qpid.server.user.connection.limits.outcome.RejectRegistration; +import org.apache.qpid.server.user.connection.limits.outcome.ResourceAvailabilityLogger; + +import javax.security.auth.Subject; +import java.security.Principal; +import java.time.Duration; +import java.time.Instant; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.Map; +import java.util.Objects; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; +import java.util.function.Supplier; + +final class PortConnectionCounter +{ + private final Function<String, ConnectionCounter> _connectionCounterFactory; + + private final Map<String, ConnectionCounter> _connectionCounters = new ConcurrentHashMap<>(); + + PortConnectionCounter(AbstractBuilder<?> builder) + { + super(); + _connectionCounterFactory = builder.getConnectionCounterFactory(); + } + + public ResourceAvailabilityLogger register(AMQPConnection<?> connection, String userId) + { + if (userId == null) + { + return DeferRegistration.defer(); + } + final String port = connection.getPort().getName(); + return _connectionCounters.computeIfAbsent(userId, _connectionCounterFactory) + .registerConnection(userId, collectGroupPrincipals(connection.getSubject()), port); + } + + public void deregister(String userId) + { + if (userId == null) + { + return; + } + _connectionCounters.computeIfAbsent(userId, user -> + { + throw new IllegalArgumentException( Review comment: this might crash the broker....which is possibly the right thing to do in this case. Though if multiple providers would be allowed, only some of them would have user registered. We need to look into this from multiple provider perspective.That would depend how delete task are registered on a connection ########## File path: broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/config/UserConnectionLimits.java ########## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.user.connection.limits.config; + +import java.util.Optional; + +public interface UserConnectionLimits extends CombinableLimit<UserConnectionLimits> +{ + Integer getConnectionCountLimit(); + + Integer getConnectionFrequencyLimit(); Review comment: I think this should be either String or special Frequency type. The Qpid users should be allowed to set frequency using user friendly terminology, something like "1/s", "2/h", "10/d". I think that using units like seconds, hours and days should be sufficient ########## File path: broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/config/PortConnectionCounter.java ########## @@ -0,0 +1,424 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.user.connection.limits.config; + +import org.apache.qpid.server.security.group.GroupPrincipal; +import org.apache.qpid.server.transport.AMQPConnection; +import org.apache.qpid.server.user.connection.limits.outcome.AcceptRegistration; +import org.apache.qpid.server.user.connection.limits.outcome.DeferRegistration; +import org.apache.qpid.server.user.connection.limits.outcome.RejectRegistration; +import org.apache.qpid.server.user.connection.limits.outcome.ResourceAvailabilityLogger; + +import javax.security.auth.Subject; +import java.security.Principal; +import java.time.Duration; +import java.time.Instant; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.Map; +import java.util.Objects; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; +import java.util.function.Supplier; + +final class PortConnectionCounter +{ + private final Function<String, ConnectionCounter> _connectionCounterFactory; + + private final Map<String, ConnectionCounter> _connectionCounters = new ConcurrentHashMap<>(); + + PortConnectionCounter(AbstractBuilder<?> builder) + { + super(); + _connectionCounterFactory = builder.getConnectionCounterFactory(); + } + + public ResourceAvailabilityLogger register(AMQPConnection<?> connection, String userId) + { + if (userId == null) + { + return DeferRegistration.defer(); + } + final String port = connection.getPort().getName(); + return _connectionCounters.computeIfAbsent(userId, _connectionCounterFactory) + .registerConnection(userId, collectGroupPrincipals(connection.getSubject()), port); + } + + public void deregister(String userId) + { + if (userId == null) + { + return; + } + _connectionCounters.computeIfAbsent(userId, user -> + { + throw new IllegalArgumentException( + String.format("The user '%s' has not been registered yet", user)); + } + ).deregister(); + } + + public void maintain() + { + _connectionCounters.values().forEach(ConnectionCounter::maintain); + } + + private Set<String> collectGroupPrincipals(Subject subject) + { + if (subject == null) + { + return Collections.emptySet(); + } + final Set<String> principalNames = new HashSet<>(); Review comment: I could be wrong but there should some utility methods to filter right principals. Please investigate ########## File path: broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/config/PortConnectionCounter.java ########## @@ -0,0 +1,424 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.user.connection.limits.config; + +import org.apache.qpid.server.security.group.GroupPrincipal; +import org.apache.qpid.server.transport.AMQPConnection; +import org.apache.qpid.server.user.connection.limits.outcome.AcceptRegistration; +import org.apache.qpid.server.user.connection.limits.outcome.DeferRegistration; +import org.apache.qpid.server.user.connection.limits.outcome.RejectRegistration; +import org.apache.qpid.server.user.connection.limits.outcome.ResourceAvailabilityLogger; + +import javax.security.auth.Subject; +import java.security.Principal; +import java.time.Duration; +import java.time.Instant; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.Map; +import java.util.Objects; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; +import java.util.function.Supplier; + +final class PortConnectionCounter +{ + private final Function<String, ConnectionCounter> _connectionCounterFactory; + + private final Map<String, ConnectionCounter> _connectionCounters = new ConcurrentHashMap<>(); + + PortConnectionCounter(AbstractBuilder<?> builder) + { + super(); + _connectionCounterFactory = builder.getConnectionCounterFactory(); + } + + public ResourceAvailabilityLogger register(AMQPConnection<?> connection, String userId) + { + if (userId == null) + { + return DeferRegistration.defer(); + } + final String port = connection.getPort().getName(); + return _connectionCounters.computeIfAbsent(userId, _connectionCounterFactory) + .registerConnection(userId, collectGroupPrincipals(connection.getSubject()), port); Review comment: I think it would read better if registerConnection is called for every group and authenticated principal ########## File path: broker-core/src/main/java/org/apache/qpid/server/security/UserConnectionLimiter.java ########## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.security; + +import org.apache.qpid.server.model.Connection; Review comment: wrong import order ########## File path: broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/config/UserConnectionCountLimit.java ########## @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.user.connection.limits.config; + +import java.util.Optional; + +@FunctionalInterface +public interface UserConnectionCountLimit extends CombinableLimit<UserConnectionCountLimit> Review comment: I am not sure that this class is really needed. I think having a single implementation would work better. At least it would make code simpler to follow. Also, at the moment there are a lot of code duplication between different implementations ########## File path: broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/config/PortConnectionCounter.java ########## @@ -0,0 +1,424 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.user.connection.limits.config; + +import org.apache.qpid.server.security.group.GroupPrincipal; +import org.apache.qpid.server.transport.AMQPConnection; +import org.apache.qpid.server.user.connection.limits.outcome.AcceptRegistration; +import org.apache.qpid.server.user.connection.limits.outcome.DeferRegistration; +import org.apache.qpid.server.user.connection.limits.outcome.RejectRegistration; +import org.apache.qpid.server.user.connection.limits.outcome.ResourceAvailabilityLogger; + +import javax.security.auth.Subject; +import java.security.Principal; +import java.time.Duration; +import java.time.Instant; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.Map; +import java.util.Objects; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; +import java.util.function.Supplier; + +final class PortConnectionCounter +{ + private final Function<String, ConnectionCounter> _connectionCounterFactory; + + private final Map<String, ConnectionCounter> _connectionCounters = new ConcurrentHashMap<>(); + + PortConnectionCounter(AbstractBuilder<?> builder) + { + super(); + _connectionCounterFactory = builder.getConnectionCounterFactory(); + } + + public ResourceAvailabilityLogger register(AMQPConnection<?> connection, String userId) + { + if (userId == null) + { + return DeferRegistration.defer(); + } + final String port = connection.getPort().getName(); + return _connectionCounters.computeIfAbsent(userId, _connectionCounterFactory) + .registerConnection(userId, collectGroupPrincipals(connection.getSubject()), port); + } + + public void deregister(String userId) + { + if (userId == null) + { + return; + } + _connectionCounters.computeIfAbsent(userId, user -> + { + throw new IllegalArgumentException( + String.format("The user '%s' has not been registered yet", user)); + } + ).deregister(); + } + + public void maintain() + { + _connectionCounters.values().forEach(ConnectionCounter::maintain); + } + + private Set<String> collectGroupPrincipals(Subject subject) + { + if (subject == null) Review comment: this would be an error. Exception has to be thrown ########## File path: broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/config/RuleSet.java ########## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.user.connection.limits.config; + +import org.apache.qpid.server.logging.EventLoggerProvider; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.security.UserConnectionLimiter; +import org.apache.qpid.server.virtualhost.HouseKeepingTask; + +import java.security.AccessControlContext; +import java.time.Duration; +import java.util.Collection; +import java.util.Objects; + +public interface RuleSet extends UserConnectionLimiter +{ + void maintain(); + + default MaintenanceTask newMaintenanceTask(VirtualHost<?> vhost, AccessControlContext context) + { + return new MaintenanceTask(this, vhost, context); + } + + static Builder newBuilder(EventLoggerProvider logger, Duration frequencyPeriod) + { + return new RuleSetImpl.RuleSetBuilderImpl(logger, frequencyPeriod); + } + + interface Builder + { + Builder logAllMessages(boolean all); + + Builder addRule(Rule rule); + + Builder addRules(Collection<? extends Rule> rules); + + RuleSet build(); + } + + final class MaintenanceTask extends HouseKeepingTask + { + private final RuleSet _rulesSet; + + MaintenanceTask(RuleSet ruleSet, VirtualHost<?> vhost, AccessControlContext context) Review comment: I think we do not need VH here. I suppose a new constructor for HouseKeepingTask can be added to avoid dragging VH around. A broker house keeping executor could be utilized for Broker ConnectionLimitProviders ########## File path: broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/config/BlockingRule.java ########## @@ -16,29 +16,43 @@ * specific language governing permissions and limitations * under the License. */ +package org.apache.qpid.server.user.connection.limits.config; -package org.apache.qpid.server.security.access.config.connection; +import org.apache.qpid.server.user.connection.limits.plugins.UclRule; -import org.apache.qpid.server.security.access.config.ObjectProperties; -import org.apache.qpid.server.transport.AMQPConnection; - -public class ConnectionPrincipalLimitRule extends ConnectionPrincipalStatisticsRule +final class BlockingRule extends AbstractRule Review comment: I do not think that BlockingRule is needed. It is redundant. NonBlockingRule with a connection limit of 0 or a connection frequency of 0 can be used instead of BlockingRule. ########## File path: broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/config/UserConnectionLimits.java ########## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.user.connection.limits.config; + +import java.util.Optional; + +public interface UserConnectionLimits extends CombinableLimit<UserConnectionLimits> Review comment: ConnectionLimits can be used as a name ########## File path: broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/config/RuleSetCreator.java ########## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.user.connection.limits.config; + +import org.apache.qpid.server.logging.EventLoggerProvider; +import org.apache.qpid.server.security.UserConnectionLimitProvider; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Optional; + +public final class RuleSetCreator extends ArrayList<Rule> implements UserConnectionLimitProvider Review comment: Please use composition instead of extending ArrayList ########## File path: broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/config/UclRulePredicates.java ########## @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.user.connection.limits.config; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.EnumSet; +import java.util.HashMap; +import java.util.Locale; +import java.util.Map; +import java.util.Set; + +public final class UclRulePredicates Review comment: I would rename it into ConnectionLimitRulePredicates. Though, I would prefer to replace bespoke rules format with json or yaml ########## File path: broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/config/UclFileParser.java ########## @@ -0,0 +1,338 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.user.connection.limits.config; + +import org.apache.qpid.server.configuration.IllegalConfigurationException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.Reader; +import java.io.StreamTokenizer; +import java.net.MalformedURLException; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.nio.file.Paths; +import java.util.ArrayDeque; +import java.util.Iterator; +import java.util.Locale; +import java.util.Queue; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class UclFileParser Review comment: Why do you need bespoke rule format? Would not it be simpler to use json or yaml? That should save some time and energy on implementation and maintenance of bespoke format parser. IMHO, would prefer to have rule defined like below [ { "identity": "foo", "port": "bar", "connection_limit": 100, "connection_frequency_limit": "10/h", }, .... ] ########## File path: broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/config/UserConnectionLimits.java ########## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.user.connection.limits.config; + +import java.util.Optional; + +public interface UserConnectionLimits extends CombinableLimit<UserConnectionLimits> +{ + Integer getConnectionCountLimit(); + + Integer getConnectionFrequencyLimit(); + + boolean isUserBlocked(); + + @Override + default UserConnectionLimits then(UserConnectionLimits other) + { + if (other != null && isEmpty()) Review comment: I am not sure that this check is needed ########## File path: broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/config/UserConnectionLimits.java ########## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.user.connection.limits.config; + +import java.util.Optional; + +public interface UserConnectionLimits extends CombinableLimit<UserConnectionLimits> +{ + Integer getConnectionCountLimit(); + + Integer getConnectionFrequencyLimit(); + + boolean isUserBlocked(); + + @Override + default UserConnectionLimits then(UserConnectionLimits other) + { + if (other != null && isEmpty()) + { + return other; + } + return this; + } + + @Override + default UserConnectionLimits mergeWith(UserConnectionLimits second) + { + if (second == null || isUserBlocked()) + { + return this; + } + if (second.isUserBlocked()) + { + return second; + } + return new Limits(this, second); Review comment: this belong to the Limits class. It does not look clean when interface depends on implementation ########## File path: broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/config/UclRulePredicates.java ########## @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.user.connection.limits.config; + +import org.slf4j.Logger; Review comment: wrong import order ########## File path: broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/outcome/DeferRegistration.java ########## @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.user.connection.limits.outcome; + +public final class DeferRegistration implements ResourceAvailabilityLogger Review comment: This can be deleted ########## File path: broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/outcome/AllLogger.java ########## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.user.connection.limits.outcome; + +import org.apache.qpid.server.logging.EventLoggerProvider; +import org.apache.qpid.server.logging.messages.UserResourceLimitMessages; + +public final class AllLogger extends RejectLogger Review comment: This can be safely deleted ########## File path: broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/plugins/UclFileBrokerUserConnectionLimitProvider.java ########## @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.user.connection.limits.plugins; + +import org.apache.qpid.server.model.BrokerUserConnectionLimitProvider; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.ManagedAttribute; +import org.apache.qpid.server.model.ManagedObject; +import org.apache.qpid.server.model.ManagedOperation; + +@ManagedObject(category = false, + type = AbstractUclFileUserConnectionLimitProvider.UCL_FILE_PROVIDER_TYPE) +public interface UclFileBrokerUserConnectionLimitProvider<C extends UclFileBrokerUserConnectionLimitProvider<C>> Review comment: Why do you need to keep rules in a separate file? What about keeping rules in a broker and VH configuration only? ########## File path: broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/outcome/EventLogger.java ########## @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.user.connection.limits.outcome; + +@FunctionalInterface +public interface EventLogger Review comment: We have EvenLogger in a different package. I would prefer to use a different name. Though, I do not see much value in keeping this interface. Let's delete it ########## File path: broker-plugins/user-connection-limits/src/main/java/org/apache/qpid/server/user/connection/limits/config/PortConnectionCounter.java ########## @@ -0,0 +1,424 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.user.connection.limits.config; + +import org.apache.qpid.server.security.group.GroupPrincipal; +import org.apache.qpid.server.transport.AMQPConnection; +import org.apache.qpid.server.user.connection.limits.outcome.AcceptRegistration; +import org.apache.qpid.server.user.connection.limits.outcome.DeferRegistration; +import org.apache.qpid.server.user.connection.limits.outcome.RejectRegistration; +import org.apache.qpid.server.user.connection.limits.outcome.ResourceAvailabilityLogger; + +import javax.security.auth.Subject; +import java.security.Principal; +import java.time.Duration; +import java.time.Instant; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.Map; +import java.util.Objects; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; +import java.util.function.Supplier; + +final class PortConnectionCounter +{ + private final Function<String, ConnectionCounter> _connectionCounterFactory; + + private final Map<String, ConnectionCounter> _connectionCounters = new ConcurrentHashMap<>(); + + PortConnectionCounter(AbstractBuilder<?> builder) + { + super(); + _connectionCounterFactory = builder.getConnectionCounterFactory(); + } + + public ResourceAvailabilityLogger register(AMQPConnection<?> connection, String userId) + { + if (userId == null) + { + return DeferRegistration.defer(); + } + final String port = connection.getPort().getName(); + return _connectionCounters.computeIfAbsent(userId, _connectionCounterFactory) + .registerConnection(userId, collectGroupPrincipals(connection.getSubject()), port); + } + + public void deregister(String userId) + { + if (userId == null) + { + return; + } + _connectionCounters.computeIfAbsent(userId, user -> + { + throw new IllegalArgumentException( + String.format("The user '%s' has not been registered yet", user)); + } + ).deregister(); + } + + public void maintain() + { + _connectionCounters.values().forEach(ConnectionCounter::maintain); + } + + private Set<String> collectGroupPrincipals(Subject subject) + { + if (subject == null) + { + return Collections.emptySet(); + } + final Set<String> principalNames = new HashSet<>(); + for (final Principal principal : subject.getPrincipals(GroupPrincipal.class)) + { + principalNames.add(principal.getName()); + } + return principalNames; + } + + static Builder newBuilder(Duration frequencyPeriod) + { + if (frequencyPeriod != null) + { + return new BuilderImpl(frequencyPeriod); + } + return new BuilderWithoutFrequencyImpl(); + } + + public interface Builder + { + Builder add(Rule rule); + + Builder addAll(Collection<? extends Rule> rule); + + PortConnectionCounter build(); + } + + private interface ConnectionCounter + { + ResourceAvailabilityLogger registerConnection(String userId, Set<String> groups, String port); + + void deregister(); + + void maintain(); + } + + abstract static class AbstractBuilder<T extends CombinableLimit<T>> implements Builder + { + final Map<String, T> _userLimits = new HashMap<>(); + T _defaultUserLimits; + + abstract T newLimits(Rule rule); + + abstract Function<String, ConnectionCounter> getConnectionCounterFactory(); Review comment: It is confusing to use Function as a factory ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@qpid.apache.org For additional commands, e-mail: dev-h...@qpid.apache.org