This is an automated email from the ASF dual-hosted git repository.

gtully pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new a8b4ee1992 ARTEMIS-4314 support queue federation batchOnCapacity via 
consumerWindowSize=0
a8b4ee1992 is described below

commit a8b4ee19928fbf0a0b2bff5baf649ace97edc967
Author: Gary Tully <gary.tu...@gmail.com>
AuthorDate: Thu Jun 15 17:15:00 2023 +0100

    ARTEMIS-4314 support queue federation batchOnCapacity via 
consumerWindowSize=0
---
 .../core/client/impl/ClientConsumerImpl.java       |  15 +-
 .../core/client/impl/ClientConsumerInternal.java   |   5 +
 .../apache/activemq/artemis/core/server/Queue.java |   2 +
 .../federation/FederatedQueueConsumerImpl.java     |  99 ++++++++++-
 .../artemis/core/server/impl/QueueImpl.java        |   5 +-
 .../core/server/impl/RoutingContextTest.java       |   5 +
 .../server/impl/ScheduledDeliveryHandlerTest.java  |   5 +
 docs/user-manual/en/federation-queue.md            |   4 +
 .../federation/FederatedQueuePullConsumerTest.java | 196 +++++++++++++++++++++
 .../tests/integration/server/RingQueueTest.java    |   6 +-
 .../core/client/impl/LargeMessageBufferTest.java   |  10 ++
 .../tests/unit/core/postoffice/impl/FakeQueue.java |   5 +
 12 files changed, 346 insertions(+), 11 deletions(-)

diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
index 87b26f24c1..6a730a0972 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
@@ -30,6 +30,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ClientConsumer;
 import org.apache.activemq.artemis.api.core.client.ClientMessage;
 import org.apache.activemq.artemis.api.core.client.ClientSession;
 import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
