This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new e1e79f6d7f ARTEMIS-3915 Support PROXY Protocol
e1e79f6d7f is described below

commit e1e79f6d7f68590b38636270d8850de9b87350e7
Author: Justin Bertram <[email protected]>
AuthorDate: Fri Sep 5 20:35:45 2025 -0500

    ARTEMIS-3915 Support PROXY Protocol
    
    This commit implements support the PROXY protocol so that the broker
    will be able to determine a client's original IP address despite the
    client's connection coming through a reverse proxy (e.g. HAProxy,
    ngingx, etc.). Changes include:
    
     - A new Netty handler to extract relevant details out of PROXY Protocol
       messages and make them available to the broker
     - A new Netty handler to enforce the acceptor's PROXY Protocol config
     - A new chapter in the user manual
     - Updated logging to use this new data
     - Expose this new data via management
     - Disambiguate some variables names related to SOCKS proxy support
     - Embedded Netty-based PROXY server implementation for testing
---
 artemis-commons/pom.xml                            |   4 +
 .../activemq/artemis/utils/ProxyProtocolUtil.java  |  60 +++
 .../activemq/artemis/utils/SocketAddressUtil.java  |  37 ++
 .../core/remoting/impl/netty/NettyConnection.java  |  18 +-
 .../core/remoting/impl/netty/NettyConnector.java   |  48 +-
 .../remoting/impl/netty/TransportConstants.java    |  47 +-
 artemis-features/src/main/resources/features.xml   |   1 +
 artemis-pom/pom.xml                                |   6 +
 artemis-server/pom.xml                             |   4 +
 .../core/management/impl/view/ConnectionField.java |   4 +-
 .../core/management/impl/view/ConnectionView.java  |   9 +-
 .../view/predicate/ConnectionFilterPredicate.java  |   3 +
 .../artemis/core/protocol/ProtocolHandler.java     |  23 +-
 .../impl/netty/HAProxyMessageEnforcer.java         |  61 +++
 .../remoting/impl/netty/HAProxyMessageHandler.java |  52 +++
 .../core/remoting/impl/netty/NettyAcceptor.java    |  21 +-
 .../remoting/impl/netty/NettyServerConnection.java |  42 ++
 .../artemis/core/server/ActiveMQServerLogger.java  |   9 +-
 .../core/server/impl/ServerConsumerImpl.java       |   2 +-
 docs/user-manual/_book.adoc                        |   1 +
 docs/user-manual/index.adoc                        |   1 +
 docs/user-manual/proxy-protocol.adoc               |  59 +++
 docs/user-manual/versions.adoc                     |   3 +
 tests/artemis-test-support/pom.xml                 |   5 +
 .../transport/netty/NettyHAProxyServer.java        | 496 +++++++++++++++++++++
 .../MultiprotocolJMSClientTestSupport.java         |  42 +-
 .../integration/proxyprotocol/HAProxyTest.java     | 411 +++++++++++++++++
 .../core/remoting/impl/netty/SocksProxyTest.java   |  16 +-
 28 files changed, 1392 insertions(+), 93 deletions(-)

diff --git a/artemis-commons/pom.xml b/artemis-commons/pom.xml
index 43b37b05e2..f48cce1c61 100644
--- a/artemis-commons/pom.xml
+++ b/artemis-commons/pom.xml
@@ -79,6 +79,10 @@
          <groupId>io.netty</groupId>
          <artifactId>netty-common</artifactId>
       </dependency>
+      <dependency>
+         <groupId>io.netty</groupId>
+         <artifactId>netty-transport</artifactId>
+      </dependency>
       <dependency>
          <groupId>commons-beanutils</groupId>
          <artifactId>commons-beanutils</artifactId>
diff --git 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ProxyProtocolUtil.java
 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ProxyProtocolUtil.java
new file mode 100644
index 0000000000..221b2ad4d2
--- /dev/null
+++ 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ProxyProtocolUtil.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils;
+
+import io.netty.channel.Channel;
+import io.netty.util.AttributeKey;
+
+public class ProxyProtocolUtil {
+   public static final AttributeKey<String> PROXY_PROTOCOL_SOURCE_ADDRESS = 
AttributeKey.valueOf("proxyProtocolSourceAddress");
+   public static final AttributeKey<String> PROXY_PROTOCOL_DESTINATION_ADDRESS 
= AttributeKey.valueOf("proxyProtocolDestinationAddress");
+   public static final AttributeKey<String> PROXY_PROTOCOL_VERSION = 
AttributeKey.valueOf("proxyProtocolVersion");
+
+   /**
+    * {@return a string representation of the remote address of this Channel 
taking into account whether the PROXY
+    * protocol was used by the remote client}
+    */
+   public static String getRemoteAddress(Channel channel) {
+      String addressFromAttribute = 
getAddressFromAttribute(PROXY_PROTOCOL_SOURCE_ADDRESS, channel);
+      if (addressFromAttribute != null) {
+         return addressFromAttribute;
+      } else {
+         return SocketAddressUtil.toString(channel.remoteAddress());
+      }
+   }
+
+   /**
+    * {@return a string representation of the proxy address of this Channel if 
and only if the PROXY protocol was used
+    * by the remote client; otherwise null}
+    */
+   public static String getProxyAddress(Channel channel) {
+      return getAddressFromAttribute(PROXY_PROTOCOL_DESTINATION_ADDRESS, 
channel);
+   }
+
+   private static String getAddressFromAttribute(AttributeKey<String> 
addressKey, Channel channel) {
+      String address = channel.attr(addressKey).get();
+      if (address != null && !address.isEmpty()) {
+         return address;
+      } else {
+         return null;
+      }
+   }
+
+   public static String getProxyProtocolVersion(Channel channel) {
+      return channel.attr(PROXY_PROTOCOL_VERSION).get() == null ? null : 
channel.attr(PROXY_PROTOCOL_VERSION).get().toString();
+   }
+}
diff --git 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SocketAddressUtil.java
 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SocketAddressUtil.java
new file mode 100644
index 0000000000..677bd05448
--- /dev/null
+++ 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SocketAddressUtil.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils;
+
+import java.net.SocketAddress;
+
+public class SocketAddressUtil {
+
+   /**
+    * {@return a string representation of the {@code SocketAddress} with the 
leading "/" removed (if applicable)}
+    */
+   public static String toString(SocketAddress address) {
+      if (address == null) {
+         return null;
+      }
+      String result = address.toString();
+      if (result.startsWith("/")) {
+         return result.substring(1);
+      } else {
+         return result;
+      }
+   }
+}
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
index 9572c666f2..b952d87bbb 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
@@ -46,6 +46,7 @@ import 
org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
 import org.apache.activemq.artemis.utils.ConfigurationHelper;
 import org.apache.activemq.artemis.utils.Env;
 import org.apache.activemq.artemis.utils.IPV6Util;
+import org.apache.activemq.artemis.utils.SocketAddressUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -387,17 +388,8 @@ public class NettyConnection implements Connection {
    }
 
    @Override
