This is an automated email from the ASF dual-hosted git repository.

jbertram pushed a commit to branch 2.52.x
in repository https://gitbox.apache.org/repos/asf/artemis.git

commit 521e672e4108675806d748158444ce23f9ef76ca
Author: Justin Bertram <[email protected]>
AuthorDate: Thu Feb 19 09:44:57 2026 -0600

    Refactor federation downstream packet handling
    
    This commit includes the following changes:
     - Separate the handling of federation downstream connect packets into
       its own handler
     - Add a new config parameter
     - Disambiguate existing Core federation logging
     - Add new logging for each possible outcome when handling these packets
     - Add tests
     - Add docs
---
 .../artemis/core/config/Configuration.java         |   6 +
 .../core/config/impl/ConfigurationImpl.java        |  18 +++
 .../deployers/impl/FileConfigurationParser.java    |  18 +++
 .../protocol/core/impl/CoreProtocolManager.java    |  65 ++++++--
 .../artemis/core/server/ActiveMQServerLogger.java  |  28 +++-
 .../core/server/federation/FederationManager.java  |  20 ++-
 .../resources/schema/artemis-configuration.xsd     |   1 +
 .../core/config/impl/ConfigurationImplTest.java    |  37 +++++
 .../config/impl/FileConfigurationParserTest.java   |  14 ++
 .../_downstream-authorization-sample.adoc          |   7 +
 docs/user-manual/_downstream-authorization.adoc    |   6 +
 docs/user-manual/federation-address.adoc           |   8 +-
 docs/user-manual/federation-queue.adoc             |   8 +-
 .../federation/FederationDownstreamDirectTest.java | 168 +++++++++++++++++++++
 14 files changed, 377 insertions(+), 27 deletions(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
index 440abab376..790213343c 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
@@ -1545,4 +1545,10 @@ public interface Configuration {
    }
 
    Map<String, JaasAppConfiguration> getJaasConfigs();
+
+   List<String> getFederationDownstreamAuthorization();
+
+   void setFederationDownstreamAuthorization(List<String> roles);
+
+   Configuration addFederationDownstreamAuthorization(String role);
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
index 85e21296e9..2997ef85ff 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
@@ -250,6 +250,8 @@ public class ConfigurationImpl extends 
javax.security.auth.login.Configuration i
 
    protected List<FederationConfiguration> federationConfigurations = new 
ArrayList<>();
 
+   protected List<String> downstreamAuthorization = new ArrayList<>();
+
    @Deprecated
    // this can eventually be replaced with List<QueueConfiguration>, but to 
keep existing semantics it must stay as is for now
    private List<CoreQueueConfiguration> coreQueueConfigurations = new 
ArrayList<>();
@@ -3522,6 +3524,22 @@ public class ConfigurationImpl extends 
javax.security.auth.login.Configuration i
       lockCoordinatorConfigurations.add(configuration);
    }
 
+   @Override
+   public List<String> getFederationDownstreamAuthorization() {
+      return downstreamAuthorization;
+   }
+
+   @Override
+   public void setFederationDownstreamAuthorization(List<String> roles) {
+      this.downstreamAuthorization = roles;
+   }
+
+   @Override
+   public Configuration addFederationDownstreamAuthorization(String role) {
+      downstreamAuthorization.add(role);
+      return this;
+   }
+
    // extend property utils with ability to auto-fill and locate from 
collections
    // collection entries are identified by the name() property
    private static class CollectionAutoFillPropertiesUtil extends 
PropertyUtilsBean {
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index 82e01806d0..30e64d6ae4 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -29,6 +29,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
 
 import org.apache.activemq.artemis.ArtemisConstants;
 import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
@@ -670,6 +671,13 @@ public final class FileConfigurationParser extends 
XMLConfigurationUtil {
          }
       }
 
