http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java index 9b9d99d..b73d2e0 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java @@ -32,10 +32,13 @@ import org.hornetq.core.client.impl.ClientSessionFactoryInternal; import org.hornetq.core.client.impl.ServerLocatorInternal; import org.hornetq.core.config.ClusterConnectionConfiguration; import org.hornetq.core.config.Configuration; +import org.hornetq.core.config.ha.ReplicaPolicyConfiguration; +import org.hornetq.core.config.ha.SharedStoreMasterPolicyConfiguration; +import org.hornetq.core.config.ha.SharedStoreSlavePolicyConfiguration; import org.hornetq.core.remoting.impl.invm.InVMConnector; import org.hornetq.core.remoting.impl.invm.InVMRegistry; import org.hornetq.core.server.NodeManager; -import org.hornetq.core.server.cluster.ha.HAPolicy; +import org.hornetq.core.server.cluster.ha.ReplicatedPolicy; import org.hornetq.core.server.impl.HornetQServerImpl; import org.hornetq.core.server.impl.InVMNodeManager; import org.hornetq.tests.integration.cluster.util.SameProcessHornetQServer; @@ -110,7 +113,8 @@ public abstract class FailoverTestBase extends ServiceTestBase protected void setBackupIdentity() { - backupServer.setIdentity(this.getClass().getSimpleName() + "/backupServers"); + backupServer.setIdentity(this.getClass() + .getSimpleName() + "/backupServers"); } protected void setLiveIdentity() @@ -120,7 +124,9 @@ public abstract class FailoverTestBase extends ServiceTestBase protected TestableServer createTestableServer(Configuration config) { - return new SameProcessHornetQServer(createInVMFailoverServer(true, config, nodeManager, config.getHAPolicy().isBackup() ? 2 : 1)); + boolean isBackup = config.getHAPolicyConfiguration() instanceof ReplicaPolicyConfiguration || + config.getHAPolicyConfiguration() instanceof SharedStoreSlavePolicyConfiguration; + return new SameProcessHornetQServer(createInVMFailoverServer(true, config, nodeManager, isBackup ? 2 : 1)); } protected TestableServer createColocatedTestableServer(Configuration config, NodeManager liveNodeManager,NodeManager backupNodeManager, int id) @@ -166,34 +172,33 @@ public abstract class FailoverTestBase extends ServiceTestBase protected void createConfigs() throws Exception { nodeManager = new InVMNodeManager(false); - - backupConfig = super.createDefaultConfig(); - backupConfig.getAcceptorConfigurations().clear(); - backupConfig.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(false)); - backupConfig.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.BACKUP_SHARED_STORE); - backupConfig.getHAPolicy().setFailbackDelay(1000); - TransportConfiguration liveConnector = getConnectorTransportConfiguration(true); TransportConfiguration backupConnector = getConnectorTransportConfiguration(false); - backupConfig.getConnectorConfigurations().put(liveConnector.getName(), liveConnector); - backupConfig.getConnectorConfigurations().put(backupConnector.getName(), backupConnector); - basicClusterConnectionConfig(backupConfig, backupConnector.getName(), liveConnector.getName()); + + backupConfig = super.createDefaultConfig() + .clearAcceptorConfigurations() + .addAcceptorConfiguration(getAcceptorTransportConfiguration(false)) + .setHAPolicyConfiguration(new SharedStoreSlavePolicyConfiguration() + .setFailbackDelay(1000)) + .addConnectorConfiguration(liveConnector.getName(), liveConnector) + .addConnectorConfiguration(backupConnector.getName(), backupConnector) + .addClusterConfiguration(basicClusterConnectionConfig(backupConnector.getName(), liveConnector.getName())); + backupServer = createTestableServer(backupConfig); - liveConfig = super.createDefaultConfig(); - liveConfig.getAcceptorConfigurations().clear(); - liveConfig.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(true)); - liveConfig.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.SHARED_STORE); - liveConfig.getHAPolicy().setFailbackDelay(1000); + liveConfig = super.createDefaultConfig() + .clearAcceptorConfigurations() + .addAcceptorConfiguration(getAcceptorTransportConfiguration(true)) + .setHAPolicyConfiguration(new SharedStoreMasterPolicyConfiguration() + .setFailbackDelay(1000)) + .addClusterConfiguration(basicClusterConnectionConfig(liveConnector.getName())) + .addConnectorConfiguration(liveConnector.getName(), liveConnector); - basicClusterConnectionConfig(liveConfig, liveConnector.getName()); - liveConfig.getConnectorConfigurations().put(liveConnector.getName(), liveConnector); liveServer = createTestableServer(liveConfig); } protected void createReplicatedConfigs() throws Exception { - final TransportConfiguration liveConnector = getConnectorTransportConfiguration(true); final TransportConfiguration backupConnector = getConnectorTransportConfiguration(false); final TransportConfiguration backupAcceptor = getAcceptorTransportConfiguration(false); @@ -201,29 +206,39 @@ public abstract class FailoverTestBase extends ServiceTestBase backupConfig = createDefaultConfig(); liveConfig = createDefaultConfig(); - ReplicatedBackupUtils.configureReplicationPair(backupConfig, backupConnector, backupAcceptor, liveConfig, - liveConnector); + ReplicatedBackupUtils.configureReplicationPair(backupConfig, backupConnector, backupAcceptor, liveConfig, liveConnector); final String suffix = "_backup"; - backupConfig.setBindingsDirectory(backupConfig.getBindingsDirectory() + suffix); - backupConfig.setJournalDirectory(backupConfig.getJournalDirectory() + suffix); - backupConfig.setPagingDirectory(backupConfig.getPagingDirectory() + suffix); - backupConfig.setLargeMessagesDirectory(backupConfig.getLargeMessagesDirectory() + suffix); - backupConfig.setSecurityEnabled(false); - backupConfig.setMaxSavedReplicatedJournalSize(0); + backupConfig.setBindingsDirectory(backupConfig.getBindingsDirectory() + suffix) + .setJournalDirectory(backupConfig.getJournalDirectory() + suffix) + .setPagingDirectory(backupConfig.getPagingDirectory() + suffix) + .setLargeMessagesDirectory(backupConfig.getLargeMessagesDirectory() + suffix) + .setSecurityEnabled(false); + + setupHAPolicyConfiguration(); nodeManager = new InVMNodeManager(true, backupConfig.getJournalDirectory()); backupServer = createTestableServer(backupConfig); - liveConfig.getAcceptorConfigurations().clear(); - liveConfig.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(true)); + + liveConfig.clearAcceptorConfigurations() + .addAcceptorConfiguration(getAcceptorTransportConfiguration(true)); liveServer = createTestableServer(liveConfig); } - protected final void adaptLiveConfigForReplicatedFailBack(Configuration configuration) + protected void setupHAPolicyConfiguration() + { + ((ReplicaPolicyConfiguration) backupConfig.getHAPolicyConfiguration()) + .setMaxSavedReplicatedJournalsSize(0) + .setAllowFailBack(true) + .setFailbackDelay(5000); + } + + protected final void adaptLiveConfigForReplicatedFailBack(TestableServer server) { + Configuration configuration = server.getServer().getConfiguration(); final TransportConfiguration backupConnector = getConnectorTransportConfiguration(false); - if (configuration.getHAPolicy().isSharedStore()) + if (server.getServer().getHAPolicy().isSharedStore()) { ClusterConnectionConfiguration cc = configuration.getClusterConfigurations().get(0); assertNotNull("cluster connection configuration", cc); @@ -233,7 +248,8 @@ public abstract class FailoverTestBase extends ServiceTestBase configuration.getConnectorConfigurations().put(backupConnector.getName(), backupConnector); return; } - configuration.setCheckForLiveServer(true); + ReplicatedPolicy haPolicy = (ReplicatedPolicy) server.getServer().getHAPolicy(); + haPolicy.setCheckForLiveServer(true); } @Override @@ -297,7 +313,7 @@ public abstract class FailoverTestBase extends ServiceTestBase protected void waitForBackup(ClientSessionFactoryInternal sessionFactory, int seconds) throws Exception { final HornetQServerImpl actualServer = (HornetQServerImpl) backupServer.getServer(); - if (actualServer.getConfiguration().getHAPolicy().isSharedStore()) + if (actualServer.getHAPolicy().isSharedStore()) { waitForServer(actualServer); }
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java index 27352d2..f9f2ed0 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java @@ -13,13 +13,17 @@ package org.hornetq.tests.integration.cluster.failover; import java.util.Collection; +import java.util.concurrent.TimeUnit; import org.hornetq.api.core.Message; import org.hornetq.api.core.SimpleString; import org.hornetq.api.core.client.ClientSessionFactory; import org.hornetq.api.core.client.ServerLocator; import org.hornetq.core.client.impl.TopologyMemberImpl; +import org.hornetq.core.config.ha.ReplicaPolicyConfiguration; +import org.hornetq.core.config.ha.ReplicatedPolicyConfiguration; import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration; +import org.hornetq.core.server.impl.SharedNothingBackupActivation; import org.hornetq.tests.integration.cluster.distribution.ClusterTestBase; import org.hornetq.tests.util.ServiceTestBase; import org.junit.Test; @@ -36,9 +40,9 @@ public abstract class GroupingFailoverTestBase extends ClusterTestBase { setupBackupServer(2, 0, isFileStorage(), isSharedStore(), isNetty()); - setupLiveServer(0, isFileStorage(), isSharedStore(), isNetty()); + setupLiveServer(0, isFileStorage(), isSharedStore(), isNetty(), false); - setupLiveServer(1, isFileStorage(), isSharedStore(), isNetty()); + setupLiveServer(1, isFileStorage(), isSharedStore(), isNetty(), false); setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1); @@ -51,10 +55,12 @@ public abstract class GroupingFailoverTestBase extends ClusterTestBase setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 2); - - servers[0].getConfiguration().getHAPolicy().setBackupGroupName("group1"); - servers[1].getConfiguration().getHAPolicy().setBackupGroupName("group2"); - servers[2].getConfiguration().getHAPolicy().setBackupGroupName("group1"); + if (!isSharedStore()) + { + ((ReplicatedPolicyConfiguration)servers[0].getConfiguration().getHAPolicyConfiguration()).setGroupName("group1"); + ((ReplicatedPolicyConfiguration)servers[1].getConfiguration().getHAPolicyConfiguration()).setGroupName("group2"); + ((ReplicaPolicyConfiguration)servers[2].getConfiguration().getHAPolicyConfiguration()).setGroupName("group1"); + } startServers(0, 1, 2); setupSessionFactory(0, isNetty()); @@ -133,9 +139,9 @@ public abstract class GroupingFailoverTestBase extends ClusterTestBase { setupBackupServer(2, 0, isFileStorage(), isSharedStore(), isNetty()); - setupLiveServer(0, isFileStorage(), isSharedStore(), isNetty()); + setupLiveServer(0, isFileStorage(), isSharedStore(), isNetty(), false); - setupLiveServer(1, isFileStorage(), isSharedStore(), isNetty()); + setupLiveServer(1, isFileStorage(), isSharedStore(), isNetty(), false); setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1); @@ -149,9 +155,12 @@ public abstract class GroupingFailoverTestBase extends ClusterTestBase setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 2); - servers[0].getConfiguration().getHAPolicy().setBackupGroupName("group1"); - servers[1].getConfiguration().getHAPolicy().setBackupGroupName("group2"); - servers[2].getConfiguration().getHAPolicy().setBackupGroupName("group1"); + if (!isSharedStore()) + { + ((ReplicatedPolicyConfiguration)servers[0].getConfiguration().getHAPolicyConfiguration()).setGroupName("group1"); + ((ReplicatedPolicyConfiguration)servers[1].getConfiguration().getHAPolicyConfiguration()).setGroupName("group2"); + ((ReplicaPolicyConfiguration)servers[2].getConfiguration().getHAPolicyConfiguration()).setGroupName("group1"); + } startServers(0, 1, 2); @@ -189,7 +198,8 @@ public abstract class GroupingFailoverTestBase extends ClusterTestBase if (!isSharedStore()) { - waitForBackupTopologyAnnouncement(sfs[0]); + SharedNothingBackupActivation backupActivation = (SharedNothingBackupActivation) servers[2].getActivation(); + assertTrue(backupActivation.waitForBackupSync(10, TimeUnit.SECONDS)); } closeSessionFactory(0); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/LiveToLiveFailoverTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/LiveToLiveFailoverTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/LiveToLiveFailoverTest.java index 5028901..5d49d45 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/LiveToLiveFailoverTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/LiveToLiveFailoverTest.java @@ -18,9 +18,11 @@ import org.hornetq.api.core.client.ClientProducer; import org.hornetq.api.core.client.ClientSession; import org.hornetq.api.core.client.ServerLocator; import org.hornetq.core.client.impl.ClientSessionFactoryInternal; -import org.hornetq.core.config.BackupStrategy; +import org.hornetq.core.config.ScaleDownConfiguration; +import org.hornetq.core.config.ha.ColocatedPolicyConfiguration; +import org.hornetq.core.config.ha.SharedStoreMasterPolicyConfiguration; +import org.hornetq.core.config.ha.SharedStoreSlavePolicyConfiguration; import org.hornetq.core.server.HornetQServer; -import org.hornetq.core.server.cluster.ha.HAPolicy; import org.hornetq.core.server.impl.InVMNodeManager; import org.hornetq.tests.util.TransportConfigurationUtils; import org.junit.Assert; @@ -47,38 +49,39 @@ public class LiveToLiveFailoverTest extends FailoverTest { nodeManager0 = new InVMNodeManager(false); nodeManager1 = new InVMNodeManager(false); - - backupConfig = super.createDefaultConfig(1); - backupConfig.getAcceptorConfigurations().clear(); - backupConfig.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(true, 1)); - backupConfig.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.COLOCATED_SHARED_STORE); - backupConfig.getHAPolicy().setFailbackDelay(1000); - TransportConfiguration liveConnector0 = getConnectorTransportConfiguration(true, 0); TransportConfiguration liveConnector1 = getConnectorTransportConfiguration(true, 1); - backupConfig.getConnectorConfigurations().put(liveConnector0.getName(), liveConnector0); - backupConfig.getConnectorConfigurations().put(liveConnector1.getName(), liveConnector1); - backupConfig.getHAPolicy().setBackupStrategy(BackupStrategy.SCALE_DOWN); - backupConfig.getHAPolicy().getScaleDownConnectors().add(liveConnector1.getName()); - backupConfig.getHAPolicy().setRequestBackup(true); - backupConfig.getHAPolicy().setBackupRequestRetryInterval(1000); - basicClusterConnectionConfig(backupConfig, liveConnector1.getName(), liveConnector0.getName()); - backupServer = createColocatedTestableServer(backupConfig, nodeManager1, nodeManager0, 1); - - liveConfig = super.createDefaultConfig(0); - liveConfig.getAcceptorConfigurations().clear(); - liveConfig.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(true, 0)); - liveConfig.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.COLOCATED_SHARED_STORE); - liveConfig.getHAPolicy().setFailbackDelay(1000); - - basicClusterConnectionConfig(liveConfig, liveConnector0.getName(), liveConnector1.getName()); - liveConfig.getConnectorConfigurations().put(liveConnector0.getName(), liveConnector0); - liveConfig.getConnectorConfigurations().put(liveConnector1.getName(), liveConnector1); - liveConfig.getHAPolicy().setBackupStrategy(BackupStrategy.SCALE_DOWN); - liveConfig.getHAPolicy().setScaleDownClustername(liveConnector0.getName()); - liveConfig.getHAPolicy().setRequestBackup(true); - liveConfig.getHAPolicy().setBackupRequestRetryInterval(1000); + backupConfig = super.createDefaultConfig(1) + .clearAcceptorConfigurations() + .addAcceptorConfiguration(getAcceptorTransportConfiguration(true, 1)) + .setHAPolicyConfiguration(new ColocatedPolicyConfiguration() + .setRequestBackup(true) + .setLiveConfig(new SharedStoreMasterPolicyConfiguration()) + .setBackupConfig(new SharedStoreSlavePolicyConfiguration() + .setFailbackDelay(1000) + .setScaleDownConfiguration(new ScaleDownConfiguration() + .addConnector(liveConnector1.getName())))) + .addConnectorConfiguration(liveConnector0.getName(), liveConnector0) + .addConnectorConfiguration(liveConnector1.getName(), liveConnector1) + .addClusterConfiguration(basicClusterConnectionConfig(liveConnector1.getName(), liveConnector0.getName())); + + backupServer = createColocatedTestableServer(backupConfig, nodeManager1, nodeManager0, 1); + + liveConfig = super.createDefaultConfig(0) + .clearAcceptorConfigurations() + .addAcceptorConfiguration(getAcceptorTransportConfiguration(true, 0)) + .setHAPolicyConfiguration(new ColocatedPolicyConfiguration() + .setRequestBackup(true) + .setBackupRequestRetryInterval(1000) + .setLiveConfig(new SharedStoreMasterPolicyConfiguration()) + .setBackupConfig(new SharedStoreSlavePolicyConfiguration() + .setFailbackDelay(1000) + .setScaleDownConfiguration(new ScaleDownConfiguration()))) + .addConnectorConfiguration(liveConnector0.getName(), liveConnector0) + .addConnectorConfiguration(liveConnector1.getName(), liveConnector1) + .addClusterConfiguration(basicClusterConnectionConfig(liveConnector0.getName(), liveConnector1.getName())); + liveServer = createColocatedTestableServer(liveConfig, nodeManager0, nodeManager1, 0); } @@ -334,6 +337,12 @@ public class LiveToLiveFailoverTest extends FailoverTest } @Override + public void testFailoverMultipleSessionsWithConsumers() throws Exception + { + // + } + + @Override public void testTimeoutOnFailover() throws Exception { } @@ -388,6 +397,11 @@ public class LiveToLiveFailoverTest extends FailoverTest { } + @Override + public void testCommitOccurredUnblockedAndResendNoDuplicates() throws Exception + { + } + /*@Override public void testCommitDidNotOccurUnblockedAndResend() throws Exception { http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java index 614e273..a6da777 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java @@ -17,14 +17,18 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.hornetq.api.config.HornetQDefaultConfiguration; import org.hornetq.api.core.TransportConfiguration; import org.hornetq.api.core.client.ClientSession; import org.hornetq.api.core.client.ServerLocator; import org.hornetq.core.client.impl.ClientSessionFactoryInternal; import org.hornetq.core.client.impl.ServerLocatorInternal; import org.hornetq.core.config.Configuration; +import org.hornetq.core.config.ha.ReplicaPolicyConfiguration; +import org.hornetq.core.config.ha.ReplicatedPolicyConfiguration; +import org.hornetq.core.config.ha.SharedStoreMasterPolicyConfiguration; +import org.hornetq.core.config.ha.SharedStoreSlavePolicyConfiguration; import org.hornetq.core.server.NodeManager; -import org.hornetq.core.server.cluster.ha.HAPolicy; import org.hornetq.core.server.impl.InVMNodeManager; import org.hornetq.tests.integration.cluster.util.SameProcessHornetQServer; import org.hornetq.tests.integration.cluster.util.TestableServer; @@ -151,87 +155,62 @@ public class MultipleLivesMultipleBackupsFailoverTest extends MultipleBackupsFai int[] otherBackupNodes, int... otherClusterNodes) throws Exception { - Configuration config1 = super.createDefaultConfig(); - config1.getAcceptorConfigurations().clear(); - config1.getAcceptorConfigurations().add(createTransportConfiguration(isNetty(), - true, - generateParams(nodeid, isNetty()))); - config1.setSecurityEnabled(false); - - if (sharedStore) - config1.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.BACKUP_SHARED_STORE); - else - config1.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.BACKUP_REPLICATED); + Configuration config1 = super.createDefaultConfig() + .clearAcceptorConfigurations() + .addAcceptorConfiguration(createTransportConfiguration(isNetty(), true, generateParams(nodeid, isNetty()))) + .setSecurityEnabled(false) + .setHAPolicyConfiguration(sharedStore ? new SharedStoreSlavePolicyConfiguration() : new ReplicaPolicyConfiguration()) + .setBindingsDirectory(HornetQDefaultConfiguration.getDefaultBindingsDirectory() + "_" + liveNode) + .setJournalDirectory(HornetQDefaultConfiguration.getDefaultJournalDir() + "_" + liveNode) + .setPagingDirectory(HornetQDefaultConfiguration.getDefaultPagingDir() + "_" + liveNode) + .setLargeMessagesDirectory(HornetQDefaultConfiguration.getDefaultLargeMessagesDir() + "_" + liveNode); - List<String> staticConnectors = new ArrayList<String>(); for (int node : otherBackupNodes) { - TransportConfiguration liveConnector = createTransportConfiguration(isNetty(), - false, - generateParams(node, isNetty())); - config1.getConnectorConfigurations().put(liveConnector.getName(), liveConnector); - staticConnectors.add(liveConnector.getName()); + TransportConfiguration liveConnector = createTransportConfiguration(isNetty(), false, generateParams(node, isNetty())); + config1.addConnectorConfiguration(liveConnector.getName(), liveConnector); } - TransportConfiguration backupConnector = createTransportConfiguration(isNetty(), - false, - generateParams(nodeid, isNetty())); - config1.getConnectorConfigurations().put(backupConnector.getName(), backupConnector); + + TransportConfiguration backupConnector = createTransportConfiguration(isNetty(), false, generateParams(nodeid, isNetty())); + config1.addConnectorConfiguration(backupConnector.getName(), backupConnector); List<String> clusterNodes = new ArrayList<String>(); for (int node : otherClusterNodes) { - TransportConfiguration connector = createTransportConfiguration(isNetty(), - false, - generateParams(node, isNetty())); - config1.getConnectorConfigurations().put(connector.getName(), connector); + TransportConfiguration connector = createTransportConfiguration(isNetty(), false, generateParams(node, isNetty())); + config1.addConnectorConfiguration(connector.getName(), connector); clusterNodes.add(connector.getName()); } - basicClusterConnectionConfig(config1, backupConnector.getName(), clusterNodes); - config1.setBindingsDirectory(config1.getBindingsDirectory() + "_" + liveNode); - config1.setJournalDirectory(config1.getJournalDirectory() + "_" + liveNode); - config1.setPagingDirectory(config1.getPagingDirectory() + "_" + liveNode); - config1.setLargeMessagesDirectory(config1.getLargeMessagesDirectory() + "_" + liveNode); + config1.addClusterConfiguration(basicClusterConnectionConfig(backupConnector.getName(), clusterNodes)); servers.put(nodeid, new SameProcessHornetQServer(createInVMFailoverServer(true, config1, nodeManager, liveNode))); } protected void createLiveConfig(NodeManager nodeManager, int liveNode, int... otherLiveNodes) throws Exception { - TransportConfiguration liveConnector = createTransportConfiguration(isNetty(), - false, - generateParams(liveNode, isNetty())); - Configuration config0 = super.createDefaultConfig(); - config0.getAcceptorConfigurations().clear(); - config0.getAcceptorConfigurations().add(createTransportConfiguration(isNetty(), - true, - generateParams(liveNode, isNetty()))); - config0.setSecurityEnabled(false); - - if (sharedStore) - config0.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.SHARED_STORE); - else - config0.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.REPLICATED); + TransportConfiguration liveConnector = createTransportConfiguration(isNetty(), false,generateParams(liveNode, isNetty())); + + Configuration config0 = super.createDefaultConfig() + .clearAcceptorConfigurations() + .addAcceptorConfiguration(createTransportConfiguration(isNetty(), true, generateParams(liveNode, isNetty()))) + .setSecurityEnabled(false) + .setHAPolicyConfiguration(sharedStore ? new SharedStoreMasterPolicyConfiguration() : new ReplicatedPolicyConfiguration()) + .setBindingsDirectory(HornetQDefaultConfiguration.getDefaultBindingsDirectory() + "_" + liveNode) + .setJournalDirectory(HornetQDefaultConfiguration.getDefaultJournalDir() + "_" + liveNode) + .setPagingDirectory(HornetQDefaultConfiguration.getDefaultPagingDir() + "_" + liveNode) + .setLargeMessagesDirectory(HornetQDefaultConfiguration.getDefaultLargeMessagesDir() + "_" + liveNode) + .addConnectorConfiguration(liveConnector.getName(), liveConnector); List<String> pairs = new ArrayList<String>(); for (int node : otherLiveNodes) { - TransportConfiguration otherLiveConnector = createTransportConfiguration(isNetty(), - false, - generateParams(node, isNetty())); - config0.getConnectorConfigurations().put(otherLiveConnector.getName(), otherLiveConnector); + TransportConfiguration otherLiveConnector = createTransportConfiguration(isNetty(), false, generateParams(node, isNetty())); + config0.addConnectorConfiguration(otherLiveConnector.getName(), otherLiveConnector); pairs.add(otherLiveConnector.getName()); - } - basicClusterConnectionConfig(config0, liveConnector.getName(), pairs); - config0.getConnectorConfigurations().put(liveConnector.getName(), liveConnector); - - config0.setBindingsDirectory(config0.getBindingsDirectory() + "_" + liveNode); - config0.setJournalDirectory(config0.getJournalDirectory() + "_" + liveNode); - config0.setPagingDirectory(config0.getPagingDirectory() + "_" + liveNode); - config0.setLargeMessagesDirectory(config0.getLargeMessagesDirectory() + "_" + liveNode); + config0.addClusterConfiguration(basicClusterConnectionConfig(liveConnector.getName(), pairs)); - servers.put(liveNode, - new SameProcessHornetQServer(createInVMFailoverServer(true, config0, nodeManager, liveNode))); + servers.put(liveNode, new SameProcessHornetQServer(createInVMFailoverServer(true, config0, nodeManager, liveNode))); } @Override http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/MultipleServerFailoverTestBase.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/MultipleServerFailoverTestBase.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/MultipleServerFailoverTestBase.java index c538faa..05f593e 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/MultipleServerFailoverTestBase.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/MultipleServerFailoverTestBase.java @@ -24,11 +24,15 @@ import org.hornetq.api.core.client.HornetQClient; import org.hornetq.api.core.client.ServerLocator; import org.hornetq.core.client.impl.ServerLocatorInternal; import org.hornetq.core.config.Configuration; +import org.hornetq.core.config.HAPolicyConfiguration; +import org.hornetq.core.config.ha.ReplicaPolicyConfiguration; +import org.hornetq.core.config.ha.ReplicatedPolicyConfiguration; +import org.hornetq.core.config.ha.SharedStoreMasterPolicyConfiguration; +import org.hornetq.core.config.ha.SharedStoreSlavePolicyConfiguration; import org.hornetq.core.server.HornetQServer; import org.hornetq.core.server.HornetQServerLogger; import org.hornetq.core.server.NodeManager; import org.hornetq.core.server.Queue; -import org.hornetq.core.server.cluster.ha.HAPolicy; import org.hornetq.tests.integration.cluster.util.SameProcessHornetQServer; import org.hornetq.tests.integration.cluster.util.TestableServer; import org.hornetq.tests.util.ServiceTestBase; @@ -66,10 +70,6 @@ public abstract class MultipleServerFailoverTestBase extends ServiceTestBase public abstract boolean isSharedStore(); - public abstract void createLiveClusterConfiguration(int server, Configuration configuration, int servers); - - public abstract void createBackupClusterConfiguration(int server, Configuration configuration, int servers); - public abstract String getNodeGroupName(); @Override @@ -81,16 +81,29 @@ public abstract class MultipleServerFailoverTestBase extends ServiceTestBase backupServers = new ArrayList<TestableServer>(); backupConfigs = new ArrayList<Configuration>(); liveConfigs = new ArrayList<Configuration>(); + for (int i = 0; i < getLiveServerCount(); i++) { - Configuration configuration = createDefaultConfig(useNetty()); - configuration.getAcceptorConfigurations().clear(); - configuration.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(true, i)); + HAPolicyConfiguration haPolicyConfiguration = null; if (isSharedStore()) - configuration.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.SHARED_STORE); + { + haPolicyConfiguration = new SharedStoreMasterPolicyConfiguration(); + ((SharedStoreMasterPolicyConfiguration)haPolicyConfiguration).setFailbackDelay(1000); + } else - configuration.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.REPLICATED); + { + haPolicyConfiguration = new ReplicatedPolicyConfiguration(); + if (getNodeGroupName() != null) + { + ((ReplicatedPolicyConfiguration)haPolicyConfiguration).setGroupName(getNodeGroupName() + "-" + i); + } + } + + Configuration configuration = createDefaultConfig(useNetty()) + .clearAcceptorConfigurations() + .addAcceptorConfiguration(getAcceptorTransportConfiguration(true, i)) + .setHAPolicyConfiguration(haPolicyConfiguration); if (!isSharedStore()) { @@ -103,12 +116,21 @@ public abstract class MultipleServerFailoverTestBase extends ServiceTestBase { //todo } - configuration.getHAPolicy().setFailbackDelay(1000); - if (getNodeGroupName() != null) + + TransportConfiguration livetc = getConnectorTransportConfiguration(true, i); + configuration.addConnectorConfiguration(livetc.getName(), livetc); + List<String> connectors = new ArrayList<String>(); + for (int j = 0; j < getLiveServerCount(); j++) { - configuration.getHAPolicy().setBackupGroupName(getNodeGroupName() + "-" + i); + if (j != i) + { + TransportConfiguration staticTc = getConnectorTransportConfiguration(true, j); + configuration.getConnectorConfigurations().put(staticTc.getName(), staticTc); + connectors.add(staticTc.getName()); + } } - createLiveClusterConfiguration(i, configuration, getLiveServerCount()); + + configuration.addClusterConfiguration(basicClusterConnectionConfig(livetc.getName(), connectors)); liveConfigs.add(configuration); HornetQServer server = createServer(true, configuration); TestableServer hornetQServer = new SameProcessHornetQServer(server); @@ -117,14 +139,27 @@ public abstract class MultipleServerFailoverTestBase extends ServiceTestBase } for (int i = 0; i < getBackupServerCount(); i++) { - Configuration configuration = createDefaultConfig(useNetty()); - configuration.getAcceptorConfigurations().clear(); - configuration.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(false, i)); + HAPolicyConfiguration haPolicyConfiguration = null; if (isSharedStore()) - configuration.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.BACKUP_SHARED_STORE); + { + haPolicyConfiguration = new SharedStoreSlavePolicyConfiguration(); + ((SharedStoreSlavePolicyConfiguration)haPolicyConfiguration).setFailbackDelay(1000); + } else - configuration.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.BACKUP_REPLICATED); + { + haPolicyConfiguration = new ReplicaPolicyConfiguration(); + ((ReplicaPolicyConfiguration)haPolicyConfiguration).setFailbackDelay(1000); + if (getNodeGroupName() != null) + { + ((ReplicaPolicyConfiguration)haPolicyConfiguration).setGroupName(getNodeGroupName() + "-" + i); + } + } + + Configuration configuration = createDefaultConfig(useNetty()) + .clearAcceptorConfigurations() + .addAcceptorConfiguration(getAcceptorTransportConfiguration(false, i)) + .setHAPolicyConfiguration(haPolicyConfiguration); if (!isSharedStore()) { @@ -138,12 +173,25 @@ public abstract class MultipleServerFailoverTestBase extends ServiceTestBase //todo } - configuration.getHAPolicy().setFailbackDelay(1000); - if (getNodeGroupName() != null) + TransportConfiguration backuptc = getConnectorTransportConfiguration(false, i); + configuration.addConnectorConfiguration(backuptc.getName(), backuptc); + List<String> connectors = new ArrayList<String>(); + for (int j = 0; j < getBackupServerCount(); j++) + { + TransportConfiguration staticTc = getConnectorTransportConfiguration(true, j); + configuration.addConnectorConfiguration(staticTc.getName(), staticTc); + connectors.add(staticTc.getName()); + } + for (int j = 0; j < getBackupServerCount(); j++) { - configuration.getHAPolicy().setBackupGroupName(getNodeGroupName() + "-" + i); + if (j != i) + { + TransportConfiguration staticTc = getConnectorTransportConfiguration(false, j); + configuration.getConnectorConfigurations().put(staticTc.getName(), staticTc); + connectors.add(staticTc.getName()); + } } - createBackupClusterConfiguration(i, configuration, getBackupServerCount()); + configuration.addClusterConfiguration(basicClusterConnectionConfig(backuptc.getName(), connectors)); backupConfigs.add(configuration); HornetQServer server = createServer(true, configuration); TestableServer testableServer = new SameProcessHornetQServer(server); @@ -276,7 +324,7 @@ public abstract class MultipleServerFailoverTestBase extends ServiceTestBase do { - if (q.getMessageCount() >= messageCount) + if (getMessageCount(q) >= messageCount) { return; } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java index 354b9d8..a7d6941 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java @@ -131,7 +131,7 @@ public class PagingFailoverTest extends FailoverTestBase if (failBeforeConsume) { crash(session); - waitForBackup(null, 30); + waitForBackup(null, 5); } session.close(); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/QuorumFailOverTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/QuorumFailOverTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/QuorumFailOverTest.java index c8b5a0c..d9c1f6b 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/QuorumFailOverTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/QuorumFailOverTest.java @@ -20,6 +20,8 @@ import org.hornetq.api.core.Pair; import org.hornetq.api.core.TransportConfiguration; import org.hornetq.api.core.client.ClusterTopologyListener; import org.hornetq.api.core.client.TopologyMember; +import org.hornetq.core.config.ha.ReplicaPolicyConfiguration; +import org.hornetq.core.config.ha.ReplicatedPolicyConfiguration; import org.hornetq.core.protocol.core.impl.PacketImpl; import org.hornetq.tests.integration.cluster.util.BackupSyncDelay; import org.junit.Test; @@ -31,12 +33,12 @@ public class QuorumFailOverTest extends StaticClusterWithBackupFailoverTest { super.setupServers(); //we need to know who is connected to who - servers[0].getConfiguration().getHAPolicy().setBackupGroupName("group0"); - servers[1].getConfiguration().getHAPolicy().setBackupGroupName("group1"); - servers[2].getConfiguration().getHAPolicy().setBackupGroupName("group2"); - servers[3].getConfiguration().getHAPolicy().setBackupGroupName("group0"); - servers[4].getConfiguration().getHAPolicy().setBackupGroupName("group1"); - servers[5].getConfiguration().getHAPolicy().setBackupGroupName("group2"); + ((ReplicatedPolicyConfiguration)servers[0].getConfiguration().getHAPolicyConfiguration()).setGroupName("group0"); + ((ReplicatedPolicyConfiguration)servers[1].getConfiguration().getHAPolicyConfiguration()).setGroupName("group1"); + ((ReplicatedPolicyConfiguration)servers[2].getConfiguration().getHAPolicyConfiguration()).setGroupName("group2"); + ((ReplicaPolicyConfiguration)servers[3].getConfiguration().getHAPolicyConfiguration()).setGroupName("group0"); + ((ReplicaPolicyConfiguration)servers[4].getConfiguration().getHAPolicyConfiguration()).setGroupName("group1"); + ((ReplicaPolicyConfiguration)servers[5].getConfiguration().getHAPolicyConfiguration()).setGroupName("group2"); } @Test @@ -75,8 +77,8 @@ public class QuorumFailOverTest extends StaticClusterWithBackupFailoverTest locators[0].addClusterTopologyListener(liveTopologyListener); - assertTrue("we assume 3 is a backup", servers[3].getConfiguration().getHAPolicy().isBackup()); - assertFalse("no shared storage", servers[3].getConfiguration().getHAPolicy().isSharedStore()); + assertTrue("we assume 3 is a backup", servers[3].getHAPolicy().isBackup()); + assertFalse("no shared storage", servers[3].getHAPolicy().isSharedStore()); failNode(0); @@ -86,10 +88,10 @@ public class QuorumFailOverTest extends StaticClusterWithBackupFailoverTest waitForBindings(3, QUEUES_TESTADDRESS, 1, 1, true); assertTrue(servers[3].waitForActivation(2, TimeUnit.SECONDS)); - assertFalse("3 should have failed over ", servers[3].getConfiguration().getHAPolicy().isBackup()); + assertFalse("3 should have failed over ", servers[3].getHAPolicy().isBackup()); failNode(1); - assertFalse("4 should have failed over ", servers[4].getConfiguration().getHAPolicy().isBackup()); + assertFalse("4 should have failed over ", servers[4].getHAPolicy().isBackup()); } @Override http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/QuorumVoteServerConnectTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/QuorumVoteServerConnectTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/QuorumVoteServerConnectTest.java index 4255707..f75ce79 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/QuorumVoteServerConnectTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/QuorumVoteServerConnectTest.java @@ -31,7 +31,7 @@ public class QuorumVoteServerConnectTest extends UnitTestCase private final int size; private final int trueVotes; - @Parameterized.Parameters + @Parameterized.Parameters(name = "size={0} trueVotes={1}") public static Collection primeNumbers() { return Arrays.asList(new Object[][] http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedDistributionTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedDistributionTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedDistributionTest.java index 5acae76..f1b394d 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedDistributionTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedDistributionTest.java @@ -190,8 +190,8 @@ public class ReplicatedDistributionTest extends ClusterTestBase { super.setUp(); - setupLiveServer(1, true, isSharedStore(), true); - setupLiveServer(3, true, isSharedStore(), true); + setupLiveServer(1, true, isSharedStore(), true, false); + setupLiveServer(3, true, isSharedStore(), true, false); setupBackupServer(2, 3, true, isSharedStore(), true); final String address = ReplicatedDistributionTest.ADDRESS.toString(); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedFailoverTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedFailoverTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedFailoverTest.java index d50d3ed..88c087d 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedFailoverTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedFailoverTest.java @@ -13,11 +13,32 @@ package org.hornetq.tests.integration.cluster.failover; import org.hornetq.api.core.client.ClientSession; +import org.hornetq.core.config.ha.ReplicaPolicyConfiguration; +import org.hornetq.core.config.ha.ReplicatedPolicyConfiguration; +import org.hornetq.core.server.cluster.ha.ReplicatedPolicy; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TestRule; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; public class ReplicatedFailoverTest extends FailoverTest { + boolean isReplicatedFailbackTest = false; + @Rule + public TestRule watcher = new TestWatcher() + { + @Override + protected void starting(Description description) + { + isReplicatedFailbackTest = description.getMethodName().equals("testReplicatedFailback"); + } + }; + + protected void beforeWaitForRemoteBackupSynchronization() + { + } @Test /* * default maxSavedReplicatedJournalsSize is 2, this means the backup will fall back to replicated only twice, after this @@ -28,8 +49,10 @@ public class ReplicatedFailoverTest extends FailoverTest { try { - backupServer.getServer().getConfiguration().getHAPolicy().setFailbackDelay(2000); - backupServer.getServer().getConfiguration().setMaxSavedReplicatedJournalSize(2); + beforeWaitForRemoteBackupSynchronization(); + + waitForRemoteBackupSynchronization(backupServer.getServer()); + createSessionFactory(); ClientSession session = createSession(sf, true, true); @@ -38,7 +61,9 @@ public class ReplicatedFailoverTest extends FailoverTest crash(session); - liveServer.getServer().getConfiguration().setCheckForLiveServer(true); + ReplicatedPolicy haPolicy = (ReplicatedPolicy) liveServer.getServer().getHAPolicy(); + + haPolicy.setCheckForLiveServer(true); liveServer.start(); @@ -80,11 +105,14 @@ public class ReplicatedFailoverTest extends FailoverTest assertFalse(backupServer.getServer().isStarted()); //the server wouldnt have reset to backup - assertFalse(backupServer.getServer().getConfiguration().getHAPolicy().isBackup()); + assertFalse(backupServer.getServer().getHAPolicy().isBackup()); } finally { - sf.close(); + if (sf != null) + { + sf.close(); + } } } @@ -95,6 +123,24 @@ public class ReplicatedFailoverTest extends FailoverTest } @Override + protected void setupHAPolicyConfiguration() + { + if (isReplicatedFailbackTest) + { + ((ReplicatedPolicyConfiguration) liveConfig.getHAPolicyConfiguration()) + .setCheckForLiveServer(true); + ((ReplicaPolicyConfiguration) backupConfig.getHAPolicyConfiguration()) + .setMaxSavedReplicatedJournalsSize(2) + .setAllowFailBack(true) + .setFailbackDelay(2000); + } + else + { + super.setupHAPolicyConfiguration(); + } + } + + @Override protected void crash(boolean waitFailure, ClientSession... sessions) throws Exception { if (sessions.length > 0) http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedFailoverUsingNodeGroupNameTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedFailoverUsingNodeGroupNameTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedFailoverUsingNodeGroupNameTest.java index bb7ce39..11a6577 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedFailoverUsingNodeGroupNameTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedFailoverUsingNodeGroupNameTest.java @@ -12,13 +12,24 @@ */ package org.hornetq.tests.integration.cluster.failover; +import org.hornetq.core.config.ha.ReplicaPolicyConfiguration; +import org.hornetq.core.config.ha.ReplicatedPolicyConfiguration; + public class ReplicatedFailoverUsingNodeGroupNameTest extends ReplicatedFailoverTest { @Override protected void createReplicatedConfigs() throws Exception { super.createReplicatedConfigs(); - liveConfig.getHAPolicy().setBackupGroupName("liveNodeGroup1"); - backupConfig.getHAPolicy().setBackupGroupName("liveNodeGroup1"); + ((ReplicatedPolicyConfiguration)liveConfig.getHAPolicyConfiguration()).setGroupName("liveNodeGroup1"); + ((ReplicaPolicyConfiguration)backupConfig.getHAPolicyConfiguration()).setGroupName("liveNodeGroup1"); + } + + @Override + protected void setupHAPolicyConfiguration() + { + super.setupHAPolicyConfiguration(); + ((ReplicatedPolicyConfiguration)liveConfig.getHAPolicyConfiguration()).setGroupName("liveNodeGroup1"); + ((ReplicaPolicyConfiguration)backupConfig.getHAPolicyConfiguration()).setGroupName("liveNodeGroup1"); } } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedMultipleServerFailoverExtraBackupsTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedMultipleServerFailoverExtraBackupsTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedMultipleServerFailoverExtraBackupsTest.java index 711a8aa..edb40e8 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedMultipleServerFailoverExtraBackupsTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedMultipleServerFailoverExtraBackupsTest.java @@ -22,6 +22,7 @@ import org.hornetq.api.core.client.ClientProducer; import org.hornetq.api.core.client.ClientSession; import org.hornetq.api.core.client.ClientSessionFactory; import org.hornetq.api.core.client.ServerLocator; +import org.hornetq.core.config.ha.ReplicaPolicyConfiguration; import org.hornetq.tests.integration.cluster.util.TestableServer; import org.junit.Assert; import org.junit.Test; @@ -35,12 +36,13 @@ public class ReplicatedMultipleServerFailoverExtraBackupsTest extends Replicated @Test public void testStartLiveFirst() throws Exception { - backupServers.get(2).getServer().getConfiguration().getHAPolicy().setBackupGroupName(getNodeGroupName() + "-0"); - backupServers.get(3).getServer().getConfiguration().getHAPolicy().setBackupGroupName(getNodeGroupName() + "-1"); + ((ReplicaPolicyConfiguration)backupServers.get(2).getServer().getConfiguration().getHAPolicyConfiguration()).setGroupName(getNodeGroupName() + "-0"); + ((ReplicaPolicyConfiguration)backupServers.get(3).getServer().getConfiguration().getHAPolicyConfiguration()).setGroupName(getNodeGroupName() + "-1"); startServers(liveServers); startServers(backupServers); - waitForBackups(); + waitForRemoteBackupSynchronization(backupServers.get(0).getServer()); + waitForRemoteBackupSynchronization(backupServers.get(1).getServer()); sendCrashReceive(); waitForTopology(backupServers.get(0).getServer(), liveServers.size(), 2); @@ -67,8 +69,8 @@ public class ReplicatedMultipleServerFailoverExtraBackupsTest extends Replicated @Test public void testStartBackupFirst() throws Exception { - backupServers.get(2).getServer().getConfiguration().getHAPolicy().setBackupGroupName(getNodeGroupName() + "-0"); - backupServers.get(3).getServer().getConfiguration().getHAPolicy().setBackupGroupName(getNodeGroupName() + "-1"); + ((ReplicaPolicyConfiguration)backupServers.get(2).getServer().getConfiguration().getHAPolicyConfiguration()).setGroupName(getNodeGroupName() + "-0"); + ((ReplicaPolicyConfiguration)backupServers.get(3).getServer().getConfiguration().getHAPolicyConfiguration()).setGroupName(getNodeGroupName() + "-1"); startServers(backupServers); startServers(liveServers); @@ -110,7 +112,7 @@ public class ReplicatedMultipleServerFailoverExtraBackupsTest extends Replicated List<TestableServer> toCrash = new ArrayList<TestableServer>(); for (TestableServer backupServer : backupServers) { - if (!backupServer.getServer().getConfiguration().getHAPolicy().isBackup()) + if (!backupServer.getServer().getHAPolicy().isBackup()) { toCrash.add(backupServer); } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedMultipleServerFailoverTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedMultipleServerFailoverTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedMultipleServerFailoverTest.java index 790880b..dcecced 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedMultipleServerFailoverTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedMultipleServerFailoverTest.java @@ -13,17 +13,12 @@ package org.hornetq.tests.integration.cluster.failover; -import java.util.ArrayList; -import java.util.List; - -import org.hornetq.api.core.TransportConfiguration; import org.hornetq.api.core.client.ClientConsumer; import org.hornetq.api.core.client.ClientMessage; import org.hornetq.api.core.client.ClientProducer; import org.hornetq.api.core.client.ClientSession; import org.hornetq.api.core.client.ClientSessionFactory; import org.hornetq.api.core.client.ServerLocator; -import org.hornetq.core.config.Configuration; import org.hornetq.tests.integration.cluster.util.TestableServer; import org.junit.Assert; import org.junit.Test; @@ -185,53 +180,4 @@ public class ReplicatedMultipleServerFailoverTest extends MultipleServerFailover { return "nodeGroup"; } - - @Override - /* - * for this test the 2 live connect to each other - * */ - public void createLiveClusterConfiguration(int server, Configuration configuration, int servers) - { - TransportConfiguration livetc = getConnectorTransportConfiguration(true, server); - configuration.getConnectorConfigurations().put(livetc.getName(), livetc); - List<String> connectors = new ArrayList<String>(); - for (int i = 0; i < servers; i++) - { - if (i != server) - { - TransportConfiguration staticTc = getConnectorTransportConfiguration(true, i); - configuration.getConnectorConfigurations().put(staticTc.getName(), staticTc); - connectors.add(staticTc.getName()); - } - } - basicClusterConnectionConfig(configuration, livetc.getName(), connectors); - - } - - @Override - /* - * for this test the backups will connect to their own live server - * */ - public void createBackupClusterConfiguration(int server, Configuration configuration, int servers) - { - TransportConfiguration backuptc = getConnectorTransportConfiguration(false, server); - configuration.getConnectorConfigurations().put(backuptc.getName(), backuptc); - List<String> connectors = new ArrayList<String>(); - for (int i = 0; i < servers; i++) - { - TransportConfiguration staticTc = getConnectorTransportConfiguration(true, i); - configuration.getConnectorConfigurations().put(staticTc.getName(), staticTc); - connectors.add(staticTc.getName()); - } - for (int i = 0; i < servers; i++) - { - if (i != server) - { - TransportConfiguration staticTc = getConnectorTransportConfiguration(false, i); - configuration.getConnectorConfigurations().put(staticTc.getName(), staticTc); - connectors.add(staticTc.getName()); - } - } - basicClusterConnectionConfig(configuration, backuptc.getName(), connectors); - } } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedWithDelayFailoverTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedWithDelayFailoverTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedWithDelayFailoverTest.java index 7150f08..2096fb9 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedWithDelayFailoverTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedWithDelayFailoverTest.java @@ -36,6 +36,12 @@ public class ReplicatedWithDelayFailoverTest extends ReplicatedFailoverTest } @Override + protected void beforeWaitForRemoteBackupSynchronization() + { + syncDelay.deliverUpToDateMsg(); + } + + @Override protected void crash(ClientSession... sessions) throws Exception { syncDelay.deliverUpToDateMsg(); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/SecurityFailoverTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/SecurityFailoverTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/SecurityFailoverTest.java index 2d45524..b9374f6 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/SecurityFailoverTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/SecurityFailoverTest.java @@ -18,8 +18,9 @@ import java.util.Set; import org.hornetq.api.core.TransportConfiguration; import org.hornetq.api.core.client.ClientSession; import org.hornetq.api.core.client.ClientSessionFactory; +import org.hornetq.core.config.ha.SharedStoreMasterPolicyConfiguration; +import org.hornetq.core.config.ha.SharedStoreSlavePolicyConfiguration; import org.hornetq.core.security.Role; -import org.hornetq.core.server.cluster.ha.HAPolicy; import org.hornetq.core.server.impl.InVMNodeManager; import org.hornetq.spi.core.security.HornetQSecurityManager; import org.hornetq.tests.integration.cluster.util.TestableServer; @@ -93,34 +94,32 @@ public class SecurityFailoverTest extends FailoverTest protected void createConfigs() throws Exception { nodeManager = new InVMNodeManager(false); - - backupConfig = super.createDefaultConfig(); - backupConfig.getAcceptorConfigurations().clear(); - backupConfig.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(false)); - backupConfig.setSecurityEnabled(true); - backupConfig.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.BACKUP_SHARED_STORE); - backupConfig.getHAPolicy().setFailbackDelay(1000); - TransportConfiguration liveConnector = getConnectorTransportConfiguration(true); TransportConfiguration backupConnector = getConnectorTransportConfiguration(false); - backupConfig.getConnectorConfigurations().put(liveConnector.getName(), liveConnector); - backupConfig.getConnectorConfigurations().put(backupConnector.getName(), backupConnector); - basicClusterConnectionConfig(backupConfig, backupConnector.getName(), liveConnector.getName()); - backupServer = createTestableServer(backupConfig); - HornetQSecurityManager securityManager = installSecurity(backupServer); + backupConfig = super.createDefaultConfig() + .clearAcceptorConfigurations() + .addAcceptorConfiguration(getAcceptorTransportConfiguration(false)) + .setSecurityEnabled(true) + .setHAPolicyConfiguration(new SharedStoreSlavePolicyConfiguration() + .setFailbackDelay(1000)) + .addConnectorConfiguration(liveConnector.getName(), liveConnector) + .addConnectorConfiguration(backupConnector.getName(), backupConnector) + .addClusterConfiguration(basicClusterConnectionConfig(backupConnector.getName(), liveConnector.getName())); + backupServer = createTestableServer(backupConfig); + HornetQSecurityManager securityManager = installSecurity(backupServer); securityManager.setDefaultUser(null); - liveConfig = super.createDefaultConfig(); - liveConfig.getAcceptorConfigurations().clear(); - liveConfig.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(true)); - liveConfig.setSecurityEnabled(true); - liveConfig.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.SHARED_STORE); - basicClusterConnectionConfig(liveConfig, liveConnector.getName()); - liveConfig.getConnectorConfigurations().put(liveConnector.getName(), liveConnector); - liveServer = createTestableServer(liveConfig); + liveConfig = super.createDefaultConfig() + .clearAcceptorConfigurations() + .addAcceptorConfiguration(getAcceptorTransportConfiguration(true)) + .setSecurityEnabled(true) + .setHAPolicyConfiguration(new SharedStoreMasterPolicyConfiguration()) + .addClusterConfiguration(basicClusterConnectionConfig(liveConnector.getName())) + .addConnectorConfiguration(liveConnector.getName(), liveConnector); + liveServer = createTestableServer(liveConfig); installSecurity(liveServer); } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/SingleLiveMultipleBackupsFailoverTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/SingleLiveMultipleBackupsFailoverTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/SingleLiveMultipleBackupsFailoverTest.java index d5f5b8a..8bedcca 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/SingleLiveMultipleBackupsFailoverTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/SingleLiveMultipleBackupsFailoverTest.java @@ -17,14 +17,17 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.hornetq.api.config.HornetQDefaultConfiguration; import org.hornetq.api.core.TransportConfiguration; import org.hornetq.api.core.client.ClientSession; import org.hornetq.core.client.impl.ClientSessionFactoryInternal; import org.hornetq.core.client.impl.ServerLocatorImpl; import org.hornetq.core.client.impl.Topology; import org.hornetq.core.config.Configuration; +import org.hornetq.core.config.ha.ReplicatedPolicyConfiguration; +import org.hornetq.core.config.ha.SharedStoreMasterPolicyConfiguration; +import org.hornetq.core.config.ha.SharedStoreSlavePolicyConfiguration; import org.hornetq.core.server.NodeManager; -import org.hornetq.core.server.cluster.ha.HAPolicy; import org.hornetq.core.server.impl.InVMNodeManager; import org.hornetq.tests.integration.IntegrationTestLogger; import org.hornetq.tests.integration.cluster.util.SameProcessHornetQServer; @@ -127,61 +130,47 @@ public class SingleLiveMultipleBackupsFailoverTest extends MultipleBackupsFailov protected void createBackupConfig(int liveNode, int nodeid, int... nodes) throws Exception { - Configuration config1 = super.createDefaultConfig(); - config1.getAcceptorConfigurations().clear(); - config1.getAcceptorConfigurations().add(createTransportConfiguration(isNetty(), true, - generateParams(nodeid, isNetty()))); - config1.setSecurityEnabled(false); - - if (sharedStore) - config1.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.BACKUP_SHARED_STORE); - else - config1.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.BACKUP_REPLICATED); + TransportConfiguration backupConnector = createTransportConfiguration(isNetty(), false, generateParams(nodeid, isNetty())); + + Configuration config1 = super.createDefaultConfig() + .clearAcceptorConfigurations() + .addAcceptorConfiguration(createTransportConfiguration(isNetty(), true, generateParams(nodeid, isNetty()))) + .setSecurityEnabled(false) + .setHAPolicyConfiguration(sharedStore ? new SharedStoreSlavePolicyConfiguration() : new ReplicatedPolicyConfiguration()) + .addConnectorConfiguration(backupConnector.getName(), backupConnector) + .setBindingsDirectory(HornetQDefaultConfiguration.getDefaultBindingsDirectory() + "_" + liveNode) + .setJournalDirectory(HornetQDefaultConfiguration.getDefaultJournalDir() + "_" + liveNode) + .setPagingDirectory(HornetQDefaultConfiguration.getDefaultPagingDir() + "_" + liveNode) + .setLargeMessagesDirectory(HornetQDefaultConfiguration.getDefaultLargeMessagesDir() + "_" + liveNode); List<String> staticConnectors = new ArrayList<String>(); for (int node : nodes) { - TransportConfiguration liveConnector = - createTransportConfiguration(isNetty(), false, generateParams(node, isNetty())); - config1.getConnectorConfigurations().put(liveConnector.getName(), liveConnector); + TransportConfiguration liveConnector = createTransportConfiguration(isNetty(), false, generateParams(node, isNetty())); + config1.addConnectorConfiguration(liveConnector.getName(), liveConnector); staticConnectors.add(liveConnector.getName()); } - TransportConfiguration backupConnector = - createTransportConfiguration(isNetty(), false, generateParams(nodeid, isNetty())); - basicClusterConnectionConfig(config1, backupConnector.getName(), staticConnectors); - config1.getConnectorConfigurations().put(backupConnector.getName(), backupConnector); - - config1.setBindingsDirectory(config1.getBindingsDirectory() + "_" + liveNode); - config1.setJournalDirectory(config1.getJournalDirectory() + "_" + liveNode); - config1.setPagingDirectory(config1.getPagingDirectory() + "_" + liveNode); - config1.setLargeMessagesDirectory(config1.getLargeMessagesDirectory() + "_" + liveNode); + config1.addClusterConfiguration(basicClusterConnectionConfig(backupConnector.getName(), staticConnectors)); servers.put(nodeid, new SameProcessHornetQServer(createInVMFailoverServer(true, config1, nodeManager, nodeid))); } protected void createLiveConfig(int liveNode) throws Exception { - TransportConfiguration liveConnector = - createTransportConfiguration(isNetty(), false, generateParams(liveNode, isNetty())); - Configuration config0 = super.createDefaultConfig(); - config0.getAcceptorConfigurations().clear(); - config0.getAcceptorConfigurations().add(createTransportConfiguration(isNetty(), true, - generateParams(liveNode, isNetty()))); - config0.setSecurityEnabled(false); - - if (sharedStore) - config0.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.SHARED_STORE); - else - config0.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.REPLICATED); - - basicClusterConnectionConfig(config0, liveConnector.getName()); - config0.getConnectorConfigurations().put(liveConnector.getName(), liveConnector); - - config0.setBindingsDirectory(config0.getBindingsDirectory() + "_" + liveNode); - config0.setJournalDirectory(config0.getJournalDirectory() + "_" + liveNode); - config0.setPagingDirectory(config0.getPagingDirectory() + "_" + liveNode); - config0.setLargeMessagesDirectory(config0.getLargeMessagesDirectory() + "_" + liveNode); + TransportConfiguration liveConnector = createTransportConfiguration(isNetty(), false, generateParams(liveNode, isNetty())); + + Configuration config0 = super.createDefaultConfig() + .clearAcceptorConfigurations() + .addAcceptorConfiguration(createTransportConfiguration(isNetty(), true, generateParams(liveNode, isNetty()))) + .setSecurityEnabled(false) + .setHAPolicyConfiguration(sharedStore ? new SharedStoreMasterPolicyConfiguration() : new ReplicatedPolicyConfiguration()) + .addClusterConfiguration(basicClusterConnectionConfig(liveConnector.getName())) + .addConnectorConfiguration(liveConnector.getName(), liveConnector) + .setBindingsDirectory(HornetQDefaultConfiguration.getDefaultBindingsDirectory() + "_" + liveNode) + .setJournalDirectory(HornetQDefaultConfiguration.getDefaultJournalDir() + "_" + liveNode) + .setPagingDirectory(HornetQDefaultConfiguration.getDefaultPagingDir() + "_" + liveNode) + .setLargeMessagesDirectory(HornetQDefaultConfiguration.getDefaultLargeMessagesDir() + "_" + liveNode); servers.put(liveNode, new SameProcessHornetQServer(createInVMFailoverServer(true, config0, nodeManager, liveNode))); } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/StaticClusterWithBackupFailoverTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/StaticClusterWithBackupFailoverTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/StaticClusterWithBackupFailoverTest.java index 6e3f7f2..3b9a0b4 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/StaticClusterWithBackupFailoverTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/StaticClusterWithBackupFailoverTest.java @@ -86,8 +86,8 @@ public class StaticClusterWithBackupFailoverTest extends ClusterWithBackupFailov setupBackupServer(5, 2, isFileStorage(), isSharedStorage(), isNetty()); // The lives - setupLiveServer(0, isFileStorage(), isSharedStorage(), isNetty()); - setupLiveServer(1, isFileStorage(), isSharedStorage(), isNetty()); - setupLiveServer(2, isFileStorage(), isSharedStorage(), isNetty()); + setupLiveServer(0, isFileStorage(), isSharedStorage(), isNetty(), false); + setupLiveServer(1, isFileStorage(), isSharedStorage(), isNetty(), false); + setupLiveServer(2, isFileStorage(), isSharedStorage(), isNetty(), false); } } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/ha/HAAutomaticBackupSharedStore.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/ha/HAAutomaticBackupSharedStore.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/ha/HAAutomaticBackupSharedStore.java index 4d582d8..bb805a8 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/ha/HAAutomaticBackupSharedStore.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/ha/HAAutomaticBackupSharedStore.java @@ -12,8 +12,11 @@ */ package org.hornetq.tests.integration.cluster.ha; +import org.hornetq.core.config.HAPolicyConfiguration; +import org.hornetq.core.config.ha.ColocatedPolicyConfiguration; +import org.hornetq.core.config.ha.SharedStoreMasterPolicyConfiguration; +import org.hornetq.core.config.ha.SharedStoreSlavePolicyConfiguration; import org.hornetq.core.server.HornetQServer; -import org.hornetq.core.server.cluster.ha.HAPolicyTemplate; import org.hornetq.tests.integration.cluster.distribution.ClusterTestBase; import org.junit.Before; import org.junit.Test; @@ -53,16 +56,22 @@ public class HAAutomaticBackupSharedStore extends ClusterTestBase protected void setupServers() throws Exception { // The lives - setupLiveServer(0, isFileStorage(), true, isNetty()); - setupLiveServer(1, isFileStorage(), true, isNetty()); - setupLiveServer(2, isFileStorage(), true, isNetty()); + setupLiveServer(0, isFileStorage(), true, isNetty(), false); + setupLiveServer(1, isFileStorage(), true, isNetty(), false); + setupLiveServer(2, isFileStorage(), true, isNetty(), false); } private void setUpHAPolicy(int node) { HornetQServer server = getServer(node); - server.getConfiguration().setHAPolicy(HAPolicyTemplate.COLOCATED_SHARED_STORE.getHaPolicy()); + ColocatedPolicyConfiguration haPolicyConfiguration = new ColocatedPolicyConfiguration(); + HAPolicyConfiguration liveConfiguration = new SharedStoreMasterPolicyConfiguration(); + haPolicyConfiguration.setLiveConfig(liveConfiguration); + + HAPolicyConfiguration backupConfiguration = new SharedStoreSlavePolicyConfiguration(); + haPolicyConfiguration.setBackupConfig(backupConfiguration); + server.getConfiguration().setHAPolicyConfiguration(haPolicyConfiguration); } public boolean isNetty() http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTest.java index dc372f8..de7006f 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTest.java @@ -29,9 +29,9 @@ public class MultiThreadRandomReattachTest extends MultiThreadRandomReattachTest @Override protected void start() throws Exception { - Configuration liveConf = createDefaultConfig(); - liveConf.setSecurityEnabled(false); - liveConf.getAcceptorConfigurations().add(new TransportConfiguration(INVM_ACCEPTOR_FACTORY)); + Configuration liveConf = createDefaultConfig() + .setSecurityEnabled(false) + .addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY)); liveServer = createServer(false, liveConf); liveServer.start(); waitForServer(liveServer); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/NettyMultiThreadRandomReattachTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/NettyMultiThreadRandomReattachTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/NettyMultiThreadRandomReattachTest.java index e3df2de..c83743f 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/NettyMultiThreadRandomReattachTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/NettyMultiThreadRandomReattachTest.java @@ -30,11 +30,11 @@ public class NettyMultiThreadRandomReattachTest extends MultiThreadRandomReattac @Override protected void start() throws Exception { - Configuration liveConf = createDefaultConfig(); - liveConf.setJMXManagementEnabled(false); - liveConf.setSecurityEnabled(false); - liveConf.getAcceptorConfigurations().clear(); - liveConf.getAcceptorConfigurations().add(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY)); + Configuration liveConf = createDefaultConfig() + .setJMXManagementEnabled(false) + .setSecurityEnabled(false) + .clearAcceptorConfigurations() + .addAcceptorConfiguration(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY)); liveServer = createServer(false, liveConf); liveServer.start(); waitForServer(liveServer); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/RandomReattachTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/RandomReattachTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/RandomReattachTest.java index 743c3d4..099b04c 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/RandomReattachTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/RandomReattachTest.java @@ -1495,10 +1495,9 @@ public class RandomReattachTest extends UnitTestCase private void start() throws Exception { - Configuration liveConf = createDefaultConfig(); - liveConf.setSecurityEnabled(false); - liveConf.getAcceptorConfigurations() - .add(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory")); + Configuration liveConf = createDefaultConfig() + .setSecurityEnabled(false) + .addAcceptorConfiguration(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory")); liveService = HornetQServers.newHornetQServer(liveConf, false); liveService.start(); } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/topology/HAClientTopologyWithDiscoveryTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/topology/HAClientTopologyWithDiscoveryTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/topology/HAClientTopologyWithDiscoveryTest.java index 8b03c25..36dd7d6 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/topology/HAClientTopologyWithDiscoveryTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/topology/HAClientTopologyWithDiscoveryTest.java @@ -60,9 +60,10 @@ public class HAClientTopologyWithDiscoveryTest extends TopologyClusterTestBase @Override protected ServerLocator createHAServerLocator() { - ServerLocator locator = HornetQClient.createServerLocatorWithHA( - new DiscoveryGroupConfiguration(HornetQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT, HornetQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT, - new UDPBroadcastGroupConfiguration(groupAddress, groupPort, null, -1))); + ServerLocator locator = HornetQClient.createServerLocatorWithHA(new DiscoveryGroupConfiguration() + .setBroadcastEndpointFactoryConfiguration(new UDPBroadcastGroupConfiguration() + .setGroupAddress(groupAddress) + .setGroupPort(groupPort))); locator.setBlockOnNonDurableSend(true); locator.setBlockOnDurableSend(true); addServerLocator(locator);
