This is an automated email from the ASF dual-hosted git repository.
vavrtom pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
The following commit(s) were added to refs/heads/main by this push:
new d7fc06b QPID-8573: [Broker-J] Logging enhancement of Sole Connection
Enforcement Policy events
d7fc06b is described below
commit d7fc06b67ff8e417e228e81932fe9960a304cf3b
Author: Marek Laca <[email protected]>
AuthorDate: Tue Feb 22 13:58:16 2022 +0100
QPID-8573: [Broker-J] Logging enhancement of Sole Connection Enforcement
Policy events
This closes #114
---
.../logging/messages/ResourceLimitMessages.java | 62 ++++++++++++++++++++++
.../messages/ResourceLimit_logmessages.properties | 1 +
.../protocol/v1_0/AMQPConnection_1_0Impl.java | 11 ++++
.../SoleConnectionEnforcementPolicyException.java | 19 +++++--
.../StrongConnectionEstablishmentLimiter.java | 2 +-
.../protocol/v1_0/ProtocolEngine_1_0_0Test.java | 2 +-
.../StrongConnectionEstablishmentLimiterTest.java | 8 +--
7 files changed, 96 insertions(+), 9 deletions(-)
diff --git
a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/ResourceLimitMessages.java
b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/ResourceLimitMessages.java
index 743db3e..5d50add 100644
---
a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/ResourceLimitMessages.java
+++
b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/ResourceLimitMessages.java
@@ -64,12 +64,14 @@ public class ResourceLimitMessages
public static final String RESOURCELIMIT_LOG_HIERARCHY =
DEFAULT_LOG_HIERARCHY_PREFIX + "resourcelimit";
public static final String ACCEPTED_LOG_HIERARCHY =
DEFAULT_LOG_HIERARCHY_PREFIX + "resourcelimit.accepted";
+ public static final String INFO_LOG_HIERARCHY =
DEFAULT_LOG_HIERARCHY_PREFIX + "resourcelimit.info";
public static final String REJECTED_LOG_HIERARCHY =
DEFAULT_LOG_HIERARCHY_PREFIX + "resourcelimit.rejected";
static
{
LoggerFactory.getLogger(RESOURCELIMIT_LOG_HIERARCHY);
LoggerFactory.getLogger(ACCEPTED_LOG_HIERARCHY);
+ LoggerFactory.getLogger(INFO_LOG_HIERARCHY);
LoggerFactory.getLogger(REJECTED_LOG_HIERARCHY);
_messages =
ResourceBundle.getBundle("org.apache.qpid.server.logging.messages.ResourceLimit_logmessages",
_currentLocale);
@@ -137,6 +139,66 @@ public class ResourceLimitMessages
/**
* Log a ResourceLimit message of the Format:
+ * <pre>RL-1003 : Info : {0} : {1}</pre>
+ * Optional values are contained in [square brackets] and are numbered
+ * sequentially in the method call.
+ *
+ */
+ public static LogMessage INFO(String param1, String param2)
+ {
+ String rawMessage = _messages.getString("INFO");
+
+ final Object[] messageArguments = {param1, param2};
+ // Create a new MessageFormat to ensure thread safety.
+ // Sharing a MessageFormat and using applyPattern is not thread safe
+ MessageFormat formatter = new MessageFormat(rawMessage,
_currentLocale);
+
+ final String message = formatter.format(messageArguments);
+
+ return new LogMessage()
+ {
+ @Override
+ public String toString()
+ {
+ return message;
+ }
+
+ @Override
+ public String getLogHierarchy()
+ {
+ return INFO_LOG_HIERARCHY;
+ }
+
+ @Override
+ public boolean equals(final Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ final LogMessage that = (LogMessage) o;
+
+ return getLogHierarchy().equals(that.getLogHierarchy()) &&
toString().equals(that.toString());
+
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = toString().hashCode();
+ result = 31 * result + getLogHierarchy().hashCode();
+ return result;
+ }
+ };
+ }
+
+ /**
+ * Log a ResourceLimit message of the Format:
* <pre>RL-1002 : Rejected : {0} {1} by {2} : {3}</pre>
* Optional values are contained in [square brackets] and are numbered
* sequentially in the method call.
diff --git
a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/ResourceLimit_logmessages.properties
b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/ResourceLimit_logmessages.properties
index a78feed..87b5f74 100644
---
a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/ResourceLimit_logmessages.properties
+++
b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/ResourceLimit_logmessages.properties
@@ -20,3 +20,4 @@
# User Resource Limit logging message i18n strings.
ACCEPTED = RL-1001 : Accepted : {0} {1} by {2} : {3}
REJECTED = RL-1002 : Rejected : {0} {1} by {2} : {3}
+INFO = RL-1003 : Info : {0} : {1}
diff --git
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
index f2d7262..ea5e38d 100644
---
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
+++
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
@@ -53,6 +53,9 @@ import com.google.common.collect.Iterators;
import com.google.common.collect.PeekingIterator;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
+
+import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.logging.messages.ResourceLimitMessages;
import org.apache.qpid.server.security.limit.ConnectionLimitException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -973,11 +976,14 @@ public class AMQPConnection_1_0Impl extends
AbstractAMQPConnection<AMQPConnectio
}
if (e.getPolicy() == SoleConnectionEnforcementPolicy.REFUSE_CONNECTION)
{
+ LOGGER.debug("Closing newly open connection: {}", e.getMessage());
_properties.put(Symbol.valueOf("amqp:connection-establishment-failed"), true);
final Error error = new Error(AmqpError.INVALID_FIELD,
String.format("Connection closed due to
sole-connection-enforcement-policy '%s'", e.getPolicy()));
error.setInfo(Collections.singletonMap(Symbol.valueOf("invalid-field"),
Symbol.valueOf("container-id")));
closeConnection(error);
+ getEventLogger().message(ResourceLimitMessages.REJECTED(
+ "Opening", "connection", String.format("container '%s'",
e.getContainerID()), e.getMessage()));
}
else if (e.getPolicy() ==
SoleConnectionEnforcementPolicy.CLOSE_EXISTING)
{
@@ -985,12 +991,17 @@ public class AMQPConnection_1_0Impl extends
AbstractAMQPConnection<AMQPConnectio
String.format("Connection closed due to
sole-connection-enforcement-policy '%s'", e.getPolicy()));
error.setInfo(Collections.singletonMap(Symbol.valueOf("sole-connection-enforcement"),
true));
+ final EventLogger logger = getEventLogger();
final List<ListenableFuture<Void>> rescheduleFutures = new
ArrayList<>();
for (final AMQPConnection_1_0<?> connection :
e.getExistingConnections())
{
if (!connection.isClosing())
{
+ LOGGER.debug("Closing existing connection '{}': {}",
+ connection.getName(), e.getMessage());
rescheduleFutures.add(connection.doOnIOThreadAsync(() ->
connection.close(error)));
+ logger.message(ResourceLimitMessages.INFO(
+ String.format("Closing existing connection '%s'",
connection.getName()), e.getMessage()));
}
}
doAfter(allAsList(rescheduleFutures), () -> doOnIOThreadAsync(()
-> receiveOpenInternal(addressSpace)));
diff --git
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/SoleConnectionEnforcementPolicyException.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/SoleConnectionEnforcementPolicyException.java
index 8f49b7b..35da2f2 100644
---
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/SoleConnectionEnforcementPolicyException.java
+++
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/SoleConnectionEnforcementPolicyException.java
@@ -21,6 +21,7 @@ package
org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
+import java.util.Objects;
import java.util.Set;
import org.apache.qpid.server.protocol.v1_0.AMQPConnection_1_0;
@@ -28,16 +29,23 @@ import
org.apache.qpid.server.security.limit.ConnectionLimitException;
public class SoleConnectionEnforcementPolicyException extends
ConnectionLimitException
{
+ private static final String MESSAGE =
+ "Single connection with container ID '%s' is required due to sole
connection enforcement policy '%s'";
+
private final Set<AMQPConnection_1_0<?>> _existingConnections;
private final SoleConnectionEnforcementPolicy _policy;
+ private final String _containerID;
+
public
SoleConnectionEnforcementPolicyException(SoleConnectionEnforcementPolicy policy,
- Collection<? extends
AMQPConnection_1_0<?>> connections)
+ Collection<? extends
AMQPConnection_1_0<?>> connections,
+ String containerID)
{
- super(String.format("Single connection is required due to
sole-connection-enforcement-policy '%s'", policy));
- _policy = policy;
+ super(String.format(MESSAGE, containerID, policy));
+ _policy = Objects.requireNonNull(policy);
_existingConnections = new HashSet<>(connections);
+ _containerID = Objects.requireNonNull(containerID);
}
public SoleConnectionEnforcementPolicy getPolicy()
@@ -49,4 +57,9 @@ public class SoleConnectionEnforcementPolicyException extends
ConnectionLimitExc
{
return Collections.unmodifiableSet(_existingConnections);
}
+
+ public String getContainerID()
+ {
+ return _containerID;
+ }
}
diff --git
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/StrongConnectionEstablishmentLimiter.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/StrongConnectionEstablishmentLimiter.java
index c0ff83b..cf56520 100644
---
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/StrongConnectionEstablishmentLimiter.java
+++
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/StrongConnectionEstablishmentLimiter.java
@@ -163,7 +163,7 @@ public class StrongConnectionEstablishmentLimiter
implements ConnectionLimiterSe
if (soleConnectionPolicy != null && !_connections.isEmpty())
{
LOGGER.debug("Single connection is required, sole connection
policy: {}", soleConnectionPolicy);
- throw new
SoleConnectionEnforcementPolicyException(soleConnectionPolicy, _connections);
+ throw new
SoleConnectionEnforcementPolicyException(soleConnectionPolicy, _connections,
_containerId);
}
final ConnectionSlot underlyingSlot =
_underlyingLimiter.register(connection);
diff --git
a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java
b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java
index 962fabb..8936168 100644
---
a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java
+++
b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java
@@ -127,7 +127,7 @@ public class ProtocolEngine_1_0_0Test extends UnitTestBase
public Object answer(final InvocationOnMock invocation) throws
Throwable
{
_connection = connectionCaptor.getValue();
- throw new SoleConnectionEnforcementPolicyException(null,
Collections.emptySet());
+ throw new SoleConnectionEnforcementPolicyException(null,
Collections.emptySet(), "abc1");
}
}).when(_virtualHost).registerConnection(connectionCaptor.capture());
when(_virtualHost.getPrincipal()).thenReturn(mock(VirtualHostPrincipal.class));
diff --git
a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/StrongConnectionEstablishmentLimiterTest.java
b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/StrongConnectionEstablishmentLimiterTest.java
index fdbd0b4..53831bc 100644
---
a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/StrongConnectionEstablishmentLimiterTest.java
+++
b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/StrongConnectionEstablishmentLimiterTest.java
@@ -104,7 +104,7 @@ public class StrongConnectionEstablishmentLimiterTest
extends UnitTestBase
catch (SoleConnectionEnforcementPolicyException e)
{
assertEquals(
- "Single connection is required due to
sole-connection-enforcement-policy 'refuse-connection'",
+ "Single connection with container ID 'C' is required due
to sole connection enforcement policy 'refuse-connection'",
e.getMessage());
assertEquals(2, e.getExistingConnections().size());
@@ -138,7 +138,7 @@ public class StrongConnectionEstablishmentLimiterTest
extends UnitTestBase
catch (SoleConnectionEnforcementPolicyException e)
{
assertEquals(
- "Single connection is required due to
sole-connection-enforcement-policy 'close-existing'",
+ "Single connection with container ID 'C' is required due
to sole connection enforcement policy 'close-existing'",
e.getMessage());
assertEquals(1, e.getExistingConnections().size());
@@ -216,7 +216,7 @@ public class StrongConnectionEstablishmentLimiterTest
extends UnitTestBase
catch (SoleConnectionEnforcementPolicyException e)
{
assertEquals(
- "Single connection is required due to
sole-connection-enforcement-policy 'close-existing'",
+ "Single connection with container ID 'C' is required due
to sole connection enforcement policy 'close-existing'",
e.getMessage());
assertEquals(1, e.getExistingConnections().size());
@@ -255,7 +255,7 @@ public class StrongConnectionEstablishmentLimiterTest
extends UnitTestBase
catch (SoleConnectionEnforcementPolicyException e)
{
assertEquals(
- "Single connection is required due to
sole-connection-enforcement-policy 'refuse-connection'",
+ "Single connection with container ID 'C' is required due
to sole connection enforcement policy 'refuse-connection'",
e.getMessage());
assertEquals(1, e.getExistingConnections().size());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]