This is an automated email from the ASF dual-hosted git repository.

robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 2d1a8661fd58ba26e5353b53799344a3f9b58d58
Author: Justin Bertram <[email protected]>
AuthorDate: Fri Jun 30 12:55:31 2023 -0500

    ARTEMIS-4338 STOMP inoperable w/resource audit logging
    
    When resource audit logging is enabled STOMP is completely inoperable
    due to an NPE during the protocol handshake. Unfortunately the failure
    is completely silent. There are no logs to indicate a problem.
    
    This commit fixes this problem via the following changes:
     - Mitigate the original NPE via a check for null
     - Move the logic necessary to set the "protocol connection" on the
       "transport connection" to a class shared by all implementations.
     - Add exception handling to log failures like this in the future.
     - Add tests to ensure the audit logging is correct.
---
 .../protocol/core/impl/RemotingConnectionImpl.java |  2 -
 .../core/protocol/AbstractRemotingConnection.java  |  1 +
 .../broker/ActiveMQProtonRemotingConnection.java   |  1 -
 .../artemis/core/protocol/mqtt/MQTTConnection.java |  1 -
 .../core/protocol/openwire/OpenWireConnection.java |  1 -
 .../artemis/core/protocol/ProtocolHandler.java     |  9 ++++
 .../remoting/server/impl/RemotingServiceImpl.java  |  2 +-
 .../artemis/core/server/ActiveMQServerLogger.java  |  2 +
 .../smoke/logging/AuditLoggerResourceTest.java     | 60 ++++++++++++++++++++++
 9 files changed, 73 insertions(+), 6 deletions(-)

diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
index ea007b6691..bf6788f79c 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
@@ -130,8 +130,6 @@ public class RemotingConnectionImpl extends 
AbstractRemotingConnection implement
 
       this.nodeID = nodeID;
 
