http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ClusterConnectionControl2Test.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ClusterConnectionControl2Test.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ClusterConnectionControl2Test.java index 2091296..43e6010 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ClusterConnectionControl2Test.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ClusterConnectionControl2Test.java @@ -118,44 +118,54 @@ public class ClusterConnectionControl2Test extends ManagementTestBase TransportConfiguration connectorConfig_1 = new TransportConfiguration(NETTY_CONNECTOR_FACTORY, acceptorParams_1); TransportConfiguration connectorConfig_0 = new TransportConfiguration(NETTY_CONNECTOR_FACTORY); - CoreQueueConfiguration queueConfig = new CoreQueueConfiguration(RandomUtil.randomString(), - RandomUtil.randomString(), - null, - false); + CoreQueueConfiguration queueConfig = new CoreQueueConfiguration() + .setAddress(RandomUtil.randomString()) + .setName(RandomUtil.randomString()) + .setDurable(false); List<String> connectorInfos = new ArrayList<String>(); connectorInfos.add("netty"); - BroadcastGroupConfiguration broadcastGroupConfig = new BroadcastGroupConfiguration(discoveryName, - 250, - connectorInfos, - new UDPBroadcastGroupConfiguration(groupAddress, groupPort, null, -1)); - DiscoveryGroupConfiguration discoveryGroupConfig = new DiscoveryGroupConfiguration(discoveryName, - 0, - 0, - new UDPBroadcastGroupConfiguration(groupAddress, groupPort, null, -1)); - - Configuration conf_1 = createBasicConfig(); - conf_1.setSecurityEnabled(false); - conf_1.setJMXManagementEnabled(true); - - clusterConnectionConfig_0 = - new ClusterConnectionConfiguration(clusterName, queueConfig.getAddress(), "netty", 1000, false, false, - 1, 1024, - discoveryName); - conf_1.getClusterConfigurations().add(clusterConnectionConfig_0); - conf_1.getAcceptorConfigurations().add(acceptorConfig_1); - conf_1.getConnectorConfigurations().put("netty", connectorConfig_1); - conf_1.getQueueConfigurations().add(queueConfig); - conf_1.getDiscoveryGroupConfigurations().put(discoveryName, discoveryGroupConfig); - conf_1.getBroadcastGroupConfigurations().add(broadcastGroupConfig); - - Configuration conf_0 = createBasicConfig(1); - conf_0.setSecurityEnabled(false); - conf_0.setJMXManagementEnabled(true); - conf_0.getAcceptorConfigurations().add(acceptorConfig_0); - conf_0.getConnectorConfigurations().put("netty", connectorConfig_0); - conf_0.getClusterConfigurations().add(clusterConnectionConfig_0); - conf_0.getDiscoveryGroupConfigurations().put(discoveryName, discoveryGroupConfig); - conf_0.getBroadcastGroupConfigurations().add(broadcastGroupConfig); + + BroadcastGroupConfiguration broadcastGroupConfig = new BroadcastGroupConfiguration() + .setName(discoveryName) + .setBroadcastPeriod(250) + .setConnectorInfos(connectorInfos) + .setEndpointFactoryConfiguration(new UDPBroadcastGroupConfiguration() + .setGroupAddress(groupAddress) + .setGroupPort(groupPort)); + + DiscoveryGroupConfiguration discoveryGroupConfig = new DiscoveryGroupConfiguration() + .setName(discoveryName) + .setRefreshTimeout(0) + .setDiscoveryInitialWaitTimeout(0) + .setBroadcastEndpointFactoryConfiguration(new UDPBroadcastGroupConfiguration() + .setGroupAddress(groupAddress) + .setGroupPort(groupPort)); + + clusterConnectionConfig_0 = new ClusterConnectionConfiguration() + .setName(clusterName) + .setAddress(queueConfig.getAddress()) + .setConnectorName("netty") + .setRetryInterval(1000) + .setDuplicateDetection(false) + .setForwardWhenNoConsumers(false) + .setMaxHops(1) + .setConfirmationWindowSize(1024) + .setDiscoveryGroupName(discoveryName); + + Configuration conf_1 = createBasicConfig() + .addClusterConfiguration(clusterConnectionConfig_0) + .addAcceptorConfiguration(acceptorConfig_1) + .addConnectorConfiguration("netty", connectorConfig_1) + .addQueueConfiguration(queueConfig) + .addDiscoveryGroupConfiguration(discoveryName, discoveryGroupConfig) + .addBroadcastGroupConfiguration(broadcastGroupConfig); + + Configuration conf_0 = createBasicConfig(1) + .addClusterConfiguration(clusterConnectionConfig_0) + .addAcceptorConfiguration(acceptorConfig_0) + .addConnectorConfiguration("netty", connectorConfig_0) + .addDiscoveryGroupConfiguration(discoveryName, discoveryGroupConfig) + .addBroadcastGroupConfiguration(broadcastGroupConfig); mbeanServer_1 = MBeanServerFactory.createMBeanServer(); server1 = addServer(HornetQServers.newHornetQServer(conf_1, mbeanServer_1, false));
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ClusterConnectionControlTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ClusterConnectionControlTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ClusterConnectionControlTest.java index 7a9284f..79985af 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ClusterConnectionControlTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ClusterConnectionControlTest.java @@ -13,7 +13,6 @@ package org.hornetq.tests.integration.management; import org.junit.Before; import org.junit.After; - import org.junit.Test; import java.util.ArrayList; @@ -25,13 +24,12 @@ import javax.management.MBeanServer; import javax.management.MBeanServerFactory; import org.junit.Assert; - import org.hornetq.api.core.DiscoveryGroupConfiguration; import org.hornetq.api.core.SimpleString; import org.hornetq.api.core.TransportConfiguration; import org.hornetq.api.core.UDPBroadcastGroupConfiguration; import org.hornetq.api.core.management.ClusterConnectionControl; -import org.hornetq.api.core.management.NotificationType; +import org.hornetq.api.core.management.CoreNotificationType; import org.hornetq.api.core.management.ObjectNameBuilder; import org.hornetq.core.config.ClusterConnectionConfiguration; import org.hornetq.core.config.Configuration; @@ -167,8 +165,8 @@ public class ClusterConnectionControlTest extends ManagementTestBase clusterConnectionControl.stop(); Assert.assertTrue(notifListener.getNotifications().size() > 0); - Notification notif = notifListener.getNotifications().get(0); - Assert.assertEquals(NotificationType.CLUSTER_CONNECTION_STOPPED, notif.getType()); + Notification notif = getFirstNotificationOfType(notifListener.getNotifications(), CoreNotificationType.CLUSTER_CONNECTION_STOPPED); + Assert.assertNotNull(notif); Assert.assertEquals(clusterConnectionControl.getName(), notif.getProperties() .getSimpleStringProperty(new SimpleString("name")) .toString()); @@ -176,13 +174,31 @@ public class ClusterConnectionControlTest extends ManagementTestBase clusterConnectionControl.start(); Assert.assertTrue(notifListener.getNotifications().size() > 0); - notif = notifListener.getNotifications().get(1); - Assert.assertEquals(NotificationType.CLUSTER_CONNECTION_STARTED, notif.getType()); + notif = getFirstNotificationOfType(notifListener.getNotifications(), CoreNotificationType.CLUSTER_CONNECTION_STARTED); + Assert.assertNotNull(notif); Assert.assertEquals(clusterConnectionControl.getName(), notif.getProperties() .getSimpleStringProperty(new SimpleString("name")) .toString()); } + private Notification getFirstNotificationOfType(List<Notification> notifications, CoreNotificationType type) + { + Notification result = null; + + // the notifications can change while we're looping + List<Notification> notificationsClone = new ArrayList<>(notifications); + + for (Notification notification : notificationsClone) + { + if (notification.getType().equals(type)) + { + result = notification; + } + } + + return result; + } + // Package protected --------------------------------------------- // Protected ----------------------------------------------------- @@ -203,46 +219,55 @@ public class ClusterConnectionControlTest extends ManagementTestBase acceptorParams, RandomUtil.randomString()); - CoreQueueConfiguration queueConfig = new CoreQueueConfiguration(RandomUtil.randomString(), - RandomUtil.randomString(), - null, - false); + CoreQueueConfiguration queueConfig = new CoreQueueConfiguration() + .setAddress(RandomUtil.randomString()) + .setName(RandomUtil.randomString()) + .setDurable(false); List<String> connectors = new ArrayList<String>(); connectors.add(connectorConfig.getName()); String discoveryGroupName = RandomUtil.randomString(); - DiscoveryGroupConfiguration discoveryGroupConfig = - new DiscoveryGroupConfiguration(discoveryGroupName, 500, 0, - new UDPBroadcastGroupConfiguration("230.1.2.3", 6745, null, -1)); - - Configuration conf_1 = createBasicConfig(); - conf_1.setSecurityEnabled(false); - conf_1.setJMXManagementEnabled(true); - conf_1.getAcceptorConfigurations().add(acceptorConfig); - conf_1.getQueueConfigurations().add(queueConfig); - - Configuration conf_0 = createBasicConfig(); - clusterConnectionConfig1 = - new ClusterConnectionConfiguration(RandomUtil.randomString(), queueConfig.getAddress(), - connectorConfig.getName(), RandomUtil.randomPositiveLong(), - RandomUtil.randomBoolean(), RandomUtil.randomBoolean(), - RandomUtil.randomPositiveInt(), RandomUtil.randomPositiveInt(), - connectors, false); - clusterConnectionConfig2 = - new ClusterConnectionConfiguration(RandomUtil.randomString(), queueConfig.getAddress(), - connectorConfig.getName(), RandomUtil.randomPositiveLong(), - RandomUtil.randomBoolean(), RandomUtil.randomBoolean(), - RandomUtil.randomPositiveInt(), RandomUtil.randomPositiveInt(), - discoveryGroupName); - - conf_0.setSecurityEnabled(false); - conf_0.setJMXManagementEnabled(true); - conf_0.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName())); - conf_0.getConnectorConfigurations().put(connectorConfig.getName(), connectorConfig); - conf_0.getClusterConfigurations().add(clusterConnectionConfig1); - conf_0.getClusterConfigurations().add(clusterConnectionConfig2); - conf_0.getDiscoveryGroupConfigurations().put(discoveryGroupName, discoveryGroupConfig); + DiscoveryGroupConfiguration discoveryGroupConfig = new DiscoveryGroupConfiguration() + .setName(discoveryGroupName) + .setRefreshTimeout(500) + .setDiscoveryInitialWaitTimeout(0) + .setBroadcastEndpointFactoryConfiguration(new UDPBroadcastGroupConfiguration() + .setGroupAddress("230.1.2.3") + .setGroupPort(6745)); + + Configuration conf_1 = createBasicConfig() + .addAcceptorConfiguration(acceptorConfig) + .addQueueConfiguration(queueConfig); + + clusterConnectionConfig1 = new ClusterConnectionConfiguration() + .setName(RandomUtil.randomString()) + .setAddress(queueConfig.getAddress()) + .setConnectorName(connectorConfig.getName()) + .setRetryInterval(RandomUtil.randomPositiveLong()) + .setDuplicateDetection(RandomUtil.randomBoolean()) + .setForwardWhenNoConsumers(RandomUtil.randomBoolean()) + .setMaxHops(RandomUtil.randomPositiveInt()) + .setConfirmationWindowSize(RandomUtil.randomPositiveInt()) + .setStaticConnectors(connectors); + + clusterConnectionConfig2 = new ClusterConnectionConfiguration() + .setName(RandomUtil.randomString()) + .setAddress(queueConfig.getAddress()) + .setConnectorName(connectorConfig.getName()) + .setRetryInterval(RandomUtil.randomPositiveLong()) + .setDuplicateDetection(RandomUtil.randomBoolean()) + .setForwardWhenNoConsumers(RandomUtil.randomBoolean()) + .setMaxHops(RandomUtil.randomPositiveInt()) + .setConfirmationWindowSize(RandomUtil.randomPositiveInt()) + .setDiscoveryGroupName(discoveryGroupName); + + Configuration conf_0 = createBasicConfig() + .addAcceptorConfiguration(new TransportConfiguration(InVMAcceptorFactory.class.getName())) + .addConnectorConfiguration(connectorConfig.getName(), connectorConfig) + .addClusterConfiguration(clusterConnectionConfig1) + .addClusterConfiguration(clusterConnectionConfig2) + .addDiscoveryGroupConfiguration(discoveryGroupName, discoveryGroupConfig); mbeanServer_1 = MBeanServerFactory.createMBeanServer(); server_1 = addServer(HornetQServers.newHornetQServer(conf_1, mbeanServer_1, false)); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/DivertControlTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/DivertControlTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/DivertControlTest.java index 1998847..ef1e312 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/DivertControlTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/DivertControlTest.java @@ -87,31 +87,28 @@ public class DivertControlTest extends ManagementTestBase TransportConfiguration connectorConfig = new TransportConfiguration(InVMConnectorFactory.class.getName()); - CoreQueueConfiguration queueConfig = new CoreQueueConfiguration(RandomUtil.randomString(), - RandomUtil.randomString(), - null, - false); - CoreQueueConfiguration forwardQueueConfig = new CoreQueueConfiguration(RandomUtil.randomString(), - RandomUtil.randomString(), - null, - false); - - divertConfig = new DivertConfiguration(RandomUtil.randomString(), - RandomUtil.randomString(), - queueConfig.getAddress(), - forwardQueueConfig.getAddress(), - RandomUtil.randomBoolean(), - null, - null); - Configuration conf = createBasicConfig(); - conf.setSecurityEnabled(false); - conf.setJMXManagementEnabled(true); - conf.getQueueConfigurations().add(queueConfig); - conf.getQueueConfigurations().add(forwardQueueConfig); - conf.getDivertConfigurations().add(divertConfig); - - conf.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName())); - conf.getConnectorConfigurations().put(connectorConfig.getName(), connectorConfig); + CoreQueueConfiguration queueConfig = new CoreQueueConfiguration() + .setAddress(RandomUtil.randomString()) + .setName(RandomUtil.randomString()) + .setDurable(false); + CoreQueueConfiguration forwardQueueConfig = new CoreQueueConfiguration() + .setAddress(RandomUtil.randomString()) + .setName(RandomUtil.randomString()) + .setDurable(false); + + divertConfig = new DivertConfiguration() + .setName(RandomUtil.randomString()) + .setRoutingName(RandomUtil.randomString()) + .setAddress(queueConfig.getAddress()) + .setForwardingAddress(forwardQueueConfig.getAddress()) + .setExclusive(RandomUtil.randomBoolean()); + + Configuration conf = createBasicConfig() + .addQueueConfiguration(queueConfig) + .addQueueConfiguration(forwardQueueConfig) + .addDivertConfiguration(divertConfig) + .addAcceptorConfiguration(new TransportConfiguration(InVMAcceptorFactory.class.getName())) + .addConnectorConfiguration(connectorConfig.getName(), connectorConfig); service = HornetQServers.newHornetQServer(conf, mbeanServer, false); service.start(); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/HornetQServerControlTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/HornetQServerControlTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/HornetQServerControlTest.java index 00d05d6..12add43 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/HornetQServerControlTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/HornetQServerControlTest.java @@ -40,6 +40,7 @@ import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory; import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory; import org.hornetq.core.server.HornetQServer; import org.hornetq.core.server.HornetQServers; +import org.hornetq.core.settings.impl.SlowConsumerPolicy; import org.hornetq.core.transaction.impl.XidImpl; import org.hornetq.tests.util.RandomUtil; import org.hornetq.tests.util.UnitTestCase; @@ -101,8 +102,6 @@ public class HornetQServerControlTest extends ManagementTestBase Assert.assertEquals(conf.isClustered(), serverControl.isClustered()); Assert.assertEquals(conf.isPersistDeliveryCountBeforeDelivery(), serverControl.isPersistDeliveryCountBeforeDelivery()); - Assert.assertEquals(conf.getHAPolicy().isBackup(), serverControl.isBackup()); - Assert.assertEquals(conf.getHAPolicy().isSharedStore(), serverControl.isSharedStore()); Assert.assertEquals(conf.getScheduledThreadPoolMaxSize(), serverControl.getScheduledThreadPoolMaxSize()); Assert.assertEquals(conf.getThreadPoolMaxSize(), serverControl.getThreadPoolMaxSize()); Assert.assertEquals(conf.getSecurityInvalidationInterval(), serverControl.getSecurityInvalidationInterval()); @@ -144,7 +143,6 @@ public class HornetQServerControlTest extends ManagementTestBase Assert.assertEquals(conf.getJournalCompactMinFiles(), serverControl.getJournalCompactMinFiles()); Assert.assertEquals(conf.getJournalCompactPercentage(), serverControl.getJournalCompactPercentage()); Assert.assertEquals(conf.isPersistenceEnabled(), serverControl.isPersistenceEnabled()); - Assert.assertEquals(conf.getHAPolicy().isFailoverOnServerShutdown(), serverControl.isFailoverOnServerShutdown()); } @Test @@ -502,6 +500,9 @@ public class HornetQServerControlTest extends ManagementTestBase long redistributionDelay = 5; boolean sendToDLAOnNoRoute = true; String addressFullMessagePolicy = "PAGE"; + long slowConsumerThreshold = 5; + long slowConsumerCheckPeriod = 10; + String slowConsumerPolicy = SlowConsumerPolicy.KILL.toString(); serverControl.addAddressSettings(addressMatch, DLA, @@ -517,7 +518,10 @@ public class HornetQServerControlTest extends ManagementTestBase maxRedeliveryDelay, redistributionDelay, sendToDLAOnNoRoute, - addressFullMessagePolicy); + addressFullMessagePolicy, + slowConsumerThreshold, + slowConsumerCheckPeriod, + slowConsumerPolicy); boolean ex = false; @@ -537,7 +541,10 @@ public class HornetQServerControlTest extends ManagementTestBase maxRedeliveryDelay, redistributionDelay, sendToDLAOnNoRoute, - addressFullMessagePolicy); + addressFullMessagePolicy, + slowConsumerThreshold, + slowConsumerCheckPeriod, + slowConsumerPolicy); } catch (Exception expected) { @@ -564,6 +571,9 @@ public class HornetQServerControlTest extends ManagementTestBase assertEquals(redistributionDelay, info.getRedistributionDelay()); assertEquals(sendToDLAOnNoRoute, info.isSendToDLAOnNoRoute()); assertEquals(addressFullMessagePolicy, info.getAddressFullMessagePolicy()); + assertEquals(slowConsumerThreshold, info.getSlowConsumerThreshold()); + assertEquals(slowConsumerCheckPeriod, info.getSlowConsumerCheckPeriod()); + assertEquals(slowConsumerPolicy, info.getSlowConsumerPolicy()); serverControl.addAddressSettings(addressMatch, DLA, @@ -579,7 +589,10 @@ public class HornetQServerControlTest extends ManagementTestBase maxRedeliveryDelay, redistributionDelay, sendToDLAOnNoRoute, - addressFullMessagePolicy); + addressFullMessagePolicy, + slowConsumerThreshold, + slowConsumerCheckPeriod, + slowConsumerPolicy); jsonString = serverControl.getAddressSettingsAsJSON(exactAddress); @@ -598,6 +611,9 @@ public class HornetQServerControlTest extends ManagementTestBase assertEquals(redistributionDelay, info.getRedistributionDelay()); assertEquals(sendToDLAOnNoRoute, info.isSendToDLAOnNoRoute()); assertEquals(addressFullMessagePolicy, info.getAddressFullMessagePolicy()); + assertEquals(slowConsumerThreshold, info.getSlowConsumerThreshold()); + assertEquals(slowConsumerCheckPeriod, info.getSlowConsumerCheckPeriod()); + assertEquals(slowConsumerPolicy, info.getSlowConsumerPolicy()); ex = false; @@ -617,7 +633,10 @@ public class HornetQServerControlTest extends ManagementTestBase maxRedeliveryDelay, redistributionDelay, sendToDLAOnNoRoute, - addressFullMessagePolicy); + addressFullMessagePolicy, + slowConsumerThreshold, + slowConsumerCheckPeriod, + slowConsumerPolicy); } catch (Exception e) { @@ -948,8 +967,85 @@ public class HornetQServerControlTest extends ManagementTestBase System.out.println("HornetQServerControlTest.testCommitPreparedTransactions"); } - // Package protected --------------------------------------------- + @Test + public void testScaleDownWithConnector() throws Exception + { + scaleDown(new ScaleDownHandler() + { + @Override + public void scaleDown(HornetQServerControl control) throws Exception + { + control.scaleDown("server2-connector"); + } + }); + } + + @Test + public void testScaleDownWithOutConnector() throws Exception + { + scaleDown(new ScaleDownHandler() + { + @Override + public void scaleDown(HornetQServerControl control) throws Exception + { + control.scaleDown(null); + } + }); + } + + protected void scaleDown(ScaleDownHandler handler) throws Exception + { + SimpleString address = new SimpleString("testQueue"); + Configuration conf = createDefaultConfig(false, 2); + conf.setSecurityEnabled(false); + conf.getAcceptorConfigurations().clear(); + HashMap<String, Object> params = new HashMap<String, Object>(); + params.put("server-id", "2"); + conf.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName(), params)); + HornetQServer server2 = HornetQServers.newHornetQServer(conf, null, true); + this.conf.getConnectorConfigurations().clear(); + this.conf.getConnectorConfigurations().put("server2-connector", new TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY, params)); + try + { + server2.start(); + server.createQueue(address, address, null, true, false); + server2.createQueue(address, address, null, true, false); + ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY)); + ClientSessionFactory csf = createSessionFactory(locator); + ClientSession session = csf.createSession(); + ClientProducer producer = session.createProducer(address); + for (int i = 0; i < 100; i++) + { + ClientMessage message = session.createMessage(true); + message.getBodyBuffer().writeString("m" + i); + producer.send(message); + } + + HornetQServerControl managementControl = createManagementControl(); + handler.scaleDown(managementControl); + locator.close(); + locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY, params)); + csf = createSessionFactory(locator); + session = csf.createSession(); + session.start(); + ClientConsumer consumer = session.createConsumer(address); + for (int i = 0; i < 100; i++) + { + ClientMessage m = consumer.receive(5000); + assertNotNull(m); + } + } + finally + { + server2.stop(); + } + } + // Package protected --------------------------------------------- + interface ScaleDownHandler + { + void scaleDown(HornetQServerControl control) throws Exception; + } // Protected ----------------------------------------------------- @Override @@ -964,13 +1060,13 @@ public class HornetQServerControlTest extends ManagementTestBase params, RandomUtil.randomString()); - conf = createDefaultConfig(false); - conf.setSecurityEnabled(false); - conf.setJMXManagementEnabled(true); - conf.getAcceptorConfigurations().clear(); - conf.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName())); + conf = createDefaultConfig(false) + .setSecurityEnabled(false) + .setJMXManagementEnabled(true) + .clearAcceptorConfigurations() + .addAcceptorConfiguration(new TransportConfiguration(InVMAcceptorFactory.class.getName())) + .addConnectorConfiguration(connectorConfig.getName(), connectorConfig); server = HornetQServers.newHornetQServer(conf, mbeanServer, true); - conf.getConnectorConfigurations().put(connectorConfig.getName(), connectorConfig); server.start(); } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/HornetQServerControlUsingCoreTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/HornetQServerControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/HornetQServerControlUsingCoreTest.java index 42f2c7e..41051f0 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/HornetQServerControlUsingCoreTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/HornetQServerControlUsingCoreTest.java @@ -96,6 +96,17 @@ public class HornetQServerControlUsingCoreTest extends HornetQServerControlTest } + // the core messaging proxy doesn't work when the server is stopped so we cant run these 2 tests + @Override + public void testScaleDownWithOutConnector() throws Exception + { + } + + @Override + public void testScaleDownWithConnector() throws Exception + { + } + @Override protected HornetQServerControl createManagementControl() throws Exception { @@ -107,6 +118,12 @@ public class HornetQServerControlUsingCoreTest extends HornetQServerControlTest } + @Override + public void scaleDown(String connector) throws Exception + { + throw new UnsupportedOperationException(); + } + private final CoreMessagingProxy proxy = new CoreMessagingProxy(session, ResourceNames.CORE_SERVER); public boolean isSharedStore() @@ -119,6 +136,16 @@ public class HornetQServerControlUsingCoreTest extends HornetQServerControlTest return (Boolean) proxy.invokeOperation("closeConnectionsForAddress", ipAddress); } + public boolean closeConsumerConnectionsForAddress(final String address) throws Exception + { + return (Boolean) proxy.invokeOperation("closeConsumerConnectionsForAddress", address); + } + + public boolean closeConnectionsForUser(final String userName) throws Exception + { + return (Boolean) proxy.invokeOperation("closeConnectionsForUser", userName); + } + public boolean commitPreparedTransaction(final String transactionAsBase64) throws Exception { return (Boolean) proxy.invokeOperation("commitPreparedTransaction", transactionAsBase64); @@ -356,7 +383,7 @@ public class HornetQServerControlUsingCoreTest extends HornetQServerControlTest public void setScaleDown(boolean scaleDown) throws Exception { - proxy.invokeOperation("setScaleDown", scaleDown); + proxy.invokeOperation("setEnabled", scaleDown); } public boolean isScaleDown() @@ -548,7 +575,10 @@ public class HornetQServerControlUsingCoreTest extends HornetQServerControlTest @Parameter(desc = "the maximum redelivery delay", name = "maxRedeliveryDelay") long maxRedeliveryDelay, @Parameter(desc = "the redistribution delay", name = "redistributionDelay") long redistributionDelay, @Parameter(desc = "do we send to the DLA when there is no where to route the message", name = "sendToDLAOnNoRoute") boolean sendToDLAOnNoRoute, - @Parameter(desc = "the policy to use when the address is full", name = "addressFullMessagePolicy") String addressFullMessagePolicy) throws Exception + @Parameter(desc = "the policy to use when the address is full", name = "addressFullMessagePolicy") String addressFullMessagePolicy, + @Parameter(desc = "when a consumer falls below this threshold in terms of messages consumed per second it will be considered 'slow'", name = "slowConsumerThreshold") long slowConsumerThreshold, + @Parameter(desc = "how often (in seconds) to check for slow consumers", name = "slowConsumerCheckPeriod") long slowConsumerCheckPeriod, + @Parameter(desc = "the policy to use when a slow consumer is detected", name = "slowConsumerPolicy") String slowConsumerPolicy) throws Exception { proxy.invokeOperation("addAddressSettings", addressMatch, @@ -565,7 +595,10 @@ public class HornetQServerControlUsingCoreTest extends HornetQServerControlTest maxRedeliveryDelay, redistributionDelay, sendToDLAOnNoRoute, - addressFullMessagePolicy); + addressFullMessagePolicy, + slowConsumerThreshold, + slowConsumerCheckPeriod, + slowConsumerPolicy); } public void removeAddressSettings(String addressMatch) throws Exception http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/JMXDomainTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/JMXDomainTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/JMXDomainTest.java index f1fe3a0..44d1eed 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/JMXDomainTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/JMXDomainTest.java @@ -12,15 +12,10 @@ */ package org.hornetq.tests.integration.management; -import org.junit.Test; - import java.util.HashMap; import java.util.Map; -import javax.management.ObjectName; - import org.hornetq.api.config.HornetQDefaultConfiguration; -import org.hornetq.api.core.SimpleString; import org.hornetq.api.core.TransportConfiguration; import org.hornetq.api.core.management.ObjectNameBuilder; import org.hornetq.core.config.Configuration; @@ -28,45 +23,35 @@ import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory; import org.hornetq.core.remoting.impl.invm.TransportConstants; import org.hornetq.core.server.HornetQServer; import org.hornetq.core.server.HornetQServers; +import org.junit.After; +import org.junit.Test; /** * A JMXDomainTest * * @author <a href="mailto:[email protected]">Jeff Mesnil</a> - * - * */ public class JMXDomainTest extends ManagementTestBase { - - // Constants ----------------------------------------------------- - - // Attributes ---------------------------------------------------- - - // Static -------------------------------------------------------- - - // Constructors -------------------------------------------------- - - // Public -------------------------------------------------------- + HornetQServer server_0 = null; + HornetQServer server_1 = null; @Test public void test2HornetQServersManagedFrom1MBeanServer() throws Exception { - - Configuration config_0 = createDefaultConfig(); - config_0.setJMXManagementEnabled(true); + Configuration config_0 = createDefaultConfig() + .setJMXManagementEnabled(true); String jmxDomain_1 = HornetQDefaultConfiguration.getDefaultJmxDomain() + ".1"; - Configuration config_1 = createBasicConfig(); Map<String, Object> params = new HashMap<String, Object>(); params.put(TransportConstants.SERVER_ID_PROP_NAME, 1); - config_1.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName(), params)); - config_1.setJMXDomain(jmxDomain_1); - config_1.setJMXManagementEnabled(true); + Configuration config_1 = createBasicConfig() + .addAcceptorConfiguration(new TransportConfiguration(InVMAcceptorFactory.class.getName(), params)) + .setJMXDomain(jmxDomain_1); - HornetQServer server_0 = HornetQServers.newHornetQServer(config_0, mbeanServer, false); - HornetQServer server_1 = HornetQServers.newHornetQServer(config_1, mbeanServer, false); + server_0 = HornetQServers.newHornetQServer(config_0, mbeanServer, false); + server_1 = HornetQServers.newHornetQServer(config_1, mbeanServer, false); ObjectNameBuilder builder_0 = ObjectNameBuilder.DEFAULT; ObjectNameBuilder builder_1 = ObjectNameBuilder.create(jmxDomain_1); @@ -93,94 +78,22 @@ public class JMXDomainTest extends ManagementTestBase checkNoResource(builder_0.getHornetQServerObjectName()); checkNoResource(builder_1.getHornetQServerObjectName()); - } - @Test - public void testDefaultObjectName() throws Exception + @Override + @After + public void tearDown() throws Exception { - ObjectName objectName = ObjectNameBuilder.DEFAULT.getJMSServerObjectName(); - ObjectName defaultValue = new ObjectName("jboss.as:subsystem=messaging,hornetq-server=default"); - - assertEquals(defaultValue, objectName); - - objectName = ObjectNameBuilder.DEFAULT.getJMSQueueObjectName("A"); - defaultValue = new ObjectName("jboss.as:subsystem=messaging,hornetq-server=default,jms-queue=\"A\""); - - assertEquals(defaultValue, objectName); - - objectName = ObjectNameBuilder.DEFAULT.getAcceptorObjectName("netty"); - System.out.println("value: " + objectName); - defaultValue = new ObjectName("jboss.as:subsystem=messaging,hornetq-server=default,remote-acceptor=\"netty\""); - - assertEquals(defaultValue, objectName); - - objectName = ObjectNameBuilder.DEFAULT.getAddressObjectName(new SimpleString("jms.queue.M")); - System.out.println("value: " + objectName); - defaultValue = new ObjectName("jboss.as:subsystem=messaging,hornetq-server=default,core-address=\"jms.queue.M\""); - - assertEquals(defaultValue, objectName); + if (server_0 != null) + { + server_0.stop(); + } - objectName = ObjectNameBuilder.DEFAULT.getBridgeObjectName("mybridge"); - System.out.println("value: " + objectName); - defaultValue = new ObjectName("jboss.as:subsystem=messaging,hornetq-server=default,bridge=\"mybridge\""); + if (server_1 != null) + { + server_1.stop(); + } - assertEquals(defaultValue, objectName); - - objectName = ObjectNameBuilder.DEFAULT.getBroadcastGroupObjectName("mybroadcastgroup"); - System.out.println("value: " + objectName); - defaultValue = new ObjectName("jboss.as:subsystem=messaging,hornetq-server=default,broadcast-group=\"mybroadcastgroup\""); - - assertEquals(defaultValue, objectName); - - objectName = ObjectNameBuilder.DEFAULT.getClusterConnectionObjectName("my-cluster-connection"); - System.out.println("value: " + objectName); - defaultValue = new ObjectName("jboss.as:subsystem=messaging,hornetq-server=default,cluster-connection=\"my-cluster-connection\""); - - assertEquals(defaultValue, objectName); - - objectName = ObjectNameBuilder.DEFAULT.getConnectionFactoryObjectName("connectionFactory"); - System.out.println("value: " + objectName); - defaultValue = new ObjectName("jboss.as:subsystem=messaging,hornetq-server=default,connection-factory=\"connectionFactory\""); - - assertEquals(defaultValue, objectName); - - objectName = ObjectNameBuilder.DEFAULT.getDiscoveryGroupObjectName("my-discovery-group"); - System.out.println("value: " + objectName); - defaultValue = new ObjectName("jboss.as:subsystem=messaging,hornetq-server=default,discovery-group=\"my-discovery-group\""); - - assertEquals(defaultValue, objectName); - - objectName = ObjectNameBuilder.DEFAULT.getDivertObjectName("my-divert"); - System.out.println("value: " + objectName); - defaultValue = new ObjectName("jboss.as:subsystem=messaging,hornetq-server=default,divert=\"my-divert\""); - - assertEquals(defaultValue, objectName); - - objectName = ObjectNameBuilder.DEFAULT.getHornetQServerObjectName(); - System.out.println("value: " + objectName); - defaultValue = new ObjectName("jboss.as:subsystem=messaging,hornetq-server=default"); - - assertEquals(defaultValue, objectName); - - objectName = ObjectNameBuilder.DEFAULT.getJMSTopicObjectName("my-topic"); - System.out.println("value: " + objectName); - defaultValue = new ObjectName("jboss.as:subsystem=messaging,hornetq-server=default,jms-topic=\"my-topic\""); - - assertEquals(defaultValue, objectName); - - objectName = ObjectNameBuilder.DEFAULT.getQueueObjectName(new SimpleString("some.address"), new SimpleString("some.queue")); - System.out.println("value: " + objectName); - defaultValue = new ObjectName("jboss.as:subsystem=messaging,hornetq-server=default,address=\"some.address\",runtime-queue=\"some.queue\""); - - assertEquals(defaultValue, objectName); + super.tearDown(); } - // Package protected --------------------------------------------- - - // Protected ----------------------------------------------------- - - // Private ------------------------------------------------------- - - // Inner classes ------------------------------------------------- - } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ManagementActivationTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ManagementActivationTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ManagementActivationTest.java index 847f675..c672fbc 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ManagementActivationTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ManagementActivationTest.java @@ -83,7 +83,10 @@ public class ManagementActivationTest extends FailoverTestBase List<String> connectorNames = new ArrayList<String>(); connectorNames.add(connectorName); - ConnectionFactoryConfiguration config = new ConnectionFactoryConfigurationImpl("test", false, connectorNames, "/myConnectionFactory"); + ConnectionFactoryConfiguration config = new ConnectionFactoryConfigurationImpl() + .setName("test") + .setConnectorNames(connectorNames) + .setBindings("/myConnectionFactory"); backupJmsServer.createConnectionFactory(true, config, "/myConnectionFactory"); boolean exception = false; http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ManagementServiceImplTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ManagementServiceImplTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ManagementServiceImplTest.java index 085c256..82ec49c 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ManagementServiceImplTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ManagementServiceImplTest.java @@ -46,8 +46,8 @@ public class ManagementServiceImplTest extends UnitTestCase String queue = RandomUtil.randomString(); String address = RandomUtil.randomString(); - Configuration conf = createBasicConfig(); - conf.setJMXManagementEnabled(false); + Configuration conf = createBasicConfig() + .setJMXManagementEnabled(false); HornetQServer server = HornetQServers.newHornetQServer(conf, false); server.start(); @@ -66,8 +66,8 @@ public class ManagementServiceImplTest extends UnitTestCase @Test public void testHandleManagementMessageWithOperationWhichFails() throws Exception { - Configuration conf = createBasicConfig(); - conf.setJMXManagementEnabled(false); + Configuration conf = createBasicConfig() + .setJMXManagementEnabled(false); HornetQServer server = HornetQServers.newHornetQServer(conf, false); server.start(); @@ -86,8 +86,8 @@ public class ManagementServiceImplTest extends UnitTestCase @Test public void testHandleManagementMessageWithUnknowResource() throws Exception { - Configuration conf = createBasicConfig(); - conf.setJMXManagementEnabled(false); + Configuration conf = createBasicConfig() + .setJMXManagementEnabled(false); HornetQServer server = HornetQServers.newHornetQServer(conf, false); server.start(); @@ -106,8 +106,8 @@ public class ManagementServiceImplTest extends UnitTestCase @Test public void testHandleManagementMessageWithUnknownAttribute() throws Exception { - Configuration conf = createBasicConfig(); - conf.setJMXManagementEnabled(false); + Configuration conf = createBasicConfig() + .setJMXManagementEnabled(false); HornetQServer server = HornetQServers.newHornetQServer(conf, false); server.start(); @@ -127,8 +127,8 @@ public class ManagementServiceImplTest extends UnitTestCase @Test public void testHandleManagementMessageWithKnownAttribute() throws Exception { - Configuration conf = createBasicConfig(); - conf.setJMXManagementEnabled(false); + Configuration conf = createBasicConfig() + .setJMXManagementEnabled(false); HornetQServer server = HornetQServers.newHornetQServer(conf, false); server.start(); @@ -148,8 +148,8 @@ public class ManagementServiceImplTest extends UnitTestCase @Test public void testGetResources() throws Exception { - Configuration conf = createBasicConfig(); - conf.setJMXManagementEnabled(false); + Configuration conf = createBasicConfig() + .setJMXManagementEnabled(false); ManagementServiceImpl managementService = new ManagementServiceImpl(null, conf); managementService.setStorageManager(new NullStorageManager()); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ManagementTestBase.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ManagementTestBase.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ManagementTestBase.java index 2a4a762..5d2452a 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ManagementTestBase.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ManagementTestBase.java @@ -12,6 +12,7 @@ */ package org.hornetq.tests.integration.management; import org.hornetq.api.core.management.QueueControl; +import org.hornetq.api.jms.management.JMSQueueControl; import org.junit.Before; import org.junit.After; @@ -121,6 +122,29 @@ public abstract class ManagementTestBase extends ServiceTestBase return queueControl; } + protected long getMessageCount(JMSQueueControl control) throws Exception + { + control.flushExecutor(); + return control.getMessageCount(); + } + + protected long getMessagesAdded(JMSQueueControl control) throws Exception + { + control.flushExecutor(); + return control.getMessagesAdded(); + } + + protected long getMessageCount(QueueControl control) throws Exception + { + control.flushExecutor(); + return control.getMessageCount(); + } + + protected long getMessagesAdded(QueueControl control) throws Exception + { + control.flushExecutor(); + return control.getMessagesAdded(); + } // Private ------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ManagementWithPagingServerTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ManagementWithPagingServerTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ManagementWithPagingServerTest.java index 9ecff90..53ea2bf 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ManagementWithPagingServerTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ManagementWithPagingServerTest.java @@ -198,11 +198,8 @@ public class ManagementWithPagingServerTest extends ManagementTestBase { super.setUp(); - Configuration conf = createBasicConfig(); - conf.setSecurityEnabled(false); - conf.setJMXManagementEnabled(true); - - conf.getAcceptorConfigurations().add(new TransportConfiguration(INVM_ACCEPTOR_FACTORY)); + Configuration conf = createBasicConfig() + .addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY)); server = addServer(HornetQServers.newHornetQServer(conf, mbeanServer, true)); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ManagementWithStompTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ManagementWithStompTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ManagementWithStompTest.java index e2694a9..2bf9096 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ManagementWithStompTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ManagementWithStompTest.java @@ -171,15 +171,15 @@ public class ManagementWithStompTest extends ManagementTestBase { super.setUp(); - Configuration conf = createBasicConfig(); - conf.setSecurityEnabled(false); - conf.setJMXManagementEnabled(true); Map<String, Object> params = new HashMap<String, Object>(); - params.put(TransportConstants.PROTOCOL_PROP_NAME, StompProtocolManagerFactory.STOMP_PROTOCOL_NAME); + params.put(TransportConstants.PROTOCOLS_PROP_NAME, StompProtocolManagerFactory.STOMP_PROTOCOL_NAME); params.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_STOMP_PORT); TransportConfiguration stompTransport = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params); - conf.getAcceptorConfigurations().add(stompTransport); - conf.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName())); + + Configuration conf = createBasicConfig() + .addAcceptorConfiguration(stompTransport) + .addAcceptorConfiguration(new TransportConfiguration(InVMAcceptorFactory.class.getName())); + server = HornetQServers.newHornetQServer(conf, mbeanServer, false, "brianm", "wombats"); server.start(); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/NotificationTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/NotificationTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/NotificationTest.java index 4c20a89..d0f7a8f 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/NotificationTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/NotificationTest.java @@ -36,10 +36,10 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import static org.hornetq.api.core.management.NotificationType.BINDING_ADDED; -import static org.hornetq.api.core.management.NotificationType.BINDING_REMOVED; -import static org.hornetq.api.core.management.NotificationType.CONSUMER_CLOSED; -import static org.hornetq.api.core.management.NotificationType.CONSUMER_CREATED; +import static org.hornetq.api.core.management.CoreNotificationType.BINDING_ADDED; +import static org.hornetq.api.core.management.CoreNotificationType.BINDING_REMOVED; +import static org.hornetq.api.core.management.CoreNotificationType.CONSUMER_CLOSED; +import static org.hornetq.api.core.management.CoreNotificationType.CONSUMER_CREATED; /** * A NotificationTest @@ -252,11 +252,9 @@ public class NotificationTest extends UnitTestCase { super.setUp(); - Configuration conf = createBasicConfig(); - conf.setSecurityEnabled(false); - // the notifications are independent of JMX - conf.setJMXManagementEnabled(false); - conf.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName())); + Configuration conf = createBasicConfig() + // the notifications are independent of JMX + .addAcceptorConfiguration(new TransportConfiguration(InVMAcceptorFactory.class.getName())); service = HornetQServers.newHornetQServer(conf, false); service.start(); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/QueueControlTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/QueueControlTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/QueueControlTest.java index c05fff5..31bea0f 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/QueueControlTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/QueueControlTest.java @@ -17,6 +17,8 @@ import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import javax.management.Notification; + import org.hornetq.api.core.HornetQException; import org.hornetq.api.core.Message; import org.hornetq.api.core.SimpleString; @@ -29,16 +31,20 @@ import org.hornetq.api.core.client.ClientSessionFactory; import org.hornetq.api.core.client.HornetQClient; import org.hornetq.api.core.client.MessageHandler; import org.hornetq.api.core.client.ServerLocator; +import org.hornetq.api.core.management.CoreNotificationType; import org.hornetq.api.core.management.DayCounterInfo; import org.hornetq.api.core.management.HornetQServerControl; import org.hornetq.api.core.management.MessageCounterInfo; +import org.hornetq.api.core.management.ObjectNameBuilder; import org.hornetq.api.core.management.QueueControl; import org.hornetq.core.config.Configuration; +import org.hornetq.core.message.impl.MessageImpl; import org.hornetq.core.messagecounter.impl.MessageCounterManagerImpl; import org.hornetq.core.server.HornetQServer; import org.hornetq.core.server.HornetQServers; import org.hornetq.core.server.Queue; import org.hornetq.core.settings.impl.AddressSettings; +import org.hornetq.tests.integration.jms.server.management.JMSUtil; import org.hornetq.tests.util.RandomUtil; import org.hornetq.utils.json.JSONArray; import org.junit.After; @@ -247,15 +253,15 @@ public class QueueControlTest extends ManagementTestBase session.createQueue(address, queue, null, false); QueueControl queueControl = createManagementControl(address, queue); - Assert.assertEquals(0, queueControl.getMessageCount()); + Assert.assertEquals(0, getMessageCount(queueControl)); ClientProducer producer = session.createProducer(address); producer.send(session.createMessage(false)); - Assert.assertEquals(1, queueControl.getMessageCount()); + Assert.assertEquals(1, getMessageCount(queueControl)); ManagementTestBase.consumeMessages(1, session, queue); - Assert.assertEquals(0, queueControl.getMessageCount()); + Assert.assertEquals(0, getMessageCount(queueControl)); session.deleteQueue(queue); } @@ -269,7 +275,7 @@ public class QueueControlTest extends ManagementTestBase session.createQueue(address, queue, null, false); QueueControl queueControl = createManagementControl(address, queue); - Assert.assertEquals(0, queueControl.getMessageCount()); + Assert.assertEquals(0, getMessageCount(queueControl)); // It's empty, so it's supposed to be like this assertEquals("[{}]", queueControl.getFirstMessageAsJSON()); @@ -291,17 +297,43 @@ public class QueueControlTest extends ManagementTestBase session.createQueue(address, queue, null, false); QueueControl queueControl = createManagementControl(address, queue); - Assert.assertEquals(0, queueControl.getMessagesAdded()); + Assert.assertEquals(0, getMessagesAdded(queueControl)); ClientProducer producer = session.createProducer(address); producer.send(session.createMessage(false)); - Assert.assertEquals(1, queueControl.getMessagesAdded()); + Assert.assertEquals(1, getMessagesAdded(queueControl)); producer.send(session.createMessage(false)); - Assert.assertEquals(2, queueControl.getMessagesAdded()); + Assert.assertEquals(2, getMessagesAdded(queueControl)); ManagementTestBase.consumeMessages(2, session, queue); - Assert.assertEquals(2, queueControl.getMessagesAdded()); + Assert.assertEquals(2, getMessagesAdded(queueControl)); + + session.deleteQueue(queue); + } + + @Test + public void testGetMessagesAcknowledged() throws Exception + { + SimpleString address = RandomUtil.randomSimpleString(); + SimpleString queue = RandomUtil.randomSimpleString(); + + session.createQueue(address, queue, null, false); + + QueueControl queueControl = createManagementControl(address, queue); + Assert.assertEquals(0, queueControl.getMessagesAcknowledged()); + + ClientProducer producer = session.createProducer(address); + producer.send(session.createMessage(false)); + ManagementTestBase.consumeMessages(1, session, queue); + Assert.assertEquals(1, queueControl.getMessagesAcknowledged()); + producer.send(session.createMessage(false)); + ManagementTestBase.consumeMessages(1, session, queue); + Assert.assertEquals(2, queueControl.getMessagesAcknowledged()); + +// ManagementTestBase.consumeMessages(2, session, queue); + +// Assert.assertEquals(2, getMessagesAdded(queueControl)); session.deleteQueue(queue); } @@ -771,12 +803,12 @@ public class QueueControlTest extends ManagementTestBase producer.send(message); QueueControl queueControl = createManagementControl(address, queue); - Assert.assertEquals(1, queueControl.getMessageCount()); + Assert.assertEquals(1, getMessageCount(queueControl)); // moved all messages to otherQueue int movedMessagesCount = queueControl.moveMessages(null, otherQueue.toString()); Assert.assertEquals(1, movedMessagesCount); - Assert.assertEquals(0, queueControl.getMessageCount()); + Assert.assertEquals(0, getMessageCount(queueControl)); // check there is no message to consume from queue ManagementTestBase.consumeMessages(0, session, queue); @@ -811,7 +843,7 @@ public class QueueControlTest extends ManagementTestBase producer.send(message); QueueControl queueControl = createManagementControl(address, queue); - Assert.assertEquals(1, queueControl.getMessageCount()); + Assert.assertEquals(1, getMessageCount(queueControl)); // moved all messages to unknown queue try @@ -822,7 +854,7 @@ public class QueueControlTest extends ManagementTestBase catch (Exception e) { } - Assert.assertEquals(1, queueControl.getMessageCount()); + Assert.assertEquals(1, getMessageCount(queueControl)); ManagementTestBase.consumeMessages(1, session, queue); @@ -863,12 +895,12 @@ public class QueueControlTest extends ManagementTestBase producer.send(unmatchingMessage); QueueControl queueControl = createManagementControl(address, queue); - Assert.assertEquals(2, queueControl.getMessageCount()); + Assert.assertEquals(2, getMessageCount(queueControl)); // moved matching messages to otherQueue int movedMatchedMessagesCount = queueControl.moveMessages(key + " =" + matchingValue, otherQueue.toString()); Assert.assertEquals(1, movedMatchedMessagesCount); - Assert.assertEquals(1, queueControl.getMessageCount()); + Assert.assertEquals(1, getMessageCount(queueControl)); // consume the unmatched message from queue ClientConsumer consumer = session.createConsumer(queue); @@ -908,8 +940,8 @@ public class QueueControlTest extends ManagementTestBase QueueControl queueControl = createManagementControl(address, queue); QueueControl otherQueueControl = createManagementControl(otherAddress, otherQueue); - Assert.assertEquals(2, queueControl.getMessageCount()); - Assert.assertEquals(0, otherQueueControl.getMessageCount()); + Assert.assertEquals(2, getMessageCount(queueControl)); + Assert.assertEquals(0, getMessageCount(otherQueueControl)); // the message IDs are set on the server Map<String, Object>[] messages = queueControl.listMessages(null); @@ -918,8 +950,8 @@ public class QueueControlTest extends ManagementTestBase boolean moved = queueControl.moveMessage(messageID, otherQueue.toString()); Assert.assertTrue(moved); - Assert.assertEquals(1, queueControl.getMessageCount()); - Assert.assertEquals(1, otherQueueControl.getMessageCount()); + Assert.assertEquals(1, getMessageCount(queueControl)); + Assert.assertEquals(1, getMessageCount(otherQueueControl)); ManagementTestBase.consumeMessages(1, session, queue); ManagementTestBase.consumeMessages(1, session, otherQueue); @@ -942,7 +974,7 @@ public class QueueControlTest extends ManagementTestBase producer.send(session.createMessage(false)); QueueControl queueControl = createManagementControl(address, queue); - Assert.assertEquals(1, queueControl.getMessageCount()); + Assert.assertEquals(1, getMessageCount(queueControl)); // the message IDs are set on the server Map<String, Object>[] messages = queueControl.listMessages(null); @@ -958,7 +990,7 @@ public class QueueControlTest extends ManagementTestBase catch (Exception e) { } - Assert.assertEquals(1, queueControl.getMessageCount()); + Assert.assertEquals(1, getMessageCount(queueControl)); ManagementTestBase.consumeMessages(1, session, queue); @@ -995,12 +1027,12 @@ public class QueueControlTest extends ManagementTestBase producer.send(unmatchingMessage); QueueControl queueControl = createManagementControl(address, queue); - Assert.assertEquals(2, queueControl.getMessageCount()); + Assert.assertEquals(2, getMessageCount(queueControl)); // removed matching messages to otherQueue int removedMatchedMessagesCount = queueControl.removeMessages(key + " =" + matchingValue); Assert.assertEquals(1, removedMatchedMessagesCount); - Assert.assertEquals(1, queueControl.getMessageCount()); + Assert.assertEquals(1, getMessageCount(queueControl)); // consume the unmatched message from queue ClientConsumer consumer = session.createConsumer(queue); @@ -1040,12 +1072,12 @@ public class QueueControlTest extends ManagementTestBase producer.send(unmatchingMessage); QueueControl queueControl = createManagementControl(address, queue); - Assert.assertEquals(2, queueControl.getMessageCount()); + Assert.assertEquals(2, getMessageCount(queueControl)); // removed matching messages to otherQueue int removedMatchedMessagesCount = queueControl.removeMessages(5, key + " =" + matchingValue); Assert.assertEquals(1, removedMatchedMessagesCount); - Assert.assertEquals(1, queueControl.getMessageCount()); + Assert.assertEquals(1, getMessageCount(queueControl)); // consume the unmatched message from queue ClientConsumer consumer = session.createConsumer(queue); @@ -1077,12 +1109,12 @@ public class QueueControlTest extends ManagementTestBase producer.send(session.createMessage(false)); QueueControl queueControl = createManagementControl(address, queue); - Assert.assertEquals(2, queueControl.getMessageCount()); + Assert.assertEquals(2, getMessageCount(queueControl)); // removed matching messages to otherQueue int removedMatchedMessagesCount = queueControl.removeMessages(null); Assert.assertEquals(2, removedMatchedMessagesCount); - Assert.assertEquals(0, queueControl.getMessageCount()); + Assert.assertEquals(0, getMessageCount(queueControl)); session.deleteQueue(queue); } @@ -1101,12 +1133,12 @@ public class QueueControlTest extends ManagementTestBase producer.send(session.createMessage(false)); QueueControl queueControl = createManagementControl(address, queue); - Assert.assertEquals(2, queueControl.getMessageCount()); + Assert.assertEquals(2, getMessageCount(queueControl)); // removed matching messages to otherQueue int removedMatchedMessagesCount = queueControl.removeMessages(""); Assert.assertEquals(2, removedMatchedMessagesCount); - Assert.assertEquals(0, queueControl.getMessageCount()); + Assert.assertEquals(0, getMessageCount(queueControl)); session.deleteQueue(queue); } @@ -1125,7 +1157,7 @@ public class QueueControlTest extends ManagementTestBase producer.send(session.createMessage(false)); QueueControl queueControl = createManagementControl(address, queue); - Assert.assertEquals(2, queueControl.getMessageCount()); + Assert.assertEquals(2, getMessageCount(queueControl)); // the message IDs are set on the server Map<String, Object>[] messages = queueControl.listMessages(null); @@ -1135,7 +1167,7 @@ public class QueueControlTest extends ManagementTestBase // delete 1st message boolean deleted = queueControl.removeMessage(messageID); Assert.assertTrue(deleted); - Assert.assertEquals(1, queueControl.getMessageCount()); + Assert.assertEquals(1, getMessageCount(queueControl)); // check there is a single message to consume from queue ManagementTestBase.consumeMessages(1, session, queue); @@ -1144,6 +1176,48 @@ public class QueueControlTest extends ManagementTestBase } @Test + public void testRemoveScheduledMessage() throws Exception + { + SimpleString address = RandomUtil.randomSimpleString(); + SimpleString queue = RandomUtil.randomSimpleString(); + + session.createQueue(address, queue, null, false); + ClientProducer producer = session.createProducer(address); + + // send 2 messages on queue, both scheduled + long timeout = System.currentTimeMillis() + 5000; + ClientMessage m1 = session.createMessage(true); + m1.putLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME, timeout); + producer.send(m1); + ClientMessage m2 = session.createMessage(true); + m2.putLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME, timeout); + producer.send(m2); + + QueueControl queueControl = createManagementControl(address, queue); + Assert.assertEquals(2, queueControl.getScheduledCount()); + + // the message IDs are set on the server + Map<String, Object>[] messages = queueControl.listScheduledMessages(); + Assert.assertEquals(2, messages.length); + long messageID = (Long) messages[0].get("messageID"); + + // delete 1st message + boolean deleted = queueControl.removeMessage(messageID); + Assert.assertTrue(deleted); + Assert.assertEquals(1, queueControl.getScheduledCount()); + + // check there is a single message to consume from queue + while (timeout > System.currentTimeMillis() && queueControl.getScheduledCount() == 1) + { + Thread.sleep(100); + } + + ManagementTestBase.consumeMessages(1, session, queue); + + session.deleteQueue(queue); + } + + @Test public void testRemoveMessage2() throws Exception { SimpleString address = RandomUtil.randomSimpleString(); @@ -1172,7 +1246,7 @@ public class QueueControlTest extends ManagementTestBase } QueueControl queueControl = createManagementControl(address, queue); - Assert.assertEquals(100, queueControl.getMessageCount()); + Assert.assertEquals(100, getMessageCount(queueControl)); // the message IDs are set on the server Map<String, Object>[] messages = queueControl.listMessages(null); @@ -1183,7 +1257,7 @@ public class QueueControlTest extends ManagementTestBase // delete 1st message boolean deleted = queueControl.removeMessage(messageID); Assert.assertTrue(deleted); - Assert.assertEquals(99, queueControl.getMessageCount()); + Assert.assertEquals(99, getMessageCount(queueControl)); cons.close(); @@ -1216,7 +1290,7 @@ public class QueueControlTest extends ManagementTestBase producer.send(matchingMessage); QueueControl queueControl = createManagementControl(address, queue); - Assert.assertEquals(3, queueControl.getMessageCount()); + Assert.assertEquals(3, getMessageCount(queueControl)); Assert.assertEquals(2, queueControl.countMessages(key + " =" + matchingValue)); Assert.assertEquals(1, queueControl.countMessages(key + " =" + unmatchingValue)); @@ -1264,7 +1338,7 @@ public class QueueControlTest extends ManagementTestBase QueueControl queueControl = createManagementControl(address, queue); - Assert.assertEquals(110, queueControl.getMessageCount()); + Assert.assertEquals(110, getMessageCount(queueControl)); Assert.assertEquals(0, queueControl.countMessages("nonExistentProperty like \'%Temp/88\'")); @@ -1299,11 +1373,11 @@ public class QueueControlTest extends ManagementTestBase producer.send(unmatchingMessage); QueueControl queueControl = createManagementControl(address, queue); - Assert.assertEquals(2, queueControl.getMessageCount()); + Assert.assertEquals(2, getMessageCount(queueControl)); int expiredMessagesCount = queueControl.expireMessages(key + " =" + matchingValue); Assert.assertEquals(1, expiredMessagesCount); - Assert.assertEquals(1, queueControl.getMessageCount()); + Assert.assertEquals(1, getMessageCount(queueControl)); // consume the unmatched message from queue ClientConsumer consumer = session.createConsumer(queue); @@ -1339,8 +1413,8 @@ public class QueueControlTest extends ManagementTestBase QueueControl queueControl = createManagementControl(address, queue); QueueControl expiryQueueControl = createManagementControl(expiryAddress, expiryQueue); - Assert.assertEquals(1, queueControl.getMessageCount()); - Assert.assertEquals(0, expiryQueueControl.getMessageCount()); + Assert.assertEquals(1, getMessageCount(queueControl)); + Assert.assertEquals(0, getMessageCount(expiryQueueControl)); // the message IDs are set on the server Map<String, Object>[] messages = queueControl.listMessages(null); @@ -1350,8 +1424,8 @@ public class QueueControlTest extends ManagementTestBase queueControl.setExpiryAddress(expiryAddress.toString()); boolean expired = queueControl.expireMessage(messageID); Assert.assertTrue(expired); - Assert.assertEquals(0, queueControl.getMessageCount()); - Assert.assertEquals(1, expiryQueueControl.getMessageCount()); + Assert.assertEquals(0, getMessageCount(queueControl)); + Assert.assertEquals(1, getMessageCount(expiryQueueControl)); ManagementTestBase.consumeMessages(0, session, queue); ManagementTestBase.consumeMessages(1, session, expiryQueue); @@ -1379,7 +1453,7 @@ public class QueueControlTest extends ManagementTestBase QueueControl queueControl = createManagementControl(address, queue); QueueControl deadLetterQueueControl = createManagementControl(deadLetterAddress, deadLetterQueue); - Assert.assertEquals(2, queueControl.getMessageCount()); + Assert.assertEquals(2, getMessageCount(queueControl)); // the message IDs are set on the server Map<String, Object>[] messages = queueControl.listMessages(null); @@ -1388,11 +1462,11 @@ public class QueueControlTest extends ManagementTestBase queueControl.setDeadLetterAddress(deadLetterAddress.toString()); - Assert.assertEquals(0, deadLetterQueueControl.getMessageCount()); + Assert.assertEquals(0, getMessageCount(deadLetterQueueControl)); boolean movedToDeadLetterAddress = queueControl.sendMessageToDeadLetterAddress(messageID); Assert.assertTrue(movedToDeadLetterAddress); - Assert.assertEquals(1, queueControl.getMessageCount()); - Assert.assertEquals(1, deadLetterQueueControl.getMessageCount()); + Assert.assertEquals(1, getMessageCount(queueControl)); + Assert.assertEquals(1, getMessageCount(deadLetterQueueControl)); // check there is a single message to consume from queue ManagementTestBase.consumeMessages(1, session, queue); @@ -1421,7 +1495,7 @@ public class QueueControlTest extends ManagementTestBase producer.send(message); QueueControl queueControl = createManagementControl(address, queue); - Assert.assertEquals(1, queueControl.getMessageCount()); + Assert.assertEquals(1, getMessageCount(queueControl)); // the message IDs are set on the server Map<String, Object>[] messages = queueControl.listMessages(null); @@ -1455,7 +1529,7 @@ public class QueueControlTest extends ManagementTestBase producer.send(message); QueueControl queueControl = createManagementControl(address, queue); - Assert.assertEquals(1, queueControl.getMessageCount()); + Assert.assertEquals(1, getMessageCount(queueControl)); // the message IDs are set on the server Map<String, Object>[] messages = queueControl.listMessages(null); @@ -1834,25 +1908,89 @@ public class QueueControlTest extends ManagementTestBase session.createQueue(address, queue, null, false); QueueControl queueControl = createManagementControl(address, queue); - Assert.assertEquals(0, queueControl.getMessagesAdded()); + Assert.assertEquals(0, getMessagesAdded(queueControl)); ClientProducer producer = session.createProducer(address); producer.send(session.createMessage(false)); - Assert.assertEquals(1, queueControl.getMessagesAdded()); + Assert.assertEquals(1, getMessagesAdded(queueControl)); producer.send(session.createMessage(false)); - Assert.assertEquals(2, queueControl.getMessagesAdded()); + Assert.assertEquals(2, getMessagesAdded(queueControl)); ManagementTestBase.consumeMessages(2, session, queue); - Assert.assertEquals(2, queueControl.getMessagesAdded()); + Assert.assertEquals(2, getMessagesAdded(queueControl)); queueControl.resetMessagesAdded(); - Assert.assertEquals(0, queueControl.getMessagesAdded()); + Assert.assertEquals(0, getMessagesAdded(queueControl)); session.deleteQueue(queue); } + @Test + public void testResetMessagesAcknowledged() throws Exception + { + SimpleString address = RandomUtil.randomSimpleString(); + SimpleString queue = RandomUtil.randomSimpleString(); + + session.createQueue(address, queue, null, false); + + QueueControl queueControl = createManagementControl(address, queue); + Assert.assertEquals(0, queueControl.getMessagesAcknowledged()); + + ClientProducer producer = session.createProducer(address); + producer.send(session.createMessage(false)); + ManagementTestBase.consumeMessages(1, session, queue); + Assert.assertEquals(1, queueControl.getMessagesAcknowledged()); + producer.send(session.createMessage(false)); + ManagementTestBase.consumeMessages(1, session, queue); + Assert.assertEquals(2, queueControl.getMessagesAcknowledged()); + + queueControl.resetMessagesAcknowledged(); + + Assert.assertEquals(0, queueControl.getMessagesAcknowledged()); + + session.deleteQueue(queue); + } + + //make sure notifications are always received no matter whether + //a Queue is created via QueueControl or by JMSServerManager directly. + @Test + public void testCreateQueueNotification() throws Exception + { + JMSUtil.JMXListener listener = new JMSUtil.JMXListener(); + this.mbeanServer.addNotificationListener(ObjectNameBuilder.DEFAULT.getHornetQServerObjectName(), listener, null, null); + + SimpleString testQueueName = new SimpleString("newQueue"); + String testQueueName2 = "newQueue2"; + this.server.createQueue(testQueueName, testQueueName, null, false, false); + + Notification notif = listener.getNotification(); + + System.out.println("got notif: " + notif); + assertEquals(CoreNotificationType.BINDING_ADDED.toString(), notif.getType()); + + this.server.destroyQueue(testQueueName); + + notif = listener.getNotification(); + System.out.println("got notif: " + notif); + assertEquals(CoreNotificationType.BINDING_REMOVED.toString(), notif.getType()); + + HornetQServerControl control = ManagementControlHelper.createHornetQServerControl(mbeanServer); + + control.createQueue(testQueueName2, testQueueName2); + + notif = listener.getNotification(); + System.out.println("got notif: " + notif); + assertEquals(CoreNotificationType.BINDING_ADDED.toString(), notif.getType()); + + control.destroyQueue(testQueueName2); + + notif = listener.getNotification(); + System.out.println("got notif: " + notif); + assertEquals(CoreNotificationType.BINDING_REMOVED.toString(), notif.getType()); + } + // Package protected --------------------------------------------- // Protected ----------------------------------------------------- @@ -1863,10 +2001,8 @@ public class QueueControlTest extends ManagementTestBase { super.setUp(); - Configuration conf = createBasicConfig(); - conf.setSecurityEnabled(false); - conf.setJMXManagementEnabled(true); - conf.getAcceptorConfigurations().add(new TransportConfiguration(INVM_ACCEPTOR_FACTORY)); + Configuration conf = createBasicConfig() + .addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY)); server = addServer(HornetQServers.newHornetQServer(conf, mbeanServer, false)); server.start(); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/QueueControlUsingCoreTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/QueueControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/QueueControlUsingCoreTest.java index 852da9f..4d6629b 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/QueueControlUsingCoreTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/QueueControlUsingCoreTest.java @@ -41,6 +41,19 @@ public class QueueControlUsingCoreTest extends QueueControlTest { private final CoreMessagingProxy proxy = new CoreMessagingProxy(session, ResourceNames.CORE_QUEUE + queue); + @Override + public void flushExecutor() + { + try + { + proxy.invokeOperation("flushExecutor"); + } + catch (Exception e) + { + throw new RuntimeException(e.getMessage(), e); + } + } + public boolean changeMessagePriority(final long messageID, final int newPriority) throws Exception { return (Boolean) proxy.invokeOperation("changeMessagePriority", messageID, newPriority); @@ -106,11 +119,21 @@ public class QueueControlUsingCoreTest extends QueueControlTest return (Integer) proxy.retrieveAttributeValue("messagesAdded"); } + public long getMessagesAcknowledged() + { + return (Integer) proxy.retrieveAttributeValue("messagesAcknowledged"); + } + public void resetMessagesAdded() throws Exception { proxy.invokeOperation("resetMessagesAdded"); } + public void resetMessagesAcknowledged() throws Exception + { + proxy.invokeOperation("resetMessagesAcknowledged"); + } + public String getName() { return (String) proxy.retrieveAttributeValue("name"); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityManagementWithConfiguredAdminUserTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityManagementWithConfiguredAdminUserTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityManagementWithConfiguredAdminUserTest.java index b0ba06e..19efdaf 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityManagementWithConfiguredAdminUserTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityManagementWithConfiguredAdminUserTest.java @@ -88,9 +88,9 @@ public class SecurityManagementWithConfiguredAdminUserTest extends SecurityManag @Override protected HornetQServer setupAndStartHornetQServer() throws Exception { - Configuration conf = createBasicConfig(); - conf.setSecurityEnabled(true); - conf.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName())); + Configuration conf = createBasicConfig() + .setSecurityEnabled(true) + .addAcceptorConfiguration(new TransportConfiguration(InVMAcceptorFactory.class.getName())); HornetQServer server = addServer(HornetQServers.newHornetQServer(conf, false)); server.start(); HierarchicalRepository<Set<Role>> securityRepository = server.getSecurityRepository(); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityManagementWithDefaultConfigurationTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityManagementWithDefaultConfigurationTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityManagementWithDefaultConfigurationTest.java index 2015d27..36a0889 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityManagementWithDefaultConfigurationTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityManagementWithDefaultConfigurationTest.java @@ -56,10 +56,10 @@ public class SecurityManagementWithDefaultConfigurationTest extends SecurityMana @Override protected HornetQServer setupAndStartHornetQServer() throws Exception { - Configuration conf = createBasicConfig(); - conf.setClusterPassword(HornetQDefaultConfiguration.getDefaultClusterPassword()); - conf.setSecurityEnabled(true); - conf.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName())); + Configuration conf = createBasicConfig() + .setClusterPassword(HornetQDefaultConfiguration.getDefaultClusterPassword()) + .setSecurityEnabled(true) + .addAcceptorConfiguration(new TransportConfiguration(InVMAcceptorFactory.class.getName())); HornetQServer server = addServer(HornetQServers.newHornetQServer(conf, false)); server.start(); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityManagementWithModifiedConfigurationTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityManagementWithModifiedConfigurationTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityManagementWithModifiedConfigurationTest.java index c6e6422..dbb0a36 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityManagementWithModifiedConfigurationTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityManagementWithModifiedConfigurationTest.java @@ -74,10 +74,10 @@ public class SecurityManagementWithModifiedConfigurationTest extends SecurityMan @Override protected HornetQServer setupAndStartHornetQServer() throws Exception { - ConfigurationImpl conf = createBasicConfig(); - conf.setSecurityEnabled(true); - conf.setClusterPassword(configuredClusterPassword); - conf.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName())); + ConfigurationImpl conf = createBasicConfig() + .setSecurityEnabled(true) + .setClusterPassword(configuredClusterPassword) + .addAcceptorConfiguration(new TransportConfiguration(InVMAcceptorFactory.class.getName())); HornetQServer server = HornetQServers.newHornetQServer(conf, false); server.start();
