http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingTest.java index 1ee3040..c47d79d 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingTest.java @@ -113,9 +113,8 @@ public class PagingTest extends ServiceTestBase { clearDataRecreateServerDirs(); - Configuration config = createDefaultConfig(); - - config.setJournalSyncNonTransactional(false); + Configuration config = createDefaultConfig() + .setJournalSyncNonTransactional(false); server = createServer(true, config, @@ -205,9 +204,9 @@ public class PagingTest extends ServiceTestBase session.start(); - assertEquals(numberOfMessages * 2, queue.getMessageCount()); + assertEquals(numberOfMessages * 2, getMessageCount(queue)); - // The consumer has to be created after the queue.getMessageCount assertion + // The consumer has to be created after the getMessageCount(queue) assertion // otherwise delivery could alter the messagecount and give us a false failure ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS); ClientMessage msg = null; @@ -231,7 +230,7 @@ public class PagingTest extends ServiceTestBase locator.close(); - assertEquals(0, queue.getMessageCount()); + assertEquals(0, getMessageCount(queue)); waitForNotPaging(queue); @@ -253,9 +252,8 @@ public class PagingTest extends ServiceTestBase { clearDataRecreateServerDirs(); - Configuration config = createDefaultConfig(); - - config.setJournalSyncNonTransactional(false); + Configuration config = createDefaultConfig() + .setJournalSyncNonTransactional(false); server = createServer(true, config, @@ -374,9 +372,8 @@ public class PagingTest extends ServiceTestBase { clearDataRecreateServerDirs(); - Configuration config = createDefaultConfig(); - - config.setJournalSyncNonTransactional(false); + Configuration config = createDefaultConfig() + .setJournalSyncNonTransactional(false); server = createServer(true, config, @@ -542,7 +539,7 @@ public class PagingTest extends ServiceTestBase assertNull(cons.receiveImmediate()); session.commit(); - System.out.println("count = " + queue.getMessageCount()); + System.out.println("count = " + getMessageCount(queue)); session.commit(); @@ -592,14 +589,11 @@ public class PagingTest extends ServiceTestBase { clearDataRecreateServerDirs(); - Configuration config = createDefaultConfig(); - - config.setJournalDirectory(getJournalDir()); - - config.setJournalSyncNonTransactional(false); - config.setJournalCompactMinFiles(0); // disable compact - - config.setMessageExpiryScanPeriod(500); + Configuration config = createDefaultConfig() + .setJournalDirectory(getJournalDir()) + .setJournalSyncNonTransactional(false) + .setJournalCompactMinFiles(0) // disable compact + .setMessageExpiryScanPeriod(500); server = createServer(true, config, @@ -678,13 +672,13 @@ public class PagingTest extends ServiceTestBase session.commit(); producer.close(); - for (long timeout = System.currentTimeMillis() + 60000; timeout > System.currentTimeMillis() && qEXP.getMessageCount() < 1000; ) + for (long timeout = System.currentTimeMillis() + 60000; timeout > System.currentTimeMillis() && getMessageCount(qEXP) < 1000; ) { - System.out.println("count = " + qEXP.getMessageCount()); + System.out.println("count = " + getMessageCount(qEXP)); Thread.sleep(100); } - assertEquals(1000, qEXP.getMessageCount()); + assertEquals(1000, getMessageCount(qEXP)); session.start(); @@ -702,11 +696,11 @@ public class PagingTest extends ServiceTestBase assertNull(consumer.receiveImmediate()); - for (long timeout = System.currentTimeMillis() + 5000; timeout > System.currentTimeMillis() && queue1.getMessageCount() != 0; ) + for (long timeout = System.currentTimeMillis() + 5000; timeout > System.currentTimeMillis() && getMessageCount(queue1) != 0; ) { Thread.sleep(100); } - assertEquals(0, queue1.getMessageCount()); + assertEquals(0, getMessageCount(queue1)); consumer.close(); @@ -722,10 +716,6 @@ public class PagingTest extends ServiceTestBase assertNull(consumer.receiveImmediate()); - System.out.println("count Exp = " + qEXP.getMessageCount()); - - System.out.println("msgCount = " + queue1.getMessageCount()); - // This is just to hold some messages as being delivered ClientConsumerInternal cons = (ClientConsumerInternal) session.createConsumer(ADDRESS); @@ -741,12 +731,10 @@ public class PagingTest extends ServiceTestBase { clearDataRecreateServerDirs(); - Configuration config = createDefaultConfig(); - - config.setJournalDirectory(getJournalDir()); - - config.setJournalSyncNonTransactional(false); - config.setJournalCompactMinFiles(0); // disable compact + Configuration config = createDefaultConfig() + .setJournalDirectory(getJournalDir()) + .setJournalSyncNonTransactional(false) + .setJournalCompactMinFiles(0); // disable compact HornetQServer server = createServer(true, config, @@ -909,7 +897,7 @@ public class PagingTest extends ServiceTestBase queue = server.locateQueue(PagingTest.ADDRESS); - assertEquals(0, queue.getMessageCount()); + assertEquals(0, getMessageCount(queue)); timeout = System.currentTimeMillis() + 10000; while (timeout > System.currentTimeMillis() && queue.getPageSubscription().getPagingStore().isPaging()) @@ -927,9 +915,8 @@ public class PagingTest extends ServiceTestBase { clearDataRecreateServerDirs(); - Configuration config = createDefaultConfig(); - - config.setJournalSyncNonTransactional(false); + Configuration config = createDefaultConfig() + .setJournalSyncNonTransactional(false); server = createServer(true, config, @@ -1007,7 +994,7 @@ public class PagingTest extends ServiceTestBase Queue queue = server.locateQueue(ADDRESS); - assertEquals(numberOfMessages, queue.getMessageCount()); + assertEquals(numberOfMessages, getMessageCount(queue)); LinkedList<Xid> xids = new LinkedList<Xid>(); @@ -1044,7 +1031,7 @@ public class PagingTest extends ServiceTestBase sessionCheck.close(); - assertEquals(numberOfMessages, queue.getMessageCount()); + assertEquals(numberOfMessages, getMessageCount(queue)); sf.close(); locator.close(); @@ -1071,7 +1058,7 @@ public class PagingTest extends ServiceTestBase session.start(); - assertEquals(numberOfMessages, queue.getMessageCount()); + assertEquals(numberOfMessages, getMessageCount(queue)); ClientMessage msg = consumer.receive(5000); if (msg != null) @@ -1125,7 +1112,7 @@ public class PagingTest extends ServiceTestBase locator.close(); - assertEquals(0, queue.getMessageCount()); + assertEquals(0, getMessageCount(queue)); waitForNotPaging(queue); } @@ -1135,9 +1122,8 @@ public class PagingTest extends ServiceTestBase { clearDataRecreateServerDirs(); - Configuration config = createDefaultConfig(); - - config.setJournalSyncNonTransactional(false); + Configuration config = createDefaultConfig() + .setJournalSyncNonTransactional(false); server = createServer(true, config, @@ -1223,9 +1209,8 @@ public class PagingTest extends ServiceTestBase { clearDataRecreateServerDirs(); - Configuration config = createDefaultConfig(); - - config.setJournalSyncNonTransactional(false); + Configuration config = createDefaultConfig() + .setJournalSyncNonTransactional(false); server = createServer(true, config, @@ -1300,7 +1285,7 @@ public class PagingTest extends ServiceTestBase Queue queue = server.locateQueue(ADDRESS); - assertEquals(numberOfMessages, queue.getMessageCount()); + assertEquals(numberOfMessages, getMessageCount(queue)); int msgReceived = 0; ClientSession sessionConsumer = sf.createSession(false, false, false); @@ -1334,7 +1319,7 @@ public class PagingTest extends ServiceTestBase locator.close(); - assertEquals(0, queue.getMessageCount()); + assertEquals(0, getMessageCount(queue)); long timeout = System.currentTimeMillis() + 5000; while (timeout > System.currentTimeMillis() && queue.getPageSubscription().getPagingStore().isPaging()) @@ -1353,8 +1338,8 @@ public class PagingTest extends ServiceTestBase { clearDataRecreateServerDirs(); - Configuration config = createDefaultConfig(); - config.setPersistDeliveryCountBeforeDelivery(true); + Configuration config = createDefaultConfig() + .setPersistDeliveryCountBeforeDelivery(true); config.setJournalSyncNonTransactional(false); @@ -1431,7 +1416,7 @@ public class PagingTest extends ServiceTestBase Queue queue = server.locateQueue(ADDRESS); - assertEquals(numberOfMessages, queue.getMessageCount()); + assertEquals(numberOfMessages, getMessageCount(queue)); int msgReceived = 0; ClientSession sessionConsumer = sf.createSession(false, false, false); @@ -1465,7 +1450,7 @@ public class PagingTest extends ServiceTestBase locator.close(); - assertEquals(0, queue.getMessageCount()); + assertEquals(0, getMessageCount(queue)); long timeout = System.currentTimeMillis() + 5000; while (timeout > System.currentTimeMillis() && queue.getPageSubscription().getPagingStore().isPaging()) @@ -1534,7 +1519,7 @@ public class PagingTest extends ServiceTestBase queue = server.locateQueue(ADDRESS); - // assertEquals(numberOfMessages, queue.getMessageCount()); + // assertEquals(numberOfMessages, getMessageCount(queue)); msgReceived = 0; sessionConsumer = sf.createSession(false, false, false); @@ -1571,9 +1556,8 @@ public class PagingTest extends ServiceTestBase { clearDataRecreateServerDirs(); - Configuration config = createDefaultConfig(); - - config.setJournalSyncNonTransactional(false); + Configuration config = createDefaultConfig() + .setJournalSyncNonTransactional(false); server = createServer(true, config, @@ -1757,9 +1741,8 @@ public class PagingTest extends ServiceTestBase { clearDataRecreateServerDirs(); - Configuration config = createDefaultConfig(); - - config.setJournalSyncNonTransactional(false); + Configuration config = createDefaultConfig() + .setJournalSyncNonTransactional(false); server = createServer(true, config, @@ -1917,9 +1900,8 @@ public class PagingTest extends ServiceTestBase clearDataRecreateServerDirs(); - Configuration config = createDefaultConfig(); - - config.setJournalSyncNonTransactional(false); + Configuration config = createDefaultConfig() + .setJournalSyncNonTransactional(false); server = createServer(true, config, @@ -2036,9 +2018,8 @@ public class PagingTest extends ServiceTestBase { clearDataRecreateServerDirs(); - Configuration config = createDefaultConfig(); - - config.setJournalSyncNonTransactional(false); + Configuration config = createDefaultConfig() + .setJournalSyncNonTransactional(false); server = createServer(true, config, @@ -2048,21 +2029,19 @@ public class PagingTest extends ServiceTestBase if (divert) { - DivertConfiguration divert1 = new DivertConfiguration("dv1", - "nm1", - PagingTest.ADDRESS.toString(), - PagingTest.ADDRESS.toString() + "-1", - true, - null, - null); - - DivertConfiguration divert2 = new DivertConfiguration("dv2", - "nm2", - PagingTest.ADDRESS.toString(), - PagingTest.ADDRESS.toString() + "-2", - true, - null, - null); + DivertConfiguration divert1 = new DivertConfiguration() + .setName("dv1") + .setRoutingName("nm1") + .setAddress(PagingTest.ADDRESS.toString()) + .setForwardingAddress(PagingTest.ADDRESS.toString() + "-1") + .setExclusive(true); + + DivertConfiguration divert2 = new DivertConfiguration() + .setName("dv2") + .setRoutingName("nm2") + .setAddress(PagingTest.ADDRESS.toString()) + .setForwardingAddress(PagingTest.ADDRESS.toString() + "-2") + .setExclusive(true); ArrayList<DivertConfiguration> divertList = new ArrayList<DivertConfiguration>(); divertList.add(divert1); @@ -2102,10 +2081,9 @@ public class PagingTest extends ServiceTestBase { while (running.get()) { - // log.info("Message count = " + queue.getMessageCount() + " on queue " + queue.getName()); - queue.getMessagesAdded(); - queue.getMessageCount(); - // log.info("Message added = " + queue.getMessagesAdded() + " on queue " + queue.getName()); + // this will be overusing what some users do. flush / getCount + getMessagesAdded(queue); + getMessageCount(queue); Thread.sleep(10); } } @@ -2336,9 +2314,8 @@ public class PagingTest extends ServiceTestBase { clearDataRecreateServerDirs(); - Configuration config = createDefaultConfig(); - - config.setJournalSyncNonTransactional(false); + Configuration config = createDefaultConfig() + .setJournalSyncNonTransactional(false); server = createServer(true, config, @@ -2504,9 +2481,8 @@ public class PagingTest extends ServiceTestBase clearDataRecreateServerDirs(); - Configuration config = createDefaultConfig(); - - config.setJournalSyncNonTransactional(false); + Configuration config = createDefaultConfig() + .setJournalSyncNonTransactional(false); server = createServer(true, config, @@ -3241,9 +3217,8 @@ public class PagingTest extends ServiceTestBase { clearDataRecreateServerDirs(); - Configuration config = createDefaultConfig(); - - config.setJournalSyncNonTransactional(false); + Configuration config = createDefaultConfig() + .setJournalSyncNonTransactional(false); server = createServer(true, config, @@ -3928,7 +3903,7 @@ public class PagingTest extends ServiceTestBase { Queue queue = (Queue) server.getPostOffice().getBinding(new SimpleString("someQueue" + i)).getBindable(); - Assert.assertEquals("Queue someQueue" + i + " was supposed to be empty", 0, queue.getMessageCount()); + Assert.assertEquals("Queue someQueue" + i + " was supposed to be empty", 0, getMessageCount(queue)); Assert.assertEquals("Queue someQueue" + i + " was supposed to be empty", 0, queue.getDeliveringCount()); } } @@ -4235,10 +4210,9 @@ public class PagingTest extends ServiceTestBase clearDataRecreateServerDirs(); - Configuration config = createDefaultConfig(); - - config.setJournalSyncNonTransactional(false); - config.setJournalFileSize(10 * 1024 * 1024); + Configuration config = createDefaultConfig() + .setJournalSyncNonTransactional(false) + .setJournalFileSize(10 * 1024 * 1024); server = createServer(true, config, 512 * 1024, 1024 * 1024, new HashMap<String, AddressSettings>()); @@ -4358,9 +4332,8 @@ public class PagingTest extends ServiceTestBase clearDataRecreateServerDirs(); - Configuration config = createDefaultConfig(); - - config.setJournalSyncNonTransactional(false); + Configuration config = createDefaultConfig() + .setJournalSyncNonTransactional(false); server = createServer(true, config, @@ -4464,9 +4437,8 @@ public class PagingTest extends ServiceTestBase clearDataRecreateServerDirs(); - Configuration config = createDefaultConfig(); - - config.setJournalSyncNonTransactional(false); + Configuration config = createDefaultConfig() + .setJournalSyncNonTransactional(false); server = createServer(true, config, @@ -4584,9 +4556,8 @@ public class PagingTest extends ServiceTestBase clearDataRecreateServerDirs(); - Configuration config = createDefaultConfig(); - - config.setJournalSyncNonTransactional(false); + Configuration config = createDefaultConfig() + .setJournalSyncNonTransactional(false); server = createServer(true, config, @@ -4684,9 +4655,8 @@ public class PagingTest extends ServiceTestBase clearDataRecreateServerDirs(); - Configuration config = createDefaultConfig(); - - config.setJournalSyncNonTransactional(false); + Configuration config = createDefaultConfig() + .setJournalSyncNonTransactional(false); server = createServer(true, config, @@ -4809,10 +4779,9 @@ public class PagingTest extends ServiceTestBase { clearDataRecreateServerDirs(); - Configuration config = createDefaultConfig(); - config.setThreadPoolMaxSize(5); - - config.setJournalSyncNonTransactional(false); + Configuration config = createDefaultConfig() + .setThreadPoolMaxSize(5) + .setJournalSyncNonTransactional(false); Map<String, AddressSettings> settings = new HashMap<String, AddressSettings>(); AddressSettings dla = new AddressSettings(); @@ -5054,10 +5023,9 @@ public class PagingTest extends ServiceTestBase { clearDataRecreateServerDirs(); - Configuration config = createDefaultConfig(); - config.setMessageExpiryScanPeriod(500); - - config.setJournalSyncNonTransactional(false); + Configuration config = createDefaultConfig() + .setMessageExpiryScanPeriod(500) + .setJournalSyncNonTransactional(false); Map<String, AddressSettings> settings = new HashMap<String, AddressSettings>(); AddressSettings dla = new AddressSettings(); @@ -5439,7 +5407,7 @@ public class PagingTest extends ServiceTestBase producer.send(message); Queue q = (Queue) server.getPostOffice().getBinding(ADDRESS).getBindable(); - Assert.assertEquals(3, q.getMessageCount()); + Assert.assertEquals(3, getMessageCount(q)); // send a message with a dup ID that should fail b/c the address is full SimpleString dupID1 = new SimpleString("abcdefg"); @@ -5448,7 +5416,7 @@ public class PagingTest extends ServiceTestBase validateExceptionOnSending(producer, message); - Assert.assertEquals(3, q.getMessageCount()); + Assert.assertEquals(3, getMessageCount(q)); ClientConsumer consumer = session.createConsumer(ADDRESS); @@ -5461,11 +5429,11 @@ public class PagingTest extends ServiceTestBase session.commit(); // to make sure it's on the server (roundtrip) consumer.close(); - Assert.assertEquals(2, q.getMessageCount()); + Assert.assertEquals(2, getMessageCount(q)); producer.send(message); - Assert.assertEquals(3, q.getMessageCount()); + Assert.assertEquals(3, getMessageCount(q)); consumer = session.createConsumer(ADDRESS); @@ -5516,9 +5484,8 @@ public class PagingTest extends ServiceTestBase public void testRouteOnTopWithMultipleQueues() throws Exception { - Configuration config = createDefaultConfig(); - - config.setJournalSyncNonTransactional(false); + Configuration config = createDefaultConfig() + .setJournalSyncNonTransactional(false); server = createServer(true, config, @@ -5591,9 +5558,8 @@ public class PagingTest extends ServiceTestBase clearDataRecreateServerDirs(); - Configuration config = createDefaultConfig(); - - config.setJournalSyncNonTransactional(false); + Configuration config = createDefaultConfig() + .setJournalSyncNonTransactional(false); server = createServer(true, config, @@ -5794,9 +5760,8 @@ public class PagingTest extends ServiceTestBase { clearDataRecreateServerDirs(); - Configuration config = createDefaultConfig(); - - config.setJournalSyncNonTransactional(false); + Configuration config = createDefaultConfig() + .setJournalSyncNonTransactional(false); server = createServer(true, config, @@ -5895,9 +5860,8 @@ public class PagingTest extends ServiceTestBase { clearDataRecreateServerDirs(); - Configuration config = createDefaultConfig(); - - config.setJournalSyncNonTransactional(false); + Configuration config = createDefaultConfig() + .setJournalSyncNonTransactional(false); server = createServer(true, config, @@ -5997,9 +5961,8 @@ public class PagingTest extends ServiceTestBase { clearDataRecreateServerDirs(); - Configuration config = createDefaultConfig(); - - config.setJournalSyncNonTransactional(false); + Configuration config = createDefaultConfig() + .setJournalSyncNonTransactional(false); server = createServer(true, config, @@ -6082,9 +6045,8 @@ public class PagingTest extends ServiceTestBase @Test public void testNoCursors() throws Exception { - Configuration config = createDefaultConfig(); - - config.setJournalSyncNonTransactional(false); + Configuration config = createDefaultConfig() + .setJournalSyncNonTransactional(false); server = createServer(true, config, @@ -6128,9 +6090,8 @@ public class PagingTest extends ServiceTestBase { clearDataRecreateServerDirs(); - Configuration config = createDefaultConfig(); - - config.setJournalSyncNonTransactional(false); + Configuration config = createDefaultConfig() + .setJournalSyncNonTransactional(false); server = createServer(true, config, @@ -6245,9 +6206,8 @@ public class PagingTest extends ServiceTestBase @Override protected Configuration createDefaultConfig() throws Exception { - Configuration config = super.createDefaultConfig(); - config.setJournalSyncNonTransactional(false); - return config; + return super.createDefaultConfig() + .setJournalSyncNonTransactional(false); } private static final class DummyOperationContext implements OperationContext
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ProducerCloseTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ProducerCloseTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ProducerCloseTest.java index 94dd634..df3132c 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ProducerCloseTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ProducerCloseTest.java @@ -72,9 +72,9 @@ public class ProducerCloseTest extends ServiceTestBase public void setUp() throws Exception { super.setUp(); - Configuration config = createDefaultConfig(); - config.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getCanonicalName())); - config.setSecurityEnabled(false); + Configuration config = createDefaultConfig() + .addAcceptorConfiguration(new TransportConfiguration(InVMAcceptorFactory.class.getCanonicalName())) + .setSecurityEnabled(false); server = createServer(false, config); server.start(); locator = createInVMNonHALocator(); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/QueueBrowserTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/QueueBrowserTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/QueueBrowserTest.java index b884933..0bb5248 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/QueueBrowserTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/QueueBrowserTest.java @@ -310,7 +310,7 @@ public class QueueBrowserTest extends ServiceTestBase } // assert that all the messages are there and none have been acked Assert.assertEquals(0, ((Queue)server.getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount()); - Assert.assertEquals(100, ((Queue)server.getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount()); + Assert.assertEquals(100, getMessageCount(((Queue)server.getPostOffice().getBinding(QUEUE).getBindable()))); session.close(); @@ -347,7 +347,7 @@ public class QueueBrowserTest extends ServiceTestBase } // assert that all the messages are there and none have been acked Assert.assertEquals(0, ((Queue)server.getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount()); - Assert.assertEquals(100, ((Queue)server.getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount()); + Assert.assertEquals(100, getMessageCount(((Queue)server.getPostOffice().getBinding(QUEUE).getBindable()))); session.close(); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ReceiveImmediateTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ReceiveImmediateTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ReceiveImmediateTest.java index 7197f97..c2eaabe 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ReceiveImmediateTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ReceiveImmediateTest.java @@ -255,7 +255,7 @@ public class ReceiveImmediateTest extends ServiceTestBase Assert.assertEquals(0, ((Queue)server.getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount()); int messagesOnServer = browser ? numMessages : 0; Assert.assertEquals(messagesOnServer, - ((Queue)server.getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount()); + getMessageCount(((Queue)server.getPostOffice().getBinding(QUEUE).getBindable()))); consumer.close(); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/RedeliveryConsumerTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/RedeliveryConsumerTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/RedeliveryConsumerTest.java index 4a3cb8c..e528151 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/RedeliveryConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/RedeliveryConsumerTest.java @@ -354,8 +354,8 @@ public class RedeliveryConsumerTest extends ServiceTestBase */ private void setUp(final boolean persistDeliveryCountBeforeDelivery) throws Exception { - Configuration config = createDefaultConfig(); - config.setPersistDeliveryCountBeforeDelivery(persistDeliveryCountBeforeDelivery); + Configuration config = createDefaultConfig() + .setPersistDeliveryCountBeforeDelivery(persistDeliveryCountBeforeDelivery); server = createServer(true, config); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/RequestorTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/RequestorTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/RequestorTest.java index b1470de..7b3e7e0 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/RequestorTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/RequestorTest.java @@ -269,8 +269,8 @@ public class RequestorTest extends ServiceTestBase { super.setUp(); - Configuration conf = createDefaultConfig(); - conf.getAcceptorConfigurations().add(TransportConfigurationUtils.getInVMAcceptor(true)); + Configuration conf = createDefaultConfig() + .addAcceptorConfiguration(TransportConfigurationUtils.getInVMAcceptor(true)); service = createServer(false, conf); service.start(); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionCloseTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionCloseTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionCloseTest.java index 0433d2c..9dadd77 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionCloseTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionCloseTest.java @@ -256,9 +256,9 @@ public class SessionCloseTest extends UnitTestCase { super.setUp(); - Configuration config = createDefaultConfig(); - config.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getCanonicalName())); - config.setSecurityEnabled(false); + Configuration config = createDefaultConfig() + .addAcceptorConfiguration(new TransportConfiguration(InVMAcceptorFactory.class.getCanonicalName())) + .setSecurityEnabled(false); server = HornetQServers.newHornetQServer(config, false); server.start(); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionClosedOnRemotingConnectionFailureTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionClosedOnRemotingConnectionFailureTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionClosedOnRemotingConnectionFailureTest.java index a750c4d..fb636e0 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionClosedOnRemotingConnectionFailureTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionClosedOnRemotingConnectionFailureTest.java @@ -111,9 +111,9 @@ public class SessionClosedOnRemotingConnectionFailureTest extends ServiceTestBas { super.setUp(); - Configuration config = createDefaultConfig(); - config.getAcceptorConfigurations().add(new TransportConfiguration(INVM_ACCEPTOR_FACTORY)); - config.setSecurityEnabled(false); + Configuration config = createDefaultConfig() + .addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY)) + .setSecurityEnabled(false); server = createServer(false, config); server.start(); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionFactoryTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionFactoryTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionFactoryTest.java index 6ffa4fd..244037c 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionFactoryTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionFactoryTest.java @@ -29,10 +29,10 @@ import org.hornetq.api.core.client.ClientSessionFactory; import org.hornetq.api.core.client.HornetQClient; import org.hornetq.api.core.client.ServerLocator; import org.hornetq.core.config.Configuration; +import org.hornetq.core.config.ha.SharedStoreMasterPolicyConfiguration; import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory; import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory; import org.hornetq.core.server.HornetQServer; -import org.hornetq.core.server.cluster.ha.HAPolicy; import org.hornetq.tests.util.RandomUtil; import org.hornetq.tests.util.ServiceTestBase; import org.junit.Assert; @@ -47,9 +47,10 @@ import org.junit.Test; */ public class SessionFactoryTest extends ServiceTestBase { - private final DiscoveryGroupConfiguration groupConfiguration = - new DiscoveryGroupConfiguration(HornetQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT, HornetQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT, - new UDPBroadcastGroupConfiguration(getUDPDiscoveryAddress(), getUDPDiscoveryPort(), null, -1)); + private final DiscoveryGroupConfiguration groupConfiguration = new DiscoveryGroupConfiguration() + .setBroadcastEndpointFactoryConfiguration(new UDPBroadcastGroupConfiguration() + .setGroupAddress(getUDPDiscoveryAddress()) + .setGroupPort(getUDPDiscoveryPort())); private HornetQServer liveService; @@ -559,12 +560,7 @@ public class SessionFactoryTest extends ServiceTestBase private void startServer() throws Exception { - Configuration liveConf = createDefaultConfig(); - liveConf.setSecurityEnabled(false); liveTC = new TransportConfiguration(InVMConnectorFactory.class.getName()); - liveConf.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName())); - liveConf.getConnectorConfigurations().put(liveTC.getName(), liveTC); - liveConf.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.SHARED_STORE); final long broadcastPeriod = 250; @@ -572,18 +568,24 @@ public class SessionFactoryTest extends ServiceTestBase final int localBindPort = 5432; - BroadcastGroupConfiguration bcConfig1 = new BroadcastGroupConfiguration(bcGroupName, - broadcastPeriod, - Arrays.asList(liveTC.getName()), - new UDPBroadcastGroupConfiguration( - getUDPDiscoveryAddress(), - getUDPDiscoveryPort(), - null, - localBindPort)); + BroadcastGroupConfiguration bcConfig1 = new BroadcastGroupConfiguration() + .setName(bcGroupName) + .setBroadcastPeriod(broadcastPeriod) + .setConnectorInfos(Arrays.asList(liveTC.getName())) + .setEndpointFactoryConfiguration(new UDPBroadcastGroupConfiguration() + .setGroupAddress(getUDPDiscoveryAddress()) + .setGroupPort(getUDPDiscoveryPort()) + .setLocalBindPort(localBindPort)); List<BroadcastGroupConfiguration> bcConfigs1 = new ArrayList<BroadcastGroupConfiguration>(); bcConfigs1.add(bcConfig1); - liveConf.setBroadcastGroupConfigurations(bcConfigs1); + + Configuration liveConf = createDefaultConfig() + .setSecurityEnabled(false) + .addAcceptorConfiguration(new TransportConfiguration(InVMAcceptorFactory.class.getName())) + .addConnectorConfiguration(liveTC.getName(), liveTC) + .setHAPolicyConfiguration(new SharedStoreMasterPolicyConfiguration()) + .setBroadcastGroupConfigurations(bcConfigs1); liveService = createServer(false, liveConf); liveService.start(); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionTest.java index f90bc94..698d631 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionTest.java @@ -41,6 +41,7 @@ import org.junit.Test; * This test covers the API for ClientSession although XA tests are tested separately. * * @author <a href="mailto:[email protected]">Andy Taylor</a> + * @author <a href="mailto:[email protected]">Martyn Taylor</a> */ public class SessionTest extends ServiceTestBase { @@ -205,6 +206,9 @@ public class SessionTest extends ServiceTestBase ClientProducer cp = clientSession.createProducer("a1"); cp.send(clientSession.createMessage(false)); cp.send(clientSession.createMessage(false)); + + flushQueue(); + QueueQuery resp = clientSession.queueQuery(new SimpleString(queueName)); Assert.assertEquals(new SimpleString("a1"), resp.getAddress()); Assert.assertEquals(2, resp.getConsumerCount()); @@ -213,6 +217,13 @@ public class SessionTest extends ServiceTestBase clientSession.close(); } + private void flushQueue() throws Exception + { + Queue queue = server.locateQueue(SimpleString.toSimpleString(queueName)); + assertNotNull(queue); + queue.flushExecutor(); + } + @Test public void testQueueQueryWithFilter() throws Exception { @@ -221,6 +232,7 @@ public class SessionTest extends ServiceTestBase clientSession.createQueue("a1", queueName, "foo=bar", false); clientSession.createConsumer(queueName); clientSession.createConsumer(queueName); + QueueQuery resp = clientSession.queueQuery(new SimpleString(queueName)); Assert.assertEquals(new SimpleString("a1"), resp.getAddress()); Assert.assertEquals(2, resp.getConsumerCount()); @@ -349,9 +361,9 @@ public class SessionTest extends ServiceTestBase cp.send(clientSession.createMessage(false)); cp.send(clientSession.createMessage(false)); Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString(queueName)).getBindable(); - Assert.assertEquals(0, q.getMessageCount()); + Assert.assertEquals(0, getMessageCount(q)); clientSession.commit(); - Assert.assertEquals(10, q.getMessageCount()); + Assert.assertEquals(10, getMessageCount(q)); clientSession.close(); } @@ -373,12 +385,12 @@ public class SessionTest extends ServiceTestBase cp.send(clientSession.createMessage(false)); cp.send(clientSession.createMessage(false)); Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString(queueName)).getBindable(); - Assert.assertEquals(0, q.getMessageCount()); + Assert.assertEquals(0, getMessageCount(q)); clientSession.rollback(); cp.send(clientSession.createMessage(false)); cp.send(clientSession.createMessage(false)); clientSession.commit(); - Assert.assertEquals(2, q.getMessageCount()); + Assert.assertEquals(2, getMessageCount(q)); clientSession.close(); } @@ -403,7 +415,7 @@ public class SessionTest extends ServiceTestBase cp.send(clientSession.createMessage(false)); cp.send(clientSession.createMessage(false)); Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString(queueName)).getBindable(); - Assert.assertEquals(10, q.getMessageCount()); + Assert.assertEquals(10, getMessageCount(q)); ClientConsumer cc = clientSession.createConsumer(queueName); clientSession.start(); ClientMessage m = cc.receive(5000); @@ -437,7 +449,7 @@ public class SessionTest extends ServiceTestBase Assert.assertNotNull(m); m.acknowledge(); clientSession.commit(); - Assert.assertEquals(0, q.getMessageCount()); + Assert.assertEquals(0, getMessageCount(q)); clientSession.close(); sendSession.close(); } @@ -463,7 +475,7 @@ public class SessionTest extends ServiceTestBase cp.send(clientSession.createMessage(false)); cp.send(clientSession.createMessage(false)); Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString(queueName)).getBindable(); - Assert.assertEquals(10, q.getMessageCount()); + Assert.assertEquals(10, getMessageCount(q)); ClientConsumer cc = clientSession.createConsumer(queueName); clientSession.start(); ClientMessage m = cc.receive(5000); @@ -497,8 +509,17 @@ public class SessionTest extends ServiceTestBase Assert.assertNotNull(m); m.acknowledge(); clientSession.rollback(); - Assert.assertEquals(10, q.getMessageCount()); + Assert.assertEquals(10, getMessageCount(q)); clientSession.close(); sendSession.close(); } + + @Test + public void testGetNodeId() throws Exception + { + cf = createSessionFactory(locator); + ClientSession clientSession = addClientSession(cf.createSession(false, true, true)); + String nodeId = ((ClientSessionInternal) clientSession).getNodeId(); + assertNotNull(nodeId); + } } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SlowConsumerTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SlowConsumerTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SlowConsumerTest.java new file mode 100644 index 0000000..ce6504c --- /dev/null +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SlowConsumerTest.java @@ -0,0 +1,361 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.tests.integration.client; + +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import org.hornetq.api.config.HornetQDefaultConfiguration; +import org.hornetq.api.core.HornetQException; +import org.hornetq.api.core.HornetQExceptionType; +import org.hornetq.api.core.HornetQObjectClosedException; +import org.hornetq.api.core.SimpleString; +import org.hornetq.api.core.client.ClientConsumer; +import org.hornetq.api.core.client.ClientMessage; +import org.hornetq.api.core.client.ClientProducer; +import org.hornetq.api.core.client.ClientSession; +import org.hornetq.api.core.client.ClientSessionFactory; +import org.hornetq.api.core.client.MessageHandler; +import org.hornetq.api.core.client.ServerLocator; +import org.hornetq.api.core.management.CoreNotificationType; +import org.hornetq.api.core.management.ManagementHelper; +import org.hornetq.core.server.HornetQServer; +import org.hornetq.core.settings.impl.AddressSettings; +import org.hornetq.core.settings.impl.SlowConsumerPolicy; +import org.hornetq.tests.util.RandomUtil; +import org.hornetq.tests.util.ServiceTestBase; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +/** + * @author Justin Bertram + */ +@RunWith(value = Parameterized.class) +public class SlowConsumerTest extends ServiceTestBase +{ + private boolean isNetty = false; + + // this will ensure that all tests in this class are run twice, + // once with "true" passed to the class' constructor and once with "false" + @Parameterized.Parameters(name = "isNetty={0}") + public static Collection getParameters() + { + return Arrays.asList(new Object[][]{ + {true}, + {false} + }); + } + + public SlowConsumerTest(boolean isNetty) + { + this.isNetty = isNetty; + } + + private HornetQServer server; + + private final SimpleString QUEUE = new SimpleString("ConsumerTestQueue"); + + private ServerLocator locator; + + @Before + @Override + public void setUp() throws Exception + { + super.setUp(); + + server = createServer(false, isNetty); + + AddressSettings addressSettings = new AddressSettings(); + addressSettings.setSlowConsumerCheckPeriod(2); + addressSettings.setSlowConsumerThreshold(10); + addressSettings.setSlowConsumerPolicy(SlowConsumerPolicy.KILL); + + server.start(); + + server.getAddressSettingsRepository().addMatch(QUEUE.toString(), addressSettings); + + locator = createFactory(isNetty); + } + + @Test + public void testSlowConsumerKilled() throws Exception + { + ClientSessionFactory sf = createSessionFactory(locator); + + ClientSession session = addClientSession(sf.createSession(false, true, true, false)); + + session.createQueue(QUEUE, QUEUE, null, false); + + ClientProducer producer = addClientProducer(session.createProducer(QUEUE)); + + final int numMessages = 25; + + for (int i = 0; i < numMessages; i++) + { + producer.send(createTextMessage(session, "m" + i)); + } + + ClientConsumer consumer = addClientConsumer(session.createConsumer(QUEUE)); + session.start(); + + Thread.sleep(3000); + + try + { + consumer.receiveImmediate(); + fail(); + } + catch (HornetQObjectClosedException e) + { + assertEquals(e.getType(), HornetQExceptionType.OBJECT_CLOSED); + } + } + + @Test + public void testSlowConsumerNotification() throws Exception + { + + ClientSessionFactory sf = createSessionFactory(locator); + + ClientSession session = addClientSession(sf.createSession(false, true, true, false)); + + session.createQueue(QUEUE, QUEUE, null, false); + + AddressSettings addressSettings = new AddressSettings(); + addressSettings.setSlowConsumerCheckPeriod(2); + addressSettings.setSlowConsumerThreshold(10); + addressSettings.setSlowConsumerPolicy(SlowConsumerPolicy.NOTIFY); + + server.getAddressSettingsRepository().removeMatch(QUEUE.toString()); + server.getAddressSettingsRepository().addMatch(QUEUE.toString(), addressSettings); + + ClientProducer producer = addClientProducer(session.createProducer(QUEUE)); + + final int numMessages = 25; + + for (int i = 0; i < numMessages; i++) + { + producer.send(createTextMessage(session, "m" + i)); + } + + SimpleString notifQueue = RandomUtil.randomSimpleString(); + + session.createQueue(HornetQDefaultConfiguration.getDefaultManagementNotificationAddress(), notifQueue, null, false); + + ClientConsumer notifConsumer = session.createConsumer(notifQueue.toString(), ManagementHelper.HDR_NOTIFICATION_TYPE + "='" + CoreNotificationType.CONSUMER_SLOW + "'"); + + final CountDownLatch notifLatch = new CountDownLatch(1); + + notifConsumer.setMessageHandler(new MessageHandler() + { + @Override + public void onMessage(ClientMessage message) + { + assertEquals(CoreNotificationType.CONSUMER_SLOW.toString(), message.getObjectProperty(ManagementHelper.HDR_NOTIFICATION_TYPE).toString()); + assertEquals(QUEUE.toString(), message.getObjectProperty(ManagementHelper.HDR_ADDRESS).toString()); + assertEquals(Integer.valueOf(1), message.getIntProperty(ManagementHelper.HDR_CONSUMER_COUNT)); + if (isNetty) + { + assertTrue(message.getSimpleStringProperty(ManagementHelper.HDR_REMOTE_ADDRESS).toString().startsWith("/127.0.0.1")); + } + else + { + assertEquals(SimpleString.toSimpleString("invm:0"), message.getSimpleStringProperty(ManagementHelper.HDR_REMOTE_ADDRESS)); + } + assertNotNull(message.getSimpleStringProperty(ManagementHelper.HDR_CONNECTION_NAME)); + assertNotNull(message.getSimpleStringProperty(ManagementHelper.HDR_CONSUMER_NAME)); + assertNotNull(message.getSimpleStringProperty(ManagementHelper.HDR_SESSION_NAME)); + try + { + message.acknowledge(); + } + catch (HornetQException e) + { + e.printStackTrace(); + } + notifLatch.countDown(); + } + }); + + ClientConsumer consumer = addClientConsumer(session.createConsumer(QUEUE)); + session.start(); + + assertTrue(notifLatch.await(3, TimeUnit.SECONDS)); + } + + @Test + public void testSlowConsumerSpared() throws Exception + { + ClientSessionFactory sf = createSessionFactory(locator); + + ClientSession session = addClientSession(sf.createSession(true, true)); + + session.createQueue(QUEUE, QUEUE, null, false); + + ClientProducer producer = addClientProducer(session.createProducer(QUEUE)); + + final int numMessages = 5; + + for (int i = 0; i < numMessages; i++) + { + producer.send(createTextMessage(session, "m" + i)); + } + + ClientConsumer consumer = addClientConsumer(session.createConsumer(QUEUE)); + session.start(); + + Thread.sleep(3000); + + for (int i = 0; i < numMessages; i++) + { + assertNotNull(consumer.receive(500)); + } + } + + @Test + public void testFastThenSlowConsumerSpared() throws Exception + { + locator.setAckBatchSize(0); + + ClientSessionFactory sf = createSessionFactory(locator); + + ClientSession session = addClientSession(sf.createSession(true, true)); + + final ClientSession producerSession = addClientSession(sf.createSession(true, true)); + + session.createQueue(QUEUE, QUEUE, null, false); + + final ClientProducer producer = addClientProducer(producerSession.createProducer(QUEUE)); + + final AtomicLong messagesProduced = new AtomicLong(0); + + Thread t = new Thread(new Runnable() + { + @Override + public void run() + { + long start = System.currentTimeMillis(); + ClientMessage m = createTextMessage(producerSession, "m", true); + + // send messages as fast as possible for 3 seconds + while (System.currentTimeMillis() < (start + 3000)) + { + try + { + producer.send(m); + messagesProduced.incrementAndGet(); + } + catch (HornetQException e) + { + e.printStackTrace(); + return; + } + } + + start = System.currentTimeMillis(); + + // send 1 msg/second for 10 seconds + while (System.currentTimeMillis() < (start + 10000)) + { + try + { + producer.send(m); + messagesProduced.incrementAndGet(); + Thread.sleep(1000); + } + catch (Exception e) + { + e.printStackTrace(); + return; + } + } + } + }); + + t.start(); + + ClientConsumer consumer = addClientConsumer(session.createConsumer(QUEUE)); + session.start(); + + ClientMessage m = null; + long messagesConsumed = 0; + + do + { + m = consumer.receive(1500); + if (m != null) + { + m.acknowledge(); + messagesConsumed++; + } + } + while (m != null); + + assertEquals(messagesProduced.longValue(), messagesConsumed); + } + + @Test + public void testSlowWildcardConsumer() throws Exception + { + SimpleString addressAB = new SimpleString("a.b"); + SimpleString addressAC = new SimpleString("a.c"); + SimpleString address = new SimpleString("a.*"); + SimpleString queueName1 = new SimpleString("Q1"); + SimpleString queueName2 = new SimpleString("Q2"); + SimpleString queueName = new SimpleString("Q"); + + AddressSettings addressSettings = new AddressSettings(); + addressSettings.setSlowConsumerCheckPeriod(2); + addressSettings.setSlowConsumerThreshold(10); + addressSettings.setSlowConsumerPolicy(SlowConsumerPolicy.KILL); + + server.getAddressSettingsRepository().addMatch(address.toString(), addressSettings); + + ClientSessionFactory sf = createSessionFactory(locator); + + ClientSession session = addClientSession(sf.createSession(false, true, true, false)); + session.createQueue(addressAB, queueName1, null, false); + session.createQueue(addressAC, queueName2, null, false); + session.createQueue(address, queueName, null, false); + ClientProducer producer = session.createProducer(addressAB); + ClientProducer producer2 = session.createProducer(addressAC); + + final int numMessages = 20; + + for (int i = 0; i < numMessages; i++) + { + producer.send(createTextMessage(session, "m1" + i)); + producer2.send(createTextMessage(session, "m2" + i)); + } + + ClientConsumer consumer = addClientConsumer(session.createConsumer(queueName)); + session.start(); + + Thread.sleep(3000); + + try + { + consumer.receiveImmediate(); + fail(); + } + catch (HornetQObjectClosedException e) + { + assertEquals(e.getType(), HornetQExceptionType.OBJECT_CLOSED); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/TemporaryQueueTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/TemporaryQueueTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/TemporaryQueueTest.java index 9722371..ebb6491 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/TemporaryQueueTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/TemporaryQueueTest.java @@ -551,6 +551,13 @@ public class TemporaryQueueTest extends SingleServerTestBase serverCloseLatch.await(2 * RemotingServiceImpl.CONNECTION_TTL_CHECK_INTERVAL + 2 * TemporaryQueueTest.CONNECTION_TTL, TimeUnit.MILLISECONDS)); + + // The next getCount will be asynchronously done at the end of failure. We will wait some time until it has reached there. + for (long timeout = System.currentTimeMillis() + 5000; timeout > System.currentTimeMillis() && server.getConnectionCount() > 0;) + { + Thread.sleep(1); + } + Assert.assertEquals(0, server.getConnectionCount()); session.close(); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/TransactionalSendTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/TransactionalSendTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/TransactionalSendTest.java index 3761b0c..8ccb233 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/TransactionalSendTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/TransactionalSendTest.java @@ -62,17 +62,17 @@ public class TransactionalSendTest extends ServiceTestBase cp.send(session.createMessage(false)); } Queue q = (Queue) server.getPostOffice().getBinding(queueA).getBindable(); - Assert.assertEquals(q.getMessageCount(), 0); + Assert.assertEquals(0, getMessageCount(q)); session.commit(); - Assert.assertEquals(q.getMessageCount(), numMessages); + Assert.assertEquals(getMessageCount(q), numMessages); // now send some more for (int i = 0; i < numMessages; i++) { cp.send(session.createMessage(false)); } - Assert.assertEquals(q.getMessageCount(), numMessages); + Assert.assertEquals(numMessages, getMessageCount(q)); session.commit(); - Assert.assertEquals(q.getMessageCount(), numMessages * 2); + Assert.assertEquals(numMessages * 2, getMessageCount(q)); session.close(); } @@ -91,17 +91,17 @@ public class TransactionalSendTest extends ServiceTestBase cp.send(session.createMessage(false)); } Queue q = (Queue) server.getPostOffice().getBinding(queueA).getBindable(); - Assert.assertEquals(q.getMessageCount(), 0); + Assert.assertEquals(getMessageCount(q), 0); session.rollback(); - Assert.assertEquals(q.getMessageCount(), 0); + Assert.assertEquals(getMessageCount(q), 0); // now send some more for (int i = 0; i < numMessages; i++) { cp.send(session.createMessage(false)); } - Assert.assertEquals(q.getMessageCount(), 0); + Assert.assertEquals(0, getMessageCount(q)); session.commit(); - Assert.assertEquals(q.getMessageCount(), numMessages); + Assert.assertEquals(numMessages, getMessageCount(q)); session.close(); } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/WildCardRoutingTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/WildCardRoutingTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/WildCardRoutingTest.java index 8df2c19..726f78a 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/WildCardRoutingTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/WildCardRoutingTest.java @@ -783,13 +783,14 @@ public class WildCardRoutingTest extends UnitTestCase public void setUp() throws Exception { super.setUp(); - - Configuration configuration = createDefaultConfig(); - configuration.setWildcardRoutingEnabled(true); - configuration.setSecurityEnabled(false); - configuration.setTransactionTimeoutScanPeriod(500); TransportConfiguration transportConfig = new TransportConfiguration(UnitTestCase.INVM_ACCEPTOR_FACTORY); - configuration.getAcceptorConfigurations().add(transportConfig); + + Configuration configuration = createDefaultConfig() + .setWildcardRoutingEnabled(true) + .setSecurityEnabled(false) + .setTransactionTimeoutScanPeriod(500) + .addAcceptorConfiguration(transportConfig); + server = HornetQServers.newHornetQServer(configuration, false); // start the server server.start(); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/clientcrash/ClientTestBase.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/clientcrash/ClientTestBase.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/clientcrash/ClientTestBase.java index 87932a3..700adcf 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/clientcrash/ClientTestBase.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/clientcrash/ClientTestBase.java @@ -37,8 +37,8 @@ public abstract class ClientTestBase extends ServiceTestBase { super.setUp(); - Configuration config = createDefaultConfig(true); - config.setSecurityEnabled(false); + Configuration config = createDefaultConfig(true) + .setSecurityEnabled(false); server = createServer(false, config); server.start(); } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/clientcrash/CrashClient.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/clientcrash/CrashClient.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/clientcrash/CrashClient.java index 5336867..753fc07 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/clientcrash/CrashClient.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/clientcrash/CrashClient.java @@ -26,7 +26,7 @@ import org.hornetq.jms.client.HornetQTextMessage; import org.hornetq.tests.integration.IntegrationTestLogger; /** - * Code to be run in an external VM, via main(). + * Code to be run in an external VM, via main() * * @author <a href="mailto:[email protected]">Ovidiu Feodorov</a> * @author <a href="mailto:[email protected]">Jeff Mesnil</a> http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/clientcrash/CrashClient2.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/clientcrash/CrashClient2.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/clientcrash/CrashClient2.java index f76d033..23715c7 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/clientcrash/CrashClient2.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/clientcrash/CrashClient2.java @@ -26,7 +26,7 @@ import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory; import org.hornetq.tests.integration.IntegrationTestLogger; /** - * Code to be run in an external VM, via main(). + * Code to be run in an external VM, via main() * * @author <a href="mailto:[email protected]">Ovidiu Feodorov</a> * @author <a href="mailto:[email protected]">Jeff Mesnil</a> http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/clientcrash/GracefulClient.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/clientcrash/GracefulClient.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/clientcrash/GracefulClient.java index 5169201..2410187 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/clientcrash/GracefulClient.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/clientcrash/GracefulClient.java @@ -25,7 +25,7 @@ import org.hornetq.jms.client.HornetQTextMessage; import org.hornetq.tests.integration.IntegrationTestLogger; /** - * Code to be run in an external VM, via main(). + * Code to be run in an external VM, via main() * * @author <a href="mailto:[email protected]">Ovidiu Feodorov</a> * @author <a href="mailto:[email protected]">Jeff Mesnil</a> http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/ClusterControllerTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/ClusterControllerTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/ClusterControllerTest.java new file mode 100644 index 0000000..9f8fad7 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/ClusterControllerTest.java @@ -0,0 +1,97 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.tests.integration.cluster; + +import org.hornetq.api.core.HornetQClusterSecurityException; +import org.hornetq.api.core.TransportConfiguration; +import org.hornetq.api.core.client.HornetQClient; +import org.hornetq.core.client.impl.ClientSessionFactoryInternal; +import org.hornetq.core.client.impl.ServerLocatorImpl; +import org.hornetq.core.server.cluster.ClusterControl; +import org.hornetq.core.server.cluster.ClusterController; +import org.hornetq.core.server.cluster.HornetQServerSideProtocolManagerFactory; +import org.hornetq.tests.integration.cluster.distribution.ClusterTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + + +public class ClusterControllerTest extends ClusterTestBase +{ + @Override + @Before + public void setUp() throws Exception + { + super.setUp(); + + setupServer(0, isFileStorage(), true); + setupServer(1, isFileStorage(), true); + + getServer(0).getConfiguration().getAcceptorConfigurations().add(createTransportConfiguration(false, true, + generateParams(0, false))); + getServer(1).getConfiguration().getAcceptorConfigurations().add(createTransportConfiguration(false, true, + generateParams(1, false))); + + getServer(0).getConfiguration().setSecurityEnabled(true); + getServer(1).getConfiguration().setSecurityEnabled(true); + + getServer(1).getConfiguration().setClusterPassword("something different"); + + setupClusterConnection("cluster0", "queues", false, 1, true, 0); + setupClusterConnection("cluster0", "queues", false, 1, true, 1); + + startServers(0); + startServers(1); + } + + @Override + @After + public void tearDown() throws Exception + { + stopServers(); + + super.tearDown(); + } + + @Test + public void controlWithDifferentConnector() throws Exception + { + try (ServerLocatorImpl locator = (ServerLocatorImpl) HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(INVM_CONNECTOR_FACTORY))) + { + locator.setProtocolManagerFactory(HornetQServerSideProtocolManagerFactory.getInstance()); + ClusterController controller = new ClusterController(getServer(0), getServer(0).getScheduledPool()); + ClusterControl clusterControl = controller.connectToNodeInCluster((ClientSessionFactoryInternal) locator.createSessionFactory()); + clusterControl.authorize(); + } + } + + @Test + public void controlWithDifferentPassword() throws Exception + { + try (ServerLocatorImpl locator = (ServerLocatorImpl) HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(INVM_CONNECTOR_FACTORY))) + { + locator.setProtocolManagerFactory(HornetQServerSideProtocolManagerFactory.getInstance()); + ClusterController controller = new ClusterController(getServer(1), getServer(1).getScheduledPool()); + ClusterControl clusterControl = controller.connectToNodeInCluster((ClientSessionFactoryInternal) locator.createSessionFactory()); + try + { + clusterControl.authorize(); + fail("should throw HornetQClusterSecurityException"); + } + catch (Exception e) + { + assertTrue("should throw HornetQClusterSecurityException", e instanceof HornetQClusterSecurityException); + } + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/bridge/BridgeFailoverTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/bridge/BridgeFailoverTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/bridge/BridgeFailoverTest.java index f2db8ee..019f8b2 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/bridge/BridgeFailoverTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/bridge/BridgeFailoverTest.java @@ -57,8 +57,12 @@ public class BridgeFailoverTest extends MultiServerTestBase for (HornetQServer server : servers) { - server.getConfiguration().getQueueConfigurations().add(new CoreQueueConfiguration(ORIGINAL_QUEUE, ORIGINAL_QUEUE, null, true)); - server.getConfiguration().getQueueConfigurations().add(new CoreQueueConfiguration(TARGET_QUEUE, TARGET_QUEUE, null, true)); + server.getConfiguration().getQueueConfigurations().add(new CoreQueueConfiguration() + .setAddress(ORIGINAL_QUEUE) + .setName(ORIGINAL_QUEUE)); + server.getConfiguration().getQueueConfigurations().add(new CoreQueueConfiguration() + .setAddress(TARGET_QUEUE) + .setName(TARGET_QUEUE)); } startServers(); @@ -132,8 +136,12 @@ public class BridgeFailoverTest extends MultiServerTestBase for (HornetQServer server : servers) { - server.getConfiguration().getQueueConfigurations().add(new CoreQueueConfiguration(ORIGINAL_QUEUE, ORIGINAL_QUEUE, null, true)); - server.getConfiguration().getQueueConfigurations().add(new CoreQueueConfiguration(TARGET_QUEUE, TARGET_QUEUE, null, true)); + server.getConfiguration().getQueueConfigurations().add(new CoreQueueConfiguration() + .setAddress(ORIGINAL_QUEUE) + .setName(ORIGINAL_QUEUE)); + server.getConfiguration().getQueueConfigurations().add(new CoreQueueConfiguration() + .setAddress(TARGET_QUEUE) + .setName(TARGET_QUEUE)); } startServers(); @@ -245,8 +253,12 @@ public class BridgeFailoverTest extends MultiServerTestBase for (HornetQServer server : servers) { - server.getConfiguration().getQueueConfigurations().add(new CoreQueueConfiguration(ORIGINAL_QUEUE, ORIGINAL_QUEUE, null, true)); - server.getConfiguration().getQueueConfigurations().add(new CoreQueueConfiguration(TARGET_QUEUE, TARGET_QUEUE, null, true)); + server.getConfiguration().getQueueConfigurations().add(new CoreQueueConfiguration() + .setAddress(ORIGINAL_QUEUE) + .setName(ORIGINAL_QUEUE)); + server.getConfiguration().getQueueConfigurations().add(new CoreQueueConfiguration() + .setAddress(TARGET_QUEUE) + .setName(TARGET_QUEUE)); } startBackups(0, 1, 3, 4); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java index 2c9ea28..ccb1708 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java @@ -17,7 +17,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import org.hornetq.api.config.HornetQDefaultConfiguration; import org.hornetq.api.core.HornetQException; import org.hornetq.api.core.HornetQExceptionType; import org.hornetq.api.core.HornetQNotConnectedException; @@ -158,12 +157,16 @@ public class BridgeReconnectTest extends BridgeTestBase server0.getConfiguration().setBridgeConfigurations(bridgeConfigs); server2.getConfiguration().setBridgeConfigurations(bridgeConfigs); - CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration(testAddress, queueName, null, true); + CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration() + .setAddress(testAddress) + .setName(queueName); List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<CoreQueueConfiguration>(); queueConfigs0.add(queueConfig0); server1.getConfiguration().setQueueConfigurations(queueConfigs0); - CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration(forwardAddress, queueName, null, true); + CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration() + .setAddress(forwardAddress) + .setName(queueName); List<CoreQueueConfiguration> queueConfigs1 = new ArrayList<CoreQueueConfiguration>(); queueConfigs1.add(queueConfig1); server0.getConfiguration().setQueueConfigurations(queueConfigs1); @@ -209,12 +212,16 @@ public class BridgeReconnectTest extends BridgeTestBase bridgeConfigs.add(bridgeConfiguration); server0.getConfiguration().setBridgeConfigurations(bridgeConfigs); - CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration(testAddress, queueName, null, true); + CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration() + .setAddress(testAddress) + .setName(queueName); List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<CoreQueueConfiguration>(); queueConfigs0.add(queueConfig0); server0.getConfiguration().setQueueConfigurations(queueConfigs0); - CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration(forwardAddress, queueName, null, true); + CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration() + .setAddress(forwardAddress) + .setName(queueName); List<CoreQueueConfiguration> queueConfigs1 = new ArrayList<CoreQueueConfiguration>(); queueConfigs1.add(queueConfig1); server1.getConfiguration().setQueueConfigurations(queueConfigs1); @@ -270,13 +277,17 @@ public class BridgeReconnectTest extends BridgeTestBase private BridgeConfiguration createBridgeConfig() { - return new BridgeConfiguration(bridgeName, queueName, forwardAddress, null, null, - HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, - HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD, - HornetQClient.DEFAULT_CONNECTION_TTL, retryInterval, - HornetQClient.DEFAULT_MAX_RETRY_INTERVAL, retryIntervalMultiplier, - -1, reconnectAttempts, 0, true, confirmationWindowSize, staticConnectors, false, - HornetQDefaultConfiguration.getDefaultClusterUser(), CLUSTER_PASSWORD); + return new BridgeConfiguration() + .setName(bridgeName) + .setQueueName(queueName) + .setForwardingAddress(forwardAddress) + .setRetryInterval(retryInterval) + .setRetryIntervalMultiplier(retryIntervalMultiplier) + .setReconnectAttempts(reconnectAttempts) + .setReconnectAttemptsOnSameNode(0) + .setConfirmationWindowSize(confirmationWindowSize) + .setStaticConnectors(staticConnectors) + .setPassword(CLUSTER_PASSWORD); } // Fail bridge and attempt failover a few times before succeeding @@ -301,12 +312,16 @@ public class BridgeReconnectTest extends BridgeTestBase bridgeConfigs.add(bridgeConfiguration); server0.getConfiguration().setBridgeConfigurations(bridgeConfigs); - CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration(testAddress, queueName, null, true); + CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration() + .setAddress(testAddress) + .setName(queueName); List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<CoreQueueConfiguration>(); queueConfigs0.add(queueConfig0); server0.getConfiguration().setQueueConfigurations(queueConfigs0); - CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration(forwardAddress, queueName, null, true); + CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration() + .setAddress(forwardAddress) + .setName(queueName); List<CoreQueueConfiguration> queueConfigs1 = new ArrayList<CoreQueueConfiguration>(); queueConfigs1.add(queueConfig1); server1.getConfiguration().setQueueConfigurations(queueConfigs1); @@ -370,12 +385,16 @@ public class BridgeReconnectTest extends BridgeTestBase bridgeConfigs.add(bridgeConfiguration); server0.getConfiguration().setBridgeConfigurations(bridgeConfigs); - CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration(testAddress, queueName, null, true); + CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration() + .setAddress(testAddress) + .setName(queueName); List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<CoreQueueConfiguration>(); queueConfigs0.add(queueConfig0); server0.getConfiguration().setQueueConfigurations(queueConfigs0); - CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration(forwardAddress, queueName, null, true); + CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration() + .setAddress(forwardAddress) + .setName(queueName); List<CoreQueueConfiguration> queueConfigs1 = new ArrayList<CoreQueueConfiguration>(); queueConfigs1.add(queueConfig1); server1.getConfiguration().setQueueConfigurations(queueConfigs1); @@ -457,37 +476,33 @@ public class BridgeReconnectTest extends BridgeTestBase reconnectAttempts = -1; final long clientFailureCheckPeriod = 1000; - BridgeConfiguration bridgeConfiguration = new BridgeConfiguration(bridgeName, - queueName, - forwardAddress, - null, - null, - HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, - clientFailureCheckPeriod, - HornetQClient.DEFAULT_CONNECTION_TTL, - retryInterval, - HornetQClient.DEFAULT_MAX_RETRY_INTERVAL, - retryIntervalMultiplier, - -1, - reconnectAttempts, - 0, - true, - confirmationWindowSize, - staticConnectors, - false, - HornetQDefaultConfiguration.getDefaultClusterUser(), - CLUSTER_PASSWORD); + BridgeConfiguration bridgeConfiguration = new BridgeConfiguration() + .setName(bridgeName) + .setQueueName(queueName) + .setForwardingAddress(forwardAddress) + .setClientFailureCheckPeriod(clientFailureCheckPeriod) + .setRetryInterval(retryInterval) + .setRetryIntervalMultiplier(retryIntervalMultiplier) + .setReconnectAttempts(reconnectAttempts) + .setReconnectAttemptsOnSameNode(0) + .setConfirmationWindowSize(confirmationWindowSize) + .setStaticConnectors(staticConnectors) + .setPassword(CLUSTER_PASSWORD); List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>(); bridgeConfigs.add(bridgeConfiguration); server0.getConfiguration().setBridgeConfigurations(bridgeConfigs); - CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration(testAddress, queueName, null, true); + CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration() + .setAddress(testAddress) + .setName(queueName); List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<CoreQueueConfiguration>(); queueConfigs0.add(queueConfig0); server0.getConfiguration().setQueueConfigurations(queueConfigs0); - CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration(forwardAddress, queueName, null, true); + CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration() + .setAddress(forwardAddress) + .setName(queueName); List<CoreQueueConfiguration> queueConfigs1 = new ArrayList<CoreQueueConfiguration>(); queueConfigs1.add(queueConfig1); server1.getConfiguration().setQueueConfigurations(queueConfigs1); @@ -594,12 +609,16 @@ public class BridgeReconnectTest extends BridgeTestBase bridgeConfigs.add(bridgeConfiguration); server0.getConfiguration().setBridgeConfigurations(bridgeConfigs); - CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration(testAddress, queueName, null, true); + CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration() + .setAddress(testAddress) + .setName(queueName); List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<CoreQueueConfiguration>(); queueConfigs0.add(queueConfig0); server0.getConfiguration().setQueueConfigurations(queueConfigs0); - CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration(forwardAddress, queueName, null, true); + CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration() + .setAddress(forwardAddress) + .setName(queueName); List<CoreQueueConfiguration> queueConfigs1 = new ArrayList<CoreQueueConfiguration>(); queueConfigs1.add(queueConfig1); server1.getConfiguration().setQueueConfigurations(queueConfigs1); @@ -707,12 +726,16 @@ public class BridgeReconnectTest extends BridgeTestBase bridgeConfigs.add(bridgeConfiguration); server0.getConfiguration().setBridgeConfigurations(bridgeConfigs); - CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration(testAddress, queueName, null, true); + CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration() + .setAddress(testAddress) + .setName(queueName); List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<CoreQueueConfiguration>(); queueConfigs0.add(queueConfig0); server0.getConfiguration().setQueueConfigurations(queueConfigs0); - CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration(forwardAddress, queueName, null, true); + CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration() + .setAddress(forwardAddress) + .setName(queueName); List<CoreQueueConfiguration> queueConfigs1 = new ArrayList<CoreQueueConfiguration>(); queueConfigs1.add(queueConfig1); server1.getConfiguration().setQueueConfigurations(queueConfigs1);
