http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java index 5540517..97d45f6 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java @@ -47,6 +47,12 @@ import org.hornetq.core.client.impl.Topology; import org.hornetq.core.client.impl.TopologyMemberImpl; import org.hornetq.core.config.ClusterConnectionConfiguration; import org.hornetq.core.config.Configuration; +import org.hornetq.core.config.HAPolicyConfiguration; +import org.hornetq.core.config.ha.LiveOnlyPolicyConfiguration; +import org.hornetq.core.config.ha.ReplicaPolicyConfiguration; +import org.hornetq.core.config.ha.ReplicatedPolicyConfiguration; +import org.hornetq.core.config.ha.SharedStoreMasterPolicyConfiguration; +import org.hornetq.core.config.ha.SharedStoreSlavePolicyConfiguration; import org.hornetq.core.postoffice.Binding; import org.hornetq.core.postoffice.Bindings; import org.hornetq.core.postoffice.PostOffice; @@ -58,8 +64,8 @@ import org.hornetq.core.server.HornetQServers; import org.hornetq.core.server.NodeManager; import org.hornetq.core.server.cluster.ClusterConnection; import org.hornetq.core.server.cluster.ClusterManager; +import org.hornetq.core.server.cluster.HornetQServerSideProtocolManagerFactory; import org.hornetq.core.server.cluster.RemoteQueueBinding; -import org.hornetq.core.server.cluster.ha.HAPolicy; import org.hornetq.core.server.cluster.impl.ClusterConnectionImpl; import org.hornetq.core.server.cluster.qourum.SharedNothingBackupQuorum; import org.hornetq.core.server.group.GroupingHandler; @@ -860,19 +866,19 @@ public abstract class ClusterTestBase extends ServiceTestBase protected void setUpGroupHandler(final GroupingHandlerConfiguration.TYPE type, final int node, final int timeout) { - setUpGroupHandler(type, node, timeout, -1, GroupingHandlerConfiguration.DEFAULT_REAPER_PERIOD); + setUpGroupHandler(type, node, timeout, -1, HornetQDefaultConfiguration.getDefaultGroupingHandlerReaperPeriod()); } protected void setUpGroupHandler(final GroupingHandlerConfiguration.TYPE type, final int node, final int timeout, final long groupTimeout, final long reaperPeriod) { - servers[node].getConfiguration() - .setGroupingHandlerConfiguration(new GroupingHandlerConfiguration(new SimpleString("grouparbitrator"), - type, - new SimpleString("queues"), - timeout, - groupTimeout, - reaperPeriod)); + servers[node].getConfiguration().setGroupingHandlerConfiguration(new GroupingHandlerConfiguration() + .setName(new SimpleString("grouparbitrator")) + .setType(type) + .setAddress(new SimpleString("queues")) + .setTimeout(timeout) + .setGroupTimeout(groupTimeout) + .setReaperPeriod(reaperPeriod)); } protected void setUpGroupHandler(final GroupingHandler groupingHandler, final int node) @@ -1551,6 +1557,8 @@ public abstract class ClusterTestBase extends ServiceTestBase locators[node] = HornetQClient.createServerLocatorWithoutHA(serverTotc); } + locators[node].setProtocolManagerFactory(HornetQServerSideProtocolManagerFactory.getInstance()); + locators[node].setBlockOnNonDurableSend(true); locators[node].setBlockOnDurableSend(true); addServerLocator(locators[node]); @@ -1632,33 +1640,44 @@ public abstract class ClusterTestBase extends ServiceTestBase protected void setupServer(final int node, final boolean fileStorage, final boolean netty) throws Exception { - setupLiveServer(node, fileStorage, false, netty); + setupLiveServer(node, fileStorage, false, netty, false); + } + + protected void setupLiveServer(final int node, final boolean fileStorage, final boolean netty, boolean isLive) throws Exception + { + setupLiveServer(node, fileStorage, false, netty, isLive); } protected void setupLiveServer(final int node, final boolean fileStorage, final boolean sharedStorage, - final boolean netty) throws Exception + final boolean netty, + boolean liveOnly) throws Exception { if (servers[node] != null) { throw new IllegalArgumentException("Already a server at node " + node); } - Configuration configuration = createBasicConfig(node); - - configuration.setJournalMaxIO_AIO(1000); - - if (sharedStorage) - configuration.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.SHARED_STORE); + HAPolicyConfiguration haPolicyConfiguration = null; + if (liveOnly) + { + haPolicyConfiguration = new LiveOnlyPolicyConfiguration(); + } else - configuration.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.REPLICATED); - - configuration.setThreadPoolMaxSize(10); + { + if (sharedStorage) + haPolicyConfiguration = new SharedStoreMasterPolicyConfiguration(); + else + haPolicyConfiguration = new ReplicatedPolicyConfiguration(); + } - configuration.getAcceptorConfigurations().clear(); - configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty, true, - generateParams(node, netty))); + Configuration configuration = createBasicConfig(node) + .setJournalMaxIO_AIO(1000) + .setThreadPoolMaxSize(10) + .clearAcceptorConfigurations() + .addAcceptorConfiguration(createTransportConfiguration(netty, true, generateParams(node, netty))) + .setHAPolicyConfiguration(haPolicyConfiguration); HornetQServer server; @@ -1715,23 +1734,16 @@ public abstract class ClusterTestBase extends ServiceTestBase throw new IllegalArgumentException("Already a server at node " + node); } - Configuration configuration = createBasicConfig(sharedStorage ? liveNode : node); - - if (sharedStorage) - configuration.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.BACKUP_SHARED_STORE); - else - configuration.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.BACKUP_REPLICATED); - - configuration.getAcceptorConfigurations().clear(); - - TransportConfiguration acceptorConfig = createTransportConfiguration(netty, true, generateParams(node, netty)); - configuration.getAcceptorConfigurations().add(acceptorConfig); - - // add backup connector TransportConfiguration liveConfig = createTransportConfiguration(netty, false, generateParams(liveNode, netty)); - configuration.getConnectorConfigurations().put(liveConfig.getName(), liveConfig); TransportConfiguration backupConfig = createTransportConfiguration(netty, false, generateParams(node, netty)); - configuration.getConnectorConfigurations().put(backupConfig.getName(), backupConfig); + TransportConfiguration acceptorConfig = createTransportConfiguration(netty, true, generateParams(node, netty)); + + Configuration configuration = createBasicConfig(sharedStorage ? liveNode : node) + .clearAcceptorConfigurations() + .addAcceptorConfiguration(acceptorConfig) + .addConnectorConfiguration(liveConfig.getName(), liveConfig) + .addConnectorConfiguration(backupConfig.getName(), backupConfig) + .setHAPolicyConfiguration(sharedStorage ? new SharedStoreSlavePolicyConfiguration() : new ReplicaPolicyConfiguration()); HornetQServer server; @@ -1760,41 +1772,35 @@ public abstract class ClusterTestBase extends ServiceTestBase throw new IllegalArgumentException("Already a server at node " + node); } - Configuration configuration = createBasicConfig(node); - - configuration.setJournalMaxIO_AIO(1000); - - if (sharedStorage) - configuration.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.SHARED_STORE); - else - configuration.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.REPLICATED); - - configuration.getAcceptorConfigurations().clear(); - Map<String, Object> params = generateParams(node, netty); - configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty, true, params)); - TransportConfiguration connector = createTransportConfiguration(netty, false, params); - configuration.getConnectorConfigurations().put(connector.getName(), connector); List<String> connectorPairs = new ArrayList<String>(); connectorPairs.add(connector.getName()); - UDPBroadcastGroupConfiguration endpoint = new UDPBroadcastGroupConfiguration(groupAddress, port, null, -1); - - BroadcastGroupConfiguration bcConfig = new BroadcastGroupConfiguration("bg1", - 200, - connectorPairs, - endpoint); - - configuration.getBroadcastGroupConfigurations().add(bcConfig); - - DiscoveryGroupConfiguration dcConfig = new DiscoveryGroupConfiguration("dg1", - 1000, - 1000, endpoint); - - configuration.getDiscoveryGroupConfigurations().put(dcConfig.getName(), dcConfig); + UDPBroadcastGroupConfiguration endpoint = new UDPBroadcastGroupConfiguration().setGroupAddress(groupAddress).setGroupPort(port); + + BroadcastGroupConfiguration bcConfig = new BroadcastGroupConfiguration() + .setName("bg1") + .setBroadcastPeriod(200) + .setConnectorInfos(connectorPairs) + .setEndpointFactoryConfiguration(endpoint); + + DiscoveryGroupConfiguration dcConfig = new DiscoveryGroupConfiguration() + .setName("dg1") + .setRefreshTimeout(1000) + .setDiscoveryInitialWaitTimeout(1000) + .setBroadcastEndpointFactoryConfiguration(endpoint); + + Configuration configuration = createBasicConfig(node) + .setJournalMaxIO_AIO(1000) + .clearAcceptorConfigurations() + .addAcceptorConfiguration(createTransportConfiguration(netty, true, params)) + .addConnectorConfiguration(connector.getName(), connector) + .addBroadcastGroupConfiguration(bcConfig) + .addDiscoveryGroupConfiguration(dcConfig.getName(), dcConfig) + .setHAPolicyConfiguration(sharedStorage ? new SharedStoreMasterPolicyConfiguration() : new ReplicatedPolicyConfiguration()); HornetQServer server; if (fileStorage) @@ -1837,40 +1843,34 @@ public abstract class ClusterTestBase extends ServiceTestBase throw new IllegalArgumentException("Already a server at node " + node); } - Configuration configuration = createBasicConfig(sharedStorage ? liveNode : node); - - if (sharedStorage) - configuration.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.BACKUP_SHARED_STORE); - else - configuration.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.BACKUP_REPLICATED); - - configuration.getAcceptorConfigurations().clear(); - Map<String, Object> params = generateParams(node, netty); - configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty, true, params)); - TransportConfiguration connector = createTransportConfiguration(netty, false, params); - configuration.getConnectorConfigurations().put(connector.getName(), connector); List<String> connectorPairs = new ArrayList<String>(); connectorPairs.add(connector.getName()); - UDPBroadcastGroupConfiguration endpoint = new UDPBroadcastGroupConfiguration(groupAddress, port, null, -1); - - BroadcastGroupConfiguration bcConfig = new BroadcastGroupConfiguration("bg1", - 1000, - connectorPairs, - endpoint); + UDPBroadcastGroupConfiguration endpoint = new UDPBroadcastGroupConfiguration().setGroupAddress(groupAddress).setGroupPort(port); - configuration.getBroadcastGroupConfigurations().add(bcConfig); + BroadcastGroupConfiguration bcConfig = new BroadcastGroupConfiguration() + .setName("bg1") + .setBroadcastPeriod(1000) + .setConnectorInfos(connectorPairs) + .setEndpointFactoryConfiguration(endpoint); - DiscoveryGroupConfiguration dcConfig = new DiscoveryGroupConfiguration("dg1", - 5000, - 5000, - endpoint); + DiscoveryGroupConfiguration dcConfig = new DiscoveryGroupConfiguration() + .setName("dg1") + .setRefreshTimeout(5000) + .setDiscoveryInitialWaitTimeout(5000) + .setBroadcastEndpointFactoryConfiguration(endpoint); - configuration.getDiscoveryGroupConfigurations().put(dcConfig.getName(), dcConfig); + Configuration configuration = createBasicConfig(sharedStorage ? liveNode : node) + .clearAcceptorConfigurations() + .addAcceptorConfiguration(createTransportConfiguration(netty, true, params)) + .addConnectorConfiguration(connector.getName(), connector) + .addBroadcastGroupConfiguration(bcConfig) + .addDiscoveryGroupConfiguration(dcConfig.getName(), dcConfig) + .setHAPolicyConfiguration(sharedStorage ? new SharedStoreSlavePolicyConfiguration() : new ReplicatedPolicyConfiguration()); HornetQServer server; if (sharedStorage) @@ -1935,15 +1935,18 @@ public abstract class ClusterTestBase extends ServiceTestBase pairs.add(serverTotc.getName()); } Configuration config = serverFrom.getConfiguration(); - ClusterConnectionConfiguration clusterConf = - new ClusterConnectionConfiguration(name, address, name, - 100, - true, - forwardWhenNoConsumers, - maxHops, - 1024, - pairs, - allowDirectConnectionsOnly); + + ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration() + .setName(name) + .setAddress(address) + .setConnectorName(name) + .setRetryInterval(100) + .setForwardWhenNoConsumers(forwardWhenNoConsumers) + .setMaxHops(maxHops) + .setConfirmationWindowSize(1024) + .setStaticConnectors(pairs) + .setAllowDirectConnectionsOnly(allowDirectConnectionsOnly); + config.getClusterConfigurations().add(clusterConf); } @@ -1978,16 +1981,19 @@ public abstract class ClusterTestBase extends ServiceTestBase pairs.add(serverTotc.getName()); } Configuration config = serverFrom.getConfiguration(); - ClusterConnectionConfiguration clusterConf = - new ClusterConnectionConfiguration(name, address, name, - retryInterval, - true, - forwardWhenNoConsumers, - maxHops, - 1024, - pairs, - allowDirectConnectionsOnly); - clusterConf.setReconnectAttempts(reconnectAttempts); + + ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration() + .setName(name) + .setAddress(address) + .setConnectorName(name) + .setReconnectAttempts(reconnectAttempts) + .setRetryInterval(retryInterval) + .setForwardWhenNoConsumers(forwardWhenNoConsumers) + .setMaxHops(maxHops) + .setConfirmationWindowSize(1024) + .setStaticConnectors(pairs) + .setAllowDirectConnectionsOnly(allowDirectConnectionsOnly); + config.getClusterConfigurations().add(clusterConf); } @@ -2054,18 +2060,19 @@ public abstract class ClusterTestBase extends ServiceTestBase pairs.add(serverTotc.getName()); } Configuration conf = serverFrom.getConfiguration(); - ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration( - name, address, connectorFrom.getName(), - HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, - HornetQDefaultConfiguration.getDefaultClusterFailureCheckPeriod(), - HornetQDefaultConfiguration.getDefaultClusterConnectionTtl(), - retryInterval, - HornetQDefaultConfiguration.getDefaultClusterRetryIntervalMultiplier(), - HornetQDefaultConfiguration.getDefaultClusterMaxRetryInterval(), - -1, reconnectAttempts, 1000, 1000, true, forwardWhenNoConsumers, maxHops, - 1024, pairs, false, - HornetQDefaultConfiguration.getDefaultClusterNotificationInterval(), - HornetQDefaultConfiguration.getDefaultClusterNotificationAttempts(), null); + + ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration() + .setName(name) + .setAddress(address) + .setConnectorName(connectorFrom.getName()) + .setRetryInterval(retryInterval) + .setReconnectAttempts(reconnectAttempts) + .setCallTimeout(100) + .setCallFailoverTimeout(100) + .setForwardWhenNoConsumers(forwardWhenNoConsumers) + .setMaxHops(maxHops) + .setConfirmationWindowSize(1024) + .setStaticConnectors(pairs); conf.getClusterConfigurations().add(clusterConf); } @@ -2074,8 +2081,15 @@ public abstract class ClusterTestBase extends ServiceTestBase final boolean forwardWhenNoConsumers, final int maxHops, TransportConfiguration connectorFrom, List<String> pairs) { - return new ClusterConnectionConfiguration(name, address, connectorFrom.getName(), 250, true, - forwardWhenNoConsumers, maxHops, 1024, pairs, false); + return new ClusterConnectionConfiguration() + .setName(name) + .setAddress(address) + .setConnectorName(connectorFrom.getName()) + .setRetryInterval(250) + .setForwardWhenNoConsumers(forwardWhenNoConsumers) + .setMaxHops(maxHops) + .setConfirmationWindowSize(1024) + .setStaticConnectors(pairs); } protected void setupClusterConnectionWithBackups(final String name, @@ -2104,9 +2118,16 @@ public abstract class ClusterTestBase extends ServiceTestBase pairs.add(serverTotc.getName()); } Configuration config = serverFrom.getConfiguration(); - ClusterConnectionConfiguration clusterConf = - new ClusterConnectionConfiguration(name, address, name, 250, true, forwardWhenNoConsumers, maxHops, - 1024, pairs, false); + + ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration() + .setName(name) + .setAddress(address) + .setConnectorName(name) + .setRetryInterval(250) + .setForwardWhenNoConsumers(forwardWhenNoConsumers) + .setMaxHops(maxHops) + .setConfirmationWindowSize(1024) + .setStaticConnectors(pairs); config.getClusterConfigurations().add(clusterConf); } @@ -2129,13 +2150,16 @@ public abstract class ClusterTestBase extends ServiceTestBase TransportConfiguration connectorConfig = createTransportConfiguration(netty, false, generateParams(node, netty)); server.getConfiguration().getConnectorConfigurations().put(name, connectorConfig); Configuration conf = server.getConfiguration(); - ClusterConnectionConfiguration clusterConf = - new ClusterConnectionConfiguration(name, address, name, 100, - true, - forwardWhenNoConsumers, - maxHops, - 1024, - discoveryGroupName); + ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration() + .setName(name) + .setAddress(address) + .setConnectorName(name) + .setRetryInterval(100) + .setDuplicateDetection(true) + .setForwardWhenNoConsumers(forwardWhenNoConsumers) + .setMaxHops(maxHops) + .setConfirmationWindowSize(1024) + .setDiscoveryGroupName(discoveryGroupName); List<ClusterConnectionConfiguration> clusterConfs = conf.getClusterConfigurations(); clusterConfs.add(clusterConf);
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterWithBackupTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterWithBackupTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterWithBackupTest.java index 73a63db..f6099c4 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterWithBackupTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterWithBackupTest.java @@ -117,9 +117,9 @@ public class ClusterWithBackupTest extends ClusterTestBase setupBackupServer(2, 5, isFileStorage(), true, isNetty()); // The lives - setupLiveServer(3, isFileStorage(), true, isNetty()); - setupLiveServer(4, isFileStorage(), true, isNetty()); - setupLiveServer(5, isFileStorage(), true, isNetty()); + setupLiveServer(3, isFileStorage(), true, isNetty(), false); + setupLiveServer(4, isFileStorage(), true, isNetty(), false); + setupLiveServer(5, isFileStorage(), true, isNetty(), false); } } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java index 64bc03b..895595e 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java @@ -29,7 +29,7 @@ import org.hornetq.api.core.client.ClientMessage; import org.hornetq.api.core.client.ClientProducer; import org.hornetq.api.core.client.ClientSession; import org.hornetq.api.core.client.ClientSessionFactory; -import org.hornetq.api.core.management.NotificationType; +import org.hornetq.api.core.management.CoreNotificationType; import org.hornetq.core.postoffice.impl.BindingsImpl; import org.hornetq.core.server.group.GroupingHandler; import org.hornetq.core.server.group.UnproposalListener; @@ -98,7 +98,8 @@ public class ClusteredGroupingTest extends ClusterTestBase @Override public void onNotification(Notification notification) { - if (notification.getType() == NotificationType.UNPROPOSAL) + if (!(notification.getType() instanceof CoreNotificationType)) return; + if (notification.getType() == CoreNotificationType.UNPROPOSAL) { latch.countDown(); } @@ -109,7 +110,8 @@ public class ClusteredGroupingTest extends ClusterTestBase @Override public void onNotification(Notification notification) { - if (notification.getType() == NotificationType.UNPROPOSAL) + if (!(notification.getType() instanceof CoreNotificationType)) return; + if (notification.getType() == CoreNotificationType.UNPROPOSAL) { latch.countDown(); } @@ -205,7 +207,8 @@ public class ClusteredGroupingTest extends ClusterTestBase @Override public void onNotification(Notification notification) { - if (notification.getType() == NotificationType.UNPROPOSAL) + if (!(notification.getType() instanceof CoreNotificationType)) return; + if (notification.getType() == CoreNotificationType.UNPROPOSAL) { latch.countDown(); } @@ -216,7 +219,8 @@ public class ClusteredGroupingTest extends ClusterTestBase @Override public void onNotification(Notification notification) { - if (notification.getType() == NotificationType.UNPROPOSAL) + if (!(notification.getType() instanceof CoreNotificationType)) return; + if (notification.getType() == CoreNotificationType.UNPROPOSAL) { latch.countDown(); } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java index f35b5b9..bff10b5 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java @@ -76,10 +76,14 @@ public class MessageRedistributionTest extends ClusterTestBase MessageRedistributionTest.log.info("Doing test"); - getServer(0).getConfiguration().setGroupingHandlerConfiguration( - new GroupingHandlerConfiguration(new SimpleString("handler"), GroupingHandlerConfiguration.TYPE.LOCAL, new SimpleString("queues"))); - getServer(1).getConfiguration().setGroupingHandlerConfiguration( - new GroupingHandlerConfiguration(new SimpleString("handler"), GroupingHandlerConfiguration.TYPE.REMOTE, new SimpleString("queues"))); + getServer(0).getConfiguration().setGroupingHandlerConfiguration(new GroupingHandlerConfiguration() + .setName(new SimpleString("handler")) + .setType(GroupingHandlerConfiguration.TYPE.LOCAL) + .setAddress(new SimpleString("queues"))); + getServer(1).getConfiguration().setGroupingHandlerConfiguration(new GroupingHandlerConfiguration() + .setName(new SimpleString("handler")) + .setType(GroupingHandlerConfiguration.TYPE.REMOTE) + .setAddress(new SimpleString("queues"))); startServers(0, 1); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java index 6a263b3..7578fb3 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java @@ -67,9 +67,9 @@ public class SimpleSymmetricClusterTest extends ClusterTestBase setupBackupServer(2, 5, isFileStorage(), true, isNetty()); // The lives - setupLiveServer(3, isFileStorage(), true, isNetty()); - setupLiveServer(4, isFileStorage(), true, isNetty()); - setupLiveServer(5, isFileStorage(), true, isNetty()); + setupLiveServer(3, isFileStorage(), true, isNetty(), false); + setupLiveServer(4, isFileStorage(), true, isNetty(), false); + setupLiveServer(5, isFileStorage(), true, isNetty(), false); setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 3, 4, 5); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java index e750d6c..58ed3e8 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java @@ -25,6 +25,7 @@ import org.hornetq.tests.util.UnitTestCase; * Most of the cases are covered in OneWayTwoNodeClusterTest - we don't duplicate them all here * * @author <a href="mailto:[email protected]">Tim Fox</a> + * @author <a href="mtaylor@redhat,com">Martyn Taylor</a> * * Created 3 Feb 2009 09:10:43 * @@ -1760,6 +1761,196 @@ public class SymmetricClusterTest extends ClusterTestBase waitForBindings(4, "queues.testaddress", 0, 0, false); } + @Test + /** + * This test verifies that addresses matching a simple string filter such as 'jms' result in bindings being created + * on appropriate nodes in the cluster. It also verifies that addresses not matching the simple string filter do not + * result in bindings being created. + */ + public void testClusterAddressCreatesBindingsForSimpleStringAddressFilters() throws Exception + { + setupCluster("jms", "jms", "jms", "jms", "jms"); + startServers(); + + setupSessionFactory(0, isNetty()); + setupSessionFactory(1, isNetty()); + setupSessionFactory(2, isNetty()); + setupSessionFactory(3, isNetty()); + setupSessionFactory(4, isNetty()); + + createQueue(0, "jms.queues.test.1", "queue0", null, false); + createQueue(1, "jms.queues.test.1", "queue0", null, false); + createQueue(2, "jms.queues.test.1", "queue0", null, false); + createQueue(3, "jms.queues.test.1", "queue0", null, false); + createQueue(4, "jms.queues.test.1", "queue0", null, false); + + createQueue(0, "foo.queues.test.1", "queue1", null, false); + createQueue(1, "foo.queues.test.1", "queue1", null, false); + createQueue(2, "foo.queues.test.1", "queue1", null, false); + createQueue(3, "foo.queues.test.1", "queue1", null, false); + createQueue(4, "foo.queues.test.1", "queue1", null, false); + + waitForBindings(0, "jms.queues.test.1", 4, 0, false); + waitForBindings(1, "jms.queues.test.1", 4, 0, false); + waitForBindings(2, "jms.queues.test.1", 4, 0, false); + waitForBindings(3, "jms.queues.test.1", 4, 0, false); + waitForBindings(4, "jms.queues.test.1", 4, 0, false); + + waitForBindings(0, "foo.queues.test.1", 0, 0, false); + waitForBindings(1, "foo.queues.test.1", 0, 0, false); + waitForBindings(2, "foo.queues.test.1", 0, 0, false); + waitForBindings(3, "foo.queues.test.1", 0, 0, false); + waitForBindings(4, "foo.queues.test.1", 0, 0, false); + } + + @Test + /** + * This test verifies that an string exclude filter '!jms.eu.uk' results in bindings not being created for this + * address for nodes in a cluster. But ensures that other addresses are matched and bindings created. + */ + public void testClusterAddressDoesNotCreatesBindingsForStringExcludesAddressFilters() throws Exception + { + setupCluster("jms.eu.de,!jms.eu.uk", "jms.eu.de,!jms.eu.uk", "jms.eu.de,!jms.eu.uk", "jms.eu.de,!jms.eu.uk", "jms.eu.de,!jms.eu.uk"); + startServers(); + + setupSessionFactory(0, isNetty()); + setupSessionFactory(1, isNetty()); + setupSessionFactory(2, isNetty()); + setupSessionFactory(3, isNetty()); + setupSessionFactory(4, isNetty()); + + createQueue(0, "jms.eu.uk", "queue0", null, false); + createQueue(1, "jms.eu.uk", "queue0", null, false); + createQueue(2, "jms.eu.uk", "queue0", null, false); + createQueue(3, "jms.eu.uk", "queue0", null, false); + createQueue(4, "jms.eu.uk", "queue0", null, false); + + createQueue(0, "jms.eu.de", "queue1", null, false); + createQueue(1, "jms.eu.de", "queue1", null, false); + createQueue(2, "jms.eu.de", "queue1", null, false); + createQueue(3, "jms.eu.de", "queue1", null, false); + createQueue(4, "jms.eu.de", "queue1", null, false); + + waitForBindings(0, "jms.eu.de", 4, 0, false); + waitForBindings(1, "jms.eu.de", 4, 0, false); + waitForBindings(2, "jms.eu.de", 4, 0, false); + waitForBindings(3, "jms.eu.de", 4, 0, false); + waitForBindings(4, "jms.eu.de", 4, 0, false); + + waitForBindings(0, "jms.eu.uk", 0, 0, false); + waitForBindings(1, "jms.eu.uk", 0, 0, false); + waitForBindings(2, "jms.eu.uk", 0, 0, false); + waitForBindings(3, "jms.eu.uk", 0, 0, false); + waitForBindings(4, "jms.eu.uk", 0, 0, false); + } + + /** + * This test verifies that remote bindings are only created for queues that match jms.eu or jms.us excluding + * jms.eu.uk and jms.us.bos. Represented by the address filter 'jms.eu,!jms.eu.uk,jms.us,!jms.us.bos' + * @throws Exception + */ + @Test + public void testClusterAddressFiltersExcludesAndIncludesAddressesInList() throws Exception + { + setupCluster("jms.eu,!jms.eu.uk,jms.us,!jms.us.bos", + "jms.eu,!jms.eu.uk,jms.us,!jms.us.bos", + "jms.eu,!jms.eu.uk,jms.us,!jms.us.bos", + "jms.eu,!jms.eu.uk,jms.us,!jms.us.bos", + "jms.eu,!jms.eu.uk,jms.us,!jms.us.bos"); + + startServers(); + + setupSessionFactory(0, isNetty()); + setupSessionFactory(1, isNetty()); + setupSessionFactory(2, isNetty()); + setupSessionFactory(3, isNetty()); + setupSessionFactory(4, isNetty()); + + createQueue(0, "jms.eu.uk", "queue0", null, false); + createQueue(1, "jms.eu.uk", "queue0", null, false); + createQueue(2, "jms.eu.uk", "queue0", null, false); + createQueue(3, "jms.eu.uk", "queue0", null, false); + createQueue(4, "jms.eu.uk", "queue0", null, false); + + createQueue(0, "jms.eu.de", "queue1", null, false); + createQueue(1, "jms.eu.de", "queue1", null, false); + createQueue(2, "jms.eu.de", "queue1", null, false); + createQueue(3, "jms.eu.de", "queue1", null, false); + createQueue(4, "jms.eu.de", "queue1", null, false); + + createQueue(0, "jms.eu.fr", "queue2", null, false); + createQueue(1, "jms.eu.fr", "queue2", null, false); + createQueue(2, "jms.eu.fr", "queue2", null, false); + createQueue(3, "jms.eu.fr", "queue2", null, false); + createQueue(4, "jms.eu.fr", "queue2", null, false); + + createQueue(0, "jms.us.ca", "queue4", null, false); + createQueue(1, "jms.us.ca", "queue4", null, false); + createQueue(2, "jms.us.ca", "queue4", null, false); + createQueue(3, "jms.us.ca", "queue4", null, false); + createQueue(4, "jms.us.ca", "queue4", null, false); + + createQueue(0, "jms.us.se", "queue5", null, false); + createQueue(1, "jms.us.se", "queue5", null, false); + createQueue(2, "jms.us.se", "queue5", null, false); + createQueue(3, "jms.us.se", "queue5", null, false); + createQueue(4, "jms.us.se", "queue5", null, false); + + createQueue(0, "jms.us.ny", "queue6", null, false); + createQueue(1, "jms.us.ny", "queue6", null, false); + createQueue(2, "jms.us.ny", "queue6", null, false); + createQueue(3, "jms.us.ny", "queue6", null, false); + createQueue(4, "jms.us.ny", "queue6", null, false); + + waitForBindings(0, "jms.eu.de", 4, 0, false); + waitForBindings(1, "jms.eu.de", 4, 0, false); + waitForBindings(2, "jms.eu.de", 4, 0, false); + waitForBindings(3, "jms.eu.de", 4, 0, false); + waitForBindings(4, "jms.eu.de", 4, 0, false); + + waitForBindings(0, "jms.eu.fr", 4, 0, false); + waitForBindings(1, "jms.eu.fr", 4, 0, false); + waitForBindings(2, "jms.eu.fr", 4, 0, false); + waitForBindings(3, "jms.eu.fr", 4, 0, false); + waitForBindings(4, "jms.eu.fr", 4, 0, false); + + waitForBindings(0, "jms.eu.uk", 0, 0, false); + waitForBindings(1, "jms.eu.uk", 0, 0, false); + waitForBindings(2, "jms.eu.uk", 0, 0, false); + waitForBindings(3, "jms.eu.uk", 0, 0, false); + waitForBindings(4, "jms.eu.uk", 0, 0, false); + + waitForBindings(0, "jms.us.ca", 4, 0, false); + waitForBindings(1, "jms.us.ca", 4, 0, false); + waitForBindings(2, "jms.us.ca", 4, 0, false); + waitForBindings(3, "jms.us.ca", 4, 0, false); + waitForBindings(4, "jms.us.ca", 4, 0, false); + + waitForBindings(0, "jms.us.ny", 4, 0, false); + waitForBindings(1, "jms.us.ny", 4, 0, false); + waitForBindings(2, "jms.us.ny", 4, 0, false); + waitForBindings(3, "jms.us.ny", 4, 0, false); + waitForBindings(4, "jms.us.ny", 4, 0, false); + + waitForBindings(0, "jms.us.bos", 0, 0, false); + waitForBindings(1, "jms.us.bos", 0, 0, false); + waitForBindings(2, "jms.us.bos", 0, 0, false); + waitForBindings(3, "jms.us.bos", 0, 0, false); + waitForBindings(4, "jms.us.bos", 0, 0, false); + } + + protected void setupCluster(String addr1, String addr2, String addr3, String addr4, String addr5) throws Exception + { + setupClusterConnection("cluster0", addr1, true, 1, isNetty(), 0, 1, 2, 3, 4); + + setupClusterConnection("cluster1", addr2, true, 1, isNetty(), 1, 0, 2, 3, 4); + + setupClusterConnection("cluster2", addr3, true, 1, isNetty(), 2, 0, 1, 3, 4); + + setupClusterConnection("cluster3", addr4, true, 1, isNetty(), 3, 0, 1, 2, 4); + setupClusterConnection("cluster4", addr5, true, 1, isNetty(), 4, 0, 1, 2, 3); + } + protected void setupCluster() throws Exception { setupCluster(false); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java index b65c547..4dc5af7 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java @@ -12,7 +12,7 @@ */ package org.hornetq.tests.integration.cluster.distribution; -import org.hornetq.core.server.cluster.ha.HAPolicy; +import org.hornetq.core.config.ha.SharedStoreSlavePolicyConfiguration; import org.hornetq.tests.integration.IntegrationTestLogger; import org.hornetq.tests.util.UnitTestCase; import org.junit.Test; @@ -539,11 +539,11 @@ public class SymmetricClusterWithBackupTest extends SymmetricClusterTest setupBackupServer(9, 4, isFileStorage(), true, isNetty()); // The lives - setupLiveServer(0, isFileStorage(), true, isNetty()); - setupLiveServer(1, isFileStorage(), true, isNetty()); - setupLiveServer(2, isFileStorage(), true, isNetty()); - setupLiveServer(3, isFileStorage(), true, isNetty()); - setupLiveServer(4, isFileStorage(), true, isNetty()); + setupLiveServer(0, isFileStorage(), true, isNetty(), false); + setupLiveServer(1, isFileStorage(), true, isNetty(), false); + setupLiveServer(2, isFileStorage(), true, isNetty(), false); + setupLiveServer(3, isFileStorage(), true, isNetty(), false); + setupLiveServer(4, isFileStorage(), true, isNetty(), false); } @Override @@ -551,11 +551,11 @@ public class SymmetricClusterWithBackupTest extends SymmetricClusterTest { // Need to set backup, since when restarting backup after it has failed over, backup will have been set to false - getServer(5).getConfiguration().getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.BACKUP_SHARED_STORE); - getServer(6).getConfiguration().getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.BACKUP_SHARED_STORE); - getServer(7).getConfiguration().getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.BACKUP_SHARED_STORE); - getServer(8).getConfiguration().getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.BACKUP_SHARED_STORE); - getServer(9).getConfiguration().getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.BACKUP_SHARED_STORE); + getServer(5).getConfiguration().setHAPolicyConfiguration(new SharedStoreSlavePolicyConfiguration()); + getServer(6).getConfiguration().setHAPolicyConfiguration(new SharedStoreSlavePolicyConfiguration()); + getServer(7).getConfiguration().setHAPolicyConfiguration(new SharedStoreSlavePolicyConfiguration()); + getServer(8).getConfiguration().setHAPolicyConfiguration(new SharedStoreSlavePolicyConfiguration()); + getServer(9).getConfiguration().setHAPolicyConfiguration(new SharedStoreSlavePolicyConfiguration()); startServers(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/AutomaticColocatedQuorumVoteTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/AutomaticColocatedQuorumVoteTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/AutomaticColocatedQuorumVoteTest.java index 0023973..52ca1fb 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/AutomaticColocatedQuorumVoteTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/AutomaticColocatedQuorumVoteTest.java @@ -21,17 +21,22 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.hornetq.api.config.HornetQDefaultConfiguration; import org.hornetq.api.core.TransportConfiguration; import org.hornetq.api.core.client.ClientSessionFactory; import org.hornetq.api.core.client.HornetQClient; import org.hornetq.api.core.client.ServerLocator; import org.hornetq.core.client.impl.Topology; import org.hornetq.core.client.impl.TopologyMemberImpl; -import org.hornetq.core.config.BackupStrategy; import org.hornetq.core.config.Configuration; import org.hornetq.core.config.CoreQueueConfiguration; +import org.hornetq.core.config.ScaleDownConfiguration; +import org.hornetq.core.config.ha.ColocatedPolicyConfiguration; +import org.hornetq.core.config.ha.ReplicaPolicyConfiguration; +import org.hornetq.core.config.ha.ReplicatedPolicyConfiguration; +import org.hornetq.core.config.ha.SharedStoreMasterPolicyConfiguration; +import org.hornetq.core.config.ha.SharedStoreSlavePolicyConfiguration; import org.hornetq.core.server.HornetQServer; -import org.hornetq.core.server.cluster.ha.HAPolicy; import org.hornetq.core.server.impl.HornetQServerImpl; import org.hornetq.tests.util.ServiceTestBase; import org.junit.Test; @@ -41,27 +46,27 @@ import org.junit.runners.Parameterized; @RunWith(value = Parameterized.class) public class AutomaticColocatedQuorumVoteTest extends ServiceTestBase { - private final HAPolicy.POLICY_TYPE policyType; + private final boolean replicated; - @Parameterized.Parameters + @Parameterized.Parameters(name = "replicated={0}") public static Collection getParameters() { return Arrays.asList(new Object[][] { - {HAPolicy.POLICY_TYPE.COLOCATED_REPLICATED}, - {HAPolicy.POLICY_TYPE.COLOCATED_SHARED_STORE} + {true}, + {false} }); } - public AutomaticColocatedQuorumVoteTest(HAPolicy.POLICY_TYPE policyType) + public AutomaticColocatedQuorumVoteTest(boolean replicated) { - this.policyType = policyType; + this.replicated = replicated; } @Test public void testSimpleDistributionBackupStrategyFull() throws Exception { - HornetQServer server0 = createServer(0, 1, BackupStrategy.FULL); - HornetQServer server1 = createServer(1, 0, BackupStrategy.FULL); + HornetQServer server0 = createServer(0, 1, false); + HornetQServer server1 = createServer(1, 0, false); TransportConfiguration liveConnector0 = getConnectorTransportConfiguration("liveConnector" + 0, 0); TransportConfiguration liveConnector1 = getConnectorTransportConfiguration("liveConnector" + 1, 1); @@ -103,7 +108,7 @@ public class AutomaticColocatedQuorumVoteTest extends ServiceTestBase assertEquals(2, connectorConfigurations1.size()); assertEquals("5546", connectorConfigurations1.get("liveConnector1").getParams().get("port")); assertEquals("5445", connectorConfigurations1.get("remoteConnector1").getParams().get("port")); - if (policyType == HAPolicy.POLICY_TYPE.COLOCATED_SHARED_STORE) + if (!replicated) { assertEquals(server0.getConfiguration().getJournalDirectory(), backupServer1.getConfiguration().getJournalDirectory()); assertEquals(server0.getConfiguration().getBindingsDirectory(), backupServer1.getConfiguration().getBindingsDirectory()); @@ -128,7 +133,14 @@ public class AutomaticColocatedQuorumVoteTest extends ServiceTestBase } finally { - server0.stop(); + try + { + server0.stop(); + } + catch (Throwable e) + { + e.printStackTrace(); + } server1.stop(); } } @@ -136,8 +148,8 @@ public class AutomaticColocatedQuorumVoteTest extends ServiceTestBase @Test public void testSimpleDistributionBackupStrategyScaleDown() throws Exception { - HornetQServer server0 = createServer(0, 1, BackupStrategy.SCALE_DOWN); - HornetQServer server1 = createServer(1, 0, BackupStrategy.SCALE_DOWN); + HornetQServer server0 = createServer(0, 1, true); + HornetQServer server1 = createServer(1, 0, true); TransportConfiguration liveConnector0 = getConnectorTransportConfiguration("liveConnector" + 0, 0); TransportConfiguration liveConnector1 = getConnectorTransportConfiguration("liveConnector" + 1, 1); @@ -177,7 +189,7 @@ public class AutomaticColocatedQuorumVoteTest extends ServiceTestBase assertEquals(2, connectorConfigurations1.size()); assertEquals("5446", connectorConfigurations1.get("liveConnector1").getParams().get("port")); assertEquals("5445", connectorConfigurations1.get("remoteConnector1").getParams().get("port")); - if (policyType == HAPolicy.POLICY_TYPE.COLOCATED_SHARED_STORE) + if (!replicated) { assertEquals(server0.getConfiguration().getJournalDirectory(), backupServer1.getConfiguration().getJournalDirectory()); assertEquals(server0.getConfiguration().getBindingsDirectory(), backupServer1.getConfiguration().getBindingsDirectory()); @@ -202,7 +214,14 @@ public class AutomaticColocatedQuorumVoteTest extends ServiceTestBase } finally { - server0.stop(); + try + { + server0.stop(); + } + catch (Throwable e) + { + e.printStackTrace(); + } server1.stop(); } } @@ -210,10 +229,10 @@ public class AutomaticColocatedQuorumVoteTest extends ServiceTestBase @Test public void testSimpleDistributionOfBackupsMaxBackupsExceeded() throws Exception { - HornetQServer server0 = createServer(0, 1, BackupStrategy.FULL); - HornetQServer server1 = createServer(1, 0, BackupStrategy.FULL); - HornetQServer server2 = createServer(2, 0, BackupStrategy.FULL); - HornetQServer server3 = createServer(3, 0, BackupStrategy.FULL); + HornetQServer server0 = createServer(0, 1, false); + HornetQServer server1 = createServer(1, 0, false); + HornetQServer server2 = createServer(2, 0, false); + HornetQServer server3 = createServer(3, 0, false); TransportConfiguration liveConnector0 = getConnectorTransportConfiguration("liveConnector" + 0, 0); TransportConfiguration liveConnector1 = getConnectorTransportConfiguration("liveConnector" + 1, 1); TransportConfiguration liveConnector2 = getConnectorTransportConfiguration("liveConnector" + 2, 2); @@ -274,45 +293,69 @@ public class AutomaticColocatedQuorumVoteTest extends ServiceTestBase } } - private HornetQServer createServer(int node, int remoteNode, BackupStrategy backupStrategy) throws Exception + private HornetQServer createServer(int node, int remoteNode, boolean scaleDown) throws Exception { TransportConfiguration liveConnector = getConnectorTransportConfiguration("liveConnector" + node, node); TransportConfiguration remoteConnector = getConnectorTransportConfiguration("remoteConnector" + node, remoteNode); TransportConfiguration liveAcceptor = getAcceptorTransportConfiguration(node); - Configuration liveConfiguration = getConfiguration("server" + node, backupStrategy, liveConnector, liveAcceptor, remoteConnector); + Configuration liveConfiguration = getConfiguration("server" + node, scaleDown, liveConnector, liveAcceptor, remoteConnector); HornetQServer server = new HornetQServerImpl(liveConfiguration); server.setIdentity("server" + node); return server; } - private Configuration getConfiguration(String identity, BackupStrategy backupStrategy, TransportConfiguration liveConnector, TransportConfiguration liveAcceptor, TransportConfiguration... otherLiveNodes) throws Exception + + private Configuration getConfiguration(String identity, boolean scaleDown, TransportConfiguration liveConnector, TransportConfiguration liveAcceptor, TransportConfiguration... otherLiveNodes) throws Exception { - Configuration configuration = createDefaultConfig(); - configuration.getAcceptorConfigurations().clear(); - configuration.getAcceptorConfigurations().add(liveAcceptor); - configuration.getConnectorConfigurations().put(liveConnector.getName(), liveConnector); - configuration.setJournalDirectory(configuration.getJournalDirectory() + identity); - configuration.setBindingsDirectory(configuration.getBindingsDirectory() + identity); - configuration.setLargeMessagesDirectory(configuration.getLargeMessagesDirectory() + identity); - configuration.setPagingDirectory(configuration.getPagingDirectory() + identity); + Configuration configuration = createDefaultConfig() + .clearAcceptorConfigurations() + .addAcceptorConfiguration(liveAcceptor) + .addConnectorConfiguration(liveConnector.getName(), liveConnector) + .setJournalDirectory(HornetQDefaultConfiguration.getDefaultJournalDir() + identity) + .setBindingsDirectory(HornetQDefaultConfiguration.getDefaultBindingsDirectory() + identity) + .setLargeMessagesDirectory(HornetQDefaultConfiguration.getDefaultLargeMessagesDir() + identity) + .setPagingDirectory(HornetQDefaultConfiguration.getDefaultPagingDir() + identity) + .addQueueConfiguration(new CoreQueueConfiguration() + .setAddress("jms.queue.testQueue") + .setName("jms.queue.testQueue")); + List<String> transportConfigurationList = new ArrayList<>(); - final HAPolicy haPolicy = new HAPolicy(); + + final ColocatedPolicyConfiguration haPolicy = new ColocatedPolicyConfiguration(); for (TransportConfiguration otherLiveNode : otherLiveNodes) { - configuration.getConnectorConfigurations().put(otherLiveNode.getName(), otherLiveNode); + configuration.addConnectorConfiguration(otherLiveNode.getName(), otherLiveNode); transportConfigurationList.add(otherLiveNode.getName()); - haPolicy.getRemoteConnectors().add(otherLiveNode.getName()); + haPolicy.getExcludedConnectors().add(otherLiveNode.getName()); } - basicClusterConnectionConfig(configuration, liveConnector.getName(), transportConfigurationList); - configuration.getQueueConfigurations().add(new CoreQueueConfiguration("jms.queue.testQueue", "jms.queue.testQueue", null, true)); - - haPolicy.setPolicyType(policyType); - haPolicy.setBackupStrategy(backupStrategy); + configuration.addClusterConfiguration(basicClusterConnectionConfig(liveConnector.getName(), transportConfigurationList)); haPolicy.setBackupPortOffset(100); haPolicy.setBackupRequestRetries(-1); haPolicy.setBackupRequestRetryInterval(500); haPolicy.setMaxBackups(1); haPolicy.setRequestBackup(true); - configuration.setHAPolicy(haPolicy); + configuration.setHAPolicyConfiguration(haPolicy); + if (!replicated) + { + SharedStoreMasterPolicyConfiguration ssmc = new SharedStoreMasterPolicyConfiguration(); + SharedStoreSlavePolicyConfiguration sssc = new SharedStoreSlavePolicyConfiguration(); + haPolicy.setLiveConfig(ssmc); + haPolicy.setBackupConfig(sssc); + if (scaleDown) + { + sssc.setScaleDownConfiguration(new ScaleDownConfiguration()); + } + } + else + { + ReplicatedPolicyConfiguration rpc = new ReplicatedPolicyConfiguration(); + ReplicaPolicyConfiguration rpc2 = new ReplicaPolicyConfiguration(); + haPolicy.setLiveConfig(rpc); + haPolicy.setBackupConfig(rpc2); + if (scaleDown) + { + rpc2.setScaleDownConfiguration(new ScaleDownConfiguration()); + } + } return configuration; } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java index daeafeb..891be38 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java @@ -41,6 +41,7 @@ import org.hornetq.tests.integration.cluster.util.BackupSyncDelay; import org.hornetq.tests.integration.cluster.util.TestableServer; import org.hornetq.tests.util.TransportConfigurationUtils; import org.hornetq.utils.UUID; +import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -81,6 +82,27 @@ public class BackupSyncJournalTest extends FailoverTestBase } + @Override + @After + public void tearDown() throws Exception + { + try + { + File dir = new File(backupServer.getServer() + .getConfiguration() + .getLargeMessagesDirectory()); + deleteDirectory(dir); + dir = new File(liveServer.getServer() + .getConfiguration() + .getLargeMessagesDirectory()); + deleteDirectory(dir); + } + finally + { + super.tearDown(); + } + } + @Test public void testNodeID() throws Exception { @@ -246,8 +268,8 @@ public class BackupSyncJournalTest extends FailoverTestBase } /** - * @throws FileNotFoundException - * @throws IOException + * @throws java.io.FileNotFoundException + * @throws java.io.IOException * @throws InterruptedException */ private void assertNodeIdWasSaved() throws Exception @@ -309,14 +331,14 @@ public class BackupSyncJournalTest extends FailoverTestBase assertNoMoreMessages(); sendMessages(session, producer, 2 * n_msgs); - assertFalse("must NOT be a backup", liveServer.getServer().getConfiguration().getHAPolicy().isBackup()); - adaptLiveConfigForReplicatedFailBack(liveServer.getServer().getConfiguration()); + assertFalse("must NOT be a backup", liveServer.getServer().getHAPolicy().isBackup()); + adaptLiveConfigForReplicatedFailBack(liveServer); liveServer.start(); waitForServer(liveServer.getServer()); - assertTrue("must have become a backup", liveServer.getServer().getConfiguration().getHAPolicy().isBackup()); + assertTrue("must have become a backup", liveServer.getServer().getHAPolicy().isBackup()); assertTrue("Fail-back must initialize live!", liveServer.getServer().waitForActivation(15, TimeUnit.SECONDS)); - assertFalse("must be LIVE!", liveServer.getServer().getConfiguration().getHAPolicy().isBackup()); + assertFalse("must be LIVE!", liveServer.getServer().getHAPolicy().isBackup()); int i = 0; while (backupServer.isStarted() && i++ < 100) { http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java index 64469ac..ecde2ec 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java @@ -62,6 +62,8 @@ public class BackupSyncLargeMessageTest extends BackupSyncJournalTest @Test public void testDeleteLargeMessages() throws Exception { + // 200 will increase the odds of a failure + setNumberOfMessages(200); File dir = new File(backupServer.getServer().getConfiguration().getLargeMessagesDirectory()); assertEquals("Should not have any large messages... previous test failed to clean up?", 0, getAllMessageFileIds(dir).size()); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ColocatedFailoverCheckPairingsNettyTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ColocatedFailoverCheckPairingsNettyTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ColocatedFailoverCheckPairingsNettyTest.java deleted file mode 100644 index 763fa48..0000000 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ColocatedFailoverCheckPairingsNettyTest.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package org.hornetq.tests.integration.cluster.failover; - - -import org.hornetq.api.core.TransportConfiguration; -import org.hornetq.core.client.impl.TopologyMemberImpl; -import org.hornetq.core.config.Configuration; -import org.hornetq.core.config.CoreQueueConfiguration; -import org.hornetq.core.server.cluster.ha.HAPolicy; -import org.hornetq.core.server.impl.InVMNodeManager; - -import java.util.HashMap; - -public class ColocatedFailoverCheckPairingsNettyTest extends ColocatedFailoverCheckPairingsTest -{ - - protected void assertPairedCorrectly(TopologyMemberImpl m1, TopologyMemberImpl m2) - { - String backup1Port = (String) m1.getBackup().getParams().get("port"); - String backup2Port = (String) m2.getBackup().getParams().get("port"); - assertEquals(backup1Port, "5449"); - assertEquals(backup2Port, "5448"); - } - @Override - protected void createConfigs() throws Exception - { - - TransportConfiguration liveConnector1 = getConnectorTransportConfiguration(1); - liveConfiguration1 = super.createDefaultConfig(); - liveConfiguration1.getAcceptorConfigurations().clear(); - liveConfiguration1.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(1)); - liveConfiguration1.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.REPLICATED); - liveConfiguration1.getHAPolicy().setFailbackDelay(1000); - liveConfiguration1.setJournalDirectory(getTestDir() + "/live1/journal"); - liveConfiguration1.setBindingsDirectory(getTestDir() + "/live1/bindings"); - liveConfiguration1.getQueueConfigurations().add(new CoreQueueConfiguration("jms.queue.testQueue", "jms.queue.testQueue", null, true)); - - TransportConfiguration liveConnector2 = getConnectorTransportConfiguration(2); - basicClusterConnectionConfig(liveConfiguration1, liveConnector1.getName(), liveConnector2.getName()); - liveConfiguration1.getConnectorConfigurations().put(liveConnector1.getName(), liveConnector1); - liveConfiguration1.getConnectorConfigurations().put(liveConnector2.getName(), liveConnector2); - - Configuration backupConfiguration1 = liveConfiguration1.copy(); - TransportConfiguration backupConnector1 = getConnectorTransportConfiguration(3); - backupConfiguration1.getConnectorConfigurations().put(backupConnector1.getName(), backupConnector1); - backupConfiguration1.setJournalDirectory(getTestDir() + "/backup1/journal"); - - backupConfiguration1.setBindingsDirectory(getTestDir() + "/backup1/bindings"); - backupConfiguration1.getAcceptorConfigurations().clear(); - backupConfiguration1.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.BACKUP_REPLICATED); - backupConfiguration1.getClusterConfigurations().clear(); - basicClusterConnectionConfig(backupConfiguration1, backupConnector1.getName(), liveConnector1.getName()); - liveConfiguration1.getBackupServerConfigurations().add(backupConfiguration1); - - - liveConfiguration2 = super.createDefaultConfig(); - liveConfiguration2.getAcceptorConfigurations().clear(); - liveConfiguration2.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(2)); - liveConfiguration2.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.REPLICATED); - liveConfiguration2.getHAPolicy().setFailbackDelay(1000); - liveConfiguration2.setJournalDirectory(getTestDir() + "/live2/journal"); - liveConfiguration2.setBindingsDirectory(getTestDir() + "/live2/bindings"); - liveConfiguration2.getQueueConfigurations().add(new CoreQueueConfiguration("jms.queue.testQueue", "jms.queue.testQueue", null, true)); - - basicClusterConnectionConfig(liveConfiguration2, liveConnector2.getName(), liveConnector1.getName()); - liveConfiguration2.getConnectorConfigurations().put(liveConnector1.getName(), liveConnector1); - liveConfiguration2.getConnectorConfigurations().put(liveConnector2.getName(), liveConnector2); - - Configuration backupConfiguration2 = liveConfiguration2.copy(); - TransportConfiguration backupConnector2 = getConnectorTransportConfiguration(4); - backupConfiguration2.getAcceptorConfigurations().clear(); - backupConfiguration2.getConnectorConfigurations().put(backupConnector2.getName(), backupConnector2); - backupConfiguration2.setJournalDirectory(getTestDir() + "/backup2/journal"); - backupConfiguration2.setBindingsDirectory(getTestDir() + "/backup2/bindings"); - backupConfiguration2.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.BACKUP_REPLICATED); - backupConfiguration2.getClusterConfigurations().clear(); - basicClusterConnectionConfig(backupConfiguration2, backupConnector2.getName(), liveConnector2.getName()); - liveConfiguration2.getBackupServerConfigurations().add(backupConfiguration2); - - nodeManagerLive1 = new InVMNodeManager(true, backupConfiguration2.getJournalDirectory()); - nodeManagerLive2 = new InVMNodeManager(true, backupConfiguration1.getJournalDirectory()); - - liveServer1 = createTestableServer(liveConfiguration1, nodeManagerLive1, nodeManagerLive2, 1); - liveServer2 = createTestableServer(liveConfiguration2, nodeManagerLive2, nodeManagerLive1, 2); - } - - @Override - protected TransportConfiguration getAcceptorTransportConfiguration(int node) - { - HashMap<String, Object> params = new HashMap<>(); - params.put("port", "" + (5445 + node)); - return new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params); - } - - @Override - protected TransportConfiguration getConnectorTransportConfiguration(int node) - { - HashMap<String, Object> params = new HashMap<>(); - params.put("port", "" + (5445 + node)); - return new TransportConfiguration(NETTY_CONNECTOR_FACTORY, params); - } -} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ColocatedFailoverCheckPairingsTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ColocatedFailoverCheckPairingsTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ColocatedFailoverCheckPairingsTest.java deleted file mode 100644 index 976dbfb..0000000 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ColocatedFailoverCheckPairingsTest.java +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package org.hornetq.tests.integration.cluster.failover; - - -import java.util.HashMap; - -import org.hornetq.api.core.TransportConfiguration; -import org.hornetq.api.core.client.ClientSessionFactory; -import org.hornetq.api.core.client.HornetQClient; -import org.hornetq.api.core.client.ServerLocator; -import org.hornetq.core.client.impl.Topology; -import org.hornetq.core.client.impl.TopologyMemberImpl; -import org.hornetq.core.config.Configuration; -import org.hornetq.core.config.CoreQueueConfiguration; -import org.hornetq.core.server.NodeManager; -import org.hornetq.core.server.cluster.ha.HAPolicy; -import org.hornetq.core.server.impl.InVMNodeManager; -import org.hornetq.tests.integration.cluster.util.SameProcessHornetQServer; -import org.hornetq.tests.integration.cluster.util.TestableServer; -import org.hornetq.tests.util.ServiceTestBase; -import org.junit.After; -import org.junit.Test; - -public class ColocatedFailoverCheckPairingsTest extends ServiceTestBase -{ - TestableServer liveServer1; - - protected TestableServer liveServer2; - - protected Configuration liveConfiguration1; - - protected Configuration liveConfiguration2; - - protected NodeManager nodeManagerLive1; - - protected NodeManager nodeManagerLive2; - - @Override - public void setUp() throws Exception - { - super.setUp(); - } - - @Test - public void testPairings() throws Exception - { - createConfigs(); - - liveServer1.start(); - liveServer2.start(); - waitForServer(liveServer1.getServer()); - waitForServer(liveServer2.getServer()); - - ServerLocator locator = HornetQClient.createServerLocator(true, getConnectorTransportConfiguration(1)); - - waitForTopology(liveServer1.getServer(), 2, 2); - waitForTopology(liveServer2.getServer(), 2, 2); - - ClientSessionFactory sessionFactory = locator.createSessionFactory(); - - Topology topology = sessionFactory.getServerLocator().getTopology(); - System.out.println(topology.describe()); - TopologyMemberImpl m1 = topology.getMember(liveServer1.getServer().getNodeID().toString()); - TopologyMemberImpl m2 = topology.getMember(liveServer2.getServer().getNodeID().toString()); - - assertPairedCorrectly(m1, m2); - } - - protected void assertPairedCorrectly(TopologyMemberImpl m1, TopologyMemberImpl m2) - { - assertEquals(m1.getLive(), m2.getBackup()); - assertEquals(m2.getLive(), m1.getBackup()); - } - - @Override - @After - public void tearDown() throws Exception - { - liveServer1.stop(); - liveServer2.stop(); - super.tearDown(); - } - - protected void createConfigs() throws Exception - { - nodeManagerLive1 = new InVMNodeManager(false); - nodeManagerLive2 = new InVMNodeManager(false); - - TransportConfiguration liveConnector1 = getConnectorTransportConfiguration(1); - liveConfiguration1 = super.createDefaultConfig(); - liveConfiguration1.getAcceptorConfigurations().clear(); - liveConfiguration1.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(1)); - liveConfiguration1.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.SHARED_STORE); - liveConfiguration1.getHAPolicy().setFailbackDelay(1000); - liveConfiguration1.setJournalDirectory(getTestDir() + "/live1"); - liveConfiguration1.getQueueConfigurations().add(new CoreQueueConfiguration("jms.queue.testQueue", "jms.queue.testQueue", null, true)); - - TransportConfiguration liveConnector2 = getConnectorTransportConfiguration(2); - basicClusterConnectionConfig(liveConfiguration1, liveConnector1.getName(), liveConnector2.getName()); - liveConfiguration1.getConnectorConfigurations().put(liveConnector1.getName(), liveConnector1); - liveConfiguration1.getConnectorConfigurations().put(liveConnector2.getName(), liveConnector2); - - Configuration backupConfiguration1 = liveConfiguration1.copy(); - backupConfiguration1.setJournalDirectory(getTestDir() + "/live2"); - backupConfiguration1.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.BACKUP_SHARED_STORE); - liveConfiguration1.getBackupServerConfigurations().add(backupConfiguration1); - - liveServer1 = createTestableServer(liveConfiguration1, nodeManagerLive1, nodeManagerLive2, 1); - - liveConfiguration2 = super.createDefaultConfig(); - liveConfiguration2.getAcceptorConfigurations().clear(); - liveConfiguration2.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(2)); - liveConfiguration2.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.SHARED_STORE); - liveConfiguration2.getHAPolicy().setFailbackDelay(1000); - liveConfiguration2.setJournalDirectory(getTestDir() + "/live2"); - liveConfiguration2.getQueueConfigurations().add(new CoreQueueConfiguration("jms.queue.testQueue", "jms.queue.testQueue", null, true)); - - basicClusterConnectionConfig(liveConfiguration2, liveConnector2.getName(), liveConnector1.getName()); - liveConfiguration2.getConnectorConfigurations().put(liveConnector1.getName(), liveConnector1); - liveConfiguration2.getConnectorConfigurations().put(liveConnector2.getName(), liveConnector2); - - Configuration backupConfiguration2 = liveConfiguration2.copy(); - backupConfiguration2.setJournalDirectory(getTestDir() + "/live1"); - backupConfiguration2.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.BACKUP_SHARED_STORE); - liveConfiguration2.getBackupServerConfigurations().add(backupConfiguration2); - - liveServer2 = createTestableServer(liveConfiguration2, nodeManagerLive2, nodeManagerLive1, 2); - } - - protected TransportConfiguration getAcceptorTransportConfiguration(int node) - { - HashMap<String, Object> params = new HashMap<>(); - params.put("server-id", "" + node); - return new TransportConfiguration(INVM_ACCEPTOR_FACTORY, params); - } - - protected TransportConfiguration getConnectorTransportConfiguration(int node) - { - HashMap<String, Object> params = new HashMap<>(); - params.put("server-id", "" + node); - return new TransportConfiguration(INVM_CONNECTOR_FACTORY, params); - } - - protected TestableServer createTestableServer(Configuration config, NodeManager liveNodeManager, NodeManager backupNodeManager, int id) - { - return new SameProcessHornetQServer( - createColocatedInVMFailoverServer(true, config, liveNodeManager, backupNodeManager, id)); - } -}
