Repository: activemq-artemis
Updated Branches:
  refs/heads/master 74db627b8 -> 16eca68bf


NO-JIRA: fixing test


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/78321668
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/78321668
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/78321668

Branch: refs/heads/master
Commit: 78321668ea85b90d9956d62b844eac428055cc84
Parents: 74db627
Author: Clebert Suconic <[email protected]>
Authored: Wed Aug 23 16:04:42 2017 -0400
Committer: Clebert Suconic <[email protected]>
Committed: Wed Aug 23 16:04:42 2017 -0400

----------------------------------------------------------------------
 .../cluster/distribution/ClusterTestBase.java   | 14 +++++++----
 .../distribution/MessageRedistributionTest.java | 26 +++++++++++++++++++-
 2 files changed, 34 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/78321668/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
index b00eb02..1d6cf4f 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
@@ -1375,11 +1375,7 @@ public abstract class ClusterTestBase extends 
ActiveMQTestBase {
          serverTotc = new TransportConfiguration(INVM_CONNECTOR_FACTORY, 
params);
       }
 
-      if (ha) {
-         locators[node] = ActiveMQClient.createServerLocatorWithHA(serverTotc);
-      } else {
-         locators[node] = 
ActiveMQClient.createServerLocatorWithoutHA(serverTotc);
-      }
+      setSessionFactoryCreateLocator(node, ha, serverTotc);
 
       
locators[node].setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locators[node]));
 
@@ -1392,6 +1388,14 @@ public abstract class ClusterTestBase extends 
ActiveMQTestBase {
       sfs[node] = sf;
    }
 
+   protected void setSessionFactoryCreateLocator(int node, boolean ha, 
TransportConfiguration serverTotc) {
+      if (ha) {
+         locators[node] = ActiveMQClient.createServerLocatorWithHA(serverTotc);
+      } else {
+         locators[node] = 
ActiveMQClient.createServerLocatorWithoutHA(serverTotc);
+      }
+   }
+
    protected void setupSessionFactory(final int node, final boolean netty, int 
reconnectAttempts) throws Exception {
       if (sfs[node] != null) {
          throw new IllegalArgumentException("Already a server at " + node);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/78321668/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java
index 69c03cc..2020489 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java
@@ -24,12 +24,14 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.api.core.client.ClientConsumer;
 import org.apache.activemq.artemis.api.core.client.ClientMessage;
 import org.apache.activemq.artemis.api.core.client.ClientProducer;
 import org.apache.activemq.artemis.api.core.client.ClientSession;
 
 import org.apache.activemq.artemis.core.server.Bindable;
+import org.apache.activemq.artemis.core.server.Queue;
 import 
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
 import org.apache.activemq.artemis.core.server.cluster.impl.Redistributor;
 import 
org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration;
@@ -37,6 +39,7 @@ import org.apache.activemq.artemis.core.server.impl.QueueImpl;
 import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
+import org.apache.activemq.artemis.tests.util.Wait;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -63,6 +66,17 @@ public class MessageRedistributionTest extends 
ClusterTestBase {
       return false;
    }
 
+
+
+   @Override
+   protected void setSessionFactoryCreateLocator(int node, boolean ha, 
TransportConfiguration serverTotc) {
+      super.setSessionFactoryCreateLocator(node, ha, serverTotc);
+
+      locators[node].setConsumerWindowSize(0);
+
+   }
+
+
    //https://issues.jboss.org/browse/HORNETQ-1061
    @Test
    public void testRedistributionWithMessageGroups() throws Exception {
@@ -989,7 +1003,17 @@ public class MessageRedistributionTest extends 
ClusterTestBase {
       removeConsumer(0);
       addConsumer(1, 1, "queue0", null);
 
-      verifyReceiveAll(QueueImpl.REDISTRIBUTOR_BATCH_SIZE * 2, 1);
+      Queue queue = 
servers[1].locateQueue(SimpleString.toSimpleString("queue0"));
+      Assert.assertNotNull(queue);
+      Wait.waitFor(() -> queue.getMessageCount() == 
QueueImpl.REDISTRIBUTOR_BATCH_SIZE * 2);
+
+      for (int i = 0; i < QueueImpl.REDISTRIBUTOR_BATCH_SIZE * 2; i++) {
+         ClientMessage message = consumers[1].getConsumer().receive(5000);
+         Assert.assertNotNull(message);
+         message.acknowledge();
+      }
+
+      Assert.assertNull(consumers[1].getConsumer().receiveImmediate());
    }
 
    /*

Reply via email to