http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ConcurrentCreateDeleteProduceTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ConcurrentCreateDeleteProduceTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ConcurrentCreateDeleteProduceTest.java index c2eed39..5923fc5 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ConcurrentCreateDeleteProduceTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ConcurrentCreateDeleteProduceTest.java @@ -58,10 +58,9 @@ public class ConcurrentCreateDeleteProduceTest extends ServiceTestBase { super.setUp(); - Configuration config = createDefaultConfig(false); - - config.setJournalSyncNonTransactional(false); - config.setJournalSyncTransactional(false); + Configuration config = createDefaultConfig(false) + .setJournalSyncNonTransactional(false) + .setJournalSyncTransactional(false); server = createServer(true, config,
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ConsumerCloseTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ConsumerCloseTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ConsumerCloseTest.java index 7975279..f3d10db 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ConsumerCloseTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ConsumerCloseTest.java @@ -303,9 +303,9 @@ public class ConsumerCloseTest extends ServiceTestBase { super.setUp(); - Configuration config = createDefaultConfig(); - config.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getCanonicalName())); - config.setSecurityEnabled(false); + Configuration config = createDefaultConfig() + .addAcceptorConfiguration(new TransportConfiguration(InVMAcceptorFactory.class.getCanonicalName())) + .setSecurityEnabled(false); server = addServer(HornetQServers.newHornetQServer(config, false)); server.start(); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ConsumerStuckTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ConsumerStuckTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ConsumerStuckTest.java new file mode 100644 index 0000000..e9b1a38 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ConsumerStuckTest.java @@ -0,0 +1,305 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.tests.integration.client; + +import org.hornetq.api.core.SimpleString; +import org.hornetq.api.core.TransportConfiguration; +import org.hornetq.api.core.client.ClientConsumer; +import org.hornetq.api.core.client.ClientMessage; +import org.hornetq.api.core.client.ClientProducer; +import org.hornetq.api.core.client.ClientSession; +import org.hornetq.api.core.client.ClientSessionFactory; +import org.hornetq.api.core.client.HornetQClient; +import org.hornetq.api.core.client.ServerLocator; +import org.hornetq.core.client.impl.ClientSessionFactoryImpl; +import org.hornetq.core.protocol.core.impl.RemotingConnectionImpl; +import org.hornetq.core.remoting.impl.netty.NettyConnection; +import org.hornetq.core.server.HornetQServer; +import org.hornetq.tests.util.ServiceTestBase; +import org.junit.Before; +import org.junit.Test; + +/** + * @author <a href="mailto:[email protected]">Clebert Suconic</a> + */ +public class ConsumerStuckTest extends ServiceTestBase +{ + private HornetQServer server; + + private final SimpleString QUEUE = new SimpleString("ConsumerTestQueue"); + + protected boolean isNetty() + { + return true; + } + + @Before + @Override + public void setUp() throws Exception + { + super.setUp(); + + server = createServer(false, isNetty()); + + server.start(); + } + + @Test + public void testClientStuckTest() throws Exception + { + + ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(NETTY_CONNECTOR_FACTORY)); + locator.setConnectionTTL(1000); + locator.setClientFailureCheckPeriod(100); + locator.setConsumerWindowSize(10 * 1024 * 1024); + ClientSessionFactory sf = locator.createSessionFactory(); + ((ClientSessionFactoryImpl) sf).stopPingingAfterOne(); + + RemotingConnectionImpl remotingConnection = (RemotingConnectionImpl) sf.getConnection(); + ClientSession session = sf.createSession(false, true, true, true); + + session.createQueue(QUEUE, QUEUE, null, false); + + ClientProducer producer = session.createProducer(QUEUE); + + final int numMessages = 10000; + + for (int i = 0; i < numMessages; i++) + { + ClientMessage message = createTextMessage(session, "m" + i); + producer.send(message); + } + + + final ClientConsumer consumer = session.createConsumer(QUEUE); + session.start(); + + final NettyConnection nettyConnection = (NettyConnection) remotingConnection.getTransportConnection(); + + + Thread tReceive = new Thread() + { + public void run() + { + boolean first = true; + try + { + while (!Thread.interrupted()) + { + ClientMessage received = consumer.receive(500); + System.out.println("Received " + received); + if (first) + { + first = false; + nettyConnection.getNettyChannel().config().setAutoRead(false); + } + if (received != null) + { + received.acknowledge(); + } + } + } + catch (Throwable e) + { + Thread.currentThread().interrupt(); + e.printStackTrace(); + } + } + }; + + tReceive.start(); + + try + { + + assertEquals(1, server.getSessions().size()); + + System.out.println("sessions = " + server.getSessions().size()); + + assertEquals(1, server.getConnectionCount()); + + long timeout = System.currentTimeMillis() + 20000; + + while (System.currentTimeMillis() < timeout && server.getSessions().size() != 0) + { + Thread.sleep(10); + } + + System.out.println("Size = " + server.getConnectionCount()); + + System.out.println("sessions = " + server.getSessions().size()); + + + + if (server.getSessions().size() != 0) + { + System.out.println(threadDump("Thread dump")); + fail("The cleanup wasn't able to finish cleaning the session. It's probably stuck, look at the thread dump generated by the test for more information"); + } + + + timeout = System.currentTimeMillis() + 20000; + + while (System.currentTimeMillis() < timeout && server.getConnectionCount() != 0) + { + Thread.sleep(10); + } + + assertEquals(0, server.getConnectionCount()); + } + finally + { + nettyConnection.getNettyChannel().config().setAutoRead(true); + tReceive.interrupt(); + tReceive.join(); + } + } + + @Test + public void testClientStuckTestWithDirectDelivery() throws Exception + { + + ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(NETTY_CONNECTOR_FACTORY)); + locator.setConnectionTTL(1000); + locator.setClientFailureCheckPeriod(100); + locator.setConsumerWindowSize(10 * 1024 * 1024); + ClientSessionFactory sf = locator.createSessionFactory(); + ((ClientSessionFactoryImpl) sf).stopPingingAfterOne(); + + RemotingConnectionImpl remotingConnection = (RemotingConnectionImpl) sf.getConnection(); + ClientSession session = sf.createSession(false, true, true, true); + + session.createQueue(QUEUE, QUEUE, null, false); + + + final int numMessages = 10000; + + final ClientConsumer consumer = session.createConsumer(QUEUE); + session.start(); + + final NettyConnection nettyConnection = (NettyConnection) remotingConnection.getTransportConnection(); + + + Thread tReceive = new Thread() + { + public void run() + { + boolean first = true; + try + { + while (!Thread.interrupted()) + { + ClientMessage received = consumer.receive(500); + System.out.println("Received " + received); + if (first) + { + first = false; + nettyConnection.getNettyChannel().config().setAutoRead(false); + } + if (received != null) + { + received.acknowledge(); + } + } + } + catch (Throwable e) + { + Thread.currentThread().interrupt(); + e.printStackTrace(); + } + } + }; + + tReceive.start(); + + Thread sender = new Thread() + { + public void run() + { + try ( + ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(NETTY_CONNECTOR_FACTORY)); + ClientSessionFactory factory = locator.createSessionFactory(); + ClientSession session = factory.createSession(false, true, true, true); + ClientProducer producer = session.createProducer(QUEUE); + ) + { + for (int i = 0; i < numMessages; i++) + { + ClientMessage message = createTextMessage(session, "m" + i); + producer.send(message); + } + } + catch (Exception e) + { + e.printStackTrace(); + } + } + }; + + + sender.start(); + + try + { + + long timeout = System.currentTimeMillis() + 20000; + + while (System.currentTimeMillis() < timeout && server.getSessions().size() != 2) + { + Thread.sleep(10); + } + + assertEquals(2, server.getSessions().size()); + + System.out.println("sessions = " + server.getSessions().size()); + + assertEquals(2, server.getConnectionCount()); + + timeout = System.currentTimeMillis() + 20000; + + while (System.currentTimeMillis() < timeout && server.getSessions().size() != 1) + { + Thread.sleep(10); + } + + System.out.println("Size = " + server.getConnectionCount()); + + System.out.println("sessions = " + server.getSessions().size()); + + + + if (server.getSessions().size() != 1) + { + System.out.println(threadDump("Thread dump")); + fail("The cleanup wasn't able to finish cleaning the session. It's probably stuck, look at the thread dump generated by the test for more information"); + } + + sender.join(); + + timeout = System.currentTimeMillis() + 20000; + + while (System.currentTimeMillis() < timeout && server.getConnectionCount() != 0) + { + Thread.sleep(10); + } + assertEquals(0, server.getConnectionCount()); + } + finally + { + nettyConnection.getNettyChannel().config().setAutoRead(true); + tReceive.interrupt(); + tReceive.join(); + } + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ConsumerTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ConsumerTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ConsumerTest.java index 07d9586..96fca27 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ConsumerTest.java @@ -12,6 +12,8 @@ */ package org.hornetq.tests.integration.client; +import java.util.Arrays; +import java.util.Collection; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -38,13 +40,32 @@ import org.hornetq.utils.ConcurrentHashSet; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; /** * @author <a href="mailto:[email protected]">Andy Taylor</a> * @author <a href="mailto:[email protected]">Clebert Suconic</a> */ + +@RunWith(value = Parameterized.class) public class ConsumerTest extends ServiceTestBase { + @Parameterized.Parameters(name = "isNetty={0}") + public static Collection getParameters() + { + return Arrays.asList(new Object[][]{ + {true}, + {false} + }); + } + + public ConsumerTest(boolean netty) + { + this.netty = netty; + } + + private final boolean netty; private HornetQServer server; private final SimpleString QUEUE = new SimpleString("ConsumerTestQueue"); @@ -53,7 +74,7 @@ public class ConsumerTest extends ServiceTestBase protected boolean isNetty() { - return false; + return netty; } @Before @@ -114,7 +135,7 @@ public class ConsumerTest extends ServiceTestBase } // assert that all the messages are there and none have been acked Assert.assertEquals(0, ((Queue) server.getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount()); - Assert.assertEquals(0, ((Queue) server.getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount()); + Assert.assertEquals(0, getMessageCount(((Queue) server.getPostOffice().getBinding(QUEUE).getBindable()))); session.close(); } @@ -149,7 +170,7 @@ public class ConsumerTest extends ServiceTestBase } // assert that all the messages are there and none have been acked Assert.assertEquals(0, ((Queue) server.getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount()); - Assert.assertEquals(0, ((Queue) server.getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount()); + Assert.assertEquals(0, getMessageCount(((Queue) server.getPostOffice().getBinding(QUEUE).getBindable()))); session.close(); } @@ -188,7 +209,7 @@ public class ConsumerTest extends ServiceTestBase } // assert that all the messages are there and none have been acked Assert.assertEquals(0, ((Queue) server.getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount()); - Assert.assertEquals(0, ((Queue) server.getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount()); + Assert.assertEquals(0, getMessageCount(((Queue) server.getPostOffice().getBinding(QUEUE).getBindable()))); session.close(); } @@ -227,12 +248,12 @@ public class ConsumerTest extends ServiceTestBase } // assert that all the messages are there and none have been acked Assert.assertEquals(0, ((Queue) server.getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount()); - Assert.assertEquals(0, ((Queue) server.getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount()); + Assert.assertEquals(0, getMessageCount(((Queue) server.getPostOffice().getBinding(QUEUE).getBindable()))); session.close(); Assert.assertEquals(0, ((Queue) server.getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount()); - Assert.assertEquals(0, ((Queue) server.getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount()); + Assert.assertEquals(0, getMessageCount(((Queue) server.getPostOffice().getBinding(QUEUE).getBindable()))); } @Test http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/CoreClientTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/CoreClientTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/CoreClientTest.java index 7521ce6..a87f180 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/CoreClientTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/CoreClientTest.java @@ -63,11 +63,9 @@ public class CoreClientTest extends ServiceTestBase { final SimpleString QUEUE = new SimpleString("CoreClientTestQueue"); - Configuration conf = createDefaultConfig(); - - conf.setSecurityEnabled(false); - - conf.getAcceptorConfigurations().add(new TransportConfiguration(acceptorFactoryClassName)); + Configuration conf = createDefaultConfig() + .setSecurityEnabled(false) + .addAcceptorConfiguration(new TransportConfiguration(acceptorFactoryClassName)); HornetQServer server = addServer(HornetQServers.newHornetQServer(conf, false)); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/CreateQueueIdempotentTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/CreateQueueIdempotentTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/CreateQueueIdempotentTest.java index 4f90e5d..ca11aa4 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/CreateQueueIdempotentTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/CreateQueueIdempotentTest.java @@ -40,9 +40,9 @@ public class CreateQueueIdempotentTest extends ServiceTestBase { super.setUp(); - Configuration conf = createDefaultConfig(); - conf.setSecurityEnabled(false); - conf.getAcceptorConfigurations().add(new TransportConfiguration(INVM_ACCEPTOR_FACTORY)); + Configuration conf = createDefaultConfig() + .setSecurityEnabled(false) + .addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY)); server = addServer(HornetQServers.newHornetQServer(conf, true)); server.start(); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/DeadLetterAddressTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/DeadLetterAddressTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/DeadLetterAddressTest.java index 8c58227..e88ca83 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/DeadLetterAddressTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/DeadLetterAddressTest.java @@ -327,12 +327,12 @@ public class DeadLetterAddressTest extends ServiceTestBase long timeout = System.currentTimeMillis() + 5000; // DLA transfer is asynchronous fired on the rollback - while (System.currentTimeMillis() < timeout && ((Queue)server.getPostOffice().getBinding(qName).getBindable()).getMessageCount() != 0) + while (System.currentTimeMillis() < timeout && getMessageCount(((Queue)server.getPostOffice().getBinding(qName).getBindable())) != 0) { Thread.sleep(1); } - Assert.assertEquals(0, ((Queue)server.getPostOffice().getBinding(qName).getBindable()).getMessageCount()); + Assert.assertEquals(0, getMessageCount(((Queue)server.getPostOffice().getBinding(qName).getBindable()))); ClientMessage m = clientConsumer.receiveImmediate(); Assert.assertNull(m); // All the messages should now be in the DLQ @@ -511,11 +511,11 @@ public class DeadLetterAddressTest extends ServiceTestBase public void setUp() throws Exception { super.setUp(); - - Configuration configuration = createDefaultConfig(); - configuration.setSecurityEnabled(false); TransportConfiguration transportConfig = new TransportConfiguration(UnitTestCase.INVM_ACCEPTOR_FACTORY); - configuration.getAcceptorConfigurations().add(transportConfig); + + Configuration configuration = createDefaultConfig() + .setSecurityEnabled(false) + .addAcceptorConfiguration(transportConfig); server = addServer(HornetQServers.newHornetQServer(configuration, false)); // start the server server.start(); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ExpiryAddressTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ExpiryAddressTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ExpiryAddressTest.java index cc7a24a..edddb8e 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ExpiryAddressTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ExpiryAddressTest.java @@ -11,6 +11,7 @@ * permissions and limitations under the License. */ package org.hornetq.tests.integration.client; +import org.hornetq.core.message.impl.MessageImpl; import org.junit.Before; import org.junit.Test; @@ -52,15 +53,16 @@ public class ExpiryAddressTest extends ServiceTestBase public void testBasicSend() throws Exception { SimpleString ea = new SimpleString("EA"); + SimpleString adSend = new SimpleString("a1"); SimpleString qName = new SimpleString("q1"); SimpleString eq = new SimpleString("EA1"); AddressSettings addressSettings = new AddressSettings(); addressSettings.setExpiryAddress(ea); - server.getAddressSettingsRepository().addMatch(qName.toString(), addressSettings); + server.getAddressSettingsRepository().addMatch("#", addressSettings); clientSession.createQueue(ea, eq, null, false); - clientSession.createQueue(qName, qName, null, false); + clientSession.createQueue(adSend, qName, null, false); - ClientProducer producer = clientSession.createProducer(qName); + ClientProducer producer = clientSession.createProducer(adSend); ClientMessage clientMessage = createTextMessage(clientSession, "heyho!"); clientMessage.setExpiration(System.currentTimeMillis()); producer.send(clientMessage); @@ -75,6 +77,9 @@ public class ExpiryAddressTest extends ServiceTestBase clientConsumer = clientSession.createConsumer(eq); m = clientConsumer.receive(500); Assert.assertNotNull(m); + Assert.assertEquals(qName.toString(), m.getStringProperty(MessageImpl.HDR_ORIGINAL_QUEUE)); + Assert.assertEquals(adSend.toString(), m.getStringProperty(MessageImpl.HDR_ORIGINAL_ADDRESS)); + Assert.assertNotNull(m); Assert.assertEquals(m.getBodyBuffer().readString(), "heyho!"); m.acknowledge(); } @@ -165,6 +170,9 @@ public class ExpiryAddressTest extends ServiceTestBase Assert.assertNotNull(m); + assertNotNull(m.getStringProperty(MessageImpl.HDR_ORIGINAL_ADDRESS)); + assertNotNull(m.getStringProperty(MessageImpl.HDR_ORIGINAL_QUEUE)); + ExpiryAddressTest.log.info("acking"); m.acknowledge(); @@ -178,6 +186,9 @@ public class ExpiryAddressTest extends ServiceTestBase Assert.assertNotNull(m); + assertNotNull(m.getStringProperty(MessageImpl.HDR_ORIGINAL_ADDRESS)); + assertNotNull(m.getStringProperty(MessageImpl.HDR_ORIGINAL_QUEUE)); + ExpiryAddressTest.log.info("acking"); m.acknowledge(); @@ -206,6 +217,7 @@ public class ExpiryAddressTest extends ServiceTestBase ClientConsumer clientConsumer = clientSession.createConsumer(qName); ClientMessage m = clientConsumer.receiveImmediate(); Assert.assertNull(m); + clientConsumer.close(); } @@ -375,11 +387,11 @@ public class ExpiryAddressTest extends ServiceTestBase public void setUp() throws Exception { super.setUp(); - - Configuration configuration = createDefaultConfig(); - configuration.setSecurityEnabled(false); TransportConfiguration transportConfig = new TransportConfiguration(UnitTestCase.INVM_ACCEPTOR_FACTORY); - configuration.getAcceptorConfigurations().add(transportConfig); + + Configuration configuration = createDefaultConfig() + .setSecurityEnabled(false) + .addAcceptorConfiguration(transportConfig); server = addServer(HornetQServers.newHornetQServer(configuration, false)); // start the server server.start(); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ExpiryLargeMessageTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ExpiryLargeMessageTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ExpiryLargeMessageTest.java index 62d2b9a..0044d9d 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ExpiryLargeMessageTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ExpiryLargeMessageTest.java @@ -143,14 +143,14 @@ public class ExpiryLargeMessageTest extends ServiceTestBase Thread.sleep(1500); long timeout = System.currentTimeMillis() + 5000; - while (timeout > System.currentTimeMillis() && queueExpiry.getMessageCount() != numberOfMessages) + while (timeout > System.currentTimeMillis() && getMessageCount(queueExpiry) != numberOfMessages) { // What the Expiry Scan would be doing myQueue.expireReferences(); Thread.sleep(50); } - assertEquals(50, queueExpiry.getMessageCount()); + assertEquals(50, getMessageCount(queueExpiry)); session = sf.createSession(false, false); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/FailureDeadlockTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/FailureDeadlockTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/FailureDeadlockTest.java index 7c44021..8435a5e 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/FailureDeadlockTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/FailureDeadlockTest.java @@ -62,9 +62,9 @@ public class FailureDeadlockTest extends ServiceTestBase { super.setUp(); - Configuration conf = createDefaultConfig(); - conf.setSecurityEnabled(false); - conf.getAcceptorConfigurations().add(new TransportConfiguration(INVM_ACCEPTOR_FACTORY)); + Configuration conf = createDefaultConfig() + .setSecurityEnabled(false) + .addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY)); server = createServer(false, conf); jmsServer = new JMSServerManagerImpl(server); jmsServer.setContext(new NullInitialContext()); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/HangConsumerTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/HangConsumerTest.java index 0f0ae8f..cfe619f 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/HangConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/HangConsumerTest.java @@ -12,9 +12,9 @@ */ package org.hornetq.tests.integration.client; import org.hornetq.core.server.MessageReference; +import org.hornetq.core.server.ServerConsumer; import org.junit.Before; import org.junit.After; - import org.junit.Test; import java.lang.management.ManagementFactory; @@ -29,7 +29,6 @@ import java.util.concurrent.TimeUnit; import javax.management.MBeanServer; import org.junit.Assert; - import org.hornetq.api.core.HornetQException; import org.hornetq.api.core.Interceptor; import org.hornetq.api.core.SimpleString; @@ -56,6 +55,7 @@ import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveMessage; import org.hornetq.core.server.HornetQServer; import org.hornetq.core.server.Queue; import org.hornetq.core.server.ServerMessage; +import org.hornetq.core.server.ServerSessionFactory; import org.hornetq.core.server.impl.HornetQServerImpl; import org.hornetq.core.server.impl.QueueFactoryImpl; import org.hornetq.core.server.impl.QueueImpl; @@ -93,9 +93,8 @@ public class HangConsumerTest extends ServiceTestBase { super.setUp(); - Configuration config = createDefaultConfig(false); - - config.setMessageExpiryScanPeriod(10); + Configuration config = createDefaultConfig(false) + .setMessageExpiryScanPeriod(10); HornetQSecurityManager securityManager = new HornetQSecurityManagerImpl(); @@ -148,9 +147,9 @@ public class HangConsumerTest extends ServiceTestBase producer.send(sessionProducer.createMessage(true)); sessionProducer.commit(); - // These two operations should finish without the test hanging - queue.getMessagesAdded(1); - queue.getMessageCount(1); + // These three operations should finish without the test hanging + getMessagesAdded(queue); + getMessageCount(queue); releaseConsumers(); @@ -161,8 +160,8 @@ public class HangConsumerTest extends ServiceTestBase // a flush to guarantee any pending task is finished on flushing out delivery and pending msgs queue.flushExecutor(); - Assert.assertEquals(2, queue.getMessageCount()); - Assert.assertEquals(2, queue.getMessagesAdded()); + Assert.assertEquals(2, getMessageCount(queue)); + Assert.assertEquals(2, getMessagesAdded(queue)); ClientMessage msg = consumer.receive(5000); Assert.assertNotNull(msg); @@ -396,8 +395,8 @@ public class HangConsumerTest extends ServiceTestBase producer.send(session.createMessage(true)); session.commit(); - long queueID = server.getStorageManager().generateUniqueID(); - long txID = server.getStorageManager().generateUniqueID(); + long queueID = server.getStorageManager().generateID(); + long txID = server.getStorageManager().generateID(); // Forcing a situation where the server would unexpectedly create a duplicated queue. The server should still start normally @@ -523,6 +522,12 @@ public class HangConsumerTest extends ServiceTestBase class MyCallback implements SessionCallback { + @Override + public boolean hasCredits(ServerConsumer consumerID) + { + return true; + } + final SessionCallback targetCallback; MyCallback(SessionCallback parameter) @@ -548,7 +553,7 @@ public class HangConsumerTest extends ServiceTestBase * @see org.hornetq.spi.core.protocol.SessionCallback#sendMessage(org.hornetq.core.server.ServerMessage, long, int) */ @Override - public int sendMessage(ServerMessage message, long consumerID, int deliveryCount) + public int sendMessage(ServerMessage message, ServerConsumer consumer, int deliveryCount) { inCall.countDown(); try @@ -563,7 +568,7 @@ public class HangConsumerTest extends ServiceTestBase try { - return targetCallback.sendMessage(message, consumerID, deliveryCount); + return targetCallback.sendMessage(message, consumer, deliveryCount); } finally { @@ -576,18 +581,18 @@ public class HangConsumerTest extends ServiceTestBase * @see org.hornetq.spi.core.protocol.SessionCallback#sendLargeMessage(org.hornetq.core.server.ServerMessage, long, long, int) */ @Override - public int sendLargeMessage(ServerMessage message, long consumerID, long bodySize, int deliveryCount) + public int sendLargeMessage(ServerMessage message, ServerConsumer consumer, long bodySize, int deliveryCount) { - return targetCallback.sendLargeMessage(message, consumerID, bodySize, deliveryCount); + return targetCallback.sendLargeMessage(message, consumer, bodySize, deliveryCount); } /* (non-Javadoc) * @see org.hornetq.spi.core.protocol.SessionCallback#sendLargeMessageContinuation(long, byte[], boolean, boolean) */ @Override - public int sendLargeMessageContinuation(long consumerID, byte[] body, boolean continues, boolean requiresResponse) + public int sendLargeMessageContinuation(ServerConsumer consumer, byte[] body, boolean continues, boolean requiresResponse) { - return targetCallback.sendLargeMessageContinuation(consumerID, body, continues, requiresResponse); + return targetCallback.sendLargeMessageContinuation(consumer, body, continues, requiresResponse); } /* (non-Javadoc) @@ -618,7 +623,7 @@ public class HangConsumerTest extends ServiceTestBase } @Override - public void disconnect(long consumerId, String queueName) + public void disconnect(ServerConsumer consumerId, String queueName) { //To change body of implemented methods use File | Settings | File Templates. } @@ -639,7 +644,7 @@ public class HangConsumerTest extends ServiceTestBase } @Override - protected ServerSessionImpl internalCreateSession(String name, String username, String password, int minLargeMessageSize, RemotingConnection connection, boolean autoCommitSends, boolean autoCommitAcks, boolean preAcknowledge, boolean xa, String defaultAddress, SessionCallback callback, OperationContext context) throws Exception + protected ServerSessionImpl internalCreateSession(String name, String username, String password, int minLargeMessageSize, RemotingConnection connection, boolean autoCommitSends, boolean autoCommitAcks, boolean preAcknowledge, boolean xa, String defaultAddress, SessionCallback callback, OperationContext context, ServerSessionFactory sessionFactory) throws Exception { return new ServerSessionImpl(name, username, http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/HeuristicXATest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/HeuristicXATest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/HeuristicXATest.java index b7f94bc..b737f83 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/HeuristicXATest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/HeuristicXATest.java @@ -67,8 +67,8 @@ public class HeuristicXATest extends ServiceTestBase @Test public void testInvalidCall() throws Exception { - Configuration configuration = createDefaultConfig(); - configuration.setJMXManagementEnabled(true); + Configuration configuration = createDefaultConfig() + .setJMXManagementEnabled(true); HornetQServer server = createServer(false, configuration, mbeanServer, new HashMap<String, AddressSettings>()); server.start(); @@ -92,8 +92,8 @@ public class HeuristicXATest extends ServiceTestBase private void internalTest(final boolean isCommit) throws Exception { - Configuration configuration = createDefaultConfig(); - configuration.setJMXManagementEnabled(true); + Configuration configuration = createDefaultConfig() + .setJMXManagementEnabled(true); HornetQServer server = createServer(false, configuration, mbeanServer, new HashMap<String, AddressSettings>()); server.start(); @@ -155,7 +155,7 @@ public class HeuristicXATest extends ServiceTestBase if (isCommit) { - Assert.assertEquals(1, ((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()).getMessageCount()); + Assert.assertEquals(1, getMessageCount(((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()))); session = sf.createSession(false, false, false); @@ -170,7 +170,7 @@ public class HeuristicXATest extends ServiceTestBase session.close(); } - Assert.assertEquals(0, ((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()).getMessageCount()); + Assert.assertEquals(0, getMessageCount(((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()))); } @Test @@ -187,8 +187,8 @@ public class HeuristicXATest extends ServiceTestBase private void doHeuristicCompletionWithRestart(final boolean isCommit) throws Exception { - Configuration configuration = createDefaultConfig(); - configuration.setJMXManagementEnabled(true); + Configuration configuration = createDefaultConfig() + .setJMXManagementEnabled(true); HornetQServer server = createServer(true, configuration, mbeanServer, new HashMap<String, AddressSettings>()); server.start(); @@ -237,7 +237,7 @@ public class HeuristicXATest extends ServiceTestBase if (isCommit) { - Assert.assertEquals(1, ((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()).getMessageCount()); + Assert.assertEquals(1, getMessageCount(((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()))); session = sf.createSession(false, false, false); @@ -252,7 +252,7 @@ public class HeuristicXATest extends ServiceTestBase session.close(); } - Assert.assertEquals(0, ((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()).getMessageCount()); + Assert.assertEquals(0, getMessageCount(((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()))); server.stop(); @@ -287,8 +287,8 @@ public class HeuristicXATest extends ServiceTestBase private void doRecoverHeuristicCompletedTxWithRestart(final boolean heuristicCommit) throws Exception { - Configuration configuration = createDefaultConfig(); - configuration.setJMXManagementEnabled(true); + Configuration configuration = createDefaultConfig() + .setJMXManagementEnabled(true); HornetQServer server = createServer(true, configuration, mbeanServer, new HashMap<String, AddressSettings>()); server.start(); @@ -337,7 +337,7 @@ public class HeuristicXATest extends ServiceTestBase if (heuristicCommit) { - Assert.assertEquals(1, ((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()).getMessageCount()); + Assert.assertEquals(1, getMessageCount(((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()))); session = sf.createSession(false, false, false); @@ -352,7 +352,7 @@ public class HeuristicXATest extends ServiceTestBase session.close(); } - Assert.assertEquals(0, ((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()).getMessageCount()); + Assert.assertEquals(0, getMessageCount(((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()))); server.stop(); @@ -396,8 +396,8 @@ public class HeuristicXATest extends ServiceTestBase private void doForgetHeuristicCompletedTxAndRestart(final boolean heuristicCommit) throws Exception { - Configuration configuration = createDefaultConfig(); - configuration.setJMXManagementEnabled(true); + Configuration configuration = createDefaultConfig() + .setJMXManagementEnabled(true); HornetQServer server = createServer(true, configuration, mbeanServer, new HashMap<String, AddressSettings>()); server.start(); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/HornetQCrashTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/HornetQCrashTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/HornetQCrashTest.java index ace6eba..55dc38e 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/HornetQCrashTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/HornetQCrashTest.java @@ -56,10 +56,10 @@ public class HornetQCrashTest extends UnitTestCase @Test public void testHang() throws Exception { - Configuration configuration = createDefaultConfig(); - configuration.setPersistenceEnabled(false); - configuration.setSecurityEnabled(false); - configuration.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName())); + Configuration configuration = createDefaultConfig() + .setPersistenceEnabled(false) + .setSecurityEnabled(false) + .addAcceptorConfiguration(new TransportConfiguration(InVMAcceptorFactory.class.getName())); server = addServer(HornetQServers.newHornetQServer(configuration)); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/IncompatibleVersionTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/IncompatibleVersionTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/IncompatibleVersionTest.java index aaee7fb..ec7f135 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/IncompatibleVersionTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/IncompatibleVersionTest.java @@ -252,9 +252,9 @@ public class IncompatibleVersionTest extends ServiceTestBase { public void perform(String startedString) throws Exception { - Configuration conf = new ConfigurationImpl(); - conf.setSecurityEnabled(false); - conf.getAcceptorConfigurations().add(new TransportConfiguration("org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory")); + Configuration conf = new ConfigurationImpl() + .setSecurityEnabled(false) + .addAcceptorConfiguration(new TransportConfiguration("org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory")); HornetQServer server = HornetQServers.newHornetQServer(conf, false); server.start(); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java index ece72e3..5365012 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java @@ -524,7 +524,7 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase } @Override - protected void postAcknowledge(final MessageReference ref) + public void postAcknowledge(final MessageReference ref) { System.out.println("Ignoring postACK on message " + ref); } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/JournalCrashTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/JournalCrashTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/JournalCrashTest.java index ecbb925..4ad3f26 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/JournalCrashTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/JournalCrashTest.java @@ -59,11 +59,11 @@ public class JournalCrashTest extends ServiceTestBase protected void startServer() throws Exception { - Configuration config = createDefaultConfig(); - config.setJournalFileSize(HornetQDefaultConfiguration.getDefaultJournalFileSize()); - config.setJournalCompactMinFiles(HornetQDefaultConfiguration.getDefaultJournalCompactMinFiles()); - config.setJournalCompactPercentage(HornetQDefaultConfiguration.getDefaultJournalCompactPercentage()); - config.setJournalMinFiles(2); + Configuration config = createDefaultConfig() + .setJournalFileSize(HornetQDefaultConfiguration.getDefaultJournalFileSize()) + .setJournalCompactMinFiles(HornetQDefaultConfiguration.getDefaultJournalCompactMinFiles()) + .setJournalCompactPercentage(HornetQDefaultConfiguration.getDefaultJournalCompactPercentage()) + .setJournalMinFiles(2); server = super.createServer(true, config); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/LargeMessageTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/LargeMessageTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/LargeMessageTest.java index 25f41a0..d3be7b8 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/LargeMessageTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/LargeMessageTest.java @@ -301,9 +301,8 @@ public class LargeMessageTest extends LargeMessageTestBase final int PAGE_SIZE = 10 * 1024; final int MESSAGE_SIZE = 1024; // 1k - Configuration config = createDefaultConfig(); - - config.setJournalSyncNonTransactional(false); + Configuration config = createDefaultConfig() + .setJournalSyncNonTransactional(false); HornetQServer server = createServer(true, config, @@ -380,11 +379,10 @@ public class LargeMessageTest extends LargeMessageTestBase ClientSession session = null; - Configuration config = createDefaultConfig(isNetty()); - config.setJournalFileSize(journalsize); - - config.setJournalBufferSize_AIO(10 * 1024); - config.setJournalBufferSize_NIO(10 * 1024); + Configuration config = createDefaultConfig(isNetty()) + .setJournalFileSize(journalsize) + .setJournalBufferSize_AIO(10 * 1024) + .setJournalBufferSize_NIO(10 * 1024); HornetQServer server = createServer(true, config); @@ -2504,7 +2502,7 @@ public class LargeMessageTest extends LargeMessageTestBase Assert.assertEquals(0, ((Queue) server.getPostOffice().getBinding(ADDRESS).getBindable()).getDeliveringCount()); Assert.assertEquals(0, - ((Queue) server.getPostOffice().getBinding(ADDRESS).getBindable()).getMessageCount()); + getMessageCount(((Queue) server.getPostOffice().getBinding(ADDRESS).getBindable()))); } finally @@ -2609,7 +2607,7 @@ public class LargeMessageTest extends LargeMessageTestBase Assert.assertEquals(0, ((Queue) server.getPostOffice().getBinding(ADDRESS).getBindable()).getDeliveringCount()); Assert.assertEquals(0, - ((Queue) server.getPostOffice().getBinding(ADDRESS).getBindable()).getMessageCount()); + getMessageCount(((Queue) server.getPostOffice().getBinding(ADDRESS).getBindable()))); } finally @@ -2942,7 +2940,7 @@ public class LargeMessageTest extends LargeMessageTestBase Assert.assertEquals(0, ((Queue) server.getPostOffice().getBinding(ADDRESS).getBindable()).getDeliveringCount()); Assert.assertEquals(0, - ((Queue) server.getPostOffice().getBinding(ADDRESS).getBindable()).getMessageCount()); + getMessageCount(((Queue) server.getPostOffice().getBinding(ADDRESS).getBindable()))); } finally @@ -3028,7 +3026,7 @@ public class LargeMessageTest extends LargeMessageTestBase Assert.assertEquals(0, ((Queue) server.getPostOffice().getBinding(ADDRESS).getBindable()).getDeliveringCount()); Assert.assertEquals(0, - ((Queue) server.getPostOffice().getBinding(ADDRESS).getBindable()).getMessageCount()); + getMessageCount(((Queue) server.getPostOffice().getBinding(ADDRESS).getBindable()))); log.debug("Thread done"); } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/MessageExpirationTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/MessageExpirationTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/MessageExpirationTest.java index 9e567da..2a0ea06 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/MessageExpirationTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/MessageExpirationTest.java @@ -157,7 +157,7 @@ public class MessageExpirationTest extends ServiceTestBase Thread.sleep(500); Assert.assertEquals(0, ((Queue)server.getPostOffice().getBinding(queue).getBindable()).getDeliveringCount()); - Assert.assertEquals(0, ((Queue)server.getPostOffice().getBinding(queue).getBindable()).getMessageCount()); + Assert.assertEquals(0, getMessageCount(((Queue)server.getPostOffice().getBinding(queue).getBindable()))); ClientMessage message2 = consumer.receiveImmediate(); Assert.assertNull(message2); @@ -188,7 +188,7 @@ public class MessageExpirationTest extends ServiceTestBase Assert.assertNull(message2); Assert.assertEquals(0, ((Queue)server.getPostOffice().getBinding(queue).getBindable()).getDeliveringCount()); - Assert.assertEquals(0, ((Queue)server.getPostOffice().getBinding(queue).getBindable()).getMessageCount()); + Assert.assertEquals(0, getMessageCount(((Queue)server.getPostOffice().getBinding(queue).getBindable()))); consumer.close(); session.deleteQueue(queue); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/MessageGroupingConnectionFactoryTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/MessageGroupingConnectionFactoryTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/MessageGroupingConnectionFactoryTest.java index b7b167d..93ee1b9 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/MessageGroupingConnectionFactoryTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/MessageGroupingConnectionFactoryTest.java @@ -121,11 +121,11 @@ public class MessageGroupingConnectionFactoryTest extends UnitTestCase public void setUp() throws Exception { super.setUp(); - - Configuration configuration = createDefaultConfig(); - configuration.setSecurityEnabled(false); TransportConfiguration transportConfig = new TransportConfiguration(UnitTestCase.INVM_ACCEPTOR_FACTORY); - configuration.getAcceptorConfigurations().add(transportConfig); + + Configuration configuration = createDefaultConfig() + .setSecurityEnabled(false) + .addAcceptorConfiguration(transportConfig); server = addServer(HornetQServers.newHornetQServer(configuration, false)); // start the server server.start(); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/MessageGroupingTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/MessageGroupingTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/MessageGroupingTest.java index edcc2f2..eff418a 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/MessageGroupingTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/MessageGroupingTest.java @@ -610,11 +610,11 @@ public class MessageGroupingTest extends UnitTestCase public void setUp() throws Exception { super.setUp(); - - Configuration configuration = createDefaultConfig(); - configuration.setSecurityEnabled(false); TransportConfiguration transportConfig = new TransportConfiguration(UnitTestCase.INVM_ACCEPTOR_FACTORY); - configuration.getAcceptorConfigurations().add(transportConfig); + + Configuration configuration = createDefaultConfig() + .setSecurityEnabled(false) + .addAcceptorConfiguration(transportConfig); server = addServer(HornetQServers.newHornetQServer(configuration, false)); // start the server server.start(); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/MessagePriorityTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/MessagePriorityTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/MessagePriorityTest.java index 2ba1786..14fb379 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/MessagePriorityTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/MessagePriorityTest.java @@ -337,9 +337,9 @@ public class MessagePriorityTest extends UnitTestCase { super.setUp(); - Configuration config = createDefaultConfig(); - config.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getCanonicalName())); - config.setSecurityEnabled(false); + Configuration config = createDefaultConfig() + .addAcceptorConfiguration(new TransportConfiguration(InVMAcceptorFactory.class.getCanonicalName())) + .setSecurityEnabled(false); server = addServer(HornetQServers.newHornetQServer(config, false)); server.start(); locator = http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/NIOvsOIOTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/NIOvsOIOTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/NIOvsOIOTest.java index 3b26f2b..dd752a8 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/NIOvsOIOTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/NIOvsOIOTest.java @@ -153,9 +153,8 @@ public class NIOvsOIOTest extends UnitTestCase { String acceptorFactoryClassName = "org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory"; - Configuration conf = createDefaultConfig(); - - conf.setSecurityEnabled(false); + Configuration conf = createDefaultConfig() + .setSecurityEnabled(false); Map<String, Object> params = new HashMap<String, Object>(); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/NettyConsumerTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/NettyConsumerTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/NettyConsumerTest.java deleted file mode 100644 index 6f21b7a..0000000 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/NettyConsumerTest.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package org.hornetq.tests.integration.client; - -/** - * A NettyConsumerTest - * - * @author clebertsuconic - * - * - */ -public class NettyConsumerTest extends ConsumerTest -{ - protected boolean isNetty() - { - return true; - } - -} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/NewDeadLetterAddressTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/NewDeadLetterAddressTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/NewDeadLetterAddressTest.java index 67c0c03..99ea6ba 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/NewDeadLetterAddressTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/NewDeadLetterAddressTest.java @@ -75,11 +75,11 @@ public class NewDeadLetterAddressTest extends UnitTestCase public void setUp() throws Exception { super.setUp(); - - Configuration configuration = createDefaultConfig(); - configuration.setSecurityEnabled(false); TransportConfiguration transportConfig = new TransportConfiguration(UnitTestCase.INVM_ACCEPTOR_FACTORY); - configuration.getAcceptorConfigurations().add(transportConfig); + + Configuration configuration = createDefaultConfig() + .setSecurityEnabled(false) + .addAcceptorConfiguration(transportConfig); server = addServer(HornetQServers.newHornetQServer(configuration, false)); // start the server server.start(); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/OrderTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/OrderTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/OrderTest.java index fac7099..621073b 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/OrderTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/OrderTest.java @@ -12,6 +12,9 @@ */ package org.hornetq.tests.integration.client; +import java.util.Arrays; +import java.util.Collection; + import org.hornetq.api.core.client.ClientConsumer; import org.hornetq.api.core.client.ClientMessage; import org.hornetq.api.core.client.ClientProducer; @@ -24,19 +27,38 @@ import org.hornetq.tests.util.ServiceTestBase; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; /** * A OrderTest * * @author <mailto:[email protected]">Clebert Suconic</a> */ +@RunWith(Parameterized.class) public class OrderTest extends ServiceTestBase { + private boolean persistent; + private HornetQServer server; private ServerLocator locator; + public OrderTest(boolean persistent) + { + this.persistent = persistent; + } + @Parameterized.Parameters(name = "persistent={0}") + public static Collection<Object[]> getParams() + { + return Arrays.asList(new Object[][]{ + {true}, + {false} + }); + } + + @Override @Before public void setUp() throws Exception @@ -53,18 +75,7 @@ public class OrderTest extends ServiceTestBase // Public -------------------------------------------------------- @Test - public void testSimpleOrderNoStorage() throws Exception - { - doTestSimpleOrder(false); - } - - @Test - public void testSimpleOrderPersistence() throws Exception - { - doTestSimpleOrder(true); - } - - public void doTestSimpleOrder(final boolean persistent) throws Exception + public void testSimpleStorage() throws Exception { server = createServer(persistent, true); server.start(); @@ -138,18 +149,7 @@ public class OrderTest extends ServiceTestBase } @Test - public void testOrderOverSessionClosePersistent() throws Exception - { - doTestOverCancel(true); - } - - @Test - public void testOrderOverSessionCloseNonPersistent() throws Exception - { - doTestOverCancel(false); - } - - public void doTestOverCancel(final boolean persistent) throws Exception + public void testOrderOverSessionClose() throws Exception { server = createServer(persistent, true); @@ -210,19 +210,7 @@ public class OrderTest extends ServiceTestBase } @Test - public void testOrderOverSessionClosePersistentWithRedeliveryDelay() throws Exception - { - doTestOverCancelWithRedelivery(true); - } - - @Test - public void testOrderOverSessionCloseNonPersistentWithRedeliveryDelay() throws Exception - { - doTestOverCancelWithRedelivery(false); - } - - - public void doTestOverCancelWithRedelivery(final boolean persistent) throws Exception + public void testOrderOverSessionCloseWithRedeliveryDelay() throws Exception { server = createServer(persistent, true); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingOrderTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingOrderTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingOrderTest.java index 5d8f379..6da8ca3 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingOrderTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingOrderTest.java @@ -92,9 +92,8 @@ public class PagingOrderTest extends ServiceTestBase { boolean persistentMessages = true; - Configuration config = createDefaultConfig(); - - config.setJournalSyncNonTransactional(false); + Configuration config = createDefaultConfig() + .setJournalSyncNonTransactional(false); HornetQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, new HashMap<String, AddressSettings>()); @@ -202,9 +201,8 @@ public class PagingOrderTest extends ServiceTestBase { boolean persistentMessages = true; - Configuration config = createDefaultConfig(); - - config.setJournalSyncNonTransactional(false); + Configuration config = createDefaultConfig() + .setJournalSyncNonTransactional(false); HornetQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, new HashMap<String, AddressSettings>()); @@ -304,10 +302,10 @@ public class PagingOrderTest extends ServiceTestBase assertEquals(0, errors.get()); - assertEquals(numberOfMessages, q2.getMessageCount()); - assertEquals(numberOfMessages, q2.getMessagesAdded()); - assertEquals(0, q1.getMessageCount()); - assertEquals(numberOfMessages, q1.getMessagesAdded()); + assertEquals(numberOfMessages, getMessageCount(q2)); + assertEquals(numberOfMessages, getMessagesAdded(q2)); + assertEquals(0, getMessageCount(q1)); + assertEquals(numberOfMessages, getMessagesAdded(q1)); session.close(); sf.close(); @@ -343,11 +341,11 @@ public class PagingOrderTest extends ServiceTestBase assertNotNull(q2); - assertEquals("q2 msg count", numberOfMessages, q2.getMessageCount()); - assertEquals("q2 msgs added", numberOfMessages, q2.getMessagesAdded()); - assertEquals("q1 msg count", 0, q1.getMessageCount()); + assertEquals("q2 msg count", numberOfMessages, getMessageCount(q2)); + assertEquals("q2 msgs added", numberOfMessages, getMessagesAdded(q2)); + assertEquals("q1 msg count", 0, getMessageCount(q1)); // 0, since nothing was sent to the queue after the server was restarted - assertEquals("q1 msgs added", 0, q1.getMessagesAdded()); + assertEquals("q1 msgs added", 0, getMessagesAdded(q1)); } @@ -356,9 +354,8 @@ public class PagingOrderTest extends ServiceTestBase { boolean persistentMessages = true; - Configuration config = createDefaultConfig(); - - config.setJournalSyncNonTransactional(false); + Configuration config = createDefaultConfig() + .setJournalSyncNonTransactional(false); HornetQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, new HashMap<String, AddressSettings>()); @@ -455,16 +452,15 @@ public class PagingOrderTest extends ServiceTestBase assertEquals(0, errors.get()); long timeout = System.currentTimeMillis() + 10000; - while (numberOfMessages - 100 != q1.getMessageCount() && System.currentTimeMillis() < timeout) + while (numberOfMessages - 100 != getMessageCount(q1) && System.currentTimeMillis() < timeout) { Thread.sleep(500); } - assertEquals(numberOfMessages, q2.getMessageCount()); - assertEquals(numberOfMessages, q2.getMessagesAdded()); - assertEquals(numberOfMessages - 100, q1.getMessageCount()); - assertEquals(numberOfMessages, q2.getMessagesAdded()); + assertEquals(numberOfMessages, getMessageCount(q2)); + assertEquals(numberOfMessages, getMessagesAdded(q2)); + assertEquals(numberOfMessages - 100, getMessageCount(q1)); } @Test @@ -472,9 +468,8 @@ public class PagingOrderTest extends ServiceTestBase { boolean persistentMessages = true; - Configuration config = createDefaultConfig(); - - config.setJournalSyncNonTransactional(false); + Configuration config = createDefaultConfig() + .setJournalSyncNonTransactional(false); HornetQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, new HashMap<String, AddressSettings>()); @@ -573,9 +568,8 @@ public class PagingOrderTest extends ServiceTestBase { boolean persistentMessages = true; - Configuration config = createDefaultConfig(); - - config.setJournalSyncNonTransactional(false); + Configuration config = createDefaultConfig() + .setJournalSyncNonTransactional(false); HornetQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, new HashMap<String, AddressSettings>()); @@ -721,9 +715,8 @@ public class PagingOrderTest extends ServiceTestBase public void testPagingOverCreatedDestinationTopics() throws Exception { - Configuration config = createDefaultConfig(); - - config.setJournalSyncNonTransactional(false); + Configuration config = createDefaultConfig() + .setJournalSyncNonTransactional(false); HornetQServer server = createServer(true, config, PAGE_SIZE, -1, new HashMap<String, AddressSettings>()); @@ -748,7 +741,10 @@ public class PagingOrderTest extends ServiceTestBase 1000, 0, false, - "PAGE"); + "PAGE", + -1, + 10, + "KILL"); HornetQJMSConnectionFactory cf = (HornetQJMSConnectionFactory) HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(INVM_CONNECTOR_FACTORY)); @@ -796,9 +792,8 @@ public class PagingOrderTest extends ServiceTestBase public void testPagingOverCreatedDestinationQueues() throws Exception { - Configuration config = createDefaultConfig(); - - config.setJournalSyncNonTransactional(false); + Configuration config = createDefaultConfig() + .setJournalSyncNonTransactional(false); HornetQServer server = createServer(true, config, -1, -1, AddressFullMessagePolicy.BLOCK, new HashMap<String, AddressSettings>()); @@ -821,7 +816,10 @@ public class PagingOrderTest extends ServiceTestBase 1000, 0, false, - "PAGE"); + "PAGE", + -1, + 10, + "KILL"); jmsServer.createQueue(true, "Q1", null, true, "/queue/Q1"); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingSyncTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingSyncTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingSyncTest.java index a038863..28fb180 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingSyncTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingSyncTest.java @@ -53,9 +53,8 @@ public class PagingSyncTest extends ServiceTestBase { boolean persistentMessages = true; - Configuration config = createDefaultConfig(); - - config.setJournalSyncNonTransactional(false); + Configuration config = createDefaultConfig() + .setJournalSyncNonTransactional(false); HornetQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, new HashMap<String, AddressSettings>());