+      NodeList federations = e.getElementsByTagName("federations");
+
+      for (int i = 0; i < federations.getLength(); i++) {
+         Element fedNode = (Element) federations.item(i);
+         parseFederationsConfiguration(fedNode, config);
+      }
+
       NodeList fedNodes = e.getElementsByTagName("federation");
 
       for (int i = 0; i < fedNodes.getLength(); i++) {
@@ -2760,6 +2768,16 @@ public final class FileConfigurationParser extends 
XMLConfigurationUtil {
       mainConfig.getBridgeConfigurations().add(config);
    }
 
+   private void parseFederationsConfiguration(final Element fedNode, final 
Configuration mainConfig) throws Exception {
+      String roles = fedNode.getAttribute("downstream-authorization");
+      if (!roles.isEmpty()) {
+         Stream.of(roles.split(","))
+            .map(String::trim)
+            .filter(s -> !s.isEmpty())
+            .forEach(mainConfig::addFederationDownstreamAuthorization);
+      }
+   }
+
    private void parseFederationConfiguration(final Element fedNode, final 
Configuration mainConfig) throws Exception {
       FederationConfiguration config = new FederationConfiguration();
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
index e8a285557c..c8a17b09a5 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
@@ -65,12 +65,15 @@ import 
org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnection;
 import 
org.apache.activemq.artemis.core.remoting.impl.netty.ActiveMQFrameDecoder2;
 import 
org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
 import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
 import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
+import org.apache.activemq.artemis.spi.core.security.jaas.UserPrincipal;
+import org.apache.activemq.artemis.utils.SecurityManagerUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.lang.invoke.MethodHandles;
@@ -163,8 +166,7 @@ public class CoreProtocolManager implements 
ProtocolManager<Interceptor, ActiveM
          .addClusterChannelHandler(rc.getChannel(CHANNEL_ID.CLUSTER.id, -1), 
acceptorUsed, rc,
                                    server.getActivation());
 
-      final Channel federationChannel =  
rc.getChannel(CHANNEL_ID.FEDERATION.id, -1);
-      federationChannel.setHandler(new LocalChannelHandler(config, entry, 
channel0, acceptorUsed, rc));
+      rc.getChannel(CHANNEL_ID.FEDERATION.id, -1).setHandler(new 
FederationChannelHandler(acceptorUsed, rc));
 
       return entry;
    }
@@ -397,7 +399,44 @@ public class CoreProtocolManager implements 
ProtocolManager<Interceptor, ActiveM
                   }
                });
             }