@@ -133,6 +134,7 @@ public final class ClientConsumerImpl implements 
ClientConsumerInternal {
    private volatile boolean ackIndividually;
 
    private final ClassLoader contextClassLoader;
+   private volatile boolean manualFlowManagement;
 
    public ClientConsumerImpl(final ClientSessionInternal session,
                              final ConsumerContext consumerContext,
@@ -406,6 +408,13 @@ public final class ClientConsumerImpl implements 
ClientConsumerInternal {
       return receiverThread;
    }
 
+   @Override
+   public ClientConsumer setManualFlowMessageHandler(final MessageHandler 
theHandler) throws ActiveMQException {
+      checkClosed();
+      this.handler = theHandler;
+      this.manualFlowManagement = true;
+      return this;
+   }
 
    // Must be synchronized since messages may be arriving while handler is 
being set and might otherwise end
    // up not queueing enough executors - so messages get stranded
@@ -849,7 +858,8 @@ public final class ClientConsumerImpl implements 
ClientConsumerInternal {
       }
    }
 
-   private void resetIfSlowConsumer() {
+   @Override
+   public void resetIfSlowConsumer() {
       if (clientWindowSize == 0) {
          sendCredits(0);
 
@@ -1041,6 +1051,9 @@ public final class ClientConsumerImpl implements 
ClientConsumerInternal {
     * @throws ActiveMQException
     */
    private void flowControlBeforeConsumption(final ClientMessageInternal 
message) throws ActiveMQException {
+      if (manualFlowManagement) {
+         return;
+      }
       // Chunk messages will execute the flow control while receiving the 
chunks
       if (message.getFlowControlSize() != 0) {
          // on large messages we should discount 1 on the first packets as we 
need continuity until the last packet
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerInternal.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerInternal.java
index 55d30e7e6b..24f55a2210 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerInternal.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerInternal.java
@@ -21,6 +21,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ClientConsumer;
 import org.apache.activemq.artemis.api.core.client.ClientMessage;
 import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.MessageHandler;
 import org.apache.activemq.artemis.utils.FutureLatch;
 
 public interface ClientConsumerInternal extends ClientConsumer {
@@ -76,4 +77,8 @@ public interface ClientConsumerInternal extends 
ClientConsumer {
    ClientSession.QueueQuery getQueueInfo();
 
    long getForceDeliveryCount();
+
+   ClientConsumer setManualFlowMessageHandler(MessageHandler theHandler) 
throws ActiveMQException;
+
+   void resetIfSlowConsumer();
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index 97b9193f3e..2c1319bcd4 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -389,6 +389,8 @@ public interface Queue extends Bindable,CriticalComponent {
 
    boolean hasMatchingConsumer(Message message);
 
+   long getPendingMessageCount();
+
    Collection<Consumer> getConsumers();
 
    Map<SimpleString, Consumer> getGroups();
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java
index b37d75009d..d2470e5a6b 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.artemis.core.server.federation;
 
 import java.lang.invoke.MethodHandles;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -25,20 +26,23 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
 import org.apache.activemq.artemis.api.core.Message;
-import org.apache.activemq.artemis.api.core.client.ClientConsumer;
 import org.apache.activemq.artemis.api.core.client.ClientMessage;
 import org.apache.activemq.artemis.api.core.client.ClientSession;
 import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
+import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal;
 import org.apache.activemq.artemis.core.client.impl.ClientLargeMessageInternal;
+import org.apache.activemq.artemis.core.client.impl.ClientMessageInternal;
 import 
org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.LargeServerMessage;
+import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.transformer.Transformer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static 
org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE;
 import static 
org.apache.activemq.artemis.core.client.impl.LargeMessageControllerImpl.LargeData;
 
 public class FederatedQueueConsumerImpl implements FederatedQueueConsumer, 
SessionFailureListener {
@@ -60,7 +64,9 @@ public class FederatedQueueConsumerImpl implements 
FederatedQueueConsumer, Sessi
 
    private ClientSessionFactoryInternal clientSessionFactory;
    private ClientSession clientSession;
-   private ClientConsumer clientConsumer;
+   private ClientConsumerInternal clientConsumer;
+   private final AtomicInteger pendingPullCredit = new AtomicInteger();
+   private QueueHandle queueHandle;
 
    public FederatedQueueConsumerImpl(Federation federation, ActiveMQServer 
server, Transformer transformer, FederatedConsumerKey key, FederationUpstream 
upstream, ClientSessionCallback clientSessionCallback) {
       this.federation = federation;
@@ -135,9 +141,16 @@ public class FederatedQueueConsumerImpl implements 
FederatedQueueConsumer, Sessi
                if (clientSessionCallback != null) {
                   clientSessionCallback.callback(clientSession);
                }
-               if (clientSession.queueQuery(key.getQueueName()).isExists()) {
-                  this.clientConsumer = 
clientSession.createConsumer(key.getQueueName(), key.getFilterString(), 
key.getPriority(), false);
-                  this.clientConsumer.setMessageHandler(this);
+               ClientSession.QueueQuery queryResult = 
clientSession.queueQuery(key.getQueueName());
+               if (queryResult.isExists()) {
+                  this.clientConsumer = (ClientConsumerInternal) 
clientSession.createConsumer(key.getQueueName(), key.getFilterString(), 
key.getPriority(), false);
+                  if (this.clientConsumer.getClientWindowSize() == 0) {
+                     this.clientConsumer.setManualFlowMessageHandler(this);
+                     queueHandle = createQueueHandle(server, queryResult);
+                     scheduleCreditOnEmpty(0, queueHandle);
+                  } else {
+                     this.clientConsumer.setMessageHandler(this);
+                  }
                } else {
                   throw new ActiveMQNonExistentQueueException("Queue " + 
key.getQueueName() + " does not exist on remote");
                }
@@ -155,6 +168,75 @@ public class FederatedQueueConsumerImpl implements 
FederatedQueueConsumer, Sessi
       }
    }
 
+   interface QueueHandle {
+      long getMessageCount();
+      int getCreditWindow();
+
+      Executor getExecutor();
+   }
+
+   private QueueHandle createQueueHandle(ActiveMQServer server, 
ClientSession.QueueQuery queryResult) {
+      final Queue queue = server.locateQueue(queryResult.getName());
+      int creditWindow = DEFAULT_CONSUMER_WINDOW_SIZE;
+
+      final Integer defaultConsumerWindowSize = 
queryResult.getDefaultConsumerWindowSize();
+      if (defaultConsumerWindowSize != null) {
+         creditWindow = defaultConsumerWindowSize.intValue();
+         if (creditWindow <= 0) {
+            creditWindow = DEFAULT_CONSUMER_WINDOW_SIZE;
+            logger.trace("{} override non positive queue consumerWindowSize 
with {}.", this, creditWindow);
+         }
+      }
+
+      final int finalCreditWindow = creditWindow;
+      return new QueueHandle() {
+         @Override
+         public long getMessageCount() {
+            return queue.getPendingMessageCount();
+         }
+
+         @Override
+         public int getCreditWindow() {
+            return finalCreditWindow;
+         }
+
+         @Override
+         public Executor getExecutor() {
+            return queue.getExecutor();
+         }
+      };
+   }
+
+   private void scheduleCreditOnEmpty(final int delay, final QueueHandle 
handle) {
+      scheduledExecutorService.schedule(() -> {
+         // use queue executor to sync on message count metric
+         handle.getExecutor().execute(() -> {
+            if (clientConsumer != null) {
+               if (0L == handle.getMessageCount()) {
+                  flow(handle.getCreditWindow());
+                  pendingPullCredit.set(handle.getCreditWindow());
+               } else {
+                  if (0 == delay) {
+                     clientConsumer.resetIfSlowConsumer();
+                     pendingPullCredit.set(0);
+                  }
+                  
scheduleCreditOnEmpty(FederatedQueueConsumer.getNextDelay(delay, 
intialConnectDelayMultiplier, intialConnectDelayMax), handle);
+               }
+            }
+         });
+      }, delay, TimeUnit.SECONDS);
+   }
+
+   private void flow(int creditWindow) {
+      try {
+         if (this.clientConsumer != null) {
+            this.clientConsumer.flowControl(creditWindow, false);
+         }
+      } catch (ActiveMQException ignored) {
+         logger.trace("{} failed to flowControl with credit {}.", this, 
creditWindow, ignored);
+      }
+   }
+
    @Override
    public synchronized void close() {
       if (started) {
@@ -225,6 +307,13 @@ public class FederatedQueueConsumerImpl implements 
FederatedQueueConsumer, Sessi
          }
          clientMessage.acknowledge();
 
+         if (pendingPullCredit.get() > 0) {
+            final int delta = ((ClientMessageInternal) 
clientMessage).getFlowControlSize();
+            if (pendingPullCredit.addAndGet(-delta) < 0) {
+               scheduleCreditOnEmpty(0, queueHandle);
+            }
+         }
+
          if (server.hasBrokerFederationPlugins()) {
             try {
                server.callBrokerFederationPlugins(plugin -> 
plugin.afterFederatedQueueConsumerMessageHandled(this, clientMessage));
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 23183f482f..c3ad85eb38 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -1621,7 +1621,8 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
       return createdTimestamp;
    }
 
-   public long getMessageCountForRing() {
+   @Override
+   public long getPendingMessageCount() {
       return (long) pendingMetrics.getMessageCount();
    }
 
@@ -4610,7 +4611,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
    private void enforceRing(MessageReference refToAck, boolean scheduling, 
boolean head) {
       int adjustment = head ? 1 : 0;
 
-      if (getMessageCountForRing() + adjustment > ringSize) {
+      if (getPendingMessageCount() + adjustment > ringSize) {
          refToAck = refToAck == null ? messageReferences.poll() : refToAck;
 
          if (refToAck != null) {
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/RoutingContextTest.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/RoutingContextTest.java
index af6d2f6654..90cd723c48 100644
--- 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/RoutingContextTest.java
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/RoutingContextTest.java
@@ -742,6 +742,11 @@ public class RoutingContextTest {
          return false;
       }
 
+      @Override
+      public long getPendingMessageCount() {
+         return 0;
+      }
+
       @Override
       public Collection<Consumer> getConsumers() {
          return null;
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index b82f9e2c2d..5e43510f8c 100644
--- 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -1488,6 +1488,11 @@ public class ScheduledDeliveryHandlerTest extends Assert 
{
          return false;
       }
 
+      @Override
+      public long getPendingMessageCount() {
+         return 0;
+      }
+
       @Override
       public Collection<Consumer> getConsumers() {
          return null;
diff --git a/docs/user-manual/en/federation-queue.md 
b/docs/user-manual/en/federation-queue.md
index 76f5ac9766..481510aa32 100644
--- a/docs/user-manual/en/federation-queue.md
+++ b/docs/user-manual/en/federation-queue.md
@@ -49,6 +49,10 @@ e.g. as many messages as possible are consumed from the same 
broker as they were
     Here for such a migration with blue/green or canary moving a number of 
consumers on the same queue, you may want to set the `priority-adjustment` to 
0, or even a positive value, so message would actively flow to the federated 
queue.
 
 
+* Dual Federation - potential for messages to flip-flop between clusters.
+  If the backlog on your queues exceeds the available local credit across 
consumers, any lower priority federation consumer becomes a candidate for 
dispatch and messages will be federated. Eventually all messages may migrate 
and the scenario can repeat on the other cluster. Applying a rate limit to the 
connector url can help mitigate but this could have an adverse effect on 
migration when there are no local consumers.
+  To better support this use case, it is possible to configure the 
consumerWindowSize to zero on the referenced connector URI: 
```tcp://<host>:<port>?consumerWindowSize=0```. This will cause the federation 
consumer to pull messages in batches only when the local queue has excess 
capacity. This means that federation won't ever drain more messaces than it can 
handle, such that messages would flip-flop. The batch size is derived from the 
relevant address settings defaultConsumerWindowSize.
+
 ## Configuring Queue Federation
 
 Federation is configured in `broker.xml`.
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueuePullConsumerTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueuePullConsumerTest.java
new file mode 100644
index 0000000000..dc954f90cb
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueuePullConsumerTest.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.federation;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import java.util.Collections;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.config.FederationConfiguration;
+import 
org.apache.activemq.artemis.core.config.federation.FederationQueuePolicyConfiguration;
+import 
org.apache.activemq.artemis.core.config.federation.FederationUpstreamConfiguration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.Before;
+import org.junit.Test;
+
+public class FederatedQueuePullConsumerTest extends FederatedTestBase {
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+      
getServer(0).getConfiguration().addConnectorConfiguration("server-pull-1", 
"tcp://localhost:" + 61617 + "?consumerWindowSize=0;ackBatchSize=10");
+   }
+
+   @Override
+   protected boolean isNetty() {
+      // such that url params can be used for the server-pull-1 connector, vm 
urls don't propagate url params!
+      return true;
+   }
+
+   @Override
+   protected void configureQueues(ActiveMQServer server) throws Exception {
+      server.getAddressSettingsRepository().addMatch("#", new 
AddressSettings().setAutoCreateAddresses(false).setAutoCreateQueues(false)
+         .setDefaultConsumerWindowSize(20 * 300));
+      createSimpleQueue(server, getName());
+   }
+
+   protected ConnectionFactory getCF(int i) throws Exception {
+      ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory("vm://" + i);
+      // pull consumers to allow deterministic message consumption
+      factory.setConsumerWindowSize(0);
+      return factory;
+   }
+
+   @Test
+   public void testFederatedQueuePullFromUpstream() throws Exception {
+      String queueName = getName();
+
+      FederationConfiguration federationConfiguration = 
FederatedTestUtil.createQueueUpstreamFederationConfiguration("server-pull-1", 
queueName);
+      
getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration);
+      getServer(0).getFederationManager().deploy();
+
+      testFederatedQueuePullFromUpstream(queueName);
+   }
+
+   @Test
+   public void testMultipleFederatedQueueRemoteConsumersUpstream() throws 
Exception {
+      String connector = "server-pull-1";
+
+      
getServer(0).getAddressSettingsRepository().getMatch("#").setAutoCreateAddresses(true).setAutoCreateQueues(true);
+      
getServer(1).getAddressSettingsRepository().getMatch("#").setAutoCreateAddresses(true).setAutoCreateQueues(true);
+
+      getServer(1).createQueue(new 
QueueConfiguration("Test.Q.1").setRoutingType(RoutingType.ANYCAST));
+      getServer(1).createQueue(new 
QueueConfiguration("Test.Q.2").setRoutingType(RoutingType.ANYCAST));
+
+      getServer(0).getConfiguration().getFederationConfigurations().add(new 
FederationConfiguration()
+                                                                           
.setName("default")
+                                                                           
.addFederationPolicy(new FederationQueuePolicyConfiguration()
+                                                                               
                    .setName("myQueuePolicy")
+                                                                               
                    .addInclude(new FederationQueuePolicyConfiguration.Matcher()
+                                                                               
                                   .setQueueMatch("#")
+                                                                               
                                   .setAddressMatch("Test.#")))
+                                                                           
.addUpstreamConfiguration(new FederationUpstreamConfiguration()
+                                                                               
                         .setName("server1-upstream")
+                                                                               
                         .addPolicyRef("myQueuePolicy")
+                                                                               
                         
.setStaticConnectors(Collections.singletonList(connector))));
+      getServer(0).getFederationManager().deploy();
+
+      ConnectionFactory cf1 = getCF(0);
+      ConnectionFactory cf2 = getCF(0);
+      ConnectionFactory cf3 = getCF(1);
+      try (Connection consumer1Connection = cf1.createConnection();
+           Connection consumer2Connection = cf2.createConnection();
+           Connection producerConnection = cf3.createConnection()) {
+         consumer1Connection.start();
+         Session session1 = consumer1Connection.createSession();
+         Queue queue1 = session1.createQueue("Test.Q.1");
+         MessageConsumer consumer1 = session1.createConsumer(queue1);
+
+         consumer2Connection.start();
+         Session session2 = consumer2Connection.createSession();
+         Queue queue2 = session2.createQueue("Test.Q.2");
+         MessageConsumer consumer2 = session2.createConsumer(queue2);
+
+         Session session3 = producerConnection.createSession();
+         MessageProducer producer = session3.createProducer(queue2);
+         producer.send(session3.createTextMessage("hello"));
+
+         assertNotNull(consumer2.receive(1000));
+
+         consumer1Connection.close();
+
+         producer.send(session3.createTextMessage("hello"));
+
+         assertNotNull(consumer2.receive(1000));
+      }
+   }
+
+   private void testFederatedQueuePullFromUpstream(final String queueName) 
throws Exception {
+
+      ConnectionFactory cf1 = getCF(1);
+      ConnectionFactory cf0 = getCF(0);
+      try (Connection connection1 = cf1.createConnection();
+           Connection connection0 = cf0.createConnection()) {
+         connection1.start();
+         Session session1 = connection1.createSession();
+         Queue queue1 = session1.createQueue(queueName);
+         MessageProducer producer = session1.createProducer(queue1);
+         producer.send(session1.createTextMessage("1"));
+
+         connection0.start();
+         Session session0 = connection0.createSession();
+         Queue queue0 = session0.createQueue(queueName);
+         MessageProducer producer0 = session0.createProducer(queue1);
+         producer0.send(session0.createTextMessage("0"));
+
+         // no consumer upstream, messages locally, no pull
+         MessageConsumer consumer0 = session0.createConsumer(queue0);
+
+         // verify federated
+         waitForBindings(getServer(1), queueName, true, 1, 1, 2000);
+
+         // verify no federation of messages
+         Wait.assertEquals(1, () -> getMessageCount(getServer(0), queueName), 
2000, 100);
+         Wait.assertEquals(1, () -> getMessageCount(getServer(1), queueName), 
2000, 100);
+
+         // drain local queue
+         assertNotNull(consumer0.receive(1000));
+
+         // no messages locally, expect message federation now of a batch
+         // 4s b/c local queue size check is in seconds
+         assertNotNull(consumer0.receive(4000));
+
+         // verify all messages consumed
+         Wait.assertEquals(0L, () -> getMessageCount(getServer(0), queueName), 
2000, 100);
+         Wait.assertEquals(0L, () -> getMessageCount(getServer(1), queueName), 
2000, 100);
+
+         assertNull(consumer0.receiveNoWait());
+
+         // verify batch end
+         final int mumMessages = 150;
+         for (int i = 0; i < mumMessages; i++ ) {
+            producer.send(session1.createTextMessage("1-" + i));
+         }
+
+         // upstream has most
+         Wait.assertTrue(() -> getMessageCount(getServer(1), queueName) > 100, 
2000, 200);
+         Wait.assertTrue(() -> getMessageCount(getServer(1), queueName) < 
mumMessages, 2000, 200);
+
+         // only a batch has been federated
+         Wait.assertTrue(() -> getMessageCount(getServer(0), queueName) > 10, 
2000, 100);
+         Wait.assertTrue(() -> getMessageCount(getServer(0), queueName) < 100, 
2000, 100);
+
+         // verify all available
+         for (int i = 0; i < mumMessages; i++ ) {
+            assertNotNull(consumer0.receive(4000));
+         }
+
+         assertNull(consumer0.receiveNoWait());
+         consumer0.close();
+      }
+   }
+}
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/RingQueueTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/RingQueueTest.java
index 8f7ae9faa4..21fb453e29 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/RingQueueTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/RingQueueTest.java
@@ -187,15 +187,15 @@ public class RingQueueTest extends ActiveMQTestBase {
       m0.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, time);
       producer.send(m0);
       Wait.assertTrue(() -> queue.getScheduledCount() == 1);
-      Wait.assertTrue(() -> ((QueueImpl) queue).getMessageCountForRing() == 0);
+      Wait.assertTrue(() -> ((QueueImpl) queue).getPendingMessageCount() == 0);
       time = System.currentTimeMillis();
       time += 500;
       m0.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, time);
       producer.send(m0);
       Wait.assertTrue(() -> queue.getScheduledCount() == 2);
-      Wait.assertTrue(() -> ((QueueImpl) queue).getMessageCountForRing() == 0);
+      Wait.assertTrue(() -> ((QueueImpl) queue).getPendingMessageCount() == 0);
       Wait.assertTrue(() -> queue.getMessagesReplaced() == 1);
-      Wait.assertTrue(() -> ((QueueImpl) queue).getMessageCountForRing() == 1);
+      Wait.assertTrue(() -> ((QueueImpl) queue).getPendingMessageCount() == 1);
    }
 
    @Test
diff --git 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/client/impl/LargeMessageBufferTest.java
 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/client/impl/LargeMessageBufferTest.java
index 79fe5f3524..79666306d5 100644
--- 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/client/impl/LargeMessageBufferTest.java
+++ 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/client/impl/LargeMessageBufferTest.java
@@ -33,6 +33,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ClientConsumer;
 import org.apache.activemq.artemis.api.core.client.ClientMessage;
 import org.apache.activemq.artemis.api.core.client.ClientSession;
 import org.apache.activemq.artemis.api.core.client.MessageHandler;
@@ -817,6 +818,15 @@ public class LargeMessageBufferTest extends 
ActiveMQTestBase {
          return 0;
       }
 
+      @Override
+      public ClientConsumer setManualFlowMessageHandler(MessageHandler 
handler) throws ActiveMQException {
+         return null;
+      }
+
+      @Override
+      public void resetIfSlowConsumer() {
+      }
+
       /* (non-Javadoc)
        * @see 
org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal#getNonXAsession()
        */
diff --git 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
index dd910810ea..bb779ac2a0 100644
--- 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
+++ 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
@@ -881,6 +881,11 @@ public class FakeQueue extends CriticalComponentImpl 
implements Queue {
       return false;
    }
 
+   @Override
+   public long getPendingMessageCount() {
+      return 0;
+   }
+
    @Override
    public Executor getExecutor() {
       // no-op

Reply via email to