-   public final String getRemoteAddress() {
-      SocketAddress address = channel.remoteAddress();
-      if (address == null) {
-         return null;
-      }
-      String result = address.toString();
-      if (result.startsWith("/")) {
-         return result.substring(1);
-      } else {
-         return result;
-      }
+   public String getRemoteAddress() {
+      return SocketAddressUtil.toString(channel.remoteAddress());
    }
 
    @Override
@@ -477,7 +469,7 @@ public class NettyConnection implements Connection {
 
    @Override
    public final String toString() {
-      return super.toString() + "[ID=" + getID() + ", local= " + 
channel.localAddress() + ", remote=" + channel.remoteAddress() + "]";
+      return super.toString() + "[ID=" + getID() + ", local= " + 
channel.localAddress() + ", remote=" + getRemoteAddress() + "]";
    }
 
    private void closeChannel(final Channel channel, boolean inEventLoop) {
@@ -489,4 +481,4 @@ public class NettyConnection implements Connection {
       }
    }
 
-}
+}
\ No newline at end of file
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
index cb7ef817a4..898dbb9ae7 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
@@ -211,19 +211,19 @@ public class NettyConnector extends AbstractConnector {
    // will be handled by the server's http server.
    private boolean httpUpgradeEnabled;
 
-   private boolean proxyEnabled;
+   private boolean socksEnabled;
 
-   private String proxyHost;
+   private String socksHost;
 
-   private int proxyPort;
+   private int socksPort;
 
-   private SocksVersion proxyVersion;
+   private SocksVersion socksVersion;
 
-   private String proxyUsername;
+   private String socksUsername;
 
-   private String proxyPassword;
+   private String socksPassword;
 
-   private boolean proxyRemoteDNS;
+   private boolean socksRemoteDNS;
 
    private boolean useServlet;
 
@@ -382,18 +382,18 @@ public class NettyConnector extends AbstractConnector {
 
       httpUpgradeEnabled = 
ConfigurationHelper.getBooleanProperty(TransportConstants.HTTP_UPGRADE_ENABLED_PROP_NAME,
 TransportConstants.DEFAULT_HTTP_UPGRADE_ENABLED, configuration);
 
-      proxyEnabled = 
ConfigurationHelper.getBooleanProperty(TransportConstants.PROXY_ENABLED_PROP_NAME,
 TransportConstants.DEFAULT_PROXY_ENABLED, configuration);
-      if (proxyEnabled) {
-         proxyHost = 
ConfigurationHelper.getStringProperty(TransportConstants.PROXY_HOST_PROP_NAME, 
TransportConstants.DEFAULT_PROXY_HOST, configuration);
-         proxyPort = 
ConfigurationHelper.getIntProperty(TransportConstants.PROXY_PORT_PROP_NAME, 
TransportConstants.DEFAULT_PROXY_PORT, configuration);
+      socksEnabled = 
ConfigurationHelper.getBooleanProperty(TransportConstants.SOCKS_ENABLED_PROP_NAME,
 TransportConstants.DEFAULT_SOCKS_ENABLED, configuration);
+      if (socksEnabled) {
+         socksHost = 
ConfigurationHelper.getStringProperty(TransportConstants.SOCKS_HOST_PROP_NAME, 
TransportConstants.DEFAULT_SOCKS_HOST, configuration);
+         socksPort = 
ConfigurationHelper.getIntProperty(TransportConstants.SOCKS_PORT_PROP_NAME, 
TransportConstants.DEFAULT_SOCKS_PORT, configuration);
 
-         int socksVersionNumber = 
ConfigurationHelper.getIntProperty(TransportConstants.PROXY_VERSION_PROP_NAME, 
TransportConstants.DEFAULT_PROXY_VERSION, configuration);
-         proxyVersion = SocksVersion.valueOf((byte) socksVersionNumber);
+         int socksVersionNumber = 
ConfigurationHelper.getIntProperty(TransportConstants.SOCKS_VERSION_PROP_NAME, 
TransportConstants.DEFAULT_SOCKS_VERSION, configuration);
+         socksVersion = SocksVersion.valueOf((byte) socksVersionNumber);
 
-         proxyUsername = 
ConfigurationHelper.getStringProperty(TransportConstants.PROXY_USERNAME_PROP_NAME,
 TransportConstants.DEFAULT_PROXY_USERNAME, configuration);
-         proxyPassword = 
ConfigurationHelper.getStringProperty(TransportConstants.PROXY_PASSWORD_PROP_NAME,
 TransportConstants.DEFAULT_PROXY_PASSWORD, configuration);
+         socksUsername = 
ConfigurationHelper.getStringProperty(TransportConstants.SOCKS_USERNAME_PROP_NAME,
 TransportConstants.DEFAULT_SOCKS_USERNAME, configuration);
+         socksPassword = 
ConfigurationHelper.getStringProperty(TransportConstants.SOCKS_PASSWORD_PROP_NAME,
 TransportConstants.DEFAULT_SOCKS_PASSWORD, configuration);
 
-         proxyRemoteDNS = 
ConfigurationHelper.getBooleanProperty(TransportConstants.PROXY_REMOTE_DNS_PROP_NAME,
 TransportConstants.DEFAULT_PROXY_REMOTE_DNS, configuration);
+         socksRemoteDNS = 
ConfigurationHelper.getBooleanProperty(TransportConstants.SOCKS_REMOTE_DNS_PROP_NAME,
 TransportConstants.SOCKS_PROXY_REMOTE_DNS, configuration);
       }
 
       remotingThreads = 
ConfigurationHelper.getIntProperty(TransportConstants.NIO_REMOTING_THREADS_PROPNAME,
 -1, configuration);
@@ -656,19 +656,19 @@ public class NettyConnector extends AbstractConnector {
          public void initChannel(Channel channel) throws Exception {
             final ChannelPipeline pipeline = channel.pipeline();
 
-            if (proxyEnabled && (proxyRemoteDNS || !isTargetLocalHost())) {
-               InetSocketAddress proxyAddress = new 
InetSocketAddress(proxyHost, proxyPort);
-               ProxyHandler proxyHandler = switch (proxyVersion) {
-                  case SOCKS5 -> new Socks5ProxyHandler(proxyAddress, 
proxyUsername, proxyPassword);
-                  case SOCKS4a -> new Socks4ProxyHandler(proxyAddress, 
proxyUsername);
+            if (socksEnabled && (socksRemoteDNS || !isTargetLocalHost())) {
+               InetSocketAddress proxyAddress = new 
InetSocketAddress(socksHost, socksPort);
+               ProxyHandler proxyHandler = switch (socksVersion) {
+                  case SOCKS5 -> new Socks5ProxyHandler(proxyAddress, 
socksUsername, socksPassword);
+                  case SOCKS4a -> new Socks4ProxyHandler(proxyAddress, 
socksUsername);
                   default -> throw new IllegalArgumentException("Unknown SOCKS 
proxy version");
                };
 
                channel.pipeline().addLast(proxyHandler);
 
-               logger.debug("Using a SOCKS proxy at {}:{}", proxyHost, 
proxyPort);
+               logger.debug("Using a SOCKS proxy at {}:{}", socksHost, 
socksPort);
 
-               if (proxyRemoteDNS) {
+               if (socksRemoteDNS) {
                   bootstrap.resolver(NoopAddressResolverGroup.INSTANCE);
                }
             }
@@ -876,7 +876,7 @@ public class NettyConnector extends AbstractConnector {
 
    public NettyConnection createConnection(Consumer<ChannelFuture> onConnect, 
String host, int port) {
       InetSocketAddress remoteDestination;
-      if (proxyEnabled && proxyRemoteDNS) {
+      if (socksEnabled && socksRemoteDNS) {
          remoteDestination = 
InetSocketAddress.createUnresolved(IPV6Util.stripBracketsAndZoneID(host), port);
       } else {
          remoteDestination = new 
InetSocketAddress(IPV6Util.stripBracketsAndZoneID(host), port);
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
index e7464975df..641d1448e9 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
@@ -36,6 +36,8 @@ public class TransportConstants {
 
    public static final String SSL_ENABLED_PROP_NAME = "sslEnabled";
 
+   public static final String PROXY_PROTOCOL_ENABLED_PROP_NAME = 
"proxyProtocolEnabled";
+
    public static final String SSL_AUTO_RELOAD_PROP_NAME = "sslAutoReload";
 
    public static final boolean DEFAULT_SSL_AUTO_RELOAD = false;
@@ -186,19 +188,19 @@ public class TransportConstants {
 
    public static final int STOMP_DEFAULT_CONSUMER_WINDOW_SIZE = 10 * 1024; // 
10K
 
-   public static final String PROXY_ENABLED_PROP_NAME = "socksEnabled";
+   public static final String SOCKS_ENABLED_PROP_NAME = "socksEnabled";
 
-   public static final String PROXY_HOST_PROP_NAME = "socksHost";
+   public static final String SOCKS_HOST_PROP_NAME = "socksHost";
 
-   public static final String PROXY_PORT_PROP_NAME = "socksPort";
+   public static final String SOCKS_PORT_PROP_NAME = "socksPort";
 
-   public static final String PROXY_VERSION_PROP_NAME = "socksVersion";
+   public static final String SOCKS_VERSION_PROP_NAME = "socksVersion";
 
-   public static final String PROXY_USERNAME_PROP_NAME = "socksUsername";
+   public static final String SOCKS_USERNAME_PROP_NAME = "socksUsername";
 
-   public static final String PROXY_PASSWORD_PROP_NAME = "socksPassword";
+   public static final String SOCKS_PASSWORD_PROP_NAME = "socksPassword";
 
-   public static final String PROXY_REMOTE_DNS_PROP_NAME = "socksRemoteDNS";
+   public static final String SOCKS_REMOTE_DNS_PROP_NAME = "socksRemoteDNS";
 
    public static final String AUTO_START = "autoStart";
 
@@ -206,6 +208,8 @@ public class TransportConstants {
 
    public static final boolean DEFAULT_SSL_ENABLED = false;
 
+   public static final boolean DEFAULT_PROXY_PROTOCOL_ENABLED = false;
+
    public static final String DEFAULT_SNIHOST_CONFIG = null;
 
    public static final boolean DEFAULT_USE_GLOBAL_WORKER_POOL = true;
@@ -381,19 +385,19 @@ public class TransportConstants {
     */
    public static final int DEFAULT_SHUTDOWN_TIMEOUT = 
parseDefaultVariable("DEFAULT_SHUTDOWN_TIMEOUT", 3_000);
 
-   public static final boolean DEFAULT_PROXY_ENABLED = false;
+   public static final boolean DEFAULT_SOCKS_ENABLED = false;
 
-   public static final String DEFAULT_PROXY_HOST = null;
+   public static final String DEFAULT_SOCKS_HOST = null;
 
-   public static final int DEFAULT_PROXY_PORT = 0;
+   public static final int DEFAULT_SOCKS_PORT = 0;
 
-   public static final byte DEFAULT_PROXY_VERSION = 
SocksVersion.SOCKS5.byteValue();
+   public static final byte DEFAULT_SOCKS_VERSION = 
SocksVersion.SOCKS5.byteValue();
 
-   public static final String DEFAULT_PROXY_USERNAME = null;
+   public static final String DEFAULT_SOCKS_USERNAME = null;
 
-   public static final String DEFAULT_PROXY_PASSWORD = null;
+   public static final String DEFAULT_SOCKS_PASSWORD = null;
 
-   public static final boolean DEFAULT_PROXY_REMOTE_DNS = false;
+   public static final boolean SOCKS_PROXY_REMOTE_DNS = false;
 
    public static final String ROUTER = "router";
 
@@ -479,6 +483,7 @@ public class TransportConstants {
       
allowableAcceptorKeys.add(TransportConstants.DISABLE_STOMP_SERVER_HEADER);
       allowableAcceptorKeys.add(TransportConstants.AUTO_START);
       allowableAcceptorKeys.add(TransportConstants.ROUTER);
+      
allowableAcceptorKeys.add(TransportConstants.PROXY_PROTOCOL_ENABLED_PROP_NAME);
 
       ALLOWABLE_ACCEPTOR_KEYS = 
Collections.unmodifiableSet(allowableAcceptorKeys);
 
@@ -525,13 +530,13 @@ public class TransportConstants {
       
allowableConnectorKeys.add(TransportConstants.NIO_REMOTING_THREADS_PROPNAME);
       allowableConnectorKeys.add(TransportConstants.REMOTING_THREADS_PROPNAME);
       allowableConnectorKeys.add(TransportConstants.BATCH_DELAY);
-      allowableConnectorKeys.add(TransportConstants.PROXY_ENABLED_PROP_NAME);
-      allowableConnectorKeys.add(TransportConstants.PROXY_HOST_PROP_NAME);
-      allowableConnectorKeys.add(TransportConstants.PROXY_PORT_PROP_NAME);
-      allowableConnectorKeys.add(TransportConstants.PROXY_VERSION_PROP_NAME);
-      allowableConnectorKeys.add(TransportConstants.PROXY_USERNAME_PROP_NAME);
-      allowableConnectorKeys.add(TransportConstants.PROXY_PASSWORD_PROP_NAME);
-      
allowableConnectorKeys.add(TransportConstants.PROXY_REMOTE_DNS_PROP_NAME);
+      allowableConnectorKeys.add(TransportConstants.SOCKS_ENABLED_PROP_NAME);
+      allowableConnectorKeys.add(TransportConstants.SOCKS_HOST_PROP_NAME);
+      allowableConnectorKeys.add(TransportConstants.SOCKS_PORT_PROP_NAME);
+      allowableConnectorKeys.add(TransportConstants.SOCKS_VERSION_PROP_NAME);
+      allowableConnectorKeys.add(TransportConstants.SOCKS_USERNAME_PROP_NAME);
+      allowableConnectorKeys.add(TransportConstants.SOCKS_PASSWORD_PROP_NAME);
+      
allowableConnectorKeys.add(TransportConstants.SOCKS_REMOTE_DNS_PROP_NAME);
       
allowableConnectorKeys.add(ActiveMQDefaultConfiguration.getPropMaskPassword());
       
allowableConnectorKeys.add(ActiveMQDefaultConfiguration.getPropPasswordCodec());
       allowableConnectorKeys.add(TransportConstants.NETTY_CONNECT_TIMEOUT);
diff --git a/artemis-features/src/main/resources/features.xml 
b/artemis-features/src/main/resources/features.xml
index d26ee3cc16..d4756422d9 100644
--- a/artemis-features/src/main/resources/features.xml
+++ b/artemis-features/src/main/resources/features.xml
@@ -35,6 +35,7 @@
                <bundle>mvn:io.netty/netty-buffer/${netty.version}</bundle>
                <bundle>mvn:io.netty/netty-codec/${netty.version}</bundle>
                <bundle>mvn:io.netty/netty-codec-socks/${netty.version}</bundle>
+               
<bundle>mvn:io.netty/netty-codec-haproxy/${netty.version}</bundle>
                <bundle>mvn:io.netty/netty-codec-http/${netty.version}</bundle>
                <bundle>mvn:io.netty/netty-handler/${netty.version}</bundle>
                
<bundle>mvn:io.netty/netty-handler-proxy/${netty.version}</bundle>
diff --git a/artemis-pom/pom.xml b/artemis-pom/pom.xml
index 13b4f31abf..8ae5d4f1dc 100644
--- a/artemis-pom/pom.xml
+++ b/artemis-pom/pom.xml
@@ -431,6 +431,12 @@
             <version>${netty.version}</version>
             <!-- License: Apache 2.0 -->
          </dependency>
+         <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-codec-haproxy</artifactId>
+            <version>${netty.version}</version>
+            <!-- License: Apache 2.0 -->
+         </dependency>
          <dependency>
             <groupId>io.netty</groupId>
             <artifactId>netty-codec-socks</artifactId>
diff --git a/artemis-server/pom.xml b/artemis-server/pom.xml
index c4542705cc..4c9e4212fd 100644
--- a/artemis-server/pom.xml
+++ b/artemis-server/pom.xml
@@ -106,6 +106,10 @@
          <groupId>io.netty</groupId>
          <artifactId>netty-codec-http</artifactId>
       </dependency>
+      <dependency>
+         <groupId>io.netty</groupId>
+         <artifactId>netty-codec-haproxy</artifactId>
+      </dependency>
       <dependency>
          <groupId>io.netty</groupId>
          <artifactId>netty-common</artifactId>
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/ConnectionField.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/ConnectionField.java
index 272f70654d..95d51a2c7d 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/ConnectionField.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/ConnectionField.java
@@ -29,7 +29,9 @@ public enum ConnectionField {
    LOCAL_ADDRESS("localAddress"),
    SESSION_ID("sessionID"),
    CREATION_TIME("creationTime"),
-   IMPLEMENTATION("implementation");
+   IMPLEMENTATION("implementation"),
+   PROXY_ADDRESS("proxyAddress"),
+   PROXY_PROTOCOL_VERSION("proxyProtocolVersion");
 
    private static final Map<String, ConnectionField> lookup = new 
TreeMap<>(String.CASE_INSENSITIVE_ORDER);
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/ConnectionView.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/ConnectionView.java
index f536b82a52..3a1fc01200 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/ConnectionView.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/ConnectionView.java
@@ -23,6 +23,7 @@ import java.util.Set;
 import java.util.TreeSet;
 
 import 
org.apache.activemq.artemis.core.management.impl.view.predicate.ConnectionFilterPredicate;
+import 
org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ServerSession;
 import org.apache.activemq.artemis.json.JsonObjectBuilder;
@@ -65,7 +66,9 @@ public class ConnectionView extends 
ActiveMQAbstractView<RemotingConnection> {
          .add(ConnectionField.PROTOCOL.getName(), 
toString(connection.getProtocolName()))
          .add(ConnectionField.CLIENT_ID.getName(), 
toString(connection.getClientID()))
          .add(ConnectionField.LOCAL_ADDRESS.getName(), 
toString(connection.getTransportLocalAddress()))
-         .add(ConnectionField.SESSION_COUNT.getName(), sessions.size());
+         .add(ConnectionField.SESSION_COUNT.getName(), sessions.size())
+         .add(ConnectionField.PROXY_ADDRESS.getName(), 
toString(NettyServerConnection.getProxyAddress(connection.getTransportConnection())))
+         .add(ConnectionField.PROXY_PROTOCOL_VERSION.getName(), 
toString(NettyServerConnection.getProxyProtocolVersion(connection.getTransportConnection())));
    }
 
    @Override
@@ -96,6 +99,10 @@ public class ConnectionView extends 
ActiveMQAbstractView<RemotingConnection> {
             return connection.getTransportLocalAddress();
          case SESSION_COUNT:
             return server.getSessions(connection.getID().toString()).size();
+         case PROXY_ADDRESS:
+            return 
NettyServerConnection.getProxyAddress(connection.getTransportConnection());
+         case PROXY_PROTOCOL_VERSION:
+            return 
NettyServerConnection.getProxyProtocolVersion(connection.getTransportConnection());
          default:
             throw new IllegalArgumentException("Unsupported field, " + 
fieldName);
       }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/predicate/ConnectionFilterPredicate.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/predicate/ConnectionFilterPredicate.java
index 99f8c69e38..b997353d07 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/predicate/ConnectionFilterPredicate.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/predicate/ConnectionFilterPredicate.java
@@ -23,6 +23,7 @@ import java.util.Set;
 import java.util.function.Function;
 
 import org.apache.activemq.artemis.core.management.impl.view.ConnectionField;
+import 
org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ServerSession;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
@@ -53,6 +54,8 @@ public class ConnectionFilterPredicate extends 
ActiveMQFilterPredicate<RemotingC
          case SESSION_ID -> 
matchAny(server.getSessions(connection.getID().toString()));
          case CREATION_TIME -> matches(connection.getCreationTime());
          case IMPLEMENTATION -> matches(connection.getClass().getSimpleName());
+         case PROXY_ADDRESS -> 
matches(NettyServerConnection.getProxyAddress(connection.getTransportConnection()));
+         case PROXY_PROTOCOL_VERSION -> 
matches(NettyServerConnection.getProxyProtocolVersion(connection.getTransportConnection()));
       };
    }
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java
index 02d54bfb97..fc6df1214a 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java
@@ -39,10 +39,10 @@ import io.netty.handler.codec.http.HttpObjectAggregator;
 import io.netty.handler.codec.http.HttpRequestDecoder;
 import io.netty.handler.codec.http.HttpResponseEncoder;
 import 
io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
-
 import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
 import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
 import org.apache.activemq.artemis.core.remoting.impl.netty.ConnectionCreator;
+import 
org.apache.activemq.artemis.core.remoting.impl.netty.HAProxyMessageEnforcer;
 import 
org.apache.activemq.artemis.core.remoting.impl.netty.HttpAcceptorHandler;
 import 
org.apache.activemq.artemis.core.remoting.impl.netty.HttpKeepAliveRunnable;
 import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor;
@@ -55,6 +55,8 @@ import 
org.apache.activemq.artemis.core.server.protocol.websocket.WebSocketFrame
 import 
org.apache.activemq.artemis.core.server.protocol.websocket.WebSocketServerHandler;
 import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
 import org.apache.activemq.artemis.utils.ConfigurationHelper;
+import org.apache.activemq.artemis.utils.ProxyProtocolUtil;
+import org.apache.activemq.artemis.utils.SocketAddressUtil;
 
 import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
 import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
@@ -134,7 +136,7 @@ public class ProtocolHandler {
 
          if (handshakeTimeout > 0) {
             timeoutFuture = scheduledThreadPool.schedule(() -> {
-               ActiveMQServerLogger.LOGGER.handshakeTimeout(handshakeTimeout, 
nettyAcceptor.getName(), ctx.channel().remoteAddress().toString());
+               ActiveMQServerLogger.LOGGER.handshakeTimeout(handshakeTimeout, 
nettyAcceptor.getName(), ProxyProtocolUtil.getRemoteAddress(ctx.channel()));
                ctx.channel().close();
             }, handshakeTimeout, TimeUnit.SECONDS);
          }
@@ -204,9 +206,12 @@ public class ProtocolHandler {
             timeoutFuture = null;
          }
 
-         final int magic1 = in.getUnsignedByte(in.readerIndex());
-         final int magic2 = in.getUnsignedByte(in.readerIndex() + 1);
-         if (http && isHttp(magic1, magic2)) {
+         if (HAProxyMessageEnforcer.isProxyProtocol(in)) {
+            
ActiveMQServerLogger.LOGGER.proxyProtocolViolation(SocketAddressUtil.toString(ctx.channel().remoteAddress()),
 nettyAcceptor.getName(), false, "used");
+            ctx.channel().close();
+         }
+
+         if (http && isHttp(in)) {
             switchToHttp(ctx);
             return;
          }
@@ -241,7 +246,7 @@ public class ProtocolHandler {
 
          ProtocolManager protocolManagerToUse = protocolMap.get(protocolToUse);
          if (protocolManagerToUse == null) {
-            
ActiveMQServerLogger.LOGGER.failedToFindProtocolManager(ctx.channel() == null ? 
null : Objects.toString(ctx.channel().remoteAddress()), ctx.channel() == null ? 
null : Objects.toString(ctx.channel().localAddress(), null), protocolToUse, 
protocolMap.keySet().toString());
+            
ActiveMQServerLogger.LOGGER.failedToFindProtocolManager(ctx.channel() == null ? 
null : ProxyProtocolUtil.getRemoteAddress(ctx.channel()), ctx.channel() == null 
? null : Objects.toString(ctx.channel().localAddress(), null), protocolToUse, 
protocolMap.keySet().toString());
             return;
          }
          ConnectionCreator channelHandler = 
nettyAcceptor.createConnectionCreator();
@@ -259,13 +264,15 @@ public class ProtocolHandler {
       @Override
       public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
          try {
-            
ActiveMQServerLogger.LOGGER.failureDuringProtocolHandshake(ctx.channel().localAddress(),
 ctx.channel().remoteAddress(), cause);
+            
ActiveMQServerLogger.LOGGER.failureDuringProtocolHandshake(ctx.channel().localAddress(),
 ProxyProtocolUtil.getRemoteAddress(ctx.channel()), cause);
          } finally {
             ctx.close();
          }
       }
 
-      private boolean isHttp(int magic1, int magic2) {
+      private boolean isHttp(ByteBuf in) {
+         final int magic1 = in.getUnsignedByte(in.readerIndex());
+         final int magic2 = in.getUnsignedByte(in.readerIndex() + 1);
          return magic1 == 'G' && magic2 == 'E' || // GET
             magic1 == 'P' && magic2 == 'O' || // POST
             magic1 == 'P' && magic2 == 'U' || // PUT
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/HAProxyMessageEnforcer.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/HAProxyMessageEnforcer.java
new file mode 100644
index 0000000000..7e77fe776f
--- /dev/null
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/HAProxyMessageEnforcer.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.remoting.impl.netty;
+
+import java.util.List;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.utils.SocketAddressUtil;
+
+/**
+ * This Netty handler enforces the presence or absence of PROXY protocol 
messages. It verifies conformity and then
+ * removes itself from the pipeline.
+ * <p>
+ * If the incoming message protocol does not align with the enforcer's 
requirements the connection is closed.
+ */
+public class HAProxyMessageEnforcer extends ByteToMessageDecoder {
+
+   final String acceptorName;
+
+   HAProxyMessageEnforcer(String acceptorName) {
+      this.acceptorName = acceptorName;
+   }
+
+   @Override
+   protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> 
out) throws Exception {
+      if (in.readableBytes() < 4) {
+         return;
+      }
+      ctx.pipeline().remove(this);
+      if (!isProxyProtocol(in)) {
+         
ActiveMQServerLogger.LOGGER.proxyProtocolViolation(SocketAddressUtil.toString(ctx.channel().remoteAddress()),
 acceptorName, true, "did not use");
+         ctx.close();
+      }
+   }
+
+   public static boolean isProxyProtocol(ByteBuf in) {
+      final int magic1 = in.getUnsignedByte(in.readerIndex());
+      final int magic2 = in.getUnsignedByte(in.readerIndex() + 1);
+      final int magic3 = in.getUnsignedByte(in.readerIndex() + 2);
+      final int magic4 = in.getUnsignedByte(in.readerIndex() + 3);
+      return (magic1 == 'P' && magic2 == 'R' && magic3 == 'O' && magic4 == 
'X') || // V1
+         (magic1 == '\r' && magic2 == '\n' && magic3 == '\r' && magic4 == 
'\n'); // V2
+   }
+}
\ No newline at end of file
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/HAProxyMessageHandler.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/HAProxyMessageHandler.java
new file mode 100644
index 0000000000..36cf1f1d6d
--- /dev/null
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/HAProxyMessageHandler.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.remoting.impl.netty;
+
+import java.io.IOException;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.codec.haproxy.HAProxyMessage;
+
+import static 
org.apache.activemq.artemis.utils.ProxyProtocolUtil.PROXY_PROTOCOL_DESTINATION_ADDRESS;
+import static 
org.apache.activemq.artemis.utils.ProxyProtocolUtil.PROXY_PROTOCOL_SOURCE_ADDRESS;
+import static 
org.apache.activemq.artemis.utils.ProxyProtocolUtil.PROXY_PROTOCOL_VERSION;
+
+public class HAProxyMessageHandler extends ChannelInboundHandlerAdapter {
+
+   private boolean skipProxyBytes = false;
+
+   @Override
+   public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
Exception {
+      if (msg instanceof HAProxyMessage haProxyMessage) {
+         
ctx.channel().attr(PROXY_PROTOCOL_SOURCE_ADDRESS).set(haProxyMessage.sourceAddress()
 + ":" + Integer.toString(haProxyMessage.sourcePort()));
+         
ctx.channel().attr(PROXY_PROTOCOL_DESTINATION_ADDRESS).set(haProxyMessage.destinationAddress()
 + ":" + Integer.toString(haProxyMessage.destinationPort()));
+         
ctx.channel().attr(PROXY_PROTOCOL_VERSION).set(haProxyMessage.protocolVersion().toString());
+         skipProxyBytes = true;
+      } else {
+         if (skipProxyBytes) {
+            // once we get the HAProxyMessage and set the proper attributes 
our job is done
+            ctx.pipeline().remove(this);
+            // slice off the proxy-related bytes that have already been read 
so other protocol handlers don't choke on them
+            ctx.fireChannelRead(((ByteBuf) msg).slice());
+         } else {
+            throw new IOException("Did not receive expected HAProxyMessage; 
instead received: " + msg);
+         }
+      }
+   }
+}
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
index 03af660847..d8ddbe9ac5 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
@@ -63,6 +63,7 @@ import io.netty.channel.local.LocalAddress;
 import io.netty.channel.local.LocalServerChannel;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.haproxy.HAProxyMessageDecoder;
 import io.netty.handler.ssl.SslContext;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.util.ResourceLeakDetector;
@@ -98,6 +99,7 @@ import 
org.apache.activemq.artemis.spi.core.remoting.ssl.SSLContextConfig;
 import 
org.apache.activemq.artemis.spi.core.remoting.ssl.SSLContextFactoryProvider;
 import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
 import org.apache.activemq.artemis.utils.ConfigurationHelper;
+import org.apache.activemq.artemis.utils.ProxyProtocolUtil;
 import org.apache.activemq.artemis.utils.collections.TypedProperties;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.slf4j.Logger;
@@ -145,6 +147,8 @@ public class NettyAcceptor extends AbstractAcceptor {
 
    private final boolean sslEnabled;
 
+   private final boolean proxyProtocolEnabled;
+
    private final boolean useInvm;
 
    private final boolean useEpoll;
@@ -288,6 +292,8 @@ public class NettyAcceptor extends AbstractAcceptor {
 
       sslEnabled = 
ConfigurationHelper.getBooleanProperty(TransportConstants.SSL_ENABLED_PROP_NAME,
 TransportConstants.DEFAULT_SSL_ENABLED, configuration);
 
+      proxyProtocolEnabled = 
ConfigurationHelper.getBooleanProperty(TransportConstants.PROXY_PROTOCOL_ENABLED_PROP_NAME,
 TransportConstants.DEFAULT_PROXY_PROTOCOL_ENABLED, configuration);
+
       remotingThreads = 
ConfigurationHelper.getIntProperty(TransportConstants.NIO_REMOTING_THREADS_PROPNAME,
 Runtime.getRuntime().availableProcessors() * 3, configuration);
       remotingThreads = 
ConfigurationHelper.getIntProperty(TransportConstants.REMOTING_THREADS_PROPNAME,
 remotingThreads, configuration);
 
@@ -472,6 +478,9 @@ public class NettyAcceptor extends AbstractAcceptor {
          @Override
          public void initChannel(Channel channel) throws Exception {
             ChannelPipeline pipeline = channel.pipeline();
+            if (proxyProtocolEnabled) {
+               pipeline.addLast(new HAProxyMessageEnforcer(getName()), new 
HAProxyMessageDecoder(), new HAProxyMessageHandler());
+            }
             if (sslEnabled) {
                final Pair<String, Integer> peerInfo = getPeerInfo(channel);
                try {
@@ -480,7 +489,7 @@ public class NettyAcceptor extends AbstractAcceptor {
                   pipeline.addLast("sslHandshakeExceptionHandler", new 
SslHandshakeExceptionHandler());
                } catch (Exception e) {
                   Throwable rootCause = ExceptionUtils.getRootCause(e);
-                  
ActiveMQServerLogger.LOGGER.gettingSslHandlerFailed(channel.remoteAddress().toString(),
 rootCause.getClass().getName() + ": " + rootCause.getMessage());
+                  
ActiveMQServerLogger.LOGGER.gettingSslHandlerFailed(ProxyProtocolUtil.getRemoteAddress(channel),
 rootCause.getClass().getName() + ": " + rootCause.getMessage());
 
                   logger.debug("Getting SSL handler failed", e);
                   throw e;
@@ -491,7 +500,7 @@ public class NettyAcceptor extends AbstractAcceptor {
 
          private Pair<String, Integer> getPeerInfo(Channel channel) {
             try {
-               String[] peerInfo = 
channel.remoteAddress().toString().replace("/", "").split(":");
+               String[] peerInfo = 
ProxyProtocolUtil.getRemoteAddress(channel).replace("/", "").split(":");
                return new Pair<>(peerInfo[0], Integer.parseInt(peerInfo[1]));
             } catch (Exception e) {
                logger.debug("Failed to parse peer info for SSL engine 
initialization", e);
@@ -787,7 +796,7 @@ public class NettyAcceptor extends AbstractAcceptor {
                ActiveMQServerLogger.LOGGER.nettyChannelGroupError();
                for (Channel channel : future.group()) {
                   if (channel.isActive()) {
-                     
ActiveMQServerLogger.LOGGER.nettyChannelStillOpen(channel, 
channel.remoteAddress());
+                     
ActiveMQServerLogger.LOGGER.nettyChannelStillOpen(channel, 
ProxyProtocolUtil.getRemoteAddress(channel));
                   }
                }
             }
@@ -852,7 +861,7 @@ public class NettyAcceptor extends AbstractAcceptor {
             ActiveMQServerLogger.LOGGER.nettyChannelGroupBindError();
             for (Channel channel : future.group()) {
                if (channel.isActive()) {
-                  ActiveMQServerLogger.LOGGER.nettyChannelStillBound(channel, 
channel.remoteAddress());
+                  ActiveMQServerLogger.LOGGER.nettyChannelStillBound(channel, 
ProxyProtocolUtil.getRemoteAddress(channel));
                }
             }
          }
@@ -951,7 +960,7 @@ public class NettyAcceptor extends AbstractAcceptor {
             }
             return nc;
          } else {
-            
ActiveMQServerLogger.LOGGER.connectionLimitReached(connectionsAllowed, 
ctx.channel().remoteAddress().toString());
+            
ActiveMQServerLogger.LOGGER.connectionLimitReached(connectionsAllowed, 
ProxyProtocolUtil.getRemoteAddress(ctx.channel()));
             ctx.channel().close();
             return null;
          }
@@ -1036,7 +1045,7 @@ public class NettyAcceptor extends AbstractAcceptor {
             Throwable rootCause = ExceptionUtils.getRootCause(cause);
             String errorMessage = rootCause.getClass().getName() + ": " + 
rootCause.getMessage();
 
-            
ActiveMQServerLogger.LOGGER.sslHandshakeFailed(ctx.channel().remoteAddress().toString(),
 errorMessage);
+            
ActiveMQServerLogger.LOGGER.sslHandshakeFailed(ProxyProtocolUtil.getRemoteAddress(ctx.channel()),
 errorMessage);
 
             logger.debug("SSL handshake failed", cause);
          }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyServerConnection.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyServerConnection.java
index 93c732e979..600f8eca83 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyServerConnection.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyServerConnection.java
@@ -19,7 +19,9 @@ package org.apache.activemq.artemis.core.remoting.impl.netty;
 import java.util.Map;
 
 import io.netty.channel.Channel;
+import org.apache.activemq.artemis.spi.core.remoting.Connection;
 import 
org.apache.activemq.artemis.spi.core.remoting.ServerConnectionLifeCycleListener;
+import org.apache.activemq.artemis.utils.ProxyProtocolUtil;
 
 public class NettyServerConnection extends NettyConnection {
 
@@ -51,4 +53,44 @@ public class NettyServerConnection extends NettyConnection {
    public String getRouter() {
       return router;
    }
+
+   /**
+    * {@return a string representation of the remote address of this 
connection; if this connection was made via the
+    * proxy protocol then this will be the original address, not the proxy 
address}
+    */
+   @Override
+   public String getRemoteAddress() {
+      return ProxyProtocolUtil.getRemoteAddress(channel);
+   }
+
+   /**
+    * {@return if this connection is made via the proxy protocol then this 
will be the address of the proxy; otherwise
+    * null}
+    */
+   public String getProxyAddress() {
+      return ProxyProtocolUtil.getProxyAddress(channel);
+   }
+
+   /**
+    * {@return the version of the proxy protocol used to make the connection 
or null if not applicable}
+    */
+   public String getProxyProtocolVersion() {
+      return ProxyProtocolUtil.getProxyProtocolVersion(channel);
+   }
+
+   public static String getProxyAddress(Connection connection) {
+      if (connection instanceof NettyServerConnection nettyServerConnection) {
+         return nettyServerConnection.getProxyAddress();
+      } else {
+         return null;
+      }
+   }
+
+   public static String getProxyProtocolVersion(Connection connection) {
+      if (connection instanceof NettyServerConnection nettyServerConnection) {
+         return nettyServerConnection.getProxyProtocolVersion();
+      } else {
+         return null;
+      }
+   }
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index 6a139815b1..b262b05a27 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -497,13 +497,13 @@ public interface ActiveMQServerLogger {
    void nettyChannelGroupError();
 
    @LogMessage(id = 222075, value = "{} is still connected to {}", level = 
LogMessage.Level.WARN)
-   void nettyChannelStillOpen(Channel channel, SocketAddress remoteAddress);
+   void nettyChannelStillOpen(Channel channel, String remoteAddress);
 
    @LogMessage(id = 222076, value = "channel group did not completely unbind", 
level = LogMessage.Level.WARN)
    void nettyChannelGroupBindError();
 
    @LogMessage(id = 222077, value = "{} is still bound to {}", level = 
LogMessage.Level.WARN)
-   void nettyChannelStillBound(Channel channel, SocketAddress remoteAddress);
+   void nettyChannelStillBound(Channel channel, String remoteAddress);
 
    @LogMessage(id = 222080, value = "Error creating acceptor: {}", level = 
LogMessage.Level.WARN)
    void errorCreatingAcceptor(String name, Exception e);
@@ -1435,7 +1435,7 @@ public interface ActiveMQServerLogger {
    void noPagefullPolicySet(Object address, Object limitBytes, Object 
limitMessages);
 
    @LogMessage(id = 224126, value = "Failure during protocol handshake on 
connection to {} from {}", level = LogMessage.Level.ERROR)
-   void failureDuringProtocolHandshake(SocketAddress localAddress, 
SocketAddress remoteAddress, Throwable e);
+   void failureDuringProtocolHandshake(SocketAddress localAddress, String 
remoteAddress, Throwable e);
 
    // Note the custom loggerName rather than the overall LogBundle-wide 
definition used by other methods.
    @LogMessage(id = 224127, value = "Message dispatch from paging is blocked. 
Address {}/Queue {} will not read any more messages from paging until pending 
messages are acknowledged. "
@@ -1516,4 +1516,7 @@ public interface ActiveMQServerLogger {
 
    @LogMessage(id = 224151, value = "Bridge {} unable to handle message: {}. 
Root cause: {}", level = LogMessage.Level.INFO)
    void bridgeUnableToHandleMessage(SimpleString bridgeName, String message, 
String exceptionMessage);
+
+   @LogMessage(id = 224152, value = "Closing connection from {} for PROXY 
Protocol violation. Acceptor {} uses proxyProtocolEnabled={}, but connection {} 
PROXY Protocol.", level = LogMessage.Level.WARN)
+   void proxyProtocolViolation(String remoteAddress, String acceptorName, 
boolean proxyProtocolEnabled, String actualUsage);
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 167d9986aa..88edc94613 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.artemis.core.server.impl;
 
+import java.lang.invoke.MethodHandles;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.util.ArrayDeque;
@@ -79,7 +80,6 @@ import 
org.apache.activemq.artemis.utils.collections.LinkedListIterator;
 import org.apache.activemq.artemis.utils.collections.TypedProperties;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import java.lang.invoke.MethodHandles;
 
 /**
  * Concrete implementation of a ClientConsumer.
diff --git a/docs/user-manual/_book.adoc b/docs/user-manual/_book.adoc
index 1ef8eb5af0..037db170d8 100644
--- a/docs/user-manual/_book.adoc
+++ b/docs/user-manual/_book.adoc
@@ -26,6 +26,7 @@ include::client-failover.adoc[leveloffset=1]
 include::jms-core-mapping.adoc[leveloffset=1]
 include::using-jms.adoc[leveloffset=1]
 include::pre-acknowledge.adoc[leveloffset=1]
+include::proxy-protocol.adoc[leveloffset=1]
 
 //== Upgrading
 
diff --git a/docs/user-manual/index.adoc b/docs/user-manual/index.adoc
index f12c264b32..ba1ef92d7a 100644
--- a/docs/user-manual/index.adoc
+++ b/docs/user-manual/index.adoc
@@ -30,6 +30,7 @@ image::images/activemq-logo.png[align="center"]
 * xref:jms-core-mapping.adoc#mapping-jms-concepts-to-the-core-api[Mapping JMS 
Concepts to the Core API]
 * xref:using-jms.adoc#using-jms-or-jakarta-messaging[Using JMS or Jakarta 
Messaging]
 ** xref:pre-acknowledge.adoc#extra-acknowledge-modes[Extra Acknowledge Modes]
+* xref:proxy-protocol.adoc#proxy-protocol[PROXY Protocol]
 
 == Upgrading
 
diff --git a/docs/user-manual/proxy-protocol.adoc 
b/docs/user-manual/proxy-protocol.adoc
new file mode 100644
index 0000000000..802194f343
--- /dev/null
+++ b/docs/user-manual/proxy-protocol.adoc
@@ -0,0 +1,59 @@
+= PROXY Protocol
+:idprefix:
+:idseparator: -
+:docinfo: shared
+
+As noted in the official 
https://github.com/haproxy/haproxy/blob/e6a9192af68c5e385aa73c3e1cc51eb9f0cc09d6/doc/proxy-protocol.txt[PROXY
 Protocol documentation]:
+
+[quote,]
+____
+The PROXY protocol provides a convenient way to safely transport connection 
information such as a client's address across multiple layers of NAT or TCP 
proxies.
+____
+
+This essentially allows the broker to know a client's IP address even when the 
connection is established through reverse proxy that supports the PROXY 
protocol (e.g. HAProxy, nginx, etc.).
+Without PROXY protocol support the broker would see such client connections as 
coming from the proxy itself which can be misleading for administrators and 
complicate trouble-shooting.
+
+Both versions 1 & 2 of the PROXY Protocol are supported.
+
+Any of our supported messaging protocols can be used in combination with the 
PROXY protocol with or without TLS.
+
+== Configuration
+
+Support for the PROXY Protocol is configured on a per-acceptor basis using the 
`proxyProtocolEnabled` parameter, e.g.:
+
+[,xml]
+----
+<acceptor 
name="proxy-artemis">tcp://0.0.0.0:61616?proxyProtocolEnabled=true</acceptor>
+----
+
+=== Security
+
+Support for the PROXY Protocol must be explicitly configured due to security 
reasons.
+As noted in the official 
https://github.com/haproxy/haproxy/blob/master/doc/proxy-protocol.txt[PROXY 
Protocol documentation]:
+
+[quote,]
+____
+The receiver MUST be configured to only receive the protocol described in this 
specification and MUST not try to guess whether the protocol header is present 
or not.
+This means that the protocol explicitly prevents port sharing between public 
and private access.
+Otherwise it would open a major security breach by allowing untrusted parties 
to spoof their connection addresses.
+*The receiver SHOULD ensure proper access filtering so that only trusted 
proxies are allowed to use this protocol.* [emphasis added]
+____
+
+Because of this, an acceptor using `proxyProtocolEnabled=true` can _only_ 
accept connections using the PROXY protocol and vice versa.
+
+If a client attempts to use (or not use) the PROXY Protocol in violation of 
the configured value for `proxyProtocolEnabled` the broker will log a warning 
with the code `AMQ224151` containing details about the violation.
+
+== Management
+
+Client connections established through a reverse proxy configured with PROXY 
Protocol support will have 2 additional pieces of information compared to 
non-proxied connections:
+
+proxyAddress::
+The IP address and port of the proxy through which the client's connection is 
established.
+
+proxyProtocolVersion::
+The version of the PROXY Protocol used when establishing the connection from 
the proxy to the broker.
++
+Valid values are `V1` and `V2`.
+
+This information is available via the `listConnections` method of the 
`ActiveMQServerControl`.
+On the web console corresponding details will be available in the 
"Connections" tab.
\ No newline at end of file
diff --git a/docs/user-manual/versions.adoc b/docs/user-manual/versions.adoc
index 4fed10de2c..88121309a2 100644
--- a/docs/user-manual/versions.adoc
+++ b/docs/user-manual/versions.adoc
@@ -20,6 +20,9 @@ 
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315920&versio
 === Highlights
 
 * Broker observability has improved with the new ability to export 
xref:metrics.adoc#executor-services[metrics for executor services] (aka thread 
pools).
+* The 
https://github.com/haproxy/haproxy/blob/master/doc/proxy-protocol.txt[PROXY 
Protocol] is now supported!
+This means that clients connecting through an applicable reverse proxy (e.g. 
HAProxy, nginx, etc.) will have the original client IP address reflected in the 
logs and in the web console rather than that of the proxy itself.
+More details available xref:proxy-protocol.adoc[here].
 
 === Upgrading from 2.41.0
 
diff --git a/tests/artemis-test-support/pom.xml 
b/tests/artemis-test-support/pom.xml
index a9f2b61f87..af12eb0a3b 100644
--- a/tests/artemis-test-support/pom.xml
+++ b/tests/artemis-test-support/pom.xml
@@ -101,6 +101,11 @@
          <artifactId>netty-handler</artifactId>
          <scope>provided</scope>
       </dependency>
+      <dependency>
+         <groupId>io.netty</groupId>
+         <artifactId>netty-codec-haproxy</artifactId>
+         <scope>provided</scope>
+      </dependency>
       <dependency>
          <groupId>jakarta.jms</groupId>
          <artifactId>jakarta.jms-api</artifactId>
diff --git 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/netty/NettyHAProxyServer.java
 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/netty/NettyHAProxyServer.java
new file mode 100644
index 0000000000..b313789a7a
--- /dev/null
+++ 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/netty/NettyHAProxyServer.java
@@ -0,0 +1,496 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.transport.netty;
+
+import java.lang.invoke.MethodHandles;
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.haproxy.HAProxyCommand;
+import io.netty.handler.codec.haproxy.HAProxyMessage;
+import io.netty.handler.codec.haproxy.HAProxyMessageEncoder;
+import io.netty.handler.codec.haproxy.HAProxyProtocolVersion;
+import io.netty.handler.codec.haproxy.HAProxyProxiedProtocol;
+import io.netty.handler.logging.LogLevel;
+import io.netty.handler.logging.LoggingHandler;
+import org.apache.activemq.artemis.utils.SocketAddressUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Netty based HA Proxy server that allows for testing some aspect of 
operating a broker behind an HA Proxy. Client
+ * connections are accepted and their traffic is forwarded between this proxy 
and a running broker instance.
+ */
+public class NettyHAProxyServer {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   private final AtomicBoolean started = new AtomicBoolean();
+
+   // values to be set on the PROXY Protocol header
+   private String headerSourceHost;
+   private int headerSourcePort;
+   private String headerDestinationHost;
+   private int headerDestinationPort;
+
+   private EventLoopGroup bossGroup;
+   private EventLoopGroup workerGroup;
+   private Channel serverChannel;
+
+   private String backendHost = "127.0.0.1";
+   private int backendPort;
+   private boolean sendProxyHeader = true;
+   private HAProxyProtocolVersion proxyProtocolVersion = 
HAProxyProtocolVersion.V1;
+   private boolean logBytes = false;
+   private int fontendPort = 0;
+   private int frontendPortInUse = -1;
+
+   /**
+    * Create a new instance that is not yet configured and requires a back-end 
port be set prior to calling the start
+    * method otherwise an exception will be thrown.
+    */
+   public NettyHAProxyServer() {
+   }
+
+   /**
+    * Create a new instance providing the port of the Artemis back-end server 
to connect to when a new client connects
+    * to this proxy server.
+    *
+    * @param backendPort The port on the host used then a client connects to 
this front end proxy.
+    */
+   public NettyHAProxyServer(int backendPort) {
+      if (backendPort <= 0) {
+         throw new IllegalArgumentException("Port for backend service cannot 
be less than or equal to zero");
+      }
+
+      this.backendPort = backendPort;
+   }
+
+   public NettyHAProxyServer start() {
+      if (started.compareAndSet(false, true)) {
+
+         if (backendPort <= 0) {
+            throw new IllegalArgumentException("The back-end server port has 
not been properly configured");
+         }
+
+         bossGroup = new NioEventLoopGroup();
+         workerGroup = new NioEventLoopGroup();
+
+         final ServerBootstrap server = new ServerBootstrap();
+
+         // Create the server context and ensure the client channels are 
created with auto-read set to off so we can
+         // first attempt a connection to the back-end before we handle any 
incoming data. Once connected to the backed
+         // the handlers will pump data between after sending a proxy header 
if configured and then enabling auto read
+         // for both sides.
+         server.group(bossGroup, workerGroup)
+               .channel(NioServerSocketChannel.class)
+               .option(ChannelOption.SO_BACKLOG, 100)
+               .handler(new LoggingHandler(logBytes ? LogLevel.INFO : 
LogLevel.TRACE))
+               .childOption(ChannelOption.AUTO_READ, false)
+               .childHandler(new ChannelInitializer<Channel>() {
+                  @Override
+                  public void initChannel(Channel clientChannel) throws 
Exception {
+                     if (isLogBytes()) {
+                        clientChannel.pipeline().addLast(new 
LoggingHandler(getClass(), LogLevel.INFO));
+                     }
+
+                     logger.info("New client connected to {}. Attempting to 
proxy to {}:{}", SocketAddressUtil.toString(serverChannel.localAddress()), 
backendHost, backendPort);
+
+                     // The client channel handler will take care of 
connecting and wiring the exchange of bytes back
+                     // and forth between the front end and the back-end.
+                     clientChannel.pipeline().addLast(new 
NettyHAProxyFrontendHandler(backendHost,
+                                                                               
       backendPort,
+                                                                               
       sendProxyHeader,
+                                                                               
       proxyProtocolVersion,
+                                                                               
       headerSourceHost,
+                                                                               
       headerSourcePort,
+                                                                               
       headerDestinationHost,
+                                                                               
       headerDestinationPort));
+                  }
+               });
+
+         // Start the server and then update the server port in case the 
configuration was such that the server chose a
+         // free port.
+         try {
+            serverChannel = server.bind(getFrontEndPort()).sync().channel();
+         } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+         }
+         frontendPortInUse = ((InetSocketAddress) 
serverChannel.localAddress()).getPort();
+         logger.info("PROXY Protocol server listening on: {}", 
SocketAddressUtil.toString(serverChannel.localAddress()));
+      }
+      return this;
+   }
+
+   public void stop() throws Exception {
+      final int timeout = 100;
+
+      if (started.compareAndSet(true, false)) {
+         frontendPortInUse = -1;
+
+         logger.info("Syncing channel close");
+         serverChannel.close().syncUninterruptibly();
+
+         logger.trace("Shutting down boss group");
+         bossGroup.shutdownGracefully(0, timeout, 
TimeUnit.MILLISECONDS).awaitUninterruptibly(timeout);
+         logger.trace("Boss group shut down");
+
+         logger.trace("Shutting down worker group");
+         workerGroup.shutdownGracefully(0, timeout, 
TimeUnit.MILLISECONDS).awaitUninterruptibly(timeout);
+         logger.trace("Worker group shut down");
+      }
+   }
+
+   public int getFrontendPortInUse() {
+      mustBeRunning("front-end port in use");
+      return frontendPortInUse;
+   }
+
+   public int getFrontEndPort() {
+      return fontendPort;
+   }
+
+   public NettyHAProxyServer setFrontendPort(int frontendPort) {
+      mustNotBeRunning("front-end port");
+      this.fontendPort = frontendPort;
+      return this;
+   }
+
+   public String getBackEndHost() {
+      return backendHost;
+   }
+
+   public NettyHAProxyServer setBackendHost(String backendHost) {
+      mustNotBeRunning("back-end host");
+      this.backendHost = backendHost;
+      return this;
+   }
+
+   public int getBackEndPort() {
+      return backendPort;
+   }
+
+   public NettyHAProxyServer setBackEndPort(int port) {
+      mustNotBeRunning("back-end port");
+      this.backendPort = port;
+      return this;
+   }
+
+   public boolean isLogBytes() {
+      return logBytes;
+   }
+
+   public NettyHAProxyServer setLogBytes(boolean logBytes) {
+      mustNotBeRunning("log bytes");
+      this.logBytes = logBytes;
+      return this;
+   }
+
+   public boolean isSendProxyHeader() {
+      mustBeRunning("send proxy header");
+      return sendProxyHeader;
+   }
+
+   public NettyHAProxyServer setSendProxyHeader(boolean sendProxyHeader) {
+      mustNotBeRunning("send proxy header");
+      this.sendProxyHeader = sendProxyHeader;
+      return this;
+   }
+
+   public HAProxyProtocolVersion getProxyProtocolVersion() {
+      mustBeRunning("proxy protocol version");
+      return proxyProtocolVersion;
+   }
+
+   public NettyHAProxyServer setProxyProtocolVersion(HAProxyProtocolVersion 
version) {
+      mustNotBeRunning("proxy protocol version");
+      this.proxyProtocolVersion = version;
+      return this;
+   }
+
+   public String getHeaderSourceHost() {
+      return headerSourceHost;
+   }
+
+   public NettyHAProxyServer setHeaderSourceHost(String headerSourceHost) {
+      mustNotBeRunning("header source host");
+      this.headerSourceHost = headerSourceHost;
+      return this;
+   }
+
+   public int getHeaderSourcePort() {
+      return headerSourcePort;
+   }
+
+   public NettyHAProxyServer setHeaderSourcePort(int headerSourcePort) {
+      mustNotBeRunning("header source port");
+      this.headerSourcePort = headerSourcePort;
+      return this;
+   }
+
+   public String getHeaderDestinationHost() {
+      return headerDestinationHost;
+   }
+
+   public NettyHAProxyServer setHeaderDestinationHost(String 
headerDestinationHost) {
+      mustNotBeRunning("header destination host");
+      this.headerDestinationHost = headerDestinationHost;
+      return this;
+   }
+
+   public int getHeaderDestinationPort() {
+      return headerDestinationPort;
+   }
+
+   public NettyHAProxyServer setHeaderDestinationPort(int 
headerDestinationPort) {
+      mustNotBeRunning("header destination port");
+      this.headerDestinationPort = headerDestinationPort;
+      return this;
+   }
+
+   private void mustNotBeRunning(String configuration) {
+      if (started.get()) {
+         throw new IllegalStateException("Cannot configure " + configuration + 
" for a server that is running");
+      }
+   }
+
+   private void mustBeRunning(String configuration) {
+      if (!started.get()) {
+         throw new IllegalStateException("Cannot access " + configuration + " 
for a server that is not running");
+      }
+   }
+
+   private static void closeOnFlush(Channel ch) {
+      // This queues an empty write which means to close won't occur until 
everything ahead of it is written or an error
+      // occurs in which case we close on handling it.
+
+      logger.trace("Close and flush called for channel: {}", ch);
+
+      if (ch.isActive()) {
+         
ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
+      } else {
+         logger.trace("Did not queue a write as the channel is already 
inactive.");
+      }
+   }
+
+   /**
+    * The front end handler attempts to connect to the back-end on connect of 
a new client connection at the front end.
+    * We will share the event loop for back-end communication with the front 
end so that we can simplify the thread
+    * handling and confine all work to the same event-loop.
+    */
+   public static class NettyHAProxyFrontendHandler extends 
ChannelInboundHandlerAdapter {
+
+      private final String backendHost;
+      private final int backendPort;
+      private final boolean sendProxyHeader;
+      private final HAProxyProtocolVersion proxyProtocolVersion;
+      private final String headerSourceHost;
+      private final int headerSourcePort;
+      private final String headerDestinationHost;
+      private final int headerDestinationPort;
+
+      private boolean proxyHeaderSent;
+      private boolean autoReadEnabled;
+
+      // As we use inboundChannel.eventLoop() when building the Bootstrap this 
does not need to be volatile as the
+      // outboundChannel will use the same EventLoop (and therefore Thread) as 
the inboundChannel.
+      private Channel backEndChannel;
+
+      public NettyHAProxyFrontendHandler(String backendHost,
+                                         int backendPort,
+                                         boolean sendProxyHeader,
+                                         HAProxyProtocolVersion 
proxyProtocolVersion,
+                                         String headerSourceHost,
+                                         int headerSourcePort,
+                                         String headerDestinationHost,
+                                         int headerDestinationPort) {
+         this.backendHost = backendHost;
+         this.backendPort = backendPort;
+         this.sendProxyHeader = sendProxyHeader;
+         this.proxyProtocolVersion = proxyProtocolVersion;
+         this.headerSourceHost = headerSourceHost;
+         this.headerSourcePort = headerSourcePort;
+         this.headerDestinationHost = headerDestinationHost;
+         this.headerDestinationPort = headerDestinationPort;
+      }
+
+      @Override
+      public void channelActive(ChannelHandlerContext ctx) {
+
+         final Channel clientChannel = ctx.channel();
+         final Bootstrap bootstrap = new Bootstrap();
+
+         // Start the connection attempt but ensure that the channel is 
created with auto-read disabled so that we can
+         // handle pump of data between the front end and the back-end.
+         bootstrap.group(clientChannel.eventLoop())
+                  .channel(ctx.channel().getClass())
+                  .option(ChannelOption.AUTO_READ, false)
+                  .handler(new ChannelInitializer<>() {
+
+                     @Override
+                     public void initChannel(Channel ch) throws Exception {
+                        ch.pipeline().addLast(new 
NettyHAProxyBackendHandler(clientChannel));
+                        ch.pipeline().addLast(HAProxyMessageEncoder.INSTANCE);
+                     }
+                  });
+
+         final ChannelFuture connectFuture = bootstrap.connect(backendHost, 
backendPort);
+
+         backEndChannel = connectFuture.channel();
+
+         connectFuture.addListener(new ChannelFutureListener() {
+
+            @Override
+            public void operationComplete(ChannelFuture future) {
+               // If we succeed in connecting we can trigger a read but if we 
fail we should perform a clean close out
+               // of the client connection via its channel.
+               if (future.isSuccess()) {
+                  logger.info("Connected to back-end {}:{}; triggering client 
read.", backendHost, backendPort);
+                  clientChannel.read();
+               } else {
+                  logger.warn("Failed to connect to back-end {}:{}", 
backendHost, backendPort);
+                  clientChannel.close();
+               }
+            }
+         });
+      }
+
+      @Override
+      public void channelRead(final ChannelHandlerContext ctx, Object message) 
{
+         if (backEndChannel.isActive()) {
+            // We only want to send the header once if configured so ensure we 
gate that
+            if (sendProxyHeader && !proxyHeaderSent) {
+               // Write but don't flush so that the client bytes can be sent 
into the pipeline before flushing the work
+               // to the back-end broker instance.
+               backEndChannel.write(new HAProxyMessage(proxyProtocolVersion,
+                                                       HAProxyCommand.PROXY,
+                                                       
HAProxyProxiedProtocol.TCP4,
+                                                       headerSourceHost,
+                                                       headerDestinationHost,
+                                                       headerSourcePort,
+                                                       headerDestinationPort));
+
+               proxyHeaderSent = true;
+            }
+
+            backEndChannel.writeAndFlush(message).addListener(new 
ChannelFutureListener() {
+
+               @Override
+               public void operationComplete(ChannelFuture future) {
+                  if (future.isSuccess() && !autoReadEnabled) {
+                     logger.trace("Switching front end channel to auto-read 
after initial exchange.");
+                     ctx.channel().config().setAutoRead(true);
+                     ctx.channel().read();
+                     autoReadEnabled = true;
+                  } else if (!future.isSuccess()) {
+                     logger.warn("Error forwarding data from the front end to 
the back-end channel.");
+                     future.channel().close();
+                  }
+               }
+            });
+         }
+      }
+
+      @Override
+      public void channelInactive(ChannelHandlerContext ctx) {
+         if (backEndChannel != null) {
+            closeOnFlush(backEndChannel);
+         }
+      }
+
+      @Override
+      public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+         logger.warn("Error caught from front end handler", cause);
+         closeOnFlush(ctx.channel());
+      }
+   }
+
+   /**
+    * Back-end handler reads from the back-end and forwards all bytes 
unchanged to the front end client. The back-end
+    * adds an encoder for HA Proxy messages in order to encode them if the 
front end sends one into the pipeline for
+    * write to the back-end server.
+    */
+   public static class NettyHAProxyBackendHandler extends 
ChannelInboundHandlerAdapter {
+
+      private final Channel frontEndChannel;
+
+      private boolean autoReadEnabled;
+
+      public NettyHAProxyBackendHandler(Channel backEndChannel) {
+         this.frontEndChannel = backEndChannel;
+      }
+
+      @Override
+      public void channelActive(ChannelHandlerContext ctx) {
+         // The connection to the front end could have gone down in which case 
ensure we close out the connection to the
+         // back-end so that things get cleaned up.
+         if (frontEndChannel.isActive()) {
+            ctx.read();
+         } else {
+            closeOnFlush(ctx.channel());
+         }
+      }
+
+      @Override
+      public void channelInactive(ChannelHandlerContext ctx) {
+         // Connection to the back-end is down so close the front end to 
propagate the state.
+         logger.trace("Back-end connection has gone inactive");
+         closeOnFlush(frontEndChannel);
+      }
+
+      @Override
+      public void channelRead(final ChannelHandlerContext ctx, Object message) 
{
+         // Read from the back-end triggers passthrough write to the front end
+         frontEndChannel.writeAndFlush(message).addListener(new 
ChannelFutureListener() {
+
+            @Override
+            public void operationComplete(ChannelFuture future) {
+               if (future.isSuccess() && !autoReadEnabled) {
+                  logger.trace("Switching back-end channel to auto-read after 
initial exchange.");
+                  ctx.channel().config().setAutoRead(true);
+                  ctx.channel().read();
+                  autoReadEnabled = true;
+               } else if (!future.isSuccess()) {
+                  logger.warn("Error forwarding data from the back-end to the 
front end channel");
+                  future.channel().close();
+               }
+            }
+         });
+      }
+
+      @Override
+      public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+         logger.warn("Error caught from back-end handler", cause);
+         closeOnFlush(ctx.channel());
+      }
+   }
+}
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/MultiprotocolJMSClientTestSupport.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/MultiprotocolJMSClientTestSupport.java
index 89fe48490a..0c315c469f 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/MultiprotocolJMSClientTestSupport.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/MultiprotocolJMSClientTestSupport.java
@@ -290,8 +290,12 @@ public abstract class MultiprotocolJMSClientTestSupport 
extends ActiveMQTestBase
    }
 
    protected String getBrokerQpidJMSConnectionString() {
+      return getBrokerQpidJMSConnectionString(PORT);
+   }
+
+   protected String getBrokerQpidJMSConnectionString(int port) {
       try {
-         String uri = "amqp://127.0.0.1:" + PORT;
+         String uri = "amqp://127.0.0.1:" + port;
 
          if (!getJmsConnectionURIOptions().isEmpty()) {
             uri = uri + "?" + getJmsConnectionURIOptions();
@@ -304,8 +308,12 @@ public abstract class MultiprotocolJMSClientTestSupport 
extends ActiveMQTestBase
    }
 
    protected URI getBrokerQpidJMSConnectionURI() {
+      return getBrokerQpidJMSConnectionURI(PORT);
+   }
+
+   protected URI getBrokerQpidJMSConnectionURI(int port) {
       try {
-         return new URI(getBrokerQpidJMSConnectionString());
+         return new URI(getBrokerQpidJMSConnectionString(port));
       } catch (Exception e) {
          throw new RuntimeException();
       }
@@ -323,6 +331,10 @@ public abstract class MultiprotocolJMSClientTestSupport 
extends ActiveMQTestBase
       return createConnection(getBrokerQpidJMSConnectionURI(), null, null, 
null, true);
    }
 
+   protected Connection createConnection(int port) throws JMSException {
+      return createConnection(getBrokerQpidJMSConnectionURI(port), null, null, 
null, true);
+   }
+
    protected Connection createFailoverConnection() throws JMSException {
       return createConnection(getBrokerQpidJMSFailoverConnectionURI(), null, 
null, null, true);
    }
@@ -362,8 +374,12 @@ public abstract class MultiprotocolJMSClientTestSupport 
extends ActiveMQTestBase
    }
 
    protected String getBrokerCoreJMSConnectionString() {
+      return getBrokerCoreJMSConnectionString(PORT);
+   }
+
+   protected String getBrokerCoreJMSConnectionString(int port) {
       try {
-         String uri = "tcp://127.0.0.1:" + PORT;
+         String uri = "tcp://127.0.0.1:" + port;
 
          if (!getJmsConnectionURIOptions().isEmpty()) {
             uri = uri + "?" + getJmsConnectionURIOptions();
@@ -376,7 +392,11 @@ public abstract class MultiprotocolJMSClientTestSupport 
extends ActiveMQTestBase
    }
 
    protected Connection createCoreConnection() throws JMSException {
-      return createCoreConnection(getBrokerCoreJMSConnectionString(), null, 
null, null, true);
+      return createCoreConnection(PORT);
+   }
+
+   protected Connection createCoreConnection(int port) throws JMSException {
+      return createCoreConnection(getBrokerCoreJMSConnectionString(port), 
null, null, null, true);
    }
 
    protected Connection createCoreConnection(boolean start) throws 
JMSException {
@@ -413,8 +433,12 @@ public abstract class MultiprotocolJMSClientTestSupport 
extends ActiveMQTestBase
    }
 
    protected String getBrokerOpenWireJMSConnectionString() {
+      return getBrokerOpenWireJMSConnectionString(PORT);
+   }
+
+   protected String getBrokerOpenWireJMSConnectionString(int port) {
       try {
-         String uri = "tcp://127.0.0.1:" + PORT;
+         String uri = "tcp://127.0.0.1:" + port;
 
          if (!getJmsConnectionURIOptions().isEmpty()) {
             uri = uri + "?" + getJmsConnectionURIOptions();
@@ -432,6 +456,10 @@ public abstract class MultiprotocolJMSClientTestSupport 
extends ActiveMQTestBase
       return createOpenWireConnection(getBrokerOpenWireJMSConnectionString(), 
null, null, null, true);
    }
 
+   protected Connection createOpenWireConnection(int port) throws JMSException 
{
+      return 
createOpenWireConnection(getBrokerOpenWireJMSConnectionString(port), null, 
null, null, true);
+   }
+
    protected Connection createOpenWireConnection(boolean start) throws 
JMSException {
       return createOpenWireConnection(getBrokerOpenWireJMSConnectionString(), 
null, null, null, start);
    }
@@ -466,11 +494,11 @@ public abstract class MultiprotocolJMSClientTestSupport 
extends ActiveMQTestBase
       return connection;
    }
 
-   interface ConnectionSupplier {
+   public interface ConnectionSupplier {
       Connection createConnection() throws JMSException;
    }
 
-   interface SecureConnectionSupplier {
+   public interface SecureConnectionSupplier {
       Connection createConnection(String username, String Password) throws 
JMSException;
    }
 }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proxyprotocol/HAProxyTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proxyprotocol/HAProxyTest.java
new file mode 100644
index 0000000000..8389af829b
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proxyprotocol/HAProxyTest.java
@@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.proxyprotocol;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueConnection;
+import javax.jms.QueueRequestor;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import java.lang.invoke.MethodHandles;
+import java.net.URI;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import io.netty.handler.codec.haproxy.HAProxyProtocolVersion;
+import org.apache.activemq.ActiveMQSslConnectionFactory;
+import org.apache.activemq.artemis.api.core.JsonUtil;
+import org.apache.activemq.artemis.api.core.management.ResourceNames;
+import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
+import org.apache.activemq.artemis.api.jms.management.JMSManagementHelper;
+import 
org.apache.activemq.artemis.core.management.impl.view.predicate.ActiveMQFilterPredicate;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.jms.client.ActiveMQQueueConnectionFactory;
+import org.apache.activemq.artemis.json.JsonArray;
+import org.apache.activemq.artemis.json.JsonObject;
+import 
org.apache.activemq.artemis.tests.integration.jms.multiprotocol.MultiprotocolJMSClientTestSupport;
+import org.apache.activemq.artemis.utils.RandomUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.activemq.transport.netty.NettyHAProxyServer;
+import org.apache.qpid.protonj2.test.driver.ProtonTestClient;
+import org.apache.qpid.protonj2.test.driver.codec.security.SaslCode;
+import org.eclipse.paho.mqttv5.client.IMqttToken;
+import org.eclipse.paho.mqttv5.client.MqttCallback;
+import org.eclipse.paho.mqttv5.client.MqttClient;
+import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
+import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse;
+import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;
+import org.eclipse.paho.mqttv5.common.MqttException;
+import org.eclipse.paho.mqttv5.common.MqttMessage;
+import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport.AT_LEAST_ONCE;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTimeout;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class HAProxyTest extends MultiprotocolJMSClientTestSupport {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   private static final int BROKER_PROXY_PORT = 61617;
+   private static final int BROKER_PROXY_SSL_PORT = 61618;
+   private static final int BROKER_STANDARD_PORT = 61616;
+
+   // the following fake IPs and ports are injected by the NettyHAProxyServer 
to verify functionality on the broker
+   private static final String HEADER_SOURCE_HOST = "9.9.9.9";
+   private static final int HEADER_SOURCE_PORT = 9999;
+   private static final String HEADER_DESTINATION_HOST = "8.8.8.8";
+   private static final int HEADER_DESTINATION_PORT = 8888;
+   private static final String REMOTE_ADDRESS_TO_VERIFY = HEADER_SOURCE_HOST + 
":" + HEADER_SOURCE_PORT;
+   private static final String PROXY_ADDRESS_TO_VERIFY = 
HEADER_DESTINATION_HOST + ":" + HEADER_DESTINATION_PORT;
+
+   @Override
+   protected ActiveMQServer createServer() throws Exception {
+      server = createServer(false, createDefaultNettyConfig()
+         .clearAcceptorConfigurations()
+         .addAcceptorConfiguration("standard", "tcp://127.0.0.1:" + 
BROKER_STANDARD_PORT + "?protocols=CORE,AMQP")
+         .addAcceptorConfiguration("proxyEnabled", "tcp://127.0.0.1:" + 
BROKER_PROXY_PORT + "?proxyProtocolEnabled=true")
+         .addAcceptorConfiguration("proxyAndSslEnabled", "tcp://127.0.0.1:" + 
BROKER_PROXY_SSL_PORT + 
"?proxyProtocolEnabled=true;sslEnabled=true;protocols=CORE,AMQP,MQTT,OPENWIRE;supportAdvisory=false;suppressInternalManagementObjects=true;keyStorePath=server-keystore.jks;keyStorePassword=securepass"));
+
+      server.start();
+
+      return server;
+   }
+
+   /*
+    * a non-proxied connection shouldn't be able to connect to an acceptor 
using proxyEnabled=true
+    */
+   @Test
+   public void testNonProxiedConnectionToProxyAcceptor() {
+      testFailure(() -> createConnection(BROKER_PROXY_PORT));
+   }
+
+   /*
+    * a proxied connection shouldn't be able to connect to an acceptor using 
proxyEnabled=false
+    */
+   @Test
+   public void testProxiedV1ConnectionToNonProxyAcceptor() throws Exception {
+      testProxiedConnectionToNonProxyAcceptor(HAProxyProtocolVersion.V1);
+   }
+
+   /*
+    * a proxied connection shouldn't be able to connect to an acceptor using 
proxyEnabled=false
+    */
+   @Test
+   public void testProxiedV2ConnectionToNonProxyAcceptor() throws Exception {
+      testProxiedConnectionToNonProxyAcceptor(HAProxyProtocolVersion.V2);
+   }
+
+   public void testProxiedConnectionToNonProxyAcceptor(HAProxyProtocolVersion 
version) throws Exception {
+      NettyHAProxyServer proxy = new NettyHAProxyServer()
+         .setBackEndPort(BROKER_STANDARD_PORT)
+         .setSendProxyHeader(true)
+         .setProxyProtocolVersion(version)
+         .setHeaderSourceHost(HEADER_SOURCE_HOST)
+         .setHeaderSourcePort(HEADER_SOURCE_PORT)
+         .setHeaderDestinationHost(HEADER_DESTINATION_HOST)
+         .setHeaderDestinationPort(HEADER_DESTINATION_PORT)
+         .start();
+      runAfter(proxy::stop);
+      testFailure(() -> createConnection(proxy.getFrontendPortInUse()));
+   }
+
+   private void testFailure(ConnectionSupplier cf) {
+      assertThrows(JMSException.class, () -> {
+         assertTimeout(Duration.ofMillis(2000), () -> testSendReceive(cf, 
null));
+      });
+   }
+
+   @Test
+   public void testSendReceiveCoreV1() throws Exception {
+      testSendReceiveCore(HAProxyProtocolVersion.V1);
+   }
+
+   @Test
+   public void testSendReceiveCoreV2() throws Exception {
+      testSendReceiveCore(HAProxyProtocolVersion.V2);
+   }
+
+   private void testSendReceiveCore(HAProxyProtocolVersion version) throws 
Exception {
+      int proxyPort = startProxy(version, BROKER_PROXY_PORT);
+      testSendReceive(() -> createCoreConnection(proxyPort), version);
+   }
+
+   @Test
+   public void testSendReceiveAMQPV1() throws Exception {
+      testSendReceiveAMQP(HAProxyProtocolVersion.V1);
+   }
+
+   @Test
+   public void testSendReceiveAMQPV2() throws Exception {
+      testSendReceiveAMQP(HAProxyProtocolVersion.V2);
+   }
+
+   private void testSendReceiveAMQP(HAProxyProtocolVersion version) throws 
Exception {
+      int proxyPort = startProxy(version, BROKER_PROXY_PORT);
+      testSendReceive(() -> createConnection(proxyPort), version);
+   }
+
+   @Test
+   public void testSendReceiveOpenWireV1() throws Exception {
+      testSendReceiveOpenWire(HAProxyProtocolVersion.V1);
+   }
+
+   @Test
+   public void testSendReceiveOpenWireV2() throws Exception {
+      testSendReceiveOpenWire(HAProxyProtocolVersion.V2);
+   }
+
+   private void testSendReceiveOpenWire(HAProxyProtocolVersion version) throws 
Exception {
+      int proxyPort = startProxy(version, BROKER_PROXY_PORT);
+      testSendReceive(() -> createOpenWireConnection(proxyPort), version);
+   }
+
+   @Test
+   public void testSendReceiveCoreV1Ssl() throws Exception {
+      testSendReceiveCoreSsl(HAProxyProtocolVersion.V1);
+   }
+
+   @Test
+   public void testSendReceiveCoreV2Ssl() throws Exception {
+      testSendReceiveCoreSsl(HAProxyProtocolVersion.V2);
+   }
+
+   private void testSendReceiveCoreSsl(HAProxyProtocolVersion version) throws 
Exception {
+      int proxyPort = startProxy(version, BROKER_PROXY_SSL_PORT);
+      testSendReceive(() -> createCoreConnection("tcp://localhost:" + 
proxyPort + 
"?sslEnabled=true;trustStorePath=server-ca-truststore.jks;trustStorePassword=securepass",
 null, null, null, true), version);
+   }
+
+   @Test
+   public void testSendReceiveAmqpV1Ssl() throws Exception {
+      testSendReceiveAmqpSsl(HAProxyProtocolVersion.V1);
+   }
+
+   @Test
+   public void testSendReceiveAmqpV2Ssl() throws Exception {
+      testSendReceiveAmqpSsl(HAProxyProtocolVersion.V2);
+   }
+
+   private void testSendReceiveAmqpSsl(HAProxyProtocolVersion version) throws 
Exception {
+      int proxyPort = startProxy(version, BROKER_PROXY_SSL_PORT);
+      URL truststorePath = 
Thread.currentThread().getContextClassLoader().getResource("server-ca-truststore.jks");
+      assertNotNull(truststorePath, "Truststore file not found on classpath");
+      String truststore = truststorePath.getPath();
+      URI uri = new URI("amqps://localhost:" + proxyPort + 
"?transport.trustStoreLocation=" + truststore + 
"&transport.trustStorePassword=securepass");
+      testSendReceive(() -> createConnection(uri, null, null, null, true), 
version);
+   }
+
+   @Test
+   public void testSendReceiveOpenWireV1Ssl() throws Exception {
+      testSendReceiveOpenWireSsl(HAProxyProtocolVersion.V1);
+   }
+
+   @Test
+   public void testSendReceiveOpenWireV2Ssl() throws Exception {
+      testSendReceiveOpenWireSsl(HAProxyProtocolVersion.V1);
+   }
+
+   private void testSendReceiveOpenWireSsl(HAProxyProtocolVersion version) 
throws Exception {
+      int proxyPort = startProxy(version, BROKER_PROXY_SSL_PORT);
+      testSendReceive(() -> {
+         ActiveMQSslConnectionFactory cf = new 
ActiveMQSslConnectionFactory("ssl://localhost:" + proxyPort);
+         try {
+            cf.setTrustStore("server-ca-truststore.jks");
+         } catch (Exception e) {
+            throw new RuntimeException(e);
+         }
+         cf.setKeyStorePassword("securepass");
+         return cf.createConnection();
+      }, version);
+   }
+
+   private void testSendReceive(ConnectionSupplier cf, HAProxyProtocolVersion 
version) throws Exception {
+      int numberOfMessages = 100;
+
+      for (int dest = 0; dest < 5; dest++) {
+         Connection producerConnection = cf.createConnection();
+         Wait.assertTrue(() -> verifyProxyConnectionCount(1, version));
+         Session session = producerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+         Queue queue = session.createQueue("queue.test" + dest);
+         MessageProducer producer = session.createProducer(queue);
+         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+         for (int i = 0; i < numberOfMessages; i++) {
+            producer.send(session.createTextMessage("hello " + i));
+         }
+
+         Connection consumerConnection = cf.createConnection();
+         Wait.assertTrue(() -> verifyProxyConnectionCount(2, version));
+         Session sessionConsumer = consumerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+         Queue queueConsumer = sessionConsumer.createQueue("queue.test" + 
dest);
+         MessageConsumer consumer = 
sessionConsumer.createConsumer(queueConsumer);
+         consumerConnection.start();
+
+         for (int i = 0; i < numberOfMessages; i++) {
+            Message message = consumer.receive(5000);
+            assertNotNull(message);
+         }
+
+         producerConnection.close();
+         consumerConnection.close();
+      }
+   }
+
+   @Test
+   public void testSendReceiveMqttV1() throws Exception {
+      testSendReceiveMqtt(HAProxyProtocolVersion.V1);
+   }
+
+   @Test
+   public void testSendReceiveMqttV2() throws Exception {
+      testSendReceiveMqtt(HAProxyProtocolVersion.V2);
+   }
+
+   private void testSendReceiveMqtt(HAProxyProtocolVersion version) throws 
Exception {
+      final int proxyPort = startProxy(version, BROKER_PROXY_PORT);
+      final String url = "tcp://localhost:" + proxyPort;
+      String topic = RandomUtil.randomUUIDString();
+      MqttConnectionOptions connectionOptions = new MqttConnectionOptions();
+
+      CountDownLatch latch = new CountDownLatch(1);
+      MqttClient subscriber = new MqttClient(url, "subscriber", new 
MemoryPersistence());
+      subscriber.connect(connectionOptions);
+      subscriber.setCallback(new MqttCallback() {
+         @Override
+         public void disconnected(MqttDisconnectResponse 
mqttDisconnectResponse) {
+         }
+
+         @Override
+         public void mqttErrorOccurred(MqttException e) {
+         }
+
+         @Override
+         public void deliveryComplete(IMqttToken iMqttToken) {
+         }
+
+         @Override
+         public void connectComplete(boolean b, String s) {
+         }
+
+         @Override
+         public void authPacketArrived(int i, MqttProperties mqttProperties) {
+         }
+
+         @Override
+         public void messageArrived(String topic, MqttMessage message) {
+            logger.info("Message received from topic {}, message={}", topic, 
message);
+            latch.countDown();
+         }
+      });
+      subscriber.subscribe(topic, AT_LEAST_ONCE);
+
+      MqttClient producer = new MqttClient(url, "producer", new 
MemoryPersistence());
+      producer.connect(connectionOptions);
+      producer.publish(topic, "myMessage".getBytes(StandardCharsets.UTF_8), 1, 
false);
+      assertTrue(latch.await(500, TimeUnit.MILLISECONDS));
+      Wait.assertTrue(() -> verifyProxyConnectionCount(2, version));
+      subscriber.disconnect();
+      producer.disconnect();
+   }
+
+   private int startProxy(HAProxyProtocolVersion version, int backEndPort) {
+      NettyHAProxyServer proxy = new NettyHAProxyServer()
+         .setBackEndPort(backEndPort)
+         .setProxyProtocolVersion(version)
+         .setHeaderSourceHost(HEADER_SOURCE_HOST)
+         .setHeaderSourcePort(HEADER_SOURCE_PORT)
+         .setHeaderDestinationHost(HEADER_DESTINATION_HOST)
+         .setHeaderDestinationPort(HEADER_DESTINATION_PORT)
+         .start();
+      runAfter(proxy::stop);
+      return proxy.getFrontendPortInUse();
+   }
+
+   private boolean verifyProxyConnectionCount(int expectedConnections, 
HAProxyProtocolVersion version) throws Exception {
+      // this connection goes directly to the broker so it won't be counted as 
a proxy connection
+      try (ActiveMQQueueConnectionFactory cf = new 
ActiveMQQueueConnectionFactory("tcp://localhost:" + BROKER_STANDARD_PORT);
+           QueueConnection c = cf.createQueueConnection()) {
+         QueueSession s = c.createQueueSession(false, 
Session.AUTO_ACKNOWLEDGE);
+         Queue managementQueue = 
ActiveMQJMSClient.createQueue("activemq.management");
+         QueueRequestor requestor = new QueueRequestor(s, managementQueue);
+         c.start();
+         Message m = s.createMessage();
+         String filter = createJsonFilter("proxyProtocolVersion", 
ActiveMQFilterPredicate.Operation.EQUALS.toString(), version.toString());
+         JMSManagementHelper.putOperationInvocation(m, ResourceNames.BROKER, 
"listConnections", filter, 1, 50);
+         JsonObject result = JsonUtil.readJsonObject((String) 
JMSManagementHelper.getResult(requestor.request(m), String.class));
+         if (expectedConnections != result.getJsonNumber("count").intValue()) {
+            return false;
+         }
+         JsonArray connections = result.getJsonArray("data");
+         for (int i = 0; i < expectedConnections; i++) {
+            if 
(!connections.getJsonObject(i).getString("proxyAddress").equals(PROXY_ADDRESS_TO_VERIFY))
 {
+               return false;
+            }
+            if 
(!connections.getJsonObject(i).getString("remoteAddress").equals(REMOTE_ADDRESS_TO_VERIFY))
 {
+               return false;
+            }
+         }
+         return true;
+      }
+   }
+
+   @Test
+   @Timeout(30)
+   public void 
testBrokerHandlesSplitAMQPHeaderBytesDuringConnectWithNoProxyHeader() throws 
Exception {
+      try (ProtonTestClient receivingPeer = new ProtonTestClient()) {
+         receivingPeer.connect("localhost", BROKER_STANDARD_PORT);
+         receivingPeer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         receivingPeer.expectSASLHeader();
+         
receivingPeer.expectSaslMechanisms().withSaslServerMechanism("ANONYMOUS");
+         receivingPeer.remoteSaslInit().withMechanism("ANONYMOUS").queue();
+         receivingPeer.expectSaslOutcome().withCode(SaslCode.OK);
+         receivingPeer.remoteAMQPHeader().queue();
+         receivingPeer.expectAMQPHeader();
+
+         receivingPeer.remoteBytes().withBytes(new byte[] {'A', 'M'}).now();
+         receivingPeer.remoteBytes().withBytes(new byte[] {'Q', 'P', 3, 1, 0, 
0}).later(10);
+
+         receivingPeer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         // Broker response after SASL anonymous connect
+         receivingPeer.expectOpen();
+         receivingPeer.expectBegin();
+
+         // Create basic connection with session
+         receivingPeer.remoteOpen().withContainerId("test-sender").now();
+         receivingPeer.remoteBegin().withNextOutgoingId(100).now();
+
+         receivingPeer.waitForScriptToComplete();
+      }
+   }
+}
diff --git 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/SocksProxyTest.java
 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/SocksProxyTest.java
index f084f38351..e84f92d47e 100644
--- 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/SocksProxyTest.java
+++ 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/SocksProxyTest.java
@@ -103,8 +103,8 @@ public class SocksProxyTest extends ActiveMQTestBase {
 
       Map<String, Object> params = new HashMap<>();
       params.put(TransportConstants.HOST_PROP_NAME, address.getHostAddress());
-      params.put(TransportConstants.PROXY_ENABLED_PROP_NAME, true);
-      params.put(TransportConstants.PROXY_HOST_PROP_NAME, "localhost");
+      params.put(TransportConstants.SOCKS_ENABLED_PROP_NAME, true);
+      params.put(TransportConstants.SOCKS_HOST_PROP_NAME, "localhost");
 
       ClientConnectionLifeCycleListener listener = new 
ClientConnectionLifeCycleListener() {
          @Override
@@ -166,8 +166,8 @@ public class SocksProxyTest extends ActiveMQTestBase {
 
       Map<String, Object> params = new HashMap<>();
       params.put(TransportConstants.HOST_PROP_NAME, "localhost");
-      params.put(TransportConstants.PROXY_ENABLED_PROP_NAME, true);
-      params.put(TransportConstants.PROXY_HOST_PROP_NAME, "localhost");
+      params.put(TransportConstants.SOCKS_ENABLED_PROP_NAME, true);
+      params.put(TransportConstants.SOCKS_HOST_PROP_NAME, "localhost");
 
       ClientConnectionLifeCycleListener listener = new 
ClientConnectionLifeCycleListener() {
          @Override
@@ -208,10 +208,10 @@ public class SocksProxyTest extends ActiveMQTestBase {
       Map<String, Object> params = new HashMap<>();
 
       params.put(TransportConstants.HOST_PROP_NAME, 
"only-resolvable-on-proxy");
-      params.put(TransportConstants.PROXY_ENABLED_PROP_NAME, true);
-      params.put(TransportConstants.PROXY_HOST_PROP_NAME, "localhost");
-      params.put(TransportConstants.PROXY_PORT_PROP_NAME, SOCKS_PORT);
-      params.put(TransportConstants.PROXY_REMOTE_DNS_PROP_NAME, true);
+      params.put(TransportConstants.SOCKS_ENABLED_PROP_NAME, true);
+      params.put(TransportConstants.SOCKS_HOST_PROP_NAME, "localhost");
+      params.put(TransportConstants.SOCKS_PORT_PROP_NAME, SOCKS_PORT);
+      params.put(TransportConstants.SOCKS_REMOTE_DNS_PROP_NAME, true);
 
       ClientConnectionLifeCycleListener listener = new 
ClientConnectionLifeCycleListener() {
          @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to