NO-JIRA Added test coverage for broker paging with unlimited memory

This test starts 2 servers and send messages to
a queue until it enters into paging state. Then
it changes the address max-size to -1, restarts
the 2 servers again and consumes all the messages.
It verifies that even if the max-size has changed
all the paged messages will be depaged and consumed.
No stuck messages after restarting.

The tests is there to guard a case where messages
won't be depaged on server restart after the max-size
is changed to -1. This issue has been fixed into
master along with the fix for ARTEMIS-581, particularly
the changes to the method PagingStoreImpl.getMaxSize().


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/19c6f2d5
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/19c6f2d5
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/19c6f2d5

Branch: refs/heads/master
Commit: 19c6f2d5c4171518e0933a3a6abfc54453c95565
Parents: 7b60df5
Author: Howard Gao <howard....@gmail.com>
Authored: Tue Nov 14 23:29:02 2017 +0800
Committer: Clebert Suconic <clebertsuco...@apache.org>
Committed: Wed Nov 15 17:19:30 2017 -0500

----------------------------------------------------------------------
 .../artemis/tests/util/ActiveMQTestBase.java    |  14 +++
 .../distribution/TwoWayTwoNodeClusterTest.java  | 121 +++++++++++++++++++
 2 files changed, 135 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/19c6f2d5/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
index 85ad922..7cd225f 100644
--- 
a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
@@ -2118,6 +2118,20 @@ public abstract class ActiveMQTestBase extends Assert {
       return message;
    }
 
+   protected ClientMessage createTextMessage(final ClientSession session, 
final boolean durable, final int numChars) {
+      ClientMessage message = session.createMessage(Message.TEXT_TYPE,
+                durable,
+                0,
+                System.currentTimeMillis(),
+                (byte)4);
+      StringBuilder builder = new StringBuilder();
+      for (int i = 0; i < numChars; i++) {
+         builder.append('a');
+      }
+      message.getBodyBuffer().writeString(builder.toString());
+      return message;
+   }
+
    protected XidImpl newXID() {
       return new XidImpl("xa1".getBytes(), 1, 
UUIDGenerator.getInstance().generateStringUUID().getBytes());
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/19c6f2d5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java
index 36f22eb..3798b92 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java
@@ -16,11 +16,22 @@
  */
 package org.apache.activemq.artemis.tests.integration.cluster.distribution;
 
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.server.Queue;
 import 
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.Map;
+
 public class TwoWayTwoNodeClusterTest extends ClusterTestBase {
 
    private static final IntegrationTestLogger log = 
IntegrationTestLogger.LOGGER;
@@ -48,6 +59,116 @@ public class TwoWayTwoNodeClusterTest extends 
ClusterTestBase {
       return false;
    }
 
+   /*
+    * This test starts 2 servers and send messages to
+    * a queue until it enters into paging state. Then
+    * it changes the max-size to -1, restarts the 2 servers
+    * and consumes all the messages. If verifies that
+    * even if the max-size has changed all the paged
+    * messages will be depaged and consumed. No stuck
+    * messages after restarting.
+    */
+   @Test(timeout = 60000)
+   public void testClusterRestartWithConfigChanged() throws Exception {
+      Configuration config0 = servers[0].getConfiguration();
+      Configuration config1 = servers[1].getConfiguration();
+
+      configureBeforeStart(config0, config1);
+      startServers(0, 1);
+
+      setupSessionFactory(0, isNetty());
+      setupSessionFactory(1, isNetty());
+
+      createQueue(0, "queues", "queue0", null, true);
+      createQueue(1, "queues", "queue0", null, true);
+
+      waitForBindings(0, "queues", 1, 0, true);
+      waitForBindings(1, "queues", 1, 0, true);
+
+      waitForBindings(0, "queues", 1, 0, false);
+      waitForBindings(1, "queues", 1, 0, false);
+
+      ClientSessionFactory sf0 = sfs[0];
+      ClientSession session0 = sf0.createSession(false, false);
+      ClientProducer producer = session0.createProducer("queues");
+      final int numSent = 200;
+      for (int i = 0; i < numSent; i++) {
+         ClientMessage msg = createTextMessage(session0, true, 5000);
+         producer.send(msg);
+         if (i % 50 == 0) {
+            session0.commit();
+         }
+      }
+      session0.commit();
+      session0.close();
+
+      while (true) {
+         long msgCount0 = getMessageCount(servers[0], "queues");
+         long msgCount1 = getMessageCount(servers[1], "queues");
+
+         if (msgCount0 + msgCount1 >= numSent) {
+            break;
+         }
+         Thread.sleep(100);
+      }
+
+      Queue queue0 = servers[0].locateQueue(new SimpleString("queue0"));
+      assertTrue(queue0.getPageSubscription().isPaging());
+
+      closeAllSessionFactories();
+      stopServers(0, 1);
+
+      AddressSettings addressSettings0 = 
config0.getAddressesSettings().get("#");
+      AddressSettings addressSettings1 = 
config1.getAddressesSettings().get("#");
+
+      addressSettings0.setMaxSizeBytes(-1);
+      addressSettings1.setMaxSizeBytes(-1);
+
+      startServers(0, 1);
+
+      waitForBindings(0, "queues", 1, 0, true);
+      waitForBindings(1, "queues", 1, 0, true);
+
+      waitForBindings(0, "queues", 1, 0, false);
+      waitForBindings(1, "queues", 1, 0, false);
+
+      setupSessionFactory(0, isNetty());
+      addConsumer(0, 0, "queue0", null);
+
+      waitForBindings(0, "queues", 1, 1, true);
+
+      for (int i = 0; i < numSent; i++) {
+         ClientMessage m = consumers[0].consumer.receive(5000);
+         assertNotNull("failed to receive message " + i, m);
+      }
+   }
+
+   private void configureBeforeStart(Configuration... serverConfigs) {
+      for (Configuration config : serverConfigs) {
+         config.setPersistenceEnabled(true);
+         config.setMessageCounterEnabled(true);
+         config.setJournalFileSize(20971520);
+         config.setJournalMinFiles(20);
+         config.setJournalCompactPercentage(50);
+
+         Map<String, AddressSettings> addressSettingsMap0 = 
config.getAddressesSettings();
+         AddressSettings addrSettings = addressSettingsMap0.get("#");
+         if (addrSettings == null) {
+            addrSettings = new AddressSettings();
+            addressSettingsMap0.put("#", addrSettings);
+         }
+         addrSettings.setDeadLetterAddress(new SimpleString("jms.queue.DLQ"));
+         addrSettings.setExpiryAddress(new 
SimpleString("jms.queue.ExpiryQueue"));
+         addrSettings.setRedeliveryDelay(30);
+         addrSettings.setMaxDeliveryAttempts(5);
+         addrSettings.setMaxSizeBytes(1048576);
+         
addrSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+         addrSettings.setPageSizeBytes(524288);
+         addrSettings.setMessageCounterHistoryDayLimit(10);
+         addrSettings.setRedistributionDelay(1000);
+      }
+   }
+
    @Test
    public void testStartStop() throws Exception {
 

Reply via email to