-      transportConnection.setProtocolConnection(this);
-
       logger.trace("RemotingConnectionImpl created: {}", this);
    }
 
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java
index 4351f77b05..48199caac4 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java
@@ -55,6 +55,7 @@ public abstract class AbstractRemotingConnection implements 
RemotingConnection {
 
    public AbstractRemotingConnection(final Connection transportConnection, 
final Executor executor) {
       this.transportConnection = transportConnection;
+      this.transportConnection.setProtocolConnection(this);
       this.executor = executor;
       this.creationTime = System.currentTimeMillis();
    }
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java
index 69740abff7..9ceea15655 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java
@@ -46,7 +46,6 @@ public class ActiveMQProtonRemotingConnection extends 
AbstractRemotingConnection
       super(transportConnection, connectionExecutor);
       this.manager = manager;
       this.amqpConnection = amqpConnection;
-      transportConnection.setProtocolConnection(this);
    }
 
    public AMQPConnectionContext getAmqpConnection() {
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java
index e136255664..06f31122ef 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java
@@ -42,7 +42,6 @@ public class MQTTConnection extends 
AbstractRemotingConnection {
    public MQTTConnection(Connection transportConnection) throws Exception {
       super(transportConnection, null);
       this.destroyed = false;
-      transportConnection.setProtocolConnection(this);
    }
 
    @Override
diff --git 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 3b3b336476..dc35b6b1ea 100644
--- 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -232,7 +232,6 @@ public class OpenWireConnection extends 
AbstractRemotingConnection implements Se
       this.outWireFormat = wf.copy();
       this.useKeepAlive = openWireProtocolManager.isUseKeepAlive();
       this.maxInactivityDuration = 
openWireProtocolManager.getMaxInactivityDuration();
-      this.transportConnection.setProtocolConnection(this);
       this.actorThresholdBytes = actorThresholdBytes;
    }
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java
index 8d489c0423..87cd1d2869 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java
@@ -243,6 +243,15 @@ public class ProtocolHandler {
          ctx.flush();
       }
 
+      @Override
+      public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+         try {
+            
ActiveMQServerLogger.LOGGER.failureDuringProtocolHandshake(ctx.channel().localAddress(),
 ctx.channel().remoteAddress(), cause);
+         } finally {
+            ctx.close();
+         }
+      }
+
       private boolean isHttp(int magic1, int magic2) {
          return magic1 == 'G' && magic2 == 'E' || // GET
             magic1 == 'P' && magic2 == 'O' || // POST
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
index 5ace4fe18d..20d777bd43 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
@@ -582,7 +582,7 @@ public class RemotingServiceImpl implements 
RemotingService, ServerConnectionLif
    public void addConnectionEntry(Connection connection, ConnectionEntry 
entry) {
       connections.put(connection.getID(), entry);
       if (AuditLogger.isResourceLoggingEnabled()) {
-         
AuditLogger.createdConnection(connection.getProtocolConnection().getProtocolName(),
 connection.getID(), connection.getRemoteAddress());
+         AuditLogger.createdConnection(connection.getProtocolConnection() == 
null ? null : connection.getProtocolConnection().getProtocolName(), 
connection.getID(), connection.getRemoteAddress());
       }
       if (logger.isDebugEnabled()) {
          logger.debug("Adding connection {}, we now have {}", 
connection.getID(), connections.size());
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index bd3d45bfde..aad5fd93f0 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -1582,4 +1582,6 @@ public interface ActiveMQServerLogger {
    @LogMessage(id = 224125, value = "Address {} has page-limit-bytes={} and 
page-limit-messages={} but no page-full-policy set. Page full configuration 
being ignored on this address", level = LogMessage.Level.WARN)
    void noPagefullPolicySet(Object address, Object limitBytes, Object 
limitMessages);
 
+   @LogMessage(id = 224126, value = "Failure during protocol handshake on 
connection to {} from {}", level = LogMessage.Level.ERROR)
+   void failureDuringProtocolHandshake(SocketAddress localAddress, 
SocketAddress remoteAddress, Throwable e);
 }
diff --git 
a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/logging/AuditLoggerResourceTest.java
 
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/logging/AuditLoggerResourceTest.java
index d67c6b91ea..653fbcc837 100644
--- 
a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/logging/AuditLoggerResourceTest.java
+++ 
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/logging/AuditLoggerResourceTest.java
@@ -16,10 +16,14 @@
  */
 package org.apache.activemq.artemis.tests.smoke.logging;
 
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Session;
 import javax.management.MBeanServerConnection;
 import javax.management.MBeanServerInvocationHandler;
 import javax.management.openmbean.CompositeData;
 import javax.management.remote.JMXConnector;
+import java.net.URI;
 import java.util.HashMap;
 
 import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
@@ -32,6 +36,12 @@ import 
org.apache.activemq.artemis.api.core.client.ServerLocator;
 import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
 import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
 import org.apache.activemq.artemis.api.core.management.QueueControl;
+import 
org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
+import 
org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.RandomUtil;
+import org.fusesource.mqtt.client.BlockingConnection;
+import org.fusesource.mqtt.client.MQTT;
 import org.junit.Test;
 
 public class AuditLoggerResourceTest extends AuditLoggerTestBase {
@@ -85,4 +95,54 @@ public class AuditLoggerResourceTest extends 
AuditLoggerTestBase {
          jmxConnector.close();
       }
    }
+
+   @Test
+   public void testCoreConnectionAuditLog() throws Exception {
+      testConnectionAuditLog("CORE");
+   }
+
+   @Test
+   public void testAMQPConnectionAuditLog() throws Exception {
+      testConnectionAuditLog("AMQP");
+   }
+
+   @Test
+   public void testOpenWireConnectionAuditLog() throws Exception {
+      testConnectionAuditLog("OPENWIRE");
+   }
+
+   private void testConnectionAuditLog(String protocol) throws Exception {
+      ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:61616");
+      Connection connection = factory.createConnection();
+      Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      checkAuditLogRecord(true, "AMQ601767: " + protocol + " connection");
+      s.close();
+      connection.close();
+      checkAuditLogRecord(true, "AMQ601768: " + protocol + " connection");
+   }
+
+   @Test
+   public void testMQTTConnectionAuditLog() throws Exception {
+      MQTT mqtt = new MQTT();
+      mqtt.setConnectAttemptsMax(1);
+      mqtt.setReconnectAttemptsMax(0);
+      mqtt.setVersion("3.1.1");
+      mqtt.setClientId(RandomUtil.randomString());
+      mqtt.setCleanSession(true);
+      mqtt.setHost("localhost", 1883);
+      final BlockingConnection connection = mqtt.blockingConnection();
+      connection.connect();
+      connection.disconnect();
+      checkAuditLogRecord(true, "AMQ601767: MQTT connection");
+      checkAuditLogRecord(true, "AMQ601768: MQTT connection");
+   }
+
+   @Test
+   public void testStompConnectionAuditLog() throws Exception {
+      StompClientConnection connection = 
StompClientConnectionFactory.createClientConnection(new 
URI("tcp://localhost:61613"));
+      connection.connect();
+      connection.disconnect();
+      checkAuditLogRecord(true, "AMQ601767: STOMP connection");
+      checkAuditLogRecord(true, "AMQ601768: STOMP connection");
+   }
 }

Reply via email to