This is an automated email from the ASF dual-hosted git repository.
vavrtom pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
The following commit(s) were added to refs/heads/main by this push:
new 28cbf3b QPID-8558: [Broker-J] Enhancement of sole connection
enforcement policy evaluation (#106)
28cbf3b is described below
commit 28cbf3b2ba40e3a895c723b3af84b07810cf43c9
Author: Marek Laca <[email protected]>
AuthorDate: Mon Nov 15 09:18:01 2021 +0100
QPID-8558: [Broker-J] Enhancement of sole connection enforcement policy
evaluation (#106)
---
.../qpid/server/model/NamedAddressSpace.java | 5 +-
.../limit/ConnectionLimiterService.java} | 9 +-
.../AbstractNonConnectionAcceptingVirtualHost.java | 4 +-
.../server/virtualhost/AbstractVirtualHost.java | 110 +-----
.../NoopConnectionEstablishmentPolicy.java | 33 --
.../virtualhost/VirtualHostConnectionLimiter.java | 21 ++
.../apache/qpid/server/model/VirtualHostTest.java | 15 +-
.../server/protocol/v0_10/ServerConnection.java | 3 +-
.../protocol/v0_8/AMQPConnection_0_8Impl.java | 3 +-
.../server/protocol/v1_0/AMQPConnection_1_0.java | 3 +
.../protocol/v1_0/AMQPConnection_1_0Impl.java | 143 ++++----
.../soleconn/SoleConnectionDetectionPolicy.java | 49 +--
.../soleconn/SoleConnectionEnforcementPolicy.java | 48 +--
.../SoleConnectionEnforcementPolicyException.java | 52 +++
.../StrongConnectionEstablishmentLimiter.java | 218 +++++++++++
.../protocol/v1_0/ProtocolEngine_1_0_0Test.java | 15 +-
.../SoleConnectionDetectionPolicyTest.java | 62 ++++
.../SoleConnectionEnforcementPolicyTest.java | 60 ++++
.../StrongConnectionEstablishmentLimiterTest.java | 398 +++++++++++++++++++++
.../management/amqp/ManagementAddressSpace.java | 5 +-
20 files changed, 945 insertions(+), 311 deletions(-)
diff --git
a/broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java
b/broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java
index 2792a31..09341c5 100644
---
a/broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java
+++
b/broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java
@@ -34,7 +34,6 @@ import org.apache.qpid.server.protocol.LinkModel;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.txn.DtxRegistry;
-import org.apache.qpid.server.virtualhost.ConnectionEstablishmentPolicy;
import org.apache.qpid.server.virtualhost.LinkRegistryModel;
public interface NamedAddressSpace extends Named
@@ -46,11 +45,9 @@ public interface NamedAddressSpace extends Named
MessageDestination getAttainedMessageDestination(String name, boolean
mayCreate);
- boolean registerConnection(AMQPConnection<?> connection,
- final ConnectionEstablishmentPolicy
connectionEstablishmentPolicy);
+ void registerConnection(AMQPConnection<?> connection);
void deregisterConnection(AMQPConnection<?> connection);
-
String getRedirectHost(AmqpPort<?> port);
Principal getPrincipal();
diff --git
a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ConnectionEstablishmentPolicy.java
b/broker-core/src/main/java/org/apache/qpid/server/security/limit/ConnectionLimiterService.java
similarity index 74%
rename from
broker-core/src/main/java/org/apache/qpid/server/virtualhost/ConnectionEstablishmentPolicy.java
rename to
broker-core/src/main/java/org/apache/qpid/server/security/limit/ConnectionLimiterService.java
index 004e3db..d526589 100644
---
a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ConnectionEstablishmentPolicy.java
+++
b/broker-core/src/main/java/org/apache/qpid/server/security/limit/ConnectionLimiterService.java
@@ -15,14 +15,11 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
*/
+package org.apache.qpid.server.security.limit;
-package org.apache.qpid.server.virtualhost;
-
-import org.apache.qpid.server.transport.AMQPConnection;
+import org.apache.qpid.server.plugin.Pluggable;
-public interface ConnectionEstablishmentPolicy
+public interface ConnectionLimiterService extends ConnectionLimiter, Pluggable
{
- boolean mayEstablishNewConnection(Iterable<AMQPConnection<?>>
existingConnections, AMQPConnection<?> newConnection);
}
diff --git
a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractNonConnectionAcceptingVirtualHost.java
b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractNonConnectionAcceptingVirtualHost.java
index 6d75ad1..b32dd49 100644
---
a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractNonConnectionAcceptingVirtualHost.java
+++
b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractNonConnectionAcceptingVirtualHost.java
@@ -72,11 +72,9 @@ public abstract class
AbstractNonConnectionAcceptingVirtualHost<X extends Abstra
}
@Override
- public boolean registerConnection(final AMQPConnection<?> connection,
- final ConnectionEstablishmentPolicy
connectionEstablishmentPolicy)
+ public void registerConnection(final AMQPConnection<?> connection)
{
throwUnsupported();
- return false;
}
@Override
diff --git
a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index 09c4fc1..cd1c26c 100644
---
a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++
b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -2646,70 +2646,22 @@ public abstract class AbstractVirtualHost<X extends
AbstractVirtualHost<X>> exte
}
@Override
- public boolean registerConnection(final AMQPConnection<?> connection,
- final ConnectionEstablishmentPolicy
connectionEstablishmentPolicy)
+ public void registerConnection(final AMQPConnection<?> connection)
{
+ if (!_acceptsConnections.get())
+ {
+ throw new VirtualHostUnavailableException(String.format(
+ "VirtualHost '%s' not accepting connections",
+ getName()));
+ }
_connectionLimiter.register(connection);
- return doSync(registerConnectionAsync(connection,
connectionEstablishmentPolicy));
- }
-
- public ListenableFuture<Boolean> registerConnectionAsync(final
AMQPConnection<?> connection,
- final
ConnectionEstablishmentPolicy connectionEstablishmentPolicy)
- {
- return doOnConfigThread(new Task<ListenableFuture<Boolean>,
RuntimeException>()
+ _connections.add(connection);
+ _totalConnectionCount.incrementAndGet();
+ if (_blocked.get())
{
- @Override
- public ListenableFuture<Boolean> execute()
- {
- if (_acceptsConnections.get())
- {
- if
(connectionEstablishmentPolicy.mayEstablishNewConnection(_connections,
connection))
- {
- _connections.add(connection);
- _totalConnectionCount.incrementAndGet();
-
- if (_blocked.get())
- {
- connection.block();
- }
-
- connection.pushScheduler(_networkConnectionScheduler);
- return Futures.immediateFuture(true);
- }
- else
- {
- return Futures.immediateFuture(false);
- }
- }
- else
- {
- final VirtualHostUnavailableException exception =
- new VirtualHostUnavailableException(String.format(
- "VirtualHost '%s' not accepting
connections",
- getName()));
- return Futures.immediateFailedFuture(exception);
- }
- }
-
- @Override
- public String getObject()
- {
- return AbstractVirtualHost.this.toString();
- }
-
- @Override
- public String getAction()
- {
- return "register connection";
- }
-
- @Override
- public String getArguments()
- {
- return String.valueOf(connection);
- }
- });
-
+ connection.block();
+ }
+ connection.pushScheduler(_networkConnectionScheduler);
}
@Override
@@ -2721,43 +2673,11 @@ public abstract class AbstractVirtualHost<X extends
AbstractVirtualHost<X>> exte
}
finally
{
- doSync(deregisterConnectionAsync(connection));
-
+ connection.popScheduler();
+ _connections.remove(connection);
}
}
- public ListenableFuture<Void> deregisterConnectionAsync(final
AMQPConnection<?> connection)
- {
- return doOnConfigThread(new Task<ListenableFuture<Void>,
RuntimeException>()
- {
- @Override
- public ListenableFuture<Void> execute()
- {
- connection.popScheduler();
- _connections.remove(connection);
- return Futures.immediateFuture(null);
- }
-
- @Override
- public String getObject()
- {
- return AbstractVirtualHost.this.toString();
- }
-
- @Override
- public String getAction()
- {
- return "deregister connection";
- }
-
- @Override
- public String getArguments()
- {
- return String.valueOf(connection);
- }
- });
- }
-
@StateTransition(currentState = {State.UNINITIALIZED, State.ERRORED},
desiredState = State.ACTIVE)
private ListenableFuture<Void> onActivate()
{
diff --git
a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/NoopConnectionEstablishmentPolicy.java
b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/NoopConnectionEstablishmentPolicy.java
deleted file mode 100644
index c1c7b44..0000000
---
a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/NoopConnectionEstablishmentPolicy.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.server.virtualhost;
-
-import org.apache.qpid.server.transport.AMQPConnection;
-
-public class NoopConnectionEstablishmentPolicy implements
ConnectionEstablishmentPolicy
-{
- @Override
- public boolean mayEstablishNewConnection(final Iterable<AMQPConnection<?>>
existingConnections,
- final AMQPConnection<?>
newConnection)
- {
- return true;
- }
-}
diff --git
a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConnectionLimiter.java
b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConnectionLimiter.java
index 4188e1a..b24447e 100644
---
a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConnectionLimiter.java
+++
b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConnectionLimiter.java
@@ -18,10 +18,12 @@
*/
package org.apache.qpid.server.virtualhost;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,10 +36,12 @@ import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.SystemConfig;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.VirtualHostConnectionLimitProvider;
+import org.apache.qpid.server.plugin.QpidServiceLoader;
import org.apache.qpid.server.security.limit.CachedConnectionLimiterImpl;
import org.apache.qpid.server.security.limit.ConnectionLimitProvider;
import org.apache.qpid.server.security.limit.ConnectionLimiter;
import org.apache.qpid.server.security.limit.ConnectionLimiter.CachedLimiter;
+import org.apache.qpid.server.security.limit.ConnectionLimiterService;
final class VirtualHostConnectionLimiter extends CachedConnectionLimiterImpl
implements CachedLimiter
{
@@ -49,6 +53,8 @@ final class VirtualHostConnectionLimiter extends
CachedConnectionLimiterImpl imp
private final Map<ConnectionLimitProvider<?>, ConnectionLimiter>
_connectionLimitProviders = new ConcurrentHashMap<>();
+ private final List<ConnectionLimiterService> _serviceLimiters = new
CopyOnWriteArrayList<>();
+
private final ChangeListener _virtualHostChangeListener;
private final ChangeListener _brokerChangeListener;
@@ -70,6 +76,13 @@ final class VirtualHostConnectionLimiter extends
CachedConnectionLimiterImpl imp
.forEach(child ->
child.addChangeListener(ProviderChangeListener.virtualHostChangeListener(this)));
_broker.getChildren(BrokerConnectionLimitProvider.class)
.forEach(child ->
child.addChangeListener(ProviderChangeListener.brokerChangeListener(this)));
+
+ final QpidServiceLoader serviceLoader = new QpidServiceLoader();
+ for (final ConnectionLimiterService service :
serviceLoader.instancesOf(ConnectionLimiterService.class))
+ {
+ LOGGER.debug("New connection limiter service found: {}",
service.getType());
+ _serviceLimiters.add(service);
+ }
}
public void activate()
@@ -90,6 +103,8 @@ final class VirtualHostConnectionLimiter extends
CachedConnectionLimiterImpl imp
_broker.getChildren(BrokerConnectionLimitProvider.class)
.forEach(child ->
child.removeChangeListener(brokerChangeListener));
+ _serviceLimiters.clear();
+
swapLimiter(ConnectionLimiter.noLimits());
}
@@ -140,6 +155,12 @@ final class VirtualHostConnectionLimiter extends
CachedConnectionLimiterImpl imp
limiter = ConnectionLimiter.blockEveryone();
}
}
+
+ LOGGER.debug("Updating service based connection limiters");
+ for (final ConnectionLimiterService service : _serviceLimiters)
+ {
+ limiter = limiter.append(service);
+ }
return limiter;
}
diff --git
a/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
b/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
index a5f2e21..350fda5 100644
---
a/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
+++
b/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
@@ -78,7 +78,6 @@ import
org.apache.qpid.server.store.preferences.PreferenceStore;
import org.apache.qpid.server.store.preferences.PreferenceStoreUpdater;
import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.virtualhost.NodeAutoCreationPolicy;
-import org.apache.qpid.server.virtualhost.NoopConnectionEstablishmentPolicy;
import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
import org.apache.qpid.server.virtualhost.TestMemoryVirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
@@ -338,7 +337,7 @@ public class VirtualHostTest extends UnitTestBase
virtualHost.getConnectionCount());
AMQPConnection modelConnection = getMockConnection();
- virtualHost.registerConnection(modelConnection, new
NoopConnectionEstablishmentPolicy());
+ virtualHost.registerConnection(modelConnection);
assertEquals("Unexpected number of connections after connection
registered",
(long) 1,
@@ -367,7 +366,7 @@ public class VirtualHostTest extends UnitTestBase
virtualHost.getConnectionCount());
AMQPConnection modelConnection = getMockConnection();
- virtualHost.registerConnection(modelConnection, new
NoopConnectionEstablishmentPolicy());
+ virtualHost.registerConnection(modelConnection);
assertEquals("Unexpected number of connections after connection
registered",
(long) 1,
@@ -499,7 +498,7 @@ public class VirtualHostTest extends UnitTestBase
{
VirtualHost<?> host = createVirtualHost(getTestName());
AMQPConnection connection = getMockConnection();
- host.registerConnection(connection, new
NoopConnectionEstablishmentPolicy());
+ host.registerConnection(connection);
((EventListener) host).event(Event.PERSISTENT_MESSAGE_SIZE_OVERFULL);
verify(connection).block();
}
@@ -578,7 +577,7 @@ public class VirtualHostTest extends UnitTestBase
AMQPConnection<?> connection = getMockConnection();
assertEquals("unexpected number of connections before test", (long) 0,
vhost.getConnectionCount());
- vhost.registerConnection(connection, new
NoopConnectionEstablishmentPolicy());
+ vhost.registerConnection(connection);
assertEquals("unexpected number of connections after
registerConnection",
(long) 1,
vhost.getConnectionCount());
@@ -591,7 +590,7 @@ public class VirtualHostTest extends UnitTestBase
QueueManagingVirtualHost<?> vhost = createVirtualHost("sdf");
AMQPConnection<?> connection = getMockConnection();
- vhost.registerConnection(connection, new
NoopConnectionEstablishmentPolicy());
+ vhost.registerConnection(connection);
assertEquals("unexpected number of connections after
registerConnection",
(long) 1,
vhost.getConnectionCount());
@@ -610,7 +609,7 @@ public class VirtualHostTest extends UnitTestBase
((AbstractConfiguredObject<?>)vhost).stop();
try
{
- vhost.registerConnection(connection, new
NoopConnectionEstablishmentPolicy());
+ vhost.registerConnection(connection);
fail("exception not thrown");
}
catch (VirtualHostUnavailableException e)
@@ -619,7 +618,7 @@ public class VirtualHostTest extends UnitTestBase
}
assertEquals("unexpected number of connections", (long) 0,
vhost.getConnectionCount());
((AbstractConfiguredObject<?>)vhost).start();
- vhost.registerConnection(connection, new
NoopConnectionEstablishmentPolicy());
+ vhost.registerConnection(connection);
assertEquals("unexpected number of connections", (long) 1,
vhost.getConnectionCount());
}
diff --git
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
index 6cc8fe8..a2d3bbd 100644
---
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
+++
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
@@ -73,7 +73,6 @@ import
org.apache.qpid.server.transport.ServerNetworkConnection;
import org.apache.qpid.server.transport.network.NetworkConnection;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
-import org.apache.qpid.server.virtualhost.NoopConnectionEstablishmentPolicy;
public class ServerConnection extends ConnectionInvoker
{
@@ -194,7 +193,7 @@ public class ServerConnection extends ConnectionInvoker
public void setVirtualHost(NamedAddressSpace addressSpace)
{
- addressSpace.registerConnection(_amqpConnection, new
NoopConnectionEstablishmentPolicy());
+ addressSpace.registerConnection(_amqpConnection);
_amqpConnection.setAddressSpace(addressSpace);
}
diff --git
a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
index e30b09e..bc04386 100644
---
a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
+++
b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
@@ -79,7 +79,6 @@ import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
-import org.apache.qpid.server.virtualhost.NoopConnectionEstablishmentPolicy;
import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
public class AMQPConnection_0_8Impl
@@ -984,7 +983,7 @@ public class AMQPConnection_0_8Impl
try
{
- addressSpace.registerConnection(this, new
NoopConnectionEstablishmentPolicy());
+ addressSpace.registerConnection(this);
setAddressSpace(addressSpace);
if (addressSpace.authoriseCreateConnection(this))
diff --git
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
index 10a907d..ef5057e 100644
---
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
+++
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
@@ -29,6 +29,7 @@ import
org.apache.qpid.server.protocol.v1_0.codec.SectionDecoderRegistry;
import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import
org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
+import
org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionEnforcementPolicy;
import org.apache.qpid.server.protocol.v1_0.type.transport.End;
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
import org.apache.qpid.server.transport.AMQPConnection;
@@ -88,4 +89,6 @@ public interface AMQPConnection_1_0<C extends
AMQPConnection_1_0<C>> extends AMQ
@DerivedAttribute(description = "If true send a final SASL challenge using
a SaslChallenge performative, rather than SaslOutcome.")
boolean getSendSaslFinalChallengeAsChallenge();
+
+ SoleConnectionEnforcementPolicy getSoleConnectionEnforcementPolicy();
}
diff --git
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
index 8f8e351..f2d7262 100644
---
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
+++
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
@@ -48,8 +48,6 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
-import java.util.stream.StreamSupport;
import com.google.common.collect.Iterators;
import com.google.common.collect.PeekingIterator;
@@ -92,6 +90,7 @@ import
org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry
import
org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionConnectionProperties;
import
org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionDetectionPolicy;
import
org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionEnforcementPolicy;
+import
org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionEnforcementPolicyException;
import org.apache.qpid.server.protocol.v1_0.type.security.SaslChallenge;
import org.apache.qpid.server.protocol.v1_0.type.security.SaslCode;
import org.apache.qpid.server.protocol.v1_0.type.security.SaslInit;
@@ -119,7 +118,6 @@ import
org.apache.qpid.server.security.auth.manager.AnonymousAuthenticationManag
import
org.apache.qpid.server.security.auth.manager.ExternalAuthenticationManagerImpl;
import org.apache.qpid.server.security.auth.sasl.SaslNegotiator;
import org.apache.qpid.server.session.AMQPSession;
-import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.transport.AbstractAMQPConnection;
import org.apache.qpid.server.transport.AggregateTicker;
import org.apache.qpid.server.transport.ByteBufferSender;
@@ -926,86 +924,28 @@ public class AMQPConnection_1_0Impl extends
AbstractAMQPConnection<AMQPConnectio
try
{
- final boolean registerSucceeded =
addressSpace.registerConnection(this, (existingConnections, newConnection) ->
- {
- boolean proceedWithRegistration = true;
- if (newConnection instanceof AMQPConnection_1_0Impl &&
!newConnection.isClosing())
- {
- final List<ListenableFuture<Void>> rescheduleFutures = new
ArrayList<>();
- for (AMQPConnection<?> existingConnection :
StreamSupport.stream(existingConnections.spliterator(), false)
- .filter(con -> con instanceof AMQPConnection_1_0)
- .filter(con -> !con.isClosing())
- .filter(con ->
con.getRemoteContainerName().equals(newConnection.getRemoteContainerName()))
- .collect(Collectors.toList()))
- {
- SoleConnectionEnforcementPolicy
soleConnectionEnforcementPolicy = null;
- if (((AMQPConnection_1_0Impl)
existingConnection)._soleConnectionEnforcementPolicy
- != null)
- {
- soleConnectionEnforcementPolicy =
- ((AMQPConnection_1_0Impl)
existingConnection)._soleConnectionEnforcementPolicy;
- }
- else if (((AMQPConnection_1_0Impl)
newConnection)._soleConnectionEnforcementPolicy != null)
- {
- soleConnectionEnforcementPolicy =
- ((AMQPConnection_1_0Impl)
newConnection)._soleConnectionEnforcementPolicy;
- }
- if
(SoleConnectionEnforcementPolicy.REFUSE_CONNECTION.equals(soleConnectionEnforcementPolicy))
- {
-
_properties.put(Symbol.valueOf("amqp:connection-establishment-failed"), true);
- final Error error = new
Error(AmqpError.INVALID_FIELD,
- String.format(
- "Connection closed due to
sole-connection-enforcement-policy '%s'",
-
String.valueOf(soleConnectionEnforcementPolicy)));
-
error.setInfo(Collections.singletonMap(Symbol.valueOf("invalid-field"),
Symbol.valueOf("container-id")));
- newConnection.doOnIOThreadAsync(() ->
((AMQPConnection_1_0Impl) newConnection).closeConnection(error));
- proceedWithRegistration = false;
- break;
- }
- else if
(SoleConnectionEnforcementPolicy.CLOSE_EXISTING.equals(soleConnectionEnforcementPolicy))
- {
- final Error error = new
Error(AmqpError.RESOURCE_LOCKED,
- String.format(
- "Connection closed due to
sole-connection-enforcement-policy '%s'",
-
String.valueOf(soleConnectionEnforcementPolicy)));
-
error.setInfo(Collections.singletonMap(Symbol.valueOf("sole-connection-enforcement"),
true));
-
rescheduleFutures.add(existingConnection.doOnIOThreadAsync(
- () -> ((AMQPConnection_1_0Impl)
existingConnection).closeConnection(error)));
- proceedWithRegistration = false;
- }
- }
- if (!rescheduleFutures.isEmpty())
- {
- doAfter(allAsList(rescheduleFutures), () ->
newConnection.doOnIOThreadAsync(() -> receiveOpenInternal(addressSpace)));
- }
- }
- return proceedWithRegistration;
- });
+ addressSpace.registerConnection(this);
+ setAddressSpace(addressSpace);
- if (registerSucceeded)
+ if (!addressSpace.authoriseCreateConnection(this))
{
- setAddressSpace(addressSpace);
-
- if (!addressSpace.authoriseCreateConnection(this))
+ closeConnection(AmqpError.NOT_ALLOWED, "Connection refused");
+ }
+ else
+ {
+ switch (_connectionState)
{
- closeConnection(AmqpError.NOT_ALLOWED, "Connection
refused");
- }
- else
- {
- switch (_connectionState)
- {
- case AWAIT_OPEN:
- sendOpen(_channelMax, _maxFrameSize);
- _connectionState = ConnectionState.OPENED;
- break;
- case CLOSE_SENT:
- case CLOSED:
- // already sent our close - probably due to an
error
- break;
- default:
- throw new
ConnectionScopedRuntimeException(String.format(
- "Unexpected state %s during connection
open.", _connectionState));
- }
+ case AWAIT_OPEN:
+ sendOpen(_channelMax, _maxFrameSize);
+ _connectionState = ConnectionState.OPENED;
+ break;
+ case CLOSE_SENT:
+ case CLOSED:
+ // already sent our close - probably due to an error
+ break;
+ default:
+ throw new
ConnectionScopedRuntimeException(String.format(
+ "Unexpected state %s during connection open.",
_connectionState));
}
}
}
@@ -1013,6 +953,10 @@ public class AMQPConnection_1_0Impl extends
AbstractAMQPConnection<AMQPConnectio
{
closeConnection(AmqpError.NOT_ALLOWED, e.getMessage());
}
+ catch (SoleConnectionEnforcementPolicyException e)
+ {
+ handleSoleConnectionEnforcement(addressSpace, e);
+ }
catch (ConnectionLimitException e)
{
LOGGER.debug("User connection limit exceeded", e);
@@ -1020,6 +964,39 @@ public class AMQPConnection_1_0Impl extends
AbstractAMQPConnection<AMQPConnectio
}
}
+ private void handleSoleConnectionEnforcement(final NamedAddressSpace
addressSpace,
+ final
SoleConnectionEnforcementPolicyException e)
+ {
+ if (isClosing())
+ {
+ return;
+ }
+ if (e.getPolicy() == SoleConnectionEnforcementPolicy.REFUSE_CONNECTION)
+ {
+
_properties.put(Symbol.valueOf("amqp:connection-establishment-failed"), true);
+ final Error error = new Error(AmqpError.INVALID_FIELD,
+ String.format("Connection closed due to
sole-connection-enforcement-policy '%s'", e.getPolicy()));
+
error.setInfo(Collections.singletonMap(Symbol.valueOf("invalid-field"),
Symbol.valueOf("container-id")));
+ closeConnection(error);
+ }
+ else if (e.getPolicy() ==
SoleConnectionEnforcementPolicy.CLOSE_EXISTING)
+ {
+ final Error error = new Error(AmqpError.RESOURCE_LOCKED,
+ String.format("Connection closed due to
sole-connection-enforcement-policy '%s'", e.getPolicy()));
+
error.setInfo(Collections.singletonMap(Symbol.valueOf("sole-connection-enforcement"),
true));
+
+ final List<ListenableFuture<Void>> rescheduleFutures = new
ArrayList<>();
+ for (final AMQPConnection_1_0<?> connection :
e.getExistingConnections())
+ {
+ if (!connection.isClosing())
+ {
+ rescheduleFutures.add(connection.doOnIOThreadAsync(() ->
connection.close(error)));
+ }
+ }
+ doAfter(allAsList(rescheduleFutures), () -> doOnIOThreadAsync(()
-> receiveOpenInternal(addressSpace)));
+ }
+ }
+
private void populateConnectionRedirect(final NamedAddressSpace
addressSpace, final Error err)
{
final String redirectHost = addressSpace.getRedirectHost(getPort());
@@ -1885,6 +1862,12 @@ public class AMQPConnection_1_0Impl extends
AbstractAMQPConnection<AMQPConnectio
}
@Override
+ public SoleConnectionEnforcementPolicy getSoleConnectionEnforcementPolicy()
+ {
+ return _soleConnectionEnforcementPolicy;
+ }
+
+ @Override
protected boolean isOpeningInProgress()
{
switch (_connectionState)
diff --git
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/SoleConnectionDetectionPolicy.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/SoleConnectionDetectionPolicy.java
index e3a3ef5..bd66dd0 100644
---
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/SoleConnectionDetectionPolicy.java
+++
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/SoleConnectionDetectionPolicy.java
@@ -15,26 +15,27 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
*/
-
package org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn;
+import java.util.Locale;
+
import org.apache.qpid.server.protocol.v1_0.type.RestrictedType;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
-public class SoleConnectionDetectionPolicy implements
RestrictedType<UnsignedInteger>
+public enum SoleConnectionDetectionPolicy implements
RestrictedType<UnsignedInteger>
{
- public static final SoleConnectionDetectionPolicy
- STRONG = new
SoleConnectionDetectionPolicy(UnsignedInteger.valueOf(0));
- public static final SoleConnectionDetectionPolicy
- WEAK = new
SoleConnectionDetectionPolicy(UnsignedInteger.valueOf(1));
+ STRONG(0),
+ WEAK(1);
private final UnsignedInteger _val;
- private SoleConnectionDetectionPolicy(final UnsignedInteger val)
+ private final String _description;
+
+ SoleConnectionDetectionPolicy(int val)
{
- _val = val;
+ _val = UnsignedInteger.valueOf(val);
+ _description = name().toLowerCase(Locale.ENGLISH);
}
@Override
@@ -45,39 +46,21 @@ public class SoleConnectionDetectionPolicy implements
RestrictedType<UnsignedInt
public static SoleConnectionDetectionPolicy valueOf(Object obj)
{
- if (obj instanceof UnsignedInteger)
+ for (final SoleConnectionDetectionPolicy detectionPolicy : values())
{
- UnsignedInteger val = (UnsignedInteger) obj;
-
- if (STRONG._val.equals(val))
- {
- return STRONG;
- }
-
- if (WEAK._val.equals(val))
+ if (detectionPolicy._val.equals(obj))
{
- return WEAK;
+ return detectionPolicy;
}
}
- final String message = String.format("Cannot convert '%s' into
'sole-connection-detection-policy'", obj);
- throw new IllegalArgumentException(message);
+ throw new IllegalArgumentException(
+ String.format("Cannot convert '%s' into
'sole-connection-detection-policy'", obj));
}
@Override
public String toString()
{
-
- if (this == STRONG)
- {
- return "strong";
- }
-
- if (this == WEAK)
- {
- return "weak";
- }
-
- return String.valueOf(_val);
+ return _description;
}
}
diff --git
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/SoleConnectionEnforcementPolicy.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/SoleConnectionEnforcementPolicy.java
index 86a57b1..2655d36 100644
---
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/SoleConnectionEnforcementPolicy.java
+++
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/SoleConnectionEnforcementPolicy.java
@@ -15,26 +15,26 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
*/
-
package org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn;
+import java.util.Objects;
+
import org.apache.qpid.server.protocol.v1_0.type.RestrictedType;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
-public class SoleConnectionEnforcementPolicy implements
RestrictedType<UnsignedInteger>
+public enum SoleConnectionEnforcementPolicy implements
RestrictedType<UnsignedInteger>
{
- public static final SoleConnectionEnforcementPolicy
- REFUSE_CONNECTION = new
SoleConnectionEnforcementPolicy(UnsignedInteger.valueOf(0));
- public static final SoleConnectionEnforcementPolicy
- CLOSE_EXISTING = new
SoleConnectionEnforcementPolicy(UnsignedInteger.valueOf(1));
+ REFUSE_CONNECTION(0, "refuse-connection"),
+ CLOSE_EXISTING(1, "close-existing");
private final UnsignedInteger _val;
+ private final String _description;
- private SoleConnectionEnforcementPolicy(final UnsignedInteger val)
+ SoleConnectionEnforcementPolicy(int val, String description)
{
- _val = val;
+ _val = UnsignedInteger.valueOf(val);
+ _description = Objects.requireNonNull(description);
}
@Override
@@ -45,39 +45,21 @@ public class SoleConnectionEnforcementPolicy implements
RestrictedType<UnsignedI
public static SoleConnectionEnforcementPolicy valueOf(Object obj)
{
- if (obj instanceof UnsignedInteger)
+ for (final SoleConnectionEnforcementPolicy policy : values())
{
- UnsignedInteger val = (UnsignedInteger) obj;
-
- if (REFUSE_CONNECTION._val.equals(val))
- {
- return REFUSE_CONNECTION;
- }
-
- if (CLOSE_EXISTING._val.equals(val))
+ if (policy._val.equals(obj))
{
- return CLOSE_EXISTING;
+ return policy;
}
}
- final String message = String.format("Cannot convert '%s' into
'sole-connection-enforcement-policy'", obj);
- throw new IllegalArgumentException(message);
+ throw new IllegalArgumentException(
+ String.format("Cannot convert '%s' into
'sole-connection-enforcement-policy'", obj));
}
@Override
public String toString()
{
-
- if (this == REFUSE_CONNECTION)
- {
- return "refuse-connection";
- }
-
- if (this == CLOSE_EXISTING)
- {
- return "close-existing";
- }
-
- return String.valueOf(_val);
+ return _description;
}
}
diff --git
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/SoleConnectionEnforcementPolicyException.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/SoleConnectionEnforcementPolicyException.java
new file mode 100644
index 0000000..8f49b7b
--- /dev/null
+++
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/SoleConnectionEnforcementPolicyException.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.qpid.server.protocol.v1_0.AMQPConnection_1_0;
+import org.apache.qpid.server.security.limit.ConnectionLimitException;
+
+public class SoleConnectionEnforcementPolicyException extends
ConnectionLimitException
+{
+ private final Set<AMQPConnection_1_0<?>> _existingConnections;
+
+ private final SoleConnectionEnforcementPolicy _policy;
+
+ public
SoleConnectionEnforcementPolicyException(SoleConnectionEnforcementPolicy policy,
+ Collection<? extends
AMQPConnection_1_0<?>> connections)
+ {
+ super(String.format("Single connection is required due to
sole-connection-enforcement-policy '%s'", policy));
+ _policy = policy;
+ _existingConnections = new HashSet<>(connections);
+ }
+
+ public SoleConnectionEnforcementPolicy getPolicy()
+ {
+ return _policy;
+ }
+
+ public Set<AMQPConnection_1_0<?>> getExistingConnections()
+ {
+ return Collections.unmodifiableSet(_existingConnections);
+ }
+}
diff --git
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/StrongConnectionEstablishmentLimiter.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/StrongConnectionEstablishmentLimiter.java
new file mode 100644
index 0000000..c0ff83b
--- /dev/null
+++
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/StrongConnectionEstablishmentLimiter.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.qpid.server.plugin.PluggableService;
+import org.apache.qpid.server.protocol.v1_0.AMQPConnection_1_0;
+import org.apache.qpid.server.security.limit.ConnectionLimiter;
+import org.apache.qpid.server.security.limit.ConnectionLimiterService;
+import org.apache.qpid.server.security.limit.ConnectionSlot;
+import org.apache.qpid.server.transport.AMQPConnection;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@PluggableService
+public class StrongConnectionEstablishmentLimiter implements
ConnectionLimiterService
+{
+ private static final Logger LOGGER =
LoggerFactory.getLogger(StrongConnectionEstablishmentLimiter.class);
+
+ private final Map<String, UsageCounter> _slots;
+
+ private final ConnectionLimiter _underlyingLimiter;
+
+ public StrongConnectionEstablishmentLimiter()
+ {
+ super();
+ _slots = new ConcurrentHashMap<>();
+ _underlyingLimiter = ConnectionLimiter.noLimits();
+ }
+
+ private
StrongConnectionEstablishmentLimiter(StrongConnectionEstablishmentLimiter
limiter, ConnectionLimiter underlyingLimiter)
+ {
+ super();
+ _slots = limiter._slots;
+ _underlyingLimiter = Objects.requireNonNull(underlyingLimiter);
+ }
+
+ @Override
+ public String getType()
+ {
+ return "EstablishmentPolicy." + SoleConnectionDetectionPolicy.STRONG;
+ }
+
+ @Override
+ public ConnectionSlot register(AMQPConnection<?> connection)
+ {
+ if (!(connection instanceof AMQPConnection_1_0) ||
connection.isClosing())
+ {
+ return _underlyingLimiter.register(connection);
+ }
+ LOGGER.debug("Registering a new connection '{}'", connection);
+ final AMQPConnection_1_0<?> newConnection = (AMQPConnection_1_0<?>)
connection;
+ final String remoteContainerId = newConnection.getRemoteContainerId();
+ if (remoteContainerId == null)
+ {
+ // 'container-id' is the mandatory field of open frame but could
be null in integration or JUnit tests,
+ // e.g. when AMQPConnection_1_0 is mocked in a test.
+ LOGGER.warn(
+ "The connection '{}' without container ID, 'container-id'
is the mandatory field of open frame",
+ connection);
+ return _underlyingLimiter.register(connection);
+ }
+
+ LOGGER.debug("Checking a container slot for the connection '{}'",
connection);
+ try
+ {
+ return _slots.compute(remoteContainerId,
+ (containerId, counter) -> counter == null ?
newUsageCounter(containerId) : counter.addUser())
+ .registerConnection(newConnection);
+ }
+ catch (RuntimeException e)
+ {
+ LOGGER.debug("Registering connection failed", e);
+ deregisterUser(remoteContainerId);
+ throw e;
+ }
+ }
+
+ private void deregisterUser(final String containerId)
+ {
+ _slots.computeIfPresent(containerId, (id, slot) -> slot.removeUser());
+ }
+
+ private UsageCounter newUsageCounter(String containerId)
+ {
+ return new UsageCounter(new RemoteContainerSlot(containerId), 1L);
+ }
+
+ @Override
+ public ConnectionLimiter append(ConnectionLimiter limiter)
+ {
+ return new StrongConnectionEstablishmentLimiter(this,
_underlyingLimiter.append(limiter));
+ }
+
+ private static final class UsageCounter
+ {
+ private final long _counter;
+
+ private final RemoteContainerSlot _slot;
+
+ UsageCounter(RemoteContainerSlot slot, long counter)
+ {
+ _counter = counter;
+ _slot = Objects.requireNonNull(slot);
+ }
+
+ public ConnectionSlot registerConnection(final AMQPConnection_1_0<?>
connection)
+ {
+ return _slot.register(connection);
+ }
+
+ public UsageCounter addUser()
+ {
+ return new UsageCounter(_slot, _counter + 1L);
+ }
+
+ public UsageCounter removeUser()
+ {
+ return _counter <= 1 ? null : new UsageCounter(_slot, _counter -
1L);
+ }
+ }
+
+ private final class RemoteContainerSlot
+ {
+ private final String _containerId;
+
+ private final Set<AMQPConnection_1_0<?>> _connections;
+
+ RemoteContainerSlot(String containerId)
+ {
+ super();
+ _connections = new HashSet<>();
+ _containerId = Objects.requireNonNull(containerId);
+ }
+
+ private synchronized ConnectionSlot register(final
AMQPConnection_1_0<?> connection)
+ {
+ final SoleConnectionEnforcementPolicy soleConnectionPolicy =
extractPolicy(connection);
+
+ if (soleConnectionPolicy != null && !_connections.isEmpty())
+ {
+ LOGGER.debug("Single connection is required, sole connection
policy: {}", soleConnectionPolicy);
+ throw new
SoleConnectionEnforcementPolicyException(soleConnectionPolicy, _connections);
+ }
+
+ final ConnectionSlot underlyingSlot =
_underlyingLimiter.register(connection);
+ _connections.add(connection);
+ final ConnectionSlot slot = () ->
+ {
+ try
+ {
+ remove(connection);
+ }
+ finally
+ {
+ deregisterUser(_containerId);
+ }
+ };
+ return slot.chainTo(underlyingSlot);
+ }
+
+ private SoleConnectionEnforcementPolicy
extractPolicy(AMQPConnection_1_0<?> connection)
+ {
+ if (_connections.isEmpty())
+ {
+ return connection.getSoleConnectionEnforcementPolicy();
+ }
+ SoleConnectionEnforcementPolicy soleConnectionPolicy = null;
+
+ final Iterator<AMQPConnection_1_0<?>> iterator =
_connections.iterator();
+ while (iterator.hasNext())
+ {
+ final AMQPConnection_1_0<?> existingConnection =
iterator.next();
+ if (existingConnection.isClosing())
+ {
+ iterator.remove();
+ }
+ else
+ {
+ soleConnectionPolicy =
existingConnection.getSoleConnectionEnforcementPolicy();
+ }
+ }
+ if (soleConnectionPolicy == null)
+ {
+ return connection.getSoleConnectionEnforcementPolicy();
+ }
+ return soleConnectionPolicy;
+ }
+
+ private synchronized void remove(final AMQPConnection_1_0<?>
connection)
+ {
+ _connections.remove(connection);
+ }
+ }
+}
diff --git
a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java
b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java
index 3f1f10f..962fabb 100644
---
a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java
+++
b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java
@@ -64,6 +64,7 @@ import org.apache.qpid.server.protocol.v1_0.framing.SASLFrame;
import org.apache.qpid.server.protocol.v1_0.framing.TransportFrame;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import
org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
+import
org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionEnforcementPolicyException;
import org.apache.qpid.server.protocol.v1_0.type.security.SaslInit;
import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
import org.apache.qpid.server.security.SubjectCreator;
@@ -75,7 +76,6 @@ import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.transport.AggregateTicker;
import org.apache.qpid.server.transport.ByteBufferSender;
import org.apache.qpid.server.transport.ServerNetworkConnection;
-import org.apache.qpid.server.virtualhost.ConnectionEstablishmentPolicy;
import org.apache.qpid.server.virtualhost.VirtualHostPrincipal;
import org.apache.qpid.test.utils.UnitTestBase;
@@ -121,16 +121,15 @@ public class ProtocolEngine_1_0_0Test extends UnitTestBase
when(_virtualHost.isActive()).thenReturn(true);
final ArgumentCaptor<AMQPConnection> connectionCaptor =
ArgumentCaptor.forClass(AMQPConnection.class);
- final ArgumentCaptor<ConnectionEstablishmentPolicy>
establishmentPolicyCaptor =
ArgumentCaptor.forClass(ConnectionEstablishmentPolicy.class);
doAnswer(new Answer()
{
@Override
public Object answer(final InvocationOnMock invocation) throws
Throwable
{
_connection = connectionCaptor.getValue();
- return null;
+ throw new SoleConnectionEnforcementPolicyException(null,
Collections.emptySet());
}
- }).when(_virtualHost).registerConnection(connectionCaptor.capture(),
establishmentPolicyCaptor.capture());
+ }).when(_virtualHost).registerConnection(connectionCaptor.capture());
when(_virtualHost.getPrincipal()).thenReturn(mock(VirtualHostPrincipal.class));
when(_port.getAddressSpace(anyString())).thenReturn(_virtualHost);
when(_port.getSubjectCreator(anyBoolean(),
anyString())).thenReturn(subjectCreator);
@@ -202,7 +201,7 @@ public class ProtocolEngine_1_0_0Test extends UnitTestBase
open.setContainerId("testContainerId");
_frameWriter.send(new TransportFrame((int) (short) 0, open));
- verify(_virtualHost).registerConnection(any(AMQPConnection.class),
any(ConnectionEstablishmentPolicy.class));
+ verify(_virtualHost).registerConnection(any(AMQPConnection.class));
AuthenticatedPrincipal principal = (AuthenticatedPrincipal)
_connection.getAuthorizedPrincipal();
assertNotNull(principal);
assertEquals(principal,
@@ -224,7 +223,7 @@ public class ProtocolEngine_1_0_0Test extends UnitTestBase
open.setContainerId("testContainerId");
_frameWriter.send(new TransportFrame((int) (short) 0, open));
- verify(_virtualHost,
never()).registerConnection(any(AMQPConnection.class),
any(ConnectionEstablishmentPolicy.class));
+ verify(_virtualHost,
never()).registerConnection(any(AMQPConnection.class));
verify(_networkConnection).close();
}
@@ -244,7 +243,7 @@ public class ProtocolEngine_1_0_0Test extends UnitTestBase
open.setContainerId("testContainerId");
_frameWriter.send(new TransportFrame((int) (short) 0, open));
- verify(_virtualHost).registerConnection(any(AMQPConnection.class),
any(ConnectionEstablishmentPolicy.class));
+ verify(_virtualHost).registerConnection(any(AMQPConnection.class));
AuthenticatedPrincipal authPrincipal = (AuthenticatedPrincipal)
_connection.getAuthorizedPrincipal();
assertNotNull(authPrincipal);
assertEquals(authPrincipal, new AuthenticatedPrincipal(principal));
@@ -277,7 +276,7 @@ public class ProtocolEngine_1_0_0Test extends UnitTestBase
open.setContainerId("testContainerId");
_frameWriter.send(new TransportFrame((int) (short) 0, open));
- verify(_virtualHost).registerConnection(any(AMQPConnection.class),
any(ConnectionEstablishmentPolicy.class));
+ verify(_virtualHost).registerConnection(any(AMQPConnection.class));
AuthenticatedPrincipal principal = (AuthenticatedPrincipal)
_connection.getAuthorizedPrincipal();
assertNotNull(principal);
assertEquals(principal,
diff --git
a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/SoleConnectionDetectionPolicyTest.java
b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/SoleConnectionDetectionPolicyTest.java
new file mode 100644
index 0000000..aae4e93
--- /dev/null
+++
b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/SoleConnectionDetectionPolicyTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn;
+
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.test.utils.UnitTestBase;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+public class SoleConnectionDetectionPolicyTest extends UnitTestBase
+{
+ @Test
+ public void testValue()
+ {
+ assertEquals(new UnsignedInteger(0),
SoleConnectionDetectionPolicy.STRONG.getValue());
+ assertEquals(new UnsignedInteger(1),
SoleConnectionDetectionPolicy.WEAK.getValue());
+ }
+
+ @Test
+ public void testValueOf()
+ {
+ assertEquals(SoleConnectionDetectionPolicy.STRONG,
SoleConnectionDetectionPolicy.valueOf(new UnsignedInteger(0)));
+ assertEquals(SoleConnectionDetectionPolicy.WEAK,
SoleConnectionDetectionPolicy.valueOf(new UnsignedInteger(1)));
+
+ try
+ {
+ SoleConnectionDetectionPolicy.valueOf(new UnsignedInteger(2));
+ fail("An exception is expected");
+ }
+ catch (RuntimeException e)
+ {
+ assertNotNull(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testToString()
+ {
+ assertEquals("strong",
SoleConnectionDetectionPolicy.STRONG.toString());
+ assertEquals("weak", SoleConnectionDetectionPolicy.WEAK.toString());
+ }
+}
diff --git
a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/SoleConnectionEnforcementPolicyTest.java
b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/SoleConnectionEnforcementPolicyTest.java
new file mode 100644
index 0000000..1e10dcd
--- /dev/null
+++
b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/SoleConnectionEnforcementPolicyTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn;
+
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.test.utils.UnitTestBase;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class SoleConnectionEnforcementPolicyTest extends UnitTestBase
+{
+ @Test
+ public void testValue()
+ {
+ assertEquals(new UnsignedInteger(0),
SoleConnectionEnforcementPolicy.REFUSE_CONNECTION.getValue());
+ assertEquals(new UnsignedInteger(1),
SoleConnectionEnforcementPolicy.CLOSE_EXISTING.getValue());
+ }
+
+ @Test
+ public void testValueOf()
+ {
+ assertEquals(SoleConnectionEnforcementPolicy.REFUSE_CONNECTION,
SoleConnectionEnforcementPolicy.valueOf(new UnsignedInteger(0)));
+ assertEquals(SoleConnectionEnforcementPolicy.CLOSE_EXISTING,
SoleConnectionEnforcementPolicy.valueOf(new UnsignedInteger(1)));
+
+ try
+ {
+ SoleConnectionEnforcementPolicy.valueOf(new UnsignedInteger(2));
+ fail("An exception is expected");
+ }
+ catch (RuntimeException e)
+ {
+ assertNotNull(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testToString()
+ {
+ assertEquals("refuse-connection",
SoleConnectionEnforcementPolicy.REFUSE_CONNECTION.toString());
+ assertEquals("close-existing",
SoleConnectionEnforcementPolicy.CLOSE_EXISTING.toString());
+ }
+}
diff --git
a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/StrongConnectionEstablishmentLimiterTest.java
b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/StrongConnectionEstablishmentLimiterTest.java
new file mode 100644
index 0000000..fdbd0b4
--- /dev/null
+++
b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/StrongConnectionEstablishmentLimiterTest.java
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn;
+
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+
+import org.apache.qpid.server.protocol.v1_0.AMQPConnection_1_0;
+import org.apache.qpid.server.security.limit.ConnectionLimiter;
+import org.apache.qpid.server.security.limit.ConnectionSlot;
+import org.apache.qpid.server.transport.AMQPConnection;
+import org.apache.qpid.test.utils.UnitTestBase;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class StrongConnectionEstablishmentLimiterTest extends UnitTestBase
+{
+ private StrongConnectionEstablishmentLimiter _limiter;
+
+ private Registry _registry;
+
+ @Before
+ public void setUp()
+ {
+ _registry = new Registry();
+ _limiter = (StrongConnectionEstablishmentLimiter) new
StrongConnectionEstablishmentLimiter().append(_registry);
+ }
+
+ @Test
+ public void testType()
+ {
+ assertEquals("EstablishmentPolicy.strong", _limiter.getType());
+ }
+
+ @Test
+ public void testNoPolicy()
+ {
+ final AMQPConnection_1_0<?> connection1 = newConnection("C", null);
+ final ConnectionSlot slot1 = _limiter.register(connection1);
+ assertTrue(_registry.isRegistered(connection1));
+
+ final AMQPConnection_1_0<?> connection2 = newConnection("C", null);
+ final ConnectionSlot slot2 = _limiter.register(connection2);
+ assertTrue(_registry.isRegistered(connection2));
+
+ final AMQPConnection_1_0<?> connection3 = newConnection("C", null);
+ final ConnectionSlot slot3 = _limiter.register(connection3);
+ assertTrue(_registry.isRegistered(connection3));
+
+ slot3.free();
+ assertFalse(_registry.isRegistered(connection3));
+ assertTrue(_registry.hasBeenRegistered(connection3));
+
+ slot2.free();
+ assertFalse(_registry.isRegistered(connection2));
+ assertTrue(_registry.hasBeenRegistered(connection2));
+
+ slot1.free();
+ assertFalse(_registry.isRegistered(connection1));
+ assertTrue(_registry.hasBeenRegistered(connection1));
+ }
+
+ @Test
+ public void testNewConnectionWithPolicy()
+ {
+ final AMQPConnection_1_0<?> connection1 = newConnection("C", null);
+ final ConnectionSlot slot1 = _limiter.register(connection1);
+ assertTrue(_registry.isRegistered(connection1));
+
+ final AMQPConnection_1_0<?> connection2 = newConnection("C", null);
+ final ConnectionSlot slot2 = _limiter.register(connection2);
+ assertTrue(_registry.isRegistered(connection2));
+
+ final AMQPConnection_1_0<?> connection3 = newConnection("C",
SoleConnectionEnforcementPolicy.REFUSE_CONNECTION);
+ try
+ {
+ _limiter.register(connection3);
+ fail("A sole connection enforcement policy exception is expected");
+ }
+ catch (SoleConnectionEnforcementPolicyException e)
+ {
+ assertEquals(
+ "Single connection is required due to
sole-connection-enforcement-policy 'refuse-connection'",
+ e.getMessage());
+
+ assertEquals(2, e.getExistingConnections().size());
+ assertTrue(e.getExistingConnections().contains(connection1));
+ assertTrue(e.getExistingConnections().contains(connection2));
+ assertEquals(SoleConnectionEnforcementPolicy.REFUSE_CONNECTION,
e.getPolicy());
+ }
+
+ slot2.free();
+ assertFalse(_registry.isRegistered(connection2));
+ assertTrue(_registry.hasBeenRegistered(connection2));
+
+ slot1.free();
+ assertFalse(_registry.isRegistered(connection1));
+ assertTrue(_registry.hasBeenRegistered(connection1));
+ }
+
+ @Test
+ public void testExistingConnectionWithPolicy()
+ {
+ final AMQPConnection_1_0<?> connection1 = newConnection("C",
SoleConnectionEnforcementPolicy.CLOSE_EXISTING);
+ final ConnectionSlot slot1 = _limiter.register(connection1);
+ assertTrue(_registry.isRegistered(connection1));
+
+ final AMQPConnection_1_0<?> connection2 = newConnection("C",
SoleConnectionEnforcementPolicy.REFUSE_CONNECTION);
+ try
+ {
+ _limiter.register(connection2);
+ fail("A sole connection enforcement policy exception is expected");
+ }
+ catch (SoleConnectionEnforcementPolicyException e)
+ {
+ assertEquals(
+ "Single connection is required due to
sole-connection-enforcement-policy 'close-existing'",
+ e.getMessage());
+
+ assertEquals(1, e.getExistingConnections().size());
+ assertTrue(e.getExistingConnections().contains(connection1));
+ assertEquals(SoleConnectionEnforcementPolicy.CLOSE_EXISTING,
e.getPolicy());
+ }
+
+ slot1.free();
+ assertFalse(_registry.isRegistered(connection1));
+ assertTrue(_registry.hasBeenRegistered(connection1));
+ }
+
+ @Test
+ public void testExistingClosedConnectionWithPolicy()
+ {
+ final AMQPConnection_1_0<?> connection1 = newConnection("C",
SoleConnectionEnforcementPolicy.CLOSE_EXISTING);
+ Mockito.doReturn(false).when(connection1).isClosing();
+ final ConnectionSlot slot1 = _limiter.register(connection1);
+ assertTrue(_registry.isRegistered(connection1));
+
+ Mockito.doReturn(true).when(connection1).isClosing();
+ final AMQPConnection_1_0<?> connection2 = newConnection("C",
SoleConnectionEnforcementPolicy.CLOSE_EXISTING);
+ final ConnectionSlot slot2 = _limiter.register(connection2);
+ assertTrue(_registry.isRegistered(connection2));
+
+ slot1.free();
+ assertFalse(_registry.isRegistered(connection1));
+ assertTrue(_registry.hasBeenRegistered(connection1));
+
+ slot2.free();
+ assertFalse(_registry.isRegistered(connection2));
+ assertTrue(_registry.hasBeenRegistered(connection2));
+ }
+
+ @Test
+ public void testClosedConnection()
+ {
+ final AMQPConnection_1_0<?> connection1 = newConnection("C",
SoleConnectionEnforcementPolicy.REFUSE_CONNECTION);
+ final ConnectionSlot slot1 = _limiter.register(connection1);
+ assertTrue(_registry.isRegistered(connection1));
+
+ final AMQPConnection_1_0<?> connection2 = newConnection("C",
SoleConnectionEnforcementPolicy.CLOSE_EXISTING);
+ Mockito.doReturn(true).when(connection1).isClosing();
+ final ConnectionSlot slot2 = _limiter.register(connection2);
+ assertTrue(_registry.isRegistered(connection2));
+
+ slot1.free();
+ assertFalse(_registry.isRegistered(connection1));
+ assertTrue(_registry.hasBeenRegistered(connection1));
+
+ slot2.free();
+ assertFalse(_registry.isRegistered(connection2));
+ assertTrue(_registry.hasBeenRegistered(connection2));
+ }
+
+ @Test
+ public void testNewConnectionWithPolicy_ClosedExisting()
+ {
+ final AMQPConnection_1_0<?> connection1 = newConnection("C",
SoleConnectionEnforcementPolicy.REFUSE_CONNECTION);
+ Mockito.doReturn(false).when(connection1).isClosing();
+ final ConnectionSlot slot1 = _limiter.register(connection1);
+ assertTrue(_registry.isRegistered(connection1));
+
+ Mockito.doReturn(true).when(connection1).isClosing();
+ final AMQPConnection_1_0<?> connection2 = newConnection("C", null);
+ final ConnectionSlot slot2 = _limiter.register(connection2);
+ assertTrue(_registry.isRegistered(connection2));
+
+ final AMQPConnection_1_0<?> connection3 = newConnection("C",
SoleConnectionEnforcementPolicy.CLOSE_EXISTING);
+ try
+ {
+ _limiter.register(connection3);
+ fail("A sole connection enforcement policy exception is expected");
+ }
+ catch (SoleConnectionEnforcementPolicyException e)
+ {
+ assertEquals(
+ "Single connection is required due to
sole-connection-enforcement-policy 'close-existing'",
+ e.getMessage());
+
+ assertEquals(1, e.getExistingConnections().size());
+ assertTrue(e.getExistingConnections().contains(connection2));
+ assertEquals(SoleConnectionEnforcementPolicy.CLOSE_EXISTING,
e.getPolicy());
+ }
+
+ slot2.free();
+ assertFalse(_registry.isRegistered(connection2));
+ assertTrue(_registry.hasBeenRegistered(connection2));
+
+ slot1.free();
+ assertFalse(_registry.isRegistered(connection1));
+ assertTrue(_registry.hasBeenRegistered(connection1));
+ }
+
+ @Test
+ public void testNewConnectionWithPolicy2_ClosedExisting()
+ {
+ final AMQPConnection_1_0<?> connection1 = newConnection("C",
SoleConnectionEnforcementPolicy.REFUSE_CONNECTION);
+ Mockito.doReturn(false).when(connection1).isClosing();
+ final ConnectionSlot slot1 = _limiter.register(connection1);
+ assertTrue(_registry.isRegistered(connection1));
+
+ Mockito.doReturn(true).when(connection1).isClosing();
+ final AMQPConnection_1_0<?> connection2 = newConnection("C",
SoleConnectionEnforcementPolicy.REFUSE_CONNECTION);
+ final ConnectionSlot slot2 = _limiter.register(connection2);
+ assertTrue(_registry.isRegistered(connection2));
+
+ final AMQPConnection_1_0<?> connection3 = newConnection("C",
SoleConnectionEnforcementPolicy.CLOSE_EXISTING);
+ try
+ {
+ _limiter.register(connection3);
+ fail("A sole connection enforcement policy exception is expected");
+ }
+ catch (SoleConnectionEnforcementPolicyException e)
+ {
+ assertEquals(
+ "Single connection is required due to
sole-connection-enforcement-policy 'refuse-connection'",
+ e.getMessage());
+
+ assertEquals(1, e.getExistingConnections().size());
+ assertTrue(e.getExistingConnections().contains(connection2));
+ assertEquals(SoleConnectionEnforcementPolicy.REFUSE_CONNECTION,
e.getPolicy());
+ }
+
+ slot2.free();
+ assertFalse(_registry.isRegistered(connection2));
+ assertTrue(_registry.hasBeenRegistered(connection2));
+
+ slot1.free();
+ assertFalse(_registry.isRegistered(connection1));
+ assertTrue(_registry.hasBeenRegistered(connection1));
+ }
+
+ @Test
+ public void testAnotherConnectionType()
+ {
+ final AMQPConnection<?> connection =
Mockito.mock(AMQPConnection.class);
+ final ConnectionSlot slot = _limiter.register(connection);
+ assertTrue(_registry.isRegistered(connection));
+ slot.free();
+ assertFalse(_registry.isRegistered(connection));
+ assertTrue(_registry.hasBeenRegistered(connection));
+ Mockito.verifyNoInteractions(connection);
+ }
+
+ @Test
+ public void testMultipleIndependentConnections()
+ {
+ final AMQPConnection_1_0<?> connection1 = newConnection("C1", null);
+ final ConnectionSlot slot1 = _limiter.register(connection1);
+ assertTrue(_registry.isRegistered(connection1));
+
+ final AMQPConnection_1_0<?> connection2 = newConnection("C2",
SoleConnectionEnforcementPolicy.REFUSE_CONNECTION);
+ final ConnectionSlot slot2 = _limiter.register(connection2);
+ assertTrue(_registry.isRegistered(connection2));
+
+ final AMQPConnection_1_0<?> connection3 = newConnection("C3",
SoleConnectionEnforcementPolicy.CLOSE_EXISTING);
+ final ConnectionSlot slot3 = _limiter.register(connection3);
+ assertTrue(_registry.isRegistered(connection3));
+
+ slot3.free();
+ assertFalse(_registry.isRegistered(connection3));
+ assertTrue(_registry.hasBeenRegistered(connection3));
+
+ slot2.free();
+ assertFalse(_registry.isRegistered(connection2));
+ assertTrue(_registry.hasBeenRegistered(connection2));
+
+ slot1.free();
+ assertFalse(_registry.isRegistered(connection1));
+ assertTrue(_registry.hasBeenRegistered(connection1));
+ }
+
+ @Test
+ public void testMultipleIndependentConnections2()
+ {
+ final AMQPConnection_1_0<?> connection1 = newConnection(null, null);
+ final ConnectionSlot slot1 = _limiter.register(connection1);
+ assertTrue(_registry.isRegistered(connection1));
+
+ final AMQPConnection_1_0<?> connection2 = newConnection(null, null);
+ final ConnectionSlot slot2 = _limiter.register(connection2);
+ assertTrue(_registry.isRegistered(connection2));
+
+ final AMQPConnection_1_0<?> connection3 = newConnection(null, null);
+ final ConnectionSlot slot3 = _limiter.register(connection3);
+ assertTrue(_registry.isRegistered(connection3));
+
+ slot3.free();
+ assertFalse(_registry.isRegistered(connection3));
+ assertTrue(_registry.hasBeenRegistered(connection3));
+
+ slot2.free();
+ assertFalse(_registry.isRegistered(connection2));
+ assertTrue(_registry.hasBeenRegistered(connection2));
+
+ slot1.free();
+ assertFalse(_registry.isRegistered(connection1));
+ assertTrue(_registry.hasBeenRegistered(connection1));
+ }
+
+ private AMQPConnection_1_0<?> newConnection(String id,
SoleConnectionEnforcementPolicy policy)
+ {
+ final AMQPConnection_1_0<?> connection =
Mockito.mock(AMQPConnection_1_0.class);
+ Mockito.doReturn(id).when(connection).getRemoteContainerId();
+
Mockito.doReturn(policy).when(connection).getSoleConnectionEnforcementPolicy();
+ return connection;
+ }
+
+ static final class Registry implements ConnectionLimiter
+ {
+ private final Set<AMQPConnection<?>> _registered;
+
+ private final Set<AMQPConnection<?>> _connections;
+
+ private final ConnectionLimiter _subLimiter;
+
+ public Registry()
+ {
+ _registered = new HashSet<>();
+ _connections = new HashSet<>();
+ _subLimiter = ConnectionLimiter.noLimits();
+ }
+
+ private Registry(Registry limiter, ConnectionLimiter subLimiter)
+ {
+ _registered = limiter._registered;
+ _connections = limiter._connections;
+ _subLimiter = Objects.requireNonNull(subLimiter);
+ }
+
+ @Override
+ public ConnectionSlot register(final AMQPConnection<?> connection)
+ {
+ final ConnectionSlot slot = _subLimiter.register(connection);
+ _registered.add(connection);
+ _connections.add(connection);
+ return slot.chainTo(() -> _connections.remove(connection));
+ }
+
+ @Override
+ public ConnectionLimiter append(ConnectionLimiter limiter)
+ {
+ return new Registry(this, _subLimiter.append(limiter));
+ }
+
+ public boolean isRegistered(AMQPConnection<?> connection)
+ {
+ return _connections.contains(connection);
+ }
+
+ public boolean hasBeenRegistered(AMQPConnection<?> connection)
+ {
+ return _registered.contains(connection);
+ }
+ }
+}
diff --git
a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java
b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java
index 0e83704..d3fd813 100644
---
a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java
+++
b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java
@@ -68,7 +68,6 @@ import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.txn.DtxNotSupportedException;
import org.apache.qpid.server.txn.DtxRegistry;
import org.apache.qpid.server.util.Action;
-import org.apache.qpid.server.virtualhost.ConnectionEstablishmentPolicy;
import org.apache.qpid.server.virtualhost.LinkRegistryFactory;
import org.apache.qpid.server.virtualhost.LinkRegistryModel;
import org.apache.qpid.server.virtualhost.VirtualHostPropertiesNode;
@@ -195,11 +194,9 @@ public class ManagementAddressSpace implements
NamedAddressSpace
}
@Override
- public boolean registerConnection(final AMQPConnection<?> connection,
- final ConnectionEstablishmentPolicy
connectionEstablishmentPolicy)
+ public void registerConnection(final AMQPConnection<?> connection)
{
_connections.add(connection);
- return true;
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]