This is an automated email from the ASF dual-hosted git repository. robbie pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit 2d1a8661fd58ba26e5353b53799344a3f9b58d58 Author: Justin Bertram <[email protected]> AuthorDate: Fri Jun 30 12:55:31 2023 -0500 ARTEMIS-4338 STOMP inoperable w/resource audit logging When resource audit logging is enabled STOMP is completely inoperable due to an NPE during the protocol handshake. Unfortunately the failure is completely silent. There are no logs to indicate a problem. This commit fixes this problem via the following changes: - Mitigate the original NPE via a check for null - Move the logic necessary to set the "protocol connection" on the "transport connection" to a class shared by all implementations. - Add exception handling to log failures like this in the future. - Add tests to ensure the audit logging is correct. --- .../protocol/core/impl/RemotingConnectionImpl.java | 2 - .../core/protocol/AbstractRemotingConnection.java | 1 + .../broker/ActiveMQProtonRemotingConnection.java | 1 - .../artemis/core/protocol/mqtt/MQTTConnection.java | 1 - .../core/protocol/openwire/OpenWireConnection.java | 1 - .../artemis/core/protocol/ProtocolHandler.java | 9 ++++ .../remoting/server/impl/RemotingServiceImpl.java | 2 +- .../artemis/core/server/ActiveMQServerLogger.java | 2 + .../smoke/logging/AuditLoggerResourceTest.java | 60 ++++++++++++++++++++++ 9 files changed, 73 insertions(+), 6 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java index ea007b6691..bf6788f79c 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java @@ -130,8 +130,6 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement this.nodeID = nodeID; - transportConnection.setProtocolConnection(this); - logger.trace("RemotingConnectionImpl created: {}", this); } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java index 4351f77b05..48199caac4 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java @@ -55,6 +55,7 @@ public abstract class AbstractRemotingConnection implements RemotingConnection { public AbstractRemotingConnection(final Connection transportConnection, final Executor executor) { this.transportConnection = transportConnection; + this.transportConnection.setProtocolConnection(this); this.executor = executor; this.creationTime = System.currentTimeMillis(); } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java index 69740abff7..9ceea15655 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java @@ -46,7 +46,6 @@ public class ActiveMQProtonRemotingConnection extends AbstractRemotingConnection super(transportConnection, connectionExecutor); this.manager = manager; this.amqpConnection = amqpConnection; - transportConnection.setProtocolConnection(this); } public AMQPConnectionContext getAmqpConnection() { diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java index e136255664..06f31122ef 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java @@ -42,7 +42,6 @@ public class MQTTConnection extends AbstractRemotingConnection { public MQTTConnection(Connection transportConnection) throws Exception { super(transportConnection, null); this.destroyed = false; - transportConnection.setProtocolConnection(this); } @Override diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index 3b3b336476..dc35b6b1ea 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -232,7 +232,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se this.outWireFormat = wf.copy(); this.useKeepAlive = openWireProtocolManager.isUseKeepAlive(); this.maxInactivityDuration = openWireProtocolManager.getMaxInactivityDuration(); - this.transportConnection.setProtocolConnection(this); this.actorThresholdBytes = actorThresholdBytes; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java index 8d489c0423..87cd1d2869 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java @@ -243,6 +243,15 @@ public class ProtocolHandler { ctx.flush(); } + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + try { + ActiveMQServerLogger.LOGGER.failureDuringProtocolHandshake(ctx.channel().localAddress(), ctx.channel().remoteAddress(), cause); + } finally { + ctx.close(); + } + } + private boolean isHttp(int magic1, int magic2) { return magic1 == 'G' && magic2 == 'E' || // GET magic1 == 'P' && magic2 == 'O' || // POST diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java index 5ace4fe18d..20d777bd43 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java @@ -582,7 +582,7 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif public void addConnectionEntry(Connection connection, ConnectionEntry entry) { connections.put(connection.getID(), entry); if (AuditLogger.isResourceLoggingEnabled()) { - AuditLogger.createdConnection(connection.getProtocolConnection().getProtocolName(), connection.getID(), connection.getRemoteAddress()); + AuditLogger.createdConnection(connection.getProtocolConnection() == null ? null : connection.getProtocolConnection().getProtocolName(), connection.getID(), connection.getRemoteAddress()); } if (logger.isDebugEnabled()) { logger.debug("Adding connection {}, we now have {}", connection.getID(), connections.size()); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java index bd3d45bfde..aad5fd93f0 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java @@ -1582,4 +1582,6 @@ public interface ActiveMQServerLogger { @LogMessage(id = 224125, value = "Address {} has page-limit-bytes={} and page-limit-messages={} but no page-full-policy set. Page full configuration being ignored on this address", level = LogMessage.Level.WARN) void noPagefullPolicySet(Object address, Object limitBytes, Object limitMessages); + @LogMessage(id = 224126, value = "Failure during protocol handshake on connection to {} from {}", level = LogMessage.Level.ERROR) + void failureDuringProtocolHandshake(SocketAddress localAddress, SocketAddress remoteAddress, Throwable e); } diff --git a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/logging/AuditLoggerResourceTest.java b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/logging/AuditLoggerResourceTest.java index d67c6b91ea..653fbcc837 100644 --- a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/logging/AuditLoggerResourceTest.java +++ b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/logging/AuditLoggerResourceTest.java @@ -16,10 +16,14 @@ */ package org.apache.activemq.artemis.tests.smoke.logging; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Session; import javax.management.MBeanServerConnection; import javax.management.MBeanServerInvocationHandler; import javax.management.openmbean.CompositeData; import javax.management.remote.JMXConnector; +import java.net.URI; import java.util.HashMap; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; @@ -32,6 +36,12 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl; import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder; import org.apache.activemq.artemis.api.core.management.QueueControl; +import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection; +import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.utils.RandomUtil; +import org.fusesource.mqtt.client.BlockingConnection; +import org.fusesource.mqtt.client.MQTT; import org.junit.Test; public class AuditLoggerResourceTest extends AuditLoggerTestBase { @@ -85,4 +95,54 @@ public class AuditLoggerResourceTest extends AuditLoggerTestBase { jmxConnector.close(); } } + + @Test + public void testCoreConnectionAuditLog() throws Exception { + testConnectionAuditLog("CORE"); + } + + @Test + public void testAMQPConnectionAuditLog() throws Exception { + testConnectionAuditLog("AMQP"); + } + + @Test + public void testOpenWireConnectionAuditLog() throws Exception { + testConnectionAuditLog("OPENWIRE"); + } + + private void testConnectionAuditLog(String protocol) throws Exception { + ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616"); + Connection connection = factory.createConnection(); + Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + checkAuditLogRecord(true, "AMQ601767: " + protocol + " connection"); + s.close(); + connection.close(); + checkAuditLogRecord(true, "AMQ601768: " + protocol + " connection"); + } + + @Test + public void testMQTTConnectionAuditLog() throws Exception { + MQTT mqtt = new MQTT(); + mqtt.setConnectAttemptsMax(1); + mqtt.setReconnectAttemptsMax(0); + mqtt.setVersion("3.1.1"); + mqtt.setClientId(RandomUtil.randomString()); + mqtt.setCleanSession(true); + mqtt.setHost("localhost", 1883); + final BlockingConnection connection = mqtt.blockingConnection(); + connection.connect(); + connection.disconnect(); + checkAuditLogRecord(true, "AMQ601767: MQTT connection"); + checkAuditLogRecord(true, "AMQ601768: MQTT connection"); + } + + @Test + public void testStompConnectionAuditLog() throws Exception { + StompClientConnection connection = StompClientConnectionFactory.createClientConnection(new URI("tcp://localhost:61613")); + connection.connect(); + connection.disconnect(); + checkAuditLogRecord(true, "AMQ601767: STOMP connection"); + checkAuditLogRecord(true, "AMQ601768: STOMP connection"); + } }
