This is an automated email from the ASF dual-hosted git repository. jbertram pushed a commit to branch 2.52.x in repository https://gitbox.apache.org/repos/asf/artemis.git
commit 521e672e4108675806d748158444ce23f9ef76ca Author: Justin Bertram <[email protected]> AuthorDate: Thu Feb 19 09:44:57 2026 -0600 Refactor federation downstream packet handling This commit includes the following changes: - Separate the handling of federation downstream connect packets into its own handler - Add a new config parameter - Disambiguate existing Core federation logging - Add new logging for each possible outcome when handling these packets - Add tests - Add docs --- .../artemis/core/config/Configuration.java | 6 + .../core/config/impl/ConfigurationImpl.java | 18 +++ .../deployers/impl/FileConfigurationParser.java | 18 +++ .../protocol/core/impl/CoreProtocolManager.java | 65 ++++++-- .../artemis/core/server/ActiveMQServerLogger.java | 28 +++- .../core/server/federation/FederationManager.java | 20 ++- .../resources/schema/artemis-configuration.xsd | 1 + .../core/config/impl/ConfigurationImplTest.java | 37 +++++ .../config/impl/FileConfigurationParserTest.java | 14 ++ .../_downstream-authorization-sample.adoc | 7 + docs/user-manual/_downstream-authorization.adoc | 6 + docs/user-manual/federation-address.adoc | 8 +- docs/user-manual/federation-queue.adoc | 8 +- .../federation/FederationDownstreamDirectTest.java | 168 +++++++++++++++++++++ 14 files changed, 377 insertions(+), 27 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java index 440abab376..790213343c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java @@ -1545,4 +1545,10 @@ public interface Configuration { } Map<String, JaasAppConfiguration> getJaasConfigs(); + + List<String> getFederationDownstreamAuthorization(); + + void setFederationDownstreamAuthorization(List<String> roles); + + Configuration addFederationDownstreamAuthorization(String role); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java index 85e21296e9..2997ef85ff 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java @@ -250,6 +250,8 @@ public class ConfigurationImpl extends javax.security.auth.login.Configuration i protected List<FederationConfiguration> federationConfigurations = new ArrayList<>(); + protected List<String> downstreamAuthorization = new ArrayList<>(); + @Deprecated // this can eventually be replaced with List<QueueConfiguration>, but to keep existing semantics it must stay as is for now private List<CoreQueueConfiguration> coreQueueConfigurations = new ArrayList<>(); @@ -3522,6 +3524,22 @@ public class ConfigurationImpl extends javax.security.auth.login.Configuration i lockCoordinatorConfigurations.add(configuration); } + @Override + public List<String> getFederationDownstreamAuthorization() { + return downstreamAuthorization; + } + + @Override + public void setFederationDownstreamAuthorization(List<String> roles) { + this.downstreamAuthorization = roles; + } + + @Override + public Configuration addFederationDownstreamAuthorization(String role) { + downstreamAuthorization.add(role); + return this; + } + // extend property utils with ability to auto-fill and locate from collections // collection entries are identified by the name() property private static class CollectionAutoFillPropertiesUtil extends PropertyUtilsBean { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index 82e01806d0..30e64d6ae4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; import org.apache.activemq.artemis.ArtemisConstants; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; @@ -670,6 +671,13 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { } } + NodeList federations = e.getElementsByTagName("federations"); + + for (int i = 0; i < federations.getLength(); i++) { + Element fedNode = (Element) federations.item(i); + parseFederationsConfiguration(fedNode, config); + } + NodeList fedNodes = e.getElementsByTagName("federation"); for (int i = 0; i < fedNodes.getLength(); i++) { @@ -2760,6 +2768,16 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { mainConfig.getBridgeConfigurations().add(config); } + private void parseFederationsConfiguration(final Element fedNode, final Configuration mainConfig) throws Exception { + String roles = fedNode.getAttribute("downstream-authorization"); + if (!roles.isEmpty()) { + Stream.of(roles.split(",")) + .map(String::trim) + .filter(s -> !s.isEmpty()) + .forEach(mainConfig::addFederationDownstreamAuthorization); + } + } + private void parseFederationConfiguration(final Element fedNode, final Configuration mainConfig) throws Exception { FederationConfiguration config = new FederationConfiguration(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java index e8a285557c..c8a17b09a5 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java @@ -65,12 +65,15 @@ import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnection; import org.apache.activemq.artemis.core.remoting.impl.netty.ActiveMQFrameDecoder2; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection; import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.remoting.Acceptor; import org.apache.activemq.artemis.spi.core.remoting.Connection; +import org.apache.activemq.artemis.spi.core.security.jaas.UserPrincipal; +import org.apache.activemq.artemis.utils.SecurityManagerUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.lang.invoke.MethodHandles; @@ -163,8 +166,7 @@ public class CoreProtocolManager implements ProtocolManager<Interceptor, ActiveM .addClusterChannelHandler(rc.getChannel(CHANNEL_ID.CLUSTER.id, -1), acceptorUsed, rc, server.getActivation()); - final Channel federationChannel = rc.getChannel(CHANNEL_ID.FEDERATION.id, -1); - federationChannel.setHandler(new LocalChannelHandler(config, entry, channel0, acceptorUsed, rc)); + rc.getChannel(CHANNEL_ID.FEDERATION.id, -1).setHandler(new FederationChannelHandler(acceptorUsed, rc)); return entry; } @@ -397,7 +399,44 @@ public class CoreProtocolManager implements ProtocolManager<Interceptor, ActiveM } }); } - } else if (packet.getType() == PacketImpl.FEDERATION_DOWNSTREAM_CONNECT) { + } + } + + private Pair<TransportConfiguration, TransportConfiguration> getPair( + TransportConfiguration conn, + boolean isBackup) { + if (isBackup) { + return new Pair<>(null, conn); + } + return new Pair<>(conn, null); + } + } + + private class FederationChannelHandler implements ChannelHandler { + + private final Acceptor acceptorUsed; + private final CoreRemotingConnection rc; + + private FederationChannelHandler(final Acceptor acceptorUsed, final CoreRemotingConnection rc) { + this.acceptorUsed = acceptorUsed; + this.rc = rc; + } + + @Override + public void handlePacket(final Packet packet) { + if (packet.getType() == PacketImpl.FEDERATION_DOWNSTREAM_CONNECT) { + if (server.getSecurityStore().isSecurityEnabled()) { + if (rc.getSubject() == null) { + ActiveMQServerLogger.LOGGER.federationDownstreamUnauthenticated(rc.getRemoteAddress()); + rc.close(); + return; + } + if (!server.getFederationManager().authorizeDownstreamDeployment(rc.getSubject())) { + ActiveMQServerLogger.LOGGER.federationDownstreamUnauthorized(rc.getRemoteAddress(), SecurityManagerUtil.getUserFromSubject(rc.getSubject(), UserPrincipal.class)); + rc.close(); + return; + } + } //If we receive this packet then a remote broker is requesting us to create federated upstream connection //back to it which simulates a downstream connection final FederationDownstreamConnectMessage message = (FederationDownstreamConnectMessage) packet; @@ -477,37 +516,35 @@ public class CoreProtocolManager implements ProtocolManager<Interceptor, ActiveM //Register close and failure listeners, if the initial downstream connection goes down then we //want to terminate the upstream connection rc.addCloseListener(() -> { - server.getFederationManager().undeploy(config.getName()); + handleClose(config.getName(), rc.getRemoteAddress()); }); rc.addFailureListener(new FailureListener() { @Override public void connectionFailed(ActiveMQException exception, boolean failedOver) { - server.getFederationManager().undeploy(config.getName()); + handleClose(config.getName(), rc.getRemoteAddress()); } @Override - public void connectionFailed(ActiveMQException exception, boolean failedOver, - String scaleDownTargetNodeID) { - server.getFederationManager().undeploy(config.getName()); + public void connectionFailed(ActiveMQException exception, boolean failedOver, String scaleDownTargetNodeID) { + handleClose(config.getName(), rc.getRemoteAddress()); } }); try { server.getFederationManager().deploy(config); + String user = rc.getSubject() != null ? SecurityManagerUtil.getUserFromSubject(rc.getSubject(), UserPrincipal.class) : "anonymous"; + ActiveMQServerLogger.LOGGER.federationDownstreamDeployedFromRemoteUser(config.getName(), user, rc.getRemoteAddress()); } catch (Exception e) { logger.error("Error deploying federation", e); } } } - private Pair<TransportConfiguration, TransportConfiguration> getPair( - TransportConfiguration conn, - boolean isBackup) { - if (isBackup) { - return new Pair<>(null, conn); + private void handleClose(String configName, String remoteAddress) { + if (server.getFederationManager().undeploy(configName)) { + ActiveMQServerLogger.LOGGER.federationDownstreamConnectionClosed(remoteAddress, configName); } - return new Pair<>(conn, null); } } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java index 1443522541..32ff67035f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java @@ -1003,28 +1003,28 @@ public interface ActiveMQServerLogger { @LogMessage(id = 222278, value = "Unable to extract GroupSequence from message", level = LogMessage.Level.WARN) void unableToExtractGroupSequence(Throwable e); - @LogMessage(id = 222279, value = "Federation upstream {} policy ref {} could not be resolved in federation configuration", level = LogMessage.Level.WARN) + @LogMessage(id = 222279, value = "Core federation upstream {} policy ref {} could not be resolved in configuration", level = LogMessage.Level.WARN) void federationCantFindPolicyRef(String upstreamName, String policyRef); - @LogMessage(id = 222280, value = "Federation upstream {} policy ref {} is of unknown type in federation configuration", level = LogMessage.Level.WARN) + @LogMessage(id = 222280, value = "Core federation upstream {} policy ref {} is of unknown type in configuration", level = LogMessage.Level.WARN) void federationUnknownPolicyType(String upstreamName, String policyRef); - @LogMessage(id = 222281, value = "Federation upstream {} policy ref {} are too self referential, avoiding stack overflow , ", level = LogMessage.Level.WARN) + @LogMessage(id = 222281, value = "Core federation upstream {} policy ref {} are too self referential, avoiding stack overflow", level = LogMessage.Level.WARN) void federationAvoidStackOverflowPolicyRef(String upstreamName, String policyRef); - @LogMessage(id = 222282, value = "Federation downstream {} upstream transport configuration ref {} could not be resolved in federation configuration", level = LogMessage.Level.WARN) + @LogMessage(id = 222282, value = "Core federation downstream {} upstream transport configuration ref {} could not be resolved in configuration", level = LogMessage.Level.WARN) void federationCantFindUpstreamConnector(String downstreamName, String upstreamRef); - @LogMessage(id = 222283, value = "Federation downstream {} has been deployed", level = LogMessage.Level.INFO) + @LogMessage(id = 222283, value = "Core federation downstream {} has been deployed", level = LogMessage.Level.INFO) void federationDownstreamDeployed(String downstreamName); - @LogMessage(id = 222284, value = "Federation downstream {} has been undeployed", level = LogMessage.Level.INFO) + @LogMessage(id = 222284, value = "Core federation downstream {} has been undeployed", level = LogMessage.Level.INFO) void federationDownstreamUnDeployed(String downstreamName); @LogMessage(id = 222285, value = "File {} at {} is empty. Delete the empty file to stop this message.", level = LogMessage.Level.WARN) void emptyAddressFile(String addressFile, String directory); - @LogMessage(id = 222286, value = "Error executing {} federation plugin method.", level = LogMessage.Level.WARN) + @LogMessage(id = 222286, value = "Error executing {} Core federation plugin method.", level = LogMessage.Level.WARN) void federationPluginExecutionError(String pluginMethod, Throwable e); @LogMessage(id = 222287, value = "Error looking up bindings for address {}.", level = LogMessage.Level.WARN) @@ -1077,7 +1077,7 @@ public interface ActiveMQServerLogger { @LogMessage(id = 222304, value = "Unable to load message from journal", level = LogMessage.Level.WARN) void unableToLoadMessageFromJournal(Throwable t); - @LogMessage(id = 222305, value = "Error federating message {}.", level = LogMessage.Level.WARN) + @LogMessage(id = 222305, value = "Error during Core federation of message: {}.", level = LogMessage.Level.WARN) void federationDispatchError(String message, Throwable e); @LogMessage(id = 222306, value = "Failed to load prepared TX and it will be rolled back: {}", level = LogMessage.Level.WARN) @@ -1530,4 +1530,16 @@ public interface ActiveMQServerLogger { @LogMessage(id = 224157, value = "At least one of the components failed to start under the lockCoordinator {}. A retry will be executed", level = LogMessage.Level.INFO) void retryLockCoordinator(String name); + + @LogMessage(id = 224158, value = "Unable to process Core federation downstream request from {}. User is not authenticated. Closing connection.", level = LogMessage.Level.WARN) + void federationDownstreamUnauthenticated(String remoteAddress); + + @LogMessage(id = 224159, value = "Unable to process Core federation downstream request from {}. User {} is not authorized. Closing connection.", level = LogMessage.Level.WARN) + void federationDownstreamUnauthorized(String remoteAddress, String user); + + @LogMessage(id = 224160, value = "Core federation downstream {} has been deployed. Request sent by user {} from {}.", level = LogMessage.Level.INFO) + void federationDownstreamDeployedFromRemoteUser(String name, String userFromSubject, String remoteAddress); + + @LogMessage(id = 224161, value = "Connection from {} closed. Undeployed Core federation {}.", level = LogMessage.Level.INFO) + void federationDownstreamConnectionClosed(String remoteAddress, String name); } \ No newline at end of file diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederationManager.java index ee3534d400..ec731eea77 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederationManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederationManager.java @@ -16,13 +16,16 @@ */ package org.apache.activemq.artemis.core.server.federation; +import javax.security.auth.Subject; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Objects; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.config.FederationConfiguration; +import org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal; public class FederationManager implements ActiveMQComponent { @@ -30,6 +33,7 @@ public class FederationManager implements ActiveMQComponent { private Map<String, Federation> federations = new HashMap<>(); private State state; + private List<String> downstreamAuthorization; enum State { STOPPED, @@ -47,6 +51,7 @@ public class FederationManager implements ActiveMQComponent { public FederationManager(final ActiveMQServer server) { this.server = server; + this.downstreamAuthorization = server.getConfiguration().getFederationDownstreamAuthorization(); } @Override @@ -90,8 +95,9 @@ public class FederationManager implements ActiveMQComponent { Federation federation = federations.remove(name); if (federation != null) { federation.stop(); + return true; } - return true; + return false; } @@ -129,4 +135,16 @@ public class FederationManager implements ActiveMQComponent { server.unRegisterBrokerPlugin(federatedAbstract); } + public boolean authorizeDownstreamDeployment(Subject subject) { + if (!server.getSecurityStore().isSecurityEnabled()) { + return true; + } + for (RolePrincipal role : subject.getPrincipals(RolePrincipal.class)) { + if (downstreamAuthorization.contains(role.getName())) { + return true; + } + } + return false; + } + } diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index 5c12855f29..7354d8dc09 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -1646,6 +1646,7 @@ <xsd:sequence> <xsd:element name="federation" type="federationType" maxOccurs="unbounded" minOccurs="0"/> </xsd:sequence> + <xsd:attribute name="downstream-authorization" type="xsd:string" use="optional" /> <xsd:attributeGroup ref="xml:specialAttrs"/> </xsd:complexType> </xsd:element> diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java index 94b0d234ad..7c6332da7a 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java @@ -1299,6 +1299,43 @@ public class ConfigurationImplTest extends AbstractConfigurationTestBase { assertEquals("secureexample", configuration.getFederationConfigurations().get(0).getCredentials().getPassword()); } + @Test + public void testParseDownstreamAuthorization() throws Exception { + Properties properties = new Properties(); + properties.put("federationDownstreamAuthorization", "a,b,c,d,e"); + ConfigurationImpl configuration = new ConfigurationImpl(); + configuration.parsePrefixedProperties(properties, null); + assertEquals(5, configuration.getFederationDownstreamAuthorization().size()); + assertEquals(List.of("a", "b", "c", "d", "e"), configuration.getFederationDownstreamAuthorization()); + } + + @Test + public void testDownstreamExportImport() throws Exception { + ConfigurationImpl configuration = new ConfigurationImpl(); + configuration.getFederationDownstreamAuthorization().add("a"); + configuration.getFederationDownstreamAuthorization().add("b"); + configuration.getFederationDownstreamAuthorization().add("c"); + configuration.getFederationDownstreamAuthorization().add("d"); + configuration.getFederationDownstreamAuthorization().add("e"); + assertEquals(List.of("a", "b", "c", "d", "e"), configuration.getFederationDownstreamAuthorization()); + + File outputProperty = new File(getTestDirfile(), "broker.properties"); + configuration.exportAsProperties(outputProperty); + + ConfigurationImpl outputConfig = new ConfigurationImpl(); + + Properties brokerProperties = new Properties(); + try (FileInputStream is = new FileInputStream(outputProperty)) { + BufferedInputStream bis = new BufferedInputStream(is); + brokerProperties.load(bis); + } + + assertEquals("a,b,c,d,e", brokerProperties.get("federationDownstreamAuthorization")); + + outputConfig.parsePrefixedProperties(brokerProperties, null); + assertEquals(List.of("a", "b", "c", "d", "e"), outputConfig.getFederationDownstreamAuthorization()); + } + @Test public void testExportWithNonPasswordEnc() throws Exception { ConfigurationImpl configuration = new ConfigurationImpl(); diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java index fdca23ef83..b820fd83b7 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java @@ -1114,6 +1114,20 @@ public class FileConfigurationParserTest extends ServerTestBase { assertEquals("myQueue", match.getQueueMatch()); } + @Test + public void testParseFederationDownstreamAuthorization() throws Exception { + String middlePart = "<federations downstream-authorization=\"a, b,c\"/>"; + String configStr = FIRST_PART + middlePart + LAST_PART; + + final FileConfigurationParser parser = new FileConfigurationParser(); + final ByteArrayInputStream input = new ByteArrayInputStream(configStr.getBytes(StandardCharsets.UTF_8)); + + final Configuration configuration = parser.parseMainConfig(input); + assertTrue(configuration.getFederationDownstreamAuthorization().contains("a")); + assertTrue(configuration.getFederationDownstreamAuthorization().contains("b")); + assertTrue(configuration.getFederationDownstreamAuthorization().contains("c")); + } + @Test public void testParsingDiskFullPolicy() throws Exception { String configStr = """ diff --git a/docs/user-manual/_downstream-authorization-sample.adoc b/docs/user-manual/_downstream-authorization-sample.adoc new file mode 100644 index 0000000000..4a94c2e33e --- /dev/null +++ b/docs/user-manual/_downstream-authorization-sample.adoc @@ -0,0 +1,7 @@ +Sample Downstream Federation setup on the downstream broker: + +[,xml] +---- +<!-- the "federation_username" configured on the upstream broker must be in this role --> +<federations downstream-authorization="federation_role" /> +---- \ No newline at end of file diff --git a/docs/user-manual/_downstream-authorization.adoc b/docs/user-manual/_downstream-authorization.adoc new file mode 100644 index 0000000000..3d345efdc1 --- /dev/null +++ b/docs/user-manual/_downstream-authorization.adoc @@ -0,0 +1,6 @@ +downstream-authorization:: +This is an attribute on the `federations` element that is set on the _downstream_ broker so that it can properly authorize incoming federation requests. +It is a comma-delimited list of roles which are authorized to deploy federation on this broker. +The broker sending the federation command must use credentials for a user that is in one of the roles listed here. ++ +If this is not defined then the broker will not process incoming federation commands. \ No newline at end of file diff --git a/docs/user-manual/federation-address.adoc b/docs/user-manual/federation-address.adoc index ce87b91ff2..43def75854 100644 --- a/docs/user-manual/federation-address.adoc +++ b/docs/user-manual/federation-address.adoc @@ -265,7 +265,7 @@ Similarly to `upstream` configuration, a downstream configuration can be configu This works by sending a command to the `downstream` broker to have it create an `upstream` connection back to the downstream broker. The benefit of this is being able to configure everything for federation on one broker in some cases to make it easier, such as a hub and spoke topology -All the same configuration options apply to `downstream` as does `upstream` with the exception of one extra configuration flag that needs to be set: +All the same configuration options apply to `downstream` as does `upstream` with two exceptions: upstream-connector-ref:: Is an element pointing to a `connector` elements defined elsewhere. @@ -274,7 +274,9 @@ This reference is used to tell the downstream broker what connector to use to cr A _connector_ encapsulates knowledge of what transport to use (TCP, SSL, HTTP etc.) as well as the server connection parameters (host, port etc). For more information about what connectors are and how to configure them, see xref:configuring-transports.adoc#configuring-the-transport[Configuring the Transport]. -Sample Downstream Address Federation setup: +include::_downstream-authorization.adoc[] + +Sample Downstream Address Federation setup on the upstream broker: [,xml] ---- @@ -325,3 +327,5 @@ Sample Downstream Address Federation setup: </federation> </federations> ---- + +include::_downstream-authorization-sample.adoc[] \ No newline at end of file diff --git a/docs/user-manual/federation-queue.adoc b/docs/user-manual/federation-queue.adoc index 3d345795df..4e593b40aa 100644 --- a/docs/user-manual/federation-queue.adoc +++ b/docs/user-manual/federation-queue.adoc @@ -230,7 +230,7 @@ Similarly to `upstream` configuration, a downstream configuration can be configu This works by sending a command to the `downstream` broker to have it create an `upstream` connection back to the downstream broker. The benefit of this is being able to configure everything for federation on one broker in some cases to make it easier, such as a hub and spoke topology. -All the same configuration options apply to `downstream` as does `upstream` with the exception of one extra configuration flag that needs to be set: +All the same configuration options apply to `downstream` as does `upstream` with two exceptions: upstream-connector-ref:: Is an element pointing to a `connector` elements defined elsewhere. @@ -239,7 +239,9 @@ This reference is used to tell the downstream broker what connector to use to cr A _connector_ encapsulates knowledge of what transport to use (TCP, SSL, HTTP etc.) as well as the server connection parameters (host, port etc). For more information about what connectors are and how to configure them, see xref:configuring-transports.adoc#configuring-the-transport[Configuring the Transport]. -Sample Downstream Address Federation setup: +include::_downstream-authorization.adoc[] + +Sample Downstream Address Federation setup on the upstream broker: [,xml] ---- @@ -291,3 +293,5 @@ Sample Downstream Address Federation setup: </federation> </federations> ---- + +include::_downstream-authorization-sample.adoc[] \ No newline at end of file diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederationDownstreamDirectTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederationDownstreamDirectTest.java new file mode 100644 index 0000000000..32c1cb47a4 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederationDownstreamDirectTest.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.federation; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.api.core.client.ActiveMQClient; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; +import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.config.federation.FederationDownstreamConfiguration; +import org.apache.activemq.artemis.core.config.federation.FederationPolicy; +import org.apache.activemq.artemis.core.config.federation.FederationPolicySet; +import org.apache.activemq.artemis.core.config.federation.FederationQueuePolicyConfiguration; +import org.apache.activemq.artemis.core.protocol.core.Channel; +import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection; +import org.apache.activemq.artemis.core.protocol.core.impl.ChannelImpl; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.FederationDownstreamConnectMessage; +import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.logs.AssertionLoggerHandler; +import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.utils.Wait; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; + +import static org.apache.activemq.artemis.core.protocol.core.impl.wireformat.FederationDownstreamConnectMessage.UPSTREAM_SUFFIX; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class FederationDownstreamDirectTest extends ActiveMQTestBase { + + protected ActiveMQServer server; + + private static final String noAuth = "noAuth"; + + private static final String authorizedUser = "authorizedUser"; + private static final String authorizedPass = "authorizedPass"; + private static final String authorizedRole = "authorizedRole"; + + private static final String unauthorizedUser = "unauthorizedUser"; + private static final String unauthorizedPass = "unauthorizedPass"; + private static final String unauthorizedRole = "unauthorizedRole"; + + @BeforeEach + public void setUp(TestInfo testInfo) throws Exception { + super.setUp(); + Configuration config = createDefaultNettyConfig().setSecurityEnabled(true); + if (!testInfo.getTags().contains(noAuth)) { + config.addFederationDownstreamAuthorization(authorizedRole); + } + server = createServer(false, config); + server.start(); + + ActiveMQJAASSecurityManager securityManager = (ActiveMQJAASSecurityManager) server.getSecurityManager(); + + securityManager.getConfiguration().addUser(authorizedUser, authorizedPass); + securityManager.getConfiguration().addRole(authorizedUser, authorizedRole); + + securityManager.getConfiguration().addUser(unauthorizedUser, unauthorizedPass); + securityManager.getConfiguration().addRole(unauthorizedUser, unauthorizedRole); + } + + @Test + @Tag(noAuth) + public void testNoAuthConfigured() throws Exception { + try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) { + sendFederationDownstreamConnectMessage(authorizedUser, authorizedPass, false); + assertFalse(loggerHandler.findText("AMQ224158")); + assertTrue(loggerHandler.findText("AMQ224159")); + assertFalse(loggerHandler.findText("AMQ224160")); + } + } + + @Test + public void testUnauthenticatedDeployment() throws Exception { + // send the federation message without authenticating with a session first + try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) { + sendFederationDownstreamConnectMessage(null, null, false); + assertTrue(loggerHandler.findText("AMQ224158")); + assertFalse(loggerHandler.findText("AMQ224159")); + assertFalse(loggerHandler.findText("AMQ224160")); + } + } + + @Test + public void testUnauthorizedDeployment() throws Exception { + // send the federation message after authenticating with a user who isn't authorized for downstream deployment + try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) { + sendFederationDownstreamConnectMessage(unauthorizedUser, unauthorizedPass, false); + assertFalse(loggerHandler.findText("AMQ224158")); + assertTrue(loggerHandler.findText("AMQ224159")); + assertFalse(loggerHandler.findText("AMQ224160")); + } + } + + @Test + public void testSuccessfulDeployment() throws Exception { + // send the federation message after authenticating with a user who is authorized for downstream deployment + try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) { + sendFederationDownstreamConnectMessage(authorizedUser, authorizedPass, true); + assertFalse(loggerHandler.findText("AMQ224158")); + assertFalse(loggerHandler.findText("AMQ224159")); + assertTrue(loggerHandler.findText("AMQ224160")); + Wait.assertTrue(() -> loggerHandler.findText("AMQ224161")); + } + } + + private void sendFederationDownstreamConnectMessage(String user, String password, boolean succeed) throws Exception { + try (ServerLocator locator = ActiveMQClient.createServerLocator("tcp://localhost:61616")) { + ClientSessionFactory factory = locator.createSessionFactory(); + ClientSession session; + if (user != null) { + session = factory.createSession(user, password, true, true, true, true, -1); + } + CoreRemotingConnection coreConn = (CoreRemotingConnection) factory.getConnection(); + Wait.assertEquals(1, server.getActiveMQServerControl()::getConnectionCount); + Channel federationChannel = coreConn.getChannel(ChannelImpl.CHANNEL_ID.FEDERATION.id, -1); + federationChannel.send(getFederationDownstreamConnectMessage(getName())); + if (succeed) { + Wait.assertNotNull(() -> server.getFederationManager().get(getName() + UPSTREAM_SUFFIX), 1000, 20); + } else { + assertFalse(Wait.waitFor(() -> server.getFederationManager().get(getName() + UPSTREAM_SUFFIX) != null, 1000, 20)); + assertEquals(0, server.getActiveMQServerControl().getConnectionCount()); + } + } + } + + private FederationDownstreamConnectMessage getFederationDownstreamConnectMessage(String name) { + final String policySetName = "fake-policy-set"; + final String policyConfigName = "fake-policy-config"; + FederationDownstreamConnectMessage msg = new FederationDownstreamConnectMessage(); + msg.setName(name); + + Map<String, FederationPolicy> policyMap = new HashMap<>(); + policyMap.put(policyConfigName, new FederationQueuePolicyConfiguration().setName(policyConfigName).addInclude(new FederationQueuePolicyConfiguration.Matcher().setQueueMatch("#").setAddressMatch("#"))); + policyMap.put(policySetName, new FederationPolicySet().setName(policySetName).addPolicyRef(policyConfigName)); + msg.setFederationPolicyMap(policyMap); + + FederationDownstreamConfiguration downstreamConfig = new FederationDownstreamConfiguration() + .setName("fake") + .addPolicyRef(policySetName); + downstreamConfig.setUpstreamConfiguration(new TransportConfiguration(NettyConnectorFactory.class.getName(), new HashMap<>(), "fake")); + msg.setStreamConfiguration(downstreamConfig); + return msg; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
