This is an automated email from the ASF dual-hosted git repository.

gtully pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 84c16f1a0d ARTEMIS-4378 ignore address federation config if connection 
is configured as pull, consumerWindowSize=0
84c16f1a0d is described below

commit 84c16f1a0d9f930311105e8e430d110ffd8eb058
Author: Gary Tully <[email protected]>
AuthorDate: Thu Aug 24 15:17:40 2023 +0100

    ARTEMIS-4378 ignore address federation config if connection is configured 
as pull, consumerWindowSize=0
---
 .../server/federation/FederationConnection.java    |  7 +++-
 .../federation/address/FederatedAddress.java       | 15 +++++++-
 .../federation/FederatedQueuePullConsumerTest.java | 42 ++++++++++++++++++++++
 3 files changed, 62 insertions(+), 2 deletions(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederationConnection.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederationConnection.java
index 0072c0761a..132e0bf460 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederationConnection.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederationConnection.java
@@ -36,6 +36,7 @@ public class FederationConnection {
    private volatile ClientSessionFactory clientSessionFactory;
    private volatile boolean started;
    private volatile boolean sharedConnection;
+   private boolean isPull;
 
    public FederationConnection(Configuration configuration, String name, 
FederationConnectionConfiguration config) {
       this.config = config;
@@ -95,8 +96,8 @@ public class FederationConnection {
             BeanSupport.setData(serverLocator, possibleLocatorParameters);
          } catch (Exception ignoredAsErrorsVisibleViaBeanUtilsLogging) {
          }
+         isPull = 
("0".equals(possibleLocatorParameters.get("consumerWindowSize")));
       }
-
    }
 
    public synchronized void start() {
@@ -117,6 +118,10 @@ public class FederationConnection {
       return started;
    }
 
+   public boolean isPull() {
+      return isPull;
+   }
+
    public boolean isSharedConnection() {
       return sharedConnection;
    }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/address/FederatedAddress.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/address/FederatedAddress.java
index 2e1a484738..a2d42e8a0d 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/address/FederatedAddress.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/address/FederatedAddress.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.artemis.core.server.federation.address;
 
 import java.io.Serializable;
+import java.lang.invoke.MethodHandles;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -52,6 +53,8 @@ import 
org.apache.activemq.artemis.core.server.transformer.Transformer;
 import org.apache.activemq.artemis.core.settings.impl.Match;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.utils.ByteUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Federated Address, replicate messages from the remote brokers address to 
itself.
@@ -65,6 +68,8 @@ import org.apache.activemq.artemis.utils.ByteUtil;
  */
 public class FederatedAddress extends FederatedAbstract implements 
ActiveMQServerBindingPlugin, ActiveMQServerAddressPlugin, Serializable {
 
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
    public static final String FEDERATED_QUEUE_PREFIX = "federated";
 
    public static final SimpleString HDR_HOPS = new SimpleString("_AMQ_Hops");
@@ -74,6 +79,7 @@ public class FederatedAddress extends FederatedAbstract 
implements ActiveMQServe
    private final Set<Matcher> excludes;
    private final FederationAddressPolicyConfiguration config;
    private final Map<DivertBinding, Set<SimpleString>> matchingDiverts = new 
HashMap<>();
+   private final boolean hasPullConnectionConfig;
 
    public FederatedAddress(Federation federation, 
FederationAddressPolicyConfiguration config, ActiveMQServer server, 
FederationUpstream upstream) {
       super(federation, server, upstream);
@@ -102,6 +108,7 @@ public class FederatedAddress extends FederatedAbstract 
implements ActiveMQServe
             excludes.add(new Matcher(exclude, wildcardConfiguration));
          }
       }
+      hasPullConnectionConfig = upstream.getConnection().isPull();
    }
 
    @Override
@@ -310,8 +317,14 @@ public class FederatedAddress extends FederatedAbstract 
implements ActiveMQServe
    }
 
    private boolean match(SimpleString address, RoutingType routingType) {
-      //Currently only supporting Multicast currently.
       if (RoutingType.ANYCAST.equals(routingType)) {
+         logger.debug("ignoring unsupported ANYCAST address {}", address);
+         return false;
+      }
+      if (hasPullConnectionConfig) {
+         // multicast address federation has no local queue to trigger batch 
pull requests, a regular fast consumer with credit window is necessary
+         // otherwise the upstream would fill up and block.
+         logger.debug("ignoring MULTICAST address {} on unsupported pull 
connection, consumerWindowSize=0 ", address);
          return false;
       }
       for (Matcher exclude : excludes) {
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueuePullConsumerTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueuePullConsumerTest.java
index dc954f90cb..000235555f 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueuePullConsumerTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueuePullConsumerTest.java
@@ -22,14 +22,18 @@ import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
+import javax.jms.Topic;
 import java.util.Collections;
 
 import org.apache.activemq.artemis.api.core.QueueConfiguration;
 import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.config.FederationConfiguration;
+import 
org.apache.activemq.artemis.core.config.federation.FederationAddressPolicyConfiguration;
 import 
org.apache.activemq.artemis.core.config.federation.FederationQueuePolicyConfiguration;
 import 
org.apache.activemq.artemis.core.config.federation.FederationUpstreamConfiguration;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
 import org.apache.activemq.artemis.tests.util.Wait;
@@ -65,6 +69,44 @@ public class FederatedQueuePullConsumerTest extends 
FederatedTestBase {
       return factory;
    }
 
+   @Test
+   public void 
testAddressFederatedConfiguredWithPullQueueConsumerEnabledNotAnOption() throws 
Exception {
+      String connector = "server-pull-1";
+
+      
getServer(0).getAddressSettingsRepository().getMatch("#").setAutoCreateAddresses(true).setAutoCreateQueues(true);
+      
getServer(1).getAddressSettingsRepository().getMatch("#").setAutoCreateAddresses(true).setAutoCreateQueues(true);
+
+      getServer(0).addAddressInfo(new 
AddressInfo(SimpleString.toSimpleString("source"), RoutingType.MULTICAST));
+      getServer(1).addAddressInfo(new 
AddressInfo(SimpleString.toSimpleString("source"), RoutingType.MULTICAST));
+
+      getServer(0).getConfiguration().getFederationConfigurations().add(new 
FederationConfiguration().setName("default").addFederationPolicy(new 
FederationAddressPolicyConfiguration().setName("myAddressPolicy").addInclude(new
 
FederationAddressPolicyConfiguration.Matcher().setAddressMatch("#"))).addUpstreamConfiguration(new
 
FederationUpstreamConfiguration().setName("server1-upstream").addPolicyRef("myAddressPolicy").setStaticConnectors(Collections.singletonList(connector))));
+
+      getServer(0).getFederationManager().deploy();
+
+      final ConnectionFactory cf1 = getCF(0);
+      final ConnectionFactory cf2 = getCF(1);
+
+      try (Connection consumer1Connection = cf1.createConnection(); Connection 
producerConnection = cf2.createConnection()) {
+         consumer1Connection.start();
+         final Session session1 = consumer1Connection.createSession();
+         final Topic topic1 = session1.createTopic("source");
+         final MessageConsumer consumer1 = session1.createConsumer(topic1);
+
+         // Remote
+         final Session session2 = producerConnection.createSession();
+         final Topic topic2 = session2.createTopic("source");
+         final MessageProducer producer = session2.createProducer(topic2);
+
+         producer.send(session2.createTextMessage("hello"));
+
+         // no federation of this address
+         // consumer visible on local
+         assertTrue(waitForBindings(getServer(0), "source", true, 1, 1, 1000));
+         // federation consumer not visible on remote
+         assertFalse(waitForBindings(getServer(1), "source", true, 1, 1, 100));
+      }
+   }
+
    @Test
    public void testFederatedQueuePullFromUpstream() throws Exception {
       String queueName = getName();

Reply via email to