-         } else if (packet.getType() == 
PacketImpl.FEDERATION_DOWNSTREAM_CONNECT) {
+         }
+      }
+
+      private Pair<TransportConfiguration, TransportConfiguration> getPair(
+         TransportConfiguration conn,
+         boolean isBackup) {
+         if (isBackup) {
+            return new Pair<>(null, conn);
+         }
+         return new Pair<>(conn, null);
+      }
+   }
+
+   private class FederationChannelHandler implements ChannelHandler {
+
+      private final Acceptor acceptorUsed;
+      private final CoreRemotingConnection rc;
+
+      private FederationChannelHandler(final Acceptor acceptorUsed, final 
CoreRemotingConnection rc) {
+         this.acceptorUsed = acceptorUsed;
+         this.rc = rc;
+      }
+
+      @Override
+      public void handlePacket(final Packet packet) {
+         if (packet.getType() == PacketImpl.FEDERATION_DOWNSTREAM_CONNECT) {
+            if (server.getSecurityStore().isSecurityEnabled()) {
+               if (rc.getSubject() == null) {
+                  
ActiveMQServerLogger.LOGGER.federationDownstreamUnauthenticated(rc.getRemoteAddress());
+                  rc.close();
+                  return;
+               }
+               if 
(!server.getFederationManager().authorizeDownstreamDeployment(rc.getSubject())) 
{
+                  
ActiveMQServerLogger.LOGGER.federationDownstreamUnauthorized(rc.getRemoteAddress(),
 SecurityManagerUtil.getUserFromSubject(rc.getSubject(), UserPrincipal.class));
+                  rc.close();
+                  return;
+               }
+            }
             //If we receive this packet then a remote broker is requesting us 
to create federated upstream connection
             //back to it which simulates a downstream connection
             final FederationDownstreamConnectMessage message = 
(FederationDownstreamConnectMessage) packet;
@@ -477,37 +516,35 @@ public class CoreProtocolManager implements 
ProtocolManager<Interceptor, ActiveM
             //Register close and failure listeners, if the initial downstream 
connection goes down then we
             //want to terminate the upstream connection
             rc.addCloseListener(() -> {
-               server.getFederationManager().undeploy(config.getName());
+               handleClose(config.getName(), rc.getRemoteAddress());
             });
 
             rc.addFailureListener(new FailureListener() {
                @Override
                public void connectionFailed(ActiveMQException exception, 
boolean failedOver) {
-                  server.getFederationManager().undeploy(config.getName());
+                  handleClose(config.getName(), rc.getRemoteAddress());
                }
 
                @Override
-               public void connectionFailed(ActiveMQException exception, 
boolean failedOver,
-                                            String scaleDownTargetNodeID) {
-                  server.getFederationManager().undeploy(config.getName());
+               public void connectionFailed(ActiveMQException exception, 
boolean failedOver, String scaleDownTargetNodeID) {
+                  handleClose(config.getName(), rc.getRemoteAddress());
                }
             });
 
             try {
                server.getFederationManager().deploy(config);
+               String user = rc.getSubject() != null ? 
SecurityManagerUtil.getUserFromSubject(rc.getSubject(), UserPrincipal.class) : 
"anonymous";
+               
ActiveMQServerLogger.LOGGER.federationDownstreamDeployedFromRemoteUser(config.getName(),
 user, rc.getRemoteAddress());
             } catch (Exception e) {
                logger.error("Error deploying federation", e);
             }
          }
       }
 
-      private Pair<TransportConfiguration, TransportConfiguration> getPair(
-         TransportConfiguration conn,
-         boolean isBackup) {
-         if (isBackup) {
-            return new Pair<>(null, conn);
+      private void handleClose(String configName, String remoteAddress) {
+         if (server.getFederationManager().undeploy(configName)) {
+            
ActiveMQServerLogger.LOGGER.federationDownstreamConnectionClosed(remoteAddress, 
configName);
          }
-         return new Pair<>(conn, null);
       }
    }
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index 1443522541..32ff67035f 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -1003,28 +1003,28 @@ public interface ActiveMQServerLogger {
    @LogMessage(id = 222278, value = "Unable to extract GroupSequence from 
message", level = LogMessage.Level.WARN)
    void unableToExtractGroupSequence(Throwable e);
 
-   @LogMessage(id = 222279, value = "Federation upstream {} policy ref {} 
could not be resolved in federation configuration", level = 
LogMessage.Level.WARN)
+   @LogMessage(id = 222279, value = "Core federation upstream {} policy ref {} 
could not be resolved in configuration", level = LogMessage.Level.WARN)
    void federationCantFindPolicyRef(String upstreamName, String policyRef);
 
-   @LogMessage(id = 222280, value = "Federation upstream {} policy ref {} is 
of unknown type in federation configuration", level = LogMessage.Level.WARN)
+   @LogMessage(id = 222280, value = "Core federation upstream {} policy ref {} 
is of unknown type in configuration", level = LogMessage.Level.WARN)
    void federationUnknownPolicyType(String upstreamName, String policyRef);
 
-   @LogMessage(id = 222281, value = "Federation upstream {} policy ref {} are 
too self referential, avoiding stack overflow , ", level = 
LogMessage.Level.WARN)
+   @LogMessage(id = 222281, value = "Core federation upstream {} policy ref {} 
are too self referential, avoiding stack overflow", level = 
LogMessage.Level.WARN)
    void federationAvoidStackOverflowPolicyRef(String upstreamName, String 
policyRef);
 
-   @LogMessage(id = 222282, value = "Federation downstream {} upstream 
transport configuration ref {} could not be resolved in federation 
configuration", level = LogMessage.Level.WARN)
+   @LogMessage(id = 222282, value = "Core federation downstream {} upstream 
transport configuration ref {} could not be resolved in configuration", level = 
LogMessage.Level.WARN)
    void federationCantFindUpstreamConnector(String downstreamName, String 
upstreamRef);
 
-   @LogMessage(id = 222283, value = "Federation downstream {} has been 
deployed", level = LogMessage.Level.INFO)
+   @LogMessage(id = 222283, value = "Core federation downstream {} has been 
deployed", level = LogMessage.Level.INFO)
    void federationDownstreamDeployed(String downstreamName);
 
-   @LogMessage(id = 222284, value = "Federation downstream {} has been 
undeployed", level = LogMessage.Level.INFO)
+   @LogMessage(id = 222284, value = "Core federation downstream {} has been 
undeployed", level = LogMessage.Level.INFO)
    void federationDownstreamUnDeployed(String downstreamName);
 
    @LogMessage(id = 222285, value = "File {} at {} is empty. Delete the empty 
file to stop this message.", level = LogMessage.Level.WARN)
    void emptyAddressFile(String addressFile, String directory);
 
-   @LogMessage(id = 222286, value = "Error executing {} federation plugin 
method.", level = LogMessage.Level.WARN)
+   @LogMessage(id = 222286, value = "Error executing {} Core federation plugin 
method.", level = LogMessage.Level.WARN)
    void federationPluginExecutionError(String pluginMethod, Throwable e);
 
    @LogMessage(id = 222287, value = "Error looking up bindings for address 
{}.", level = LogMessage.Level.WARN)
@@ -1077,7 +1077,7 @@ public interface ActiveMQServerLogger {
    @LogMessage(id = 222304, value = "Unable to load message from journal", 
level = LogMessage.Level.WARN)
    void unableToLoadMessageFromJournal(Throwable t);
 
-   @LogMessage(id = 222305, value = "Error federating message {}.", level = 
LogMessage.Level.WARN)
+   @LogMessage(id = 222305, value = "Error during Core federation of message: 
{}.", level = LogMessage.Level.WARN)
    void federationDispatchError(String message, Throwable e);
 
    @LogMessage(id = 222306, value = "Failed to load prepared TX and it will be 
rolled back: {}", level = LogMessage.Level.WARN)
@@ -1530,4 +1530,16 @@ public interface ActiveMQServerLogger {
 
    @LogMessage(id = 224157, value = "At least one of the components failed to 
start under the lockCoordinator {}. A retry will be executed", level = 
LogMessage.Level.INFO)
    void retryLockCoordinator(String name);
+
+   @LogMessage(id = 224158, value = "Unable to process Core federation 
downstream request from {}. User is not authenticated. Closing connection.", 
level = LogMessage.Level.WARN)
+   void federationDownstreamUnauthenticated(String remoteAddress);
+
+   @LogMessage(id = 224159, value = "Unable to process Core federation 
downstream request from {}. User {} is not authorized. Closing connection.", 
level = LogMessage.Level.WARN)
+   void federationDownstreamUnauthorized(String remoteAddress, String user);
+
+   @LogMessage(id = 224160, value = "Core federation downstream {} has been 
deployed. Request sent by user {} from {}.", level = LogMessage.Level.INFO)
+   void federationDownstreamDeployedFromRemoteUser(String name, String 
userFromSubject, String remoteAddress);
+
+   @LogMessage(id = 224161, value = "Connection from {} closed. Undeployed 
Core federation {}.", level = LogMessage.Level.INFO)
+   void federationDownstreamConnectionClosed(String remoteAddress, String 
name);
 }
\ No newline at end of file
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederationManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederationManager.java
index ee3534d400..ec731eea77 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederationManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederationManager.java
@@ -16,13 +16,16 @@
  */
 package org.apache.activemq.artemis.core.server.federation;
 
+import javax.security.auth.Subject;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.core.server.ActiveMQComponent;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.config.FederationConfiguration;
+import org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal;
 
 public class FederationManager implements ActiveMQComponent {
 
@@ -30,6 +33,7 @@ public class FederationManager implements ActiveMQComponent {
 
    private Map<String, Federation> federations = new HashMap<>();
    private State state;
+   private List<String> downstreamAuthorization;
 
    enum State {
       STOPPED,
@@ -47,6 +51,7 @@ public class FederationManager implements ActiveMQComponent {
 
    public FederationManager(final ActiveMQServer server) {
       this.server = server;
+      this.downstreamAuthorization = 
server.getConfiguration().getFederationDownstreamAuthorization();
    }
 
    @Override
@@ -90,8 +95,9 @@ public class FederationManager implements ActiveMQComponent {
       Federation federation = federations.remove(name);
       if (federation != null) {
          federation.stop();
+         return true;
       }
-      return true;
+      return false;
    }
 
 
@@ -129,4 +135,16 @@ public class FederationManager implements 
ActiveMQComponent {
       server.unRegisterBrokerPlugin(federatedAbstract);
    }
 
+   public boolean authorizeDownstreamDeployment(Subject subject) {
+      if (!server.getSecurityStore().isSecurityEnabled()) {
+         return true;
+      }
+      for (RolePrincipal role : subject.getPrincipals(RolePrincipal.class)) {
+         if (downstreamAuthorization.contains(role.getName())) {
+            return true;
+         }
+      }
+      return false;
+   }
+
 }
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd 
b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index 5c12855f29..7354d8dc09 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -1646,6 +1646,7 @@
          <xsd:sequence>
             <xsd:element name="federation" type="federationType" 
maxOccurs="unbounded" minOccurs="0"/>
          </xsd:sequence>
+         <xsd:attribute name="downstream-authorization" type="xsd:string" 
use="optional" />
          <xsd:attributeGroup ref="xml:specialAttrs"/>
       </xsd:complexType>
    </xsd:element>
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
index 94b0d234ad..7c6332da7a 100644
--- 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
@@ -1299,6 +1299,43 @@ public class ConfigurationImplTest extends 
AbstractConfigurationTestBase {
       assertEquals("secureexample", 
configuration.getFederationConfigurations().get(0).getCredentials().getPassword());
    }
 
+   @Test
+   public void testParseDownstreamAuthorization() throws Exception {
+      Properties properties = new Properties();
+      properties.put("federationDownstreamAuthorization", "a,b,c,d,e");
+      ConfigurationImpl configuration = new ConfigurationImpl();
+      configuration.parsePrefixedProperties(properties, null);
+      assertEquals(5, 
configuration.getFederationDownstreamAuthorization().size());
+      assertEquals(List.of("a", "b", "c", "d", "e"), 
configuration.getFederationDownstreamAuthorization());
+   }
+
+   @Test
+   public void testDownstreamExportImport() throws Exception {
+      ConfigurationImpl configuration = new ConfigurationImpl();
+      configuration.getFederationDownstreamAuthorization().add("a");
+      configuration.getFederationDownstreamAuthorization().add("b");
+      configuration.getFederationDownstreamAuthorization().add("c");
+      configuration.getFederationDownstreamAuthorization().add("d");
+      configuration.getFederationDownstreamAuthorization().add("e");
+      assertEquals(List.of("a", "b", "c", "d", "e"), 
configuration.getFederationDownstreamAuthorization());
+
+      File outputProperty = new File(getTestDirfile(), "broker.properties");
+      configuration.exportAsProperties(outputProperty);
+
+      ConfigurationImpl outputConfig = new ConfigurationImpl();
+
+      Properties brokerProperties = new Properties();
+      try (FileInputStream is = new FileInputStream(outputProperty)) {
+         BufferedInputStream bis = new BufferedInputStream(is);
+         brokerProperties.load(bis);
+      }
+
+      assertEquals("a,b,c,d,e", 
brokerProperties.get("federationDownstreamAuthorization"));
+
+      outputConfig.parsePrefixedProperties(brokerProperties, null);
+      assertEquals(List.of("a", "b", "c", "d", "e"), 
outputConfig.getFederationDownstreamAuthorization());
+   }
+
    @Test
    public void testExportWithNonPasswordEnc() throws Exception {
       ConfigurationImpl configuration = new ConfigurationImpl();
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
index fdca23ef83..b820fd83b7 100644
--- 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
@@ -1114,6 +1114,20 @@ public class FileConfigurationParserTest extends 
ServerTestBase {
       assertEquals("myQueue", match.getQueueMatch());
    }
 
+   @Test
+   public void testParseFederationDownstreamAuthorization() throws Exception {
+      String middlePart = "<federations downstream-authorization=\"a, b,c\"/>";
+      String configStr = FIRST_PART + middlePart + LAST_PART;
+
+      final FileConfigurationParser parser = new FileConfigurationParser();
+      final ByteArrayInputStream input = new 
ByteArrayInputStream(configStr.getBytes(StandardCharsets.UTF_8));
+
+      final Configuration configuration = parser.parseMainConfig(input);
+      
assertTrue(configuration.getFederationDownstreamAuthorization().contains("a"));
+      
assertTrue(configuration.getFederationDownstreamAuthorization().contains("b"));
+      
assertTrue(configuration.getFederationDownstreamAuthorization().contains("c"));
+   }
+
    @Test
    public void testParsingDiskFullPolicy() throws Exception {
       String configStr = """
diff --git a/docs/user-manual/_downstream-authorization-sample.adoc 
b/docs/user-manual/_downstream-authorization-sample.adoc
new file mode 100644
index 0000000000..4a94c2e33e
--- /dev/null
+++ b/docs/user-manual/_downstream-authorization-sample.adoc
@@ -0,0 +1,7 @@
+Sample Downstream Federation setup on the downstream broker:
+
+[,xml]
+----
+<!-- the "federation_username" configured on the upstream broker must be in 
this role -->
+<federations downstream-authorization="federation_role" />
+----
\ No newline at end of file
diff --git a/docs/user-manual/_downstream-authorization.adoc 
b/docs/user-manual/_downstream-authorization.adoc
new file mode 100644
index 0000000000..3d345efdc1
--- /dev/null
+++ b/docs/user-manual/_downstream-authorization.adoc
@@ -0,0 +1,6 @@
+downstream-authorization::
+This is an attribute on the `federations` element that is set on the 
_downstream_ broker so that it can properly authorize incoming federation 
requests.
+It is a comma-delimited list of roles which are authorized to deploy 
federation on this broker.
+The broker sending the federation command must use credentials for a user that 
is in one of the roles listed here.
++
+If this is not defined then the broker will not process incoming federation 
commands.
\ No newline at end of file
diff --git a/docs/user-manual/federation-address.adoc 
b/docs/user-manual/federation-address.adoc
index ce87b91ff2..43def75854 100644
--- a/docs/user-manual/federation-address.adoc
+++ b/docs/user-manual/federation-address.adoc
@@ -265,7 +265,7 @@ Similarly to `upstream` configuration, a downstream 
configuration can be configu
 This works by sending a command to the `downstream` broker to have it create 
an `upstream` connection back to the downstream broker.
 The benefit of this is being able to configure everything for federation on 
one broker in some cases to make it easier, such as a hub and spoke topology
 
-All the same configuration options apply to `downstream` as does `upstream` 
with the exception of one extra configuration flag that needs to be set:
+All the same configuration options apply to `downstream` as does `upstream` 
with two exceptions:
 
 upstream-connector-ref::
 Is an element pointing to a `connector` elements defined elsewhere.
@@ -274,7 +274,9 @@ This reference is used to tell the downstream broker what 
connector to use to cr
 A _connector_ encapsulates knowledge of what transport to use (TCP, SSL, HTTP 
etc.) as well as the server connection parameters (host, port etc).
 For more information about what connectors are and how to configure them, see 
xref:configuring-transports.adoc#configuring-the-transport[Configuring the 
Transport].
 
-Sample Downstream Address Federation setup:
+include::_downstream-authorization.adoc[]
+
+Sample Downstream Address Federation setup on the upstream broker:
 
 [,xml]
 ----
@@ -325,3 +327,5 @@ Sample Downstream Address Federation setup:
    </federation>
 </federations>
 ----
+
+include::_downstream-authorization-sample.adoc[]
\ No newline at end of file
diff --git a/docs/user-manual/federation-queue.adoc 
b/docs/user-manual/federation-queue.adoc
index 3d345795df..4e593b40aa 100644
--- a/docs/user-manual/federation-queue.adoc
+++ b/docs/user-manual/federation-queue.adoc
@@ -230,7 +230,7 @@ Similarly to `upstream` configuration, a downstream 
configuration can be configu
 This works by sending a command to the `downstream` broker to have it create 
an `upstream` connection back to the downstream broker.
 The benefit of this is being able to configure everything for federation on 
one broker in some cases to make it easier, such as a hub and spoke topology.
 
-All the same configuration options apply to `downstream` as does `upstream` 
with the exception of one extra configuration flag that needs to be set:
+All the same configuration options apply to `downstream` as does `upstream` 
with two exceptions:
 
 upstream-connector-ref::
 Is an element pointing to a `connector` elements defined elsewhere.
@@ -239,7 +239,9 @@ This reference is used to tell the downstream broker what 
connector to use to cr
 A _connector_ encapsulates knowledge of what transport to use (TCP, SSL, HTTP 
etc.) as well as the server connection parameters (host, port etc).
 For more information about what connectors are and how to configure them, see 
xref:configuring-transports.adoc#configuring-the-transport[Configuring the 
Transport].
 
-Sample Downstream Address Federation setup:
+include::_downstream-authorization.adoc[]
+
+Sample Downstream Address Federation setup on the upstream broker:
 
 [,xml]
 ----
@@ -291,3 +293,5 @@ Sample Downstream Address Federation setup:
    </federation>
 </federations>
 ----
+
+include::_downstream-authorization-sample.adoc[]
\ No newline at end of file
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederationDownstreamDirectTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederationDownstreamDirectTest.java
new file mode 100644
index 0000000000..32c1cb47a4
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederationDownstreamDirectTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.federation;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.config.Configuration;
+import 
org.apache.activemq.artemis.core.config.federation.FederationDownstreamConfiguration;
+import org.apache.activemq.artemis.core.config.federation.FederationPolicy;
+import org.apache.activemq.artemis.core.config.federation.FederationPolicySet;
+import 
org.apache.activemq.artemis.core.config.federation.FederationQueuePolicyConfiguration;
+import org.apache.activemq.artemis.core.protocol.core.Channel;
+import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
+import org.apache.activemq.artemis.core.protocol.core.impl.ChannelImpl;
+import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.FederationDownstreamConnectMessage;
+import 
org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import 
org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+
+import static 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.FederationDownstreamConnectMessage.UPSTREAM_SUFFIX;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class FederationDownstreamDirectTest extends ActiveMQTestBase {
+
+   protected ActiveMQServer server;
+
+   private static final String noAuth = "noAuth";
+
+   private static final String authorizedUser = "authorizedUser";
+   private static final String authorizedPass = "authorizedPass";
+   private static final String authorizedRole = "authorizedRole";
+
+   private static final String unauthorizedUser = "unauthorizedUser";
+   private static final String unauthorizedPass = "unauthorizedPass";
+   private static final String unauthorizedRole = "unauthorizedRole";
+
+   @BeforeEach
+   public void setUp(TestInfo testInfo) throws Exception {
+      super.setUp();
+      Configuration config = 
createDefaultNettyConfig().setSecurityEnabled(true);
+      if (!testInfo.getTags().contains(noAuth)) {
+         config.addFederationDownstreamAuthorization(authorizedRole);
+      }
+      server = createServer(false, config);
+      server.start();
+
+      ActiveMQJAASSecurityManager securityManager = 
(ActiveMQJAASSecurityManager) server.getSecurityManager();
+
+      securityManager.getConfiguration().addUser(authorizedUser, 
authorizedPass);
+      securityManager.getConfiguration().addRole(authorizedUser, 
authorizedRole);
+
+      securityManager.getConfiguration().addUser(unauthorizedUser, 
unauthorizedPass);
+      securityManager.getConfiguration().addRole(unauthorizedUser, 
unauthorizedRole);
+   }
+
+   @Test
+   @Tag(noAuth)
+   public void testNoAuthConfigured() throws Exception {
+      try (AssertionLoggerHandler loggerHandler = new 
AssertionLoggerHandler()) {
+         sendFederationDownstreamConnectMessage(authorizedUser, 
authorizedPass, false);
+         assertFalse(loggerHandler.findText("AMQ224158"));
+         assertTrue(loggerHandler.findText("AMQ224159"));
+         assertFalse(loggerHandler.findText("AMQ224160"));
+      }
+   }
+
+   @Test
+   public void testUnauthenticatedDeployment() throws Exception {
+      // send the federation message without authenticating with a session 
first
+      try (AssertionLoggerHandler loggerHandler = new 
AssertionLoggerHandler()) {
+         sendFederationDownstreamConnectMessage(null, null, false);
+         assertTrue(loggerHandler.findText("AMQ224158"));
+         assertFalse(loggerHandler.findText("AMQ224159"));
+         assertFalse(loggerHandler.findText("AMQ224160"));
+      }
+   }
+
+   @Test
+   public void testUnauthorizedDeployment() throws Exception {
+      // send the federation message after authenticating with a user who 
isn't authorized for downstream deployment
+      try (AssertionLoggerHandler loggerHandler = new 
AssertionLoggerHandler()) {
+         sendFederationDownstreamConnectMessage(unauthorizedUser, 
unauthorizedPass, false);
+         assertFalse(loggerHandler.findText("AMQ224158"));
+         assertTrue(loggerHandler.findText("AMQ224159"));
+         assertFalse(loggerHandler.findText("AMQ224160"));
+      }
+   }
+
+   @Test
+   public void testSuccessfulDeployment() throws Exception {
+      // send the federation message after authenticating with a user who is 
authorized for downstream deployment
+      try (AssertionLoggerHandler loggerHandler = new 
AssertionLoggerHandler()) {
+         sendFederationDownstreamConnectMessage(authorizedUser, 
authorizedPass, true);
+         assertFalse(loggerHandler.findText("AMQ224158"));
+         assertFalse(loggerHandler.findText("AMQ224159"));
+         assertTrue(loggerHandler.findText("AMQ224160"));
+         Wait.assertTrue(() -> loggerHandler.findText("AMQ224161"));
+      }
+   }
+
+   private void sendFederationDownstreamConnectMessage(String user, String 
password, boolean succeed) throws Exception {
+      try (ServerLocator locator = 
ActiveMQClient.createServerLocator("tcp://localhost:61616")) {
+         ClientSessionFactory factory = locator.createSessionFactory();
+         ClientSession session;
+         if (user != null) {
+            session = factory.createSession(user, password, true, true, true, 
true, -1);
+         }
+         CoreRemotingConnection coreConn = (CoreRemotingConnection) 
factory.getConnection();
+         Wait.assertEquals(1, 
server.getActiveMQServerControl()::getConnectionCount);
+         Channel federationChannel = 
coreConn.getChannel(ChannelImpl.CHANNEL_ID.FEDERATION.id, -1);
+         
federationChannel.send(getFederationDownstreamConnectMessage(getName()));
+         if (succeed) {
+            Wait.assertNotNull(() -> 
server.getFederationManager().get(getName() + UPSTREAM_SUFFIX), 1000, 20);
+         } else {
+            assertFalse(Wait.waitFor(() -> 
server.getFederationManager().get(getName() + UPSTREAM_SUFFIX) != null, 1000, 
20));
+            assertEquals(0, 
server.getActiveMQServerControl().getConnectionCount());
+         }
+      }
+   }
+
+   private FederationDownstreamConnectMessage 
getFederationDownstreamConnectMessage(String name) {
+      final String policySetName = "fake-policy-set";
+      final String policyConfigName = "fake-policy-config";
+      FederationDownstreamConnectMessage msg = new 
FederationDownstreamConnectMessage();
+      msg.setName(name);
+
+      Map<String, FederationPolicy> policyMap = new HashMap<>();
+      policyMap.put(policyConfigName, new 
FederationQueuePolicyConfiguration().setName(policyConfigName).addInclude(new 
FederationQueuePolicyConfiguration.Matcher().setQueueMatch("#").setAddressMatch("#")));
+      policyMap.put(policySetName, new 
FederationPolicySet().setName(policySetName).addPolicyRef(policyConfigName));
+      msg.setFederationPolicyMap(policyMap);
+
+      FederationDownstreamConfiguration downstreamConfig = new 
FederationDownstreamConfiguration()
+         .setName("fake")
+         .addPolicyRef(policySetName);
+      downstreamConfig.setUpstreamConfiguration(new 
TransportConfiguration(NettyConnectorFactory.class.getName(), new HashMap<>(), 
"fake"));
+      msg.setStreamConfiguration(downstreamConfig);
+      return msg;
+   }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to