This is an automated email from the ASF dual-hosted git repository.
jbertram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 12d45767cb ARTEMIS-5163 Artemis fails to send mqtt will message using
mutual TLS
12d45767cb is described below
commit 12d45767cb762f4105e041fe8c3e70595dfcb42e
Author: Jean-Luc Graphalo <[email protected]>
AuthorDate: Mon Oct 6 14:53:37 2025 +0200
ARTEMIS-5163 Artemis fails to send mqtt will message using mutual TLS
The client's certificate is cached in order to successfully authenticate
when sending LWT using mutual TLS.
- move CertificateUtil to artemis-server
- cache X509Certificate in NettyServerConnection
- try to get certificate from channel if not already cached
---
.../amqp/connect/AMQPBrokerConnection.java | 2 +-
.../amqp/sasl/ExternalServerSASLFactory.java | 2 +-
.../remoting/impl/netty/NettyServerConnection.java | 11 +++++
.../core/security/impl/SecurityStoreImpl.java | 2 +-
.../core/server/impl/ServerSessionImpl.java | 2 +-
.../impl/NotificationActiveMQServerPlugin.java | 2 +-
.../core/security/ActiveMQJAASSecurityManager.java | 2 +-
.../core/security/jaas/JaasCallbackHandler.java | 4 +-
.../activemq/artemis/utils}/CertificateUtil.java | 9 ++--
.../ssl/CertificateAuthenticationSslTests.java | 54 ++++++++++++++++++++++
10 files changed, 79 insertions(+), 11 deletions(-)
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
index ae068bace8..17d677bb29 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
@@ -49,7 +49,6 @@ import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFedera
import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
-import org.apache.activemq.artemis.core.remoting.CertificateUtil;
import org.apache.activemq.artemis.core.remoting.CloseListener;
import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
@@ -99,6 +98,7 @@ import
org.apache.activemq.artemis.spi.core.remoting.ClientConnectionLifeCycleLi
import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.security.scram.SCRAM;
+import org.apache.activemq.artemis.utils.CertificateUtil;
import org.apache.activemq.artemis.utils.ConfigurationHelper;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.qpid.proton.amqp.Symbol;
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/ExternalServerSASLFactory.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/ExternalServerSASLFactory.java
index fe506fac28..bedb275392 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/ExternalServerSASLFactory.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/ExternalServerSASLFactory.java
@@ -18,13 +18,13 @@ package org.apache.activemq.artemis.protocol.amqp.sasl;
import java.security.Principal;
-import org.apache.activemq.artemis.core.remoting.CertificateUtil;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.protocol.amqp.broker.AmqpInterceptor;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPRoutingHandler;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
+import org.apache.activemq.artemis.utils.CertificateUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyServerConnection.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyServerConnection.java
index 600f8eca83..70e4e37c66 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyServerConnection.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyServerConnection.java
@@ -16,11 +16,13 @@
*/
package org.apache.activemq.artemis.core.remoting.impl.netty;
+import java.security.cert.X509Certificate;
import java.util.Map;
import io.netty.channel.Channel;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import
org.apache.activemq.artemis.spi.core.remoting.ServerConnectionLifeCycleListener;
+import org.apache.activemq.artemis.utils.CertificateUtil;
import org.apache.activemq.artemis.utils.ProxyProtocolUtil;
public class NettyServerConnection extends NettyConnection {
@@ -29,6 +31,8 @@ public class NettyServerConnection extends NettyConnection {
private final String router;
+ private X509Certificate[] certificates;
+
public NettyServerConnection(Map<String, Object> configuration,
Channel channel,
ServerConnectionLifeCycleListener listener,
@@ -54,6 +58,13 @@ public class NettyServerConnection extends NettyConnection {
return router;
}
+ public X509Certificate[] getPeerCertificates() {
+ if (certificates == null) {
+ certificates = CertificateUtil.getCertsFromChannel(channel);
+ }
+ return certificates;
+ }
+
/**
* {@return a string representation of the remote address of this
connection; if this connection was made via the
* proxy protocol then this will be the original address, not the proxy
address}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/security/impl/SecurityStoreImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/security/impl/SecurityStoreImpl.java
index 75613af39e..a563af7828 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/security/impl/SecurityStoreImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/security/impl/SecurityStoreImpl.java
@@ -36,7 +36,6 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import
org.apache.activemq.artemis.core.management.impl.ManagementRemotingConnection;
-import org.apache.activemq.artemis.core.remoting.CertificateUtil;
import org.apache.activemq.artemis.core.security.CheckType;
import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.security.SecurityAuth;
@@ -58,6 +57,7 @@ import
org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager5;
import
org.apache.activemq.artemis.spi.core.security.jaas.NoCacheLoginException;
import org.apache.activemq.artemis.spi.core.security.jaas.UserPrincipal;
import org.apache.activemq.artemis.utils.ByteUtil;
+import org.apache.activemq.artemis.utils.CertificateUtil;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 1808ba85e2..97ba38bd8d 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -67,7 +67,6 @@ import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
-import org.apache.activemq.artemis.core.remoting.CertificateUtil;
import org.apache.activemq.artemis.core.remoting.CloseListener;
import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.core.security.CheckType;
@@ -104,6 +103,7 @@ import org.apache.activemq.artemis.logs.AuditLogger;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
import org.apache.activemq.artemis.utils.ByteUtil;
+import org.apache.activemq.artemis.utils.CertificateUtil;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.activemq.artemis.utils.JsonLoader;
import org.apache.activemq.artemis.utils.PrefixUtil;
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/NotificationActiveMQServerPlugin.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/NotificationActiveMQServerPlugin.java
index 4d20eb032f..5194c4a259 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/NotificationActiveMQServerPlugin.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/NotificationActiveMQServerPlugin.java
@@ -23,7 +23,6 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
-import org.apache.activemq.artemis.core.remoting.CertificateUtil;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.ServerConsumer;
@@ -32,6 +31,7 @@ import
org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.utils.CertificateUtil;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/security/ActiveMQJAASSecurityManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/security/ActiveMQJAASSecurityManager.java
index 0be7a2ba12..a0aaed7641 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/security/ActiveMQJAASSecurityManager.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/security/ActiveMQJAASSecurityManager.java
@@ -35,7 +35,7 @@ import org.apache.activemq.artemis.utils.SecurityManagerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static
org.apache.activemq.artemis.core.remoting.CertificateUtil.getCertsFromConnection;
+import static
org.apache.activemq.artemis.utils.CertificateUtil.getCertsFromConnection;
/**
* This implementation delegates to the JAAS security interfaces.
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/security/jaas/JaasCallbackHandler.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/security/jaas/JaasCallbackHandler.java
index 54bd272933..0110aa60a2 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/security/jaas/JaasCallbackHandler.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/security/jaas/JaasCallbackHandler.java
@@ -16,8 +16,8 @@
*/
package org.apache.activemq.artemis.spi.core.security.jaas;
-import static
org.apache.activemq.artemis.core.remoting.CertificateUtil.getCertsFromConnection;
-import static
org.apache.activemq.artemis.core.remoting.CertificateUtil.getPeerPrincipalFromConnection;
+import static
org.apache.activemq.artemis.utils.CertificateUtil.getCertsFromConnection;
+import static
org.apache.activemq.artemis.utils.CertificateUtil.getPeerPrincipalFromConnection;
import java.io.IOException;
import java.security.Principal;
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/CertificateUtil.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/utils/CertificateUtil.java
similarity index 92%
rename from
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/CertificateUtil.java
rename to
artemis-server/src/main/java/org/apache/activemq/artemis/utils/CertificateUtil.java
index a5afc37585..6e6be8aa08 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/CertificateUtil.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/utils/CertificateUtil.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.artemis.core.remoting;
+package org.apache.activemq.artemis.utils;
import javax.net.ssl.SSLPeerUnverifiedException;
import java.io.ByteArrayInputStream;
@@ -27,6 +27,7 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.handler.ssl.SslHandler;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
+import
org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.slf4j.Logger;
@@ -52,7 +53,9 @@ public class CertificateUtil {
X509Certificate[] certificates = null;
if (remotingConnection != null) {
Connection transportConnection =
remotingConnection.getTransportConnection();
- if (transportConnection instanceof NettyConnection nettyConnection) {
+ if (transportConnection instanceof NettyServerConnection
nettyServerConnection) {
+ certificates = nettyServerConnection.getPeerCertificates();
+ } else if (transportConnection instanceof NettyConnection
nettyConnection) {
certificates = getCertsFromChannel(nettyConnection.getChannel());
}
}
@@ -87,7 +90,7 @@ public class CertificateUtil {
return result;
}
- private static X509Certificate[] getCertsFromChannel(Channel channel) {
+ public static X509Certificate[] getCertsFromChannel(Channel channel) {
Certificate[] plainCerts = null;
ChannelHandler channelHandler = channel.pipeline().get("ssl");
if (channelHandler != null && channelHandler instanceof SslHandler
sslHandler) {
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/ssl/CertificateAuthenticationSslTests.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/ssl/CertificateAuthenticationSslTests.java
index 81f99b1e73..7c268ee1d5 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/ssl/CertificateAuthenticationSslTests.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/ssl/CertificateAuthenticationSslTests.java
@@ -35,7 +35,10 @@ import
org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.Wait;
import org.eclipse.paho.mqttv5.client.MqttClient;
+import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
+import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
+import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -120,4 +123,55 @@ public class CertificateAuthenticationSslTests extends
MQTT5TestSupport {
producer.publish(topic, body, 1, false);
assertTrue(latch.await(500, TimeUnit.MILLISECONDS));
}
+
+ // Send will message using mutual TLS with certificate-based authentication
+ @TestTemplate
+ @Timeout(DEFAULT_TIMEOUT_SEC)
+ void testSendWillMessage() throws Exception {
+ final String willSenderId = RandomUtil.randomUUIDString();
+ final String willTopic = RandomUtil.randomUUIDString();
+ final byte[] willBody = RandomUtil.randomBytes(32);
+
+ CountDownLatch latch = new CountDownLatch(1);
+ MqttClient willSender = createConnectedWillSender(willSenderId,
willTopic, willBody);
+ MqttClient willConsumer = createConnectedWillConsumer(latch, willTopic,
willBody);
+
+ if (protocol.equals(WSS)) {
+ willSender.disconnectForcibly(0, 0, false);
+ } else {
+ // for some reason disconnectForcibly doesn't work in this case so we
trick the broker into sending the LWT
+ getSessionStates().get(willSenderId).setFailed(true);
+ getSessionStates().get(willSenderId).setAttached(false);
+ }
+ assertTrue(latch.await(3, TimeUnit.SECONDS));
+ willConsumer.disconnect();
+ }
+
+ private MqttClient createConnectedWillSender(String clientId, String topic,
byte[] body) throws MqttException {
+ MqttClient willSender = createPahoClient(protocol, clientId);
+ MqttConnectionOptions options = getSslMqttConnectOptions();
+ options.setSessionExpiryInterval(5L);
+ options.setWill(topic, new MqttMessage(body));
+ MqttProperties willMessageProperties = new MqttProperties();
+ willMessageProperties.setWillDelayInterval(1L);
+ options.setWillMessageProperties(willMessageProperties);
+ willSender.connect(options);
+ return willSender;
+ }
+
+ private MqttClient createConnectedWillConsumer(CountDownLatch latch,
+ String topic,
+ byte[] body) throws
MqttException {
+ MqttClient willConsumer = createPahoClient(protocol,
RandomUtil.randomUUIDString());
+ willConsumer.connect(getSslMqttConnectOptions());
+ willConsumer.setCallback(new DefaultMqttCallback() {
+ @Override
+ public void messageArrived(String topic, MqttMessage message) {
+ assertEqualsByteArrays(body, message.getPayload());
+ latch.countDown();
+ }
+ });
+ willConsumer.subscribe(topic, AT_LEAST_ONCE);
+ return willConsumer;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact