http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ColocatedFailoverTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ColocatedFailoverTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ColocatedFailoverTest.java deleted file mode 100644 index de4a7cd..0000000 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ColocatedFailoverTest.java +++ /dev/null @@ -1,1403 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package org.hornetq.tests.integration.cluster.failover; - - -import org.hornetq.api.core.Message; -import org.hornetq.api.core.SimpleString; -import org.hornetq.api.core.TransportConfiguration; -import org.hornetq.api.core.client.ClientConsumer; -import org.hornetq.api.core.client.ClientMessage; -import org.hornetq.api.core.client.ClientProducer; -import org.hornetq.api.core.client.ClientSession; -import org.hornetq.api.core.client.ClientSessionFactory; -import org.hornetq.api.core.client.HornetQClient; -import org.hornetq.api.core.client.SendAcknowledgementHandler; -import org.hornetq.api.core.client.ServerLocator; -import org.hornetq.core.config.BackupStrategy; -import org.hornetq.core.config.Configuration; -import org.hornetq.core.config.CoreQueueConfiguration; -import org.hornetq.core.postoffice.Binding; -import org.hornetq.core.postoffice.Bindings; -import org.hornetq.core.server.HornetQServer; -import org.hornetq.core.server.NodeManager; -import org.hornetq.core.server.Queue; -import org.hornetq.core.server.cluster.ClusterConnection; -import org.hornetq.core.server.cluster.ha.HAPolicy; -import org.hornetq.core.server.impl.HornetQServerImpl; -import org.hornetq.core.server.impl.InVMNodeManager; -import org.hornetq.core.server.impl.QueueImpl; -import org.hornetq.core.settings.impl.AddressFullMessagePolicy; -import org.hornetq.core.settings.impl.AddressSettings; -import org.hornetq.core.transaction.impl.XidImpl; -import org.hornetq.tests.integration.cluster.util.SameProcessHornetQServer; -import org.hornetq.tests.integration.cluster.util.TestableServer; -import org.hornetq.tests.util.ColocatedHornetQServer; -import org.hornetq.tests.util.ServiceTestBase; -import org.hornetq.tests.util.UnitTestCase; -import org.hornetq.utils.UUIDGenerator; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import javax.transaction.xa.XAResource; -import javax.transaction.xa.Xid; -import java.io.File; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -public class ColocatedFailoverTest extends ServiceTestBase -{ - private TestableServer liveServer1; - - private TestableServer liveServer2; - - protected NodeManager nodeManagerLive1; - - protected NodeManager nodeManagerLive2; - - private ServerLocator locator; - - private SimpleString queue; - - private SimpleString topic; - - private ClientSessionFactory factory1; - - private ClientSessionFactory factory2; - - private ClientSession session1; - - private ClientSession session2; - - @Override - @Before - public void setUp() throws Exception - { - super.setUp(); - deleteDirectory(new File(getTestDir() + "")); - createConfigs(); - - liveServer1.start(); - liveServer2.start(); - waitForServer(liveServer1.getServer()); - waitForServer(liveServer2.getServer()); - - HashMap<String, Object> params = new HashMap<>(); - params.put("server-id", "1"); - TransportConfiguration transportConfiguration1 = new TransportConfiguration(INVM_CONNECTOR_FACTORY, params); - locator = HornetQClient.createServerLocator(true, transportConfiguration1); - locator.setReconnectAttempts(-1); - locator.setConfirmationWindowSize(0); - factory1 = locator.createSessionFactory(transportConfiguration1); - - HashMap<String, Object> params2 = new HashMap<>(); - params2.put("server-id", "2"); - - TransportConfiguration transportConfiguration2 = new TransportConfiguration(INVM_CONNECTOR_FACTORY, params2); - factory2 = locator.createSessionFactory(transportConfiguration2); - - session1 = factory1.createSession(false, true, true); - session2 = factory2.createSession(false, true, true); - queue = new SimpleString("jms.queue.testQueue"); - topic = new SimpleString("jms.topic.testTopic"); - } - - @Override - @After - public void tearDown() throws Exception - { - if (locator != null) - locator.close(); - liveServer1.stop(); - liveServer2.stop(); - super.tearDown(); - } - - @Test - public void testSend() throws Exception - { - int numMessages = 100; - ClientProducer producer = session1.createProducer(); - final CountDownLatch latch = new CountDownLatch(numMessages); - ClientConsumer consumer = session2.createConsumer(queue); - session2.start(); - session1.setSendAcknowledgementHandler(new SendAcknowledgementHandler() - { - @Override - public void sendAcknowledged(org.hornetq.api.core.Message message) - { - - latch.countDown(); - } - }); - for (int i = 0; i < numMessages; i++) - { - ClientMessage message = session1.createMessage(true); - message.getBodyBuffer().writeString("message:" + i); - producer.send(queue, message); - } - latch.await(10, TimeUnit.SECONDS); - System.out.println(locator.getTopology().describe()); - liveServer1.crash(true, session1); - for (int i = 0; i < numMessages; i++) - { - ClientMessage cMessage = consumer.receive(5000); - assertNotNull(cMessage.getBodyBuffer().readString()); - //NB we dont test for order as they will be round robined and out of order - } - } - - @Test - public void testSendPagingOnReload() throws Exception - { - AddressSettings addressSettings = new AddressSettings(); - addressSettings.setMaxSizeBytes(5000); - addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE); - addressSettings.setPageSizeBytes(1000); - String pagedQueue = "jms.queue.pagedQueue"; - SimpleString ssPagedQueue = new SimpleString(pagedQueue); - liveServer2.getServer().getAddressSettingsRepository().addMatch(pagedQueue, addressSettings); - session1.createQueue(pagedQueue, pagedQueue, true); - ClientProducer producer = session1.createProducer(); - final CountDownLatch latch = new CountDownLatch(2); - session1.setSendAcknowledgementHandler(new SendAcknowledgementHandler() - { - @Override - public void sendAcknowledged(org.hornetq.api.core.Message message) - { - - latch.countDown(); - } - }); - byte[] bytes = new byte[10000]; - ClientMessage message = session1.createMessage(true); - message.getBodyBuffer().writeBytes(bytes); - producer.send(ssPagedQueue, message); - message = session1.createMessage(true); - message.getBodyBuffer().writeBytes(bytes); - producer.send(ssPagedQueue, message); - latch.await(10, TimeUnit.SECONDS); - liveServer1.crash(true, session1); - ColocatedHornetQServer qServer = (ColocatedHornetQServer) liveServer2.getServer(); - qServer.backupServer.waitForActivation(5, TimeUnit.SECONDS); - } - - @Test - public void testLargeMessage() throws Exception - { - ClientProducer producer = session1.createProducer(); - final CountDownLatch latch = new CountDownLatch(2); - session1.setSendAcknowledgementHandler(new SendAcknowledgementHandler() - { - @Override - public void sendAcknowledged(org.hornetq.api.core.Message message) - { - - latch.countDown(); - } - }); - ClientMessage message = session1.createMessage(true); - - message.setBodyInputStream(UnitTestCase.createFakeLargeStream(10000)); - producer.send(queue, message); - ClientConsumer consumer = session1.createConsumer(queue); - session1.start(); - message = consumer.receive(5000); - assertNotNull(message); - assertTrue(message.isLargeMessage()); - message.getBodyBuffer().readBytes(10000); - session1.close(); - liveServer1.crash(true, session1); - consumer = session2.createConsumer(queue); - session2.start(); - message = consumer.receive(5000); - assertNotNull(message); - assertTrue(message.isLargeMessage()); - message.getBodyBuffer().readBytes(10000); - } - - @Test - public void testSendScheduled() throws Exception - { - ClientProducer producer = session1.createProducer(); - final CountDownLatch latch = new CountDownLatch(1); - ClientConsumer consumer = session2.createConsumer(queue); - session2.start(); - session1.setSendAcknowledgementHandler(new SendAcknowledgementHandler() - { - @Override - public void sendAcknowledged(org.hornetq.api.core.Message message) - { - - latch.countDown(); - } - }); - ClientMessage message = session1.createMessage(true); - long time = System.currentTimeMillis() + 10000; - message.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, time); - message.getBodyBuffer().writeString("message:1"); - producer.send(queue, message); - latch.await(10, TimeUnit.SECONDS); - - liveServer1.crash(true, session1); - ClientMessage cMessage = consumer.receive(10000); - assertNotNull(cMessage); - assertTrue(System.currentTimeMillis() >= time); - assertNotNull(cMessage.getBodyBuffer().readString()); - } - - - @Test - public void testSendQueueNotExistOnLive() throws Exception - { - int numMessages = 100; - ClientProducer producer = session1.createProducer(); - final CountDownLatch latch = new CountDownLatch(numMessages); - SimpleString dur1 = new SimpleString("myDurSub1"); - session1.createQueue(topic, dur1, true); - SimpleString dur2 = new SimpleString("myDurSub2"); - session2.createQueue(topic, dur2, true); - waitForBindings(liveServer1.getServer(), topic.toString(), false, 2, 0, 5000L); - waitForBindings(liveServer2.getServer(), topic.toString(), false, 2, 0, 5000L); - session1.setSendAcknowledgementHandler(new SendAcknowledgementHandler() - { - @Override - public void sendAcknowledged(org.hornetq.api.core.Message message) - { - latch.countDown(); - } - }); - for (int i = 0; i < numMessages; i++) - { - ClientMessage message = session1.createMessage(true); - message.getBodyBuffer().writeString("message:" + i); - producer.send(topic, message); - } - latch.await(10, TimeUnit.SECONDS); - liveServer1.crash(true, session1); - - ColocatedHornetQServer qServer = (ColocatedHornetQServer) liveServer2.getServer(); - qServer.backupServer.waitForActivation(5, TimeUnit.SECONDS); - ClientConsumer consumer = session2.createConsumer(dur1); - ClientConsumer consumer2 = session2.createConsumer(dur2); - session2.start(); - for (int i = 0; i < numMessages; i++) - { - ClientMessage cMessage = consumer.receive(5000); - assertNotNull(cMessage.getBodyBuffer().readString()); - } - for (int i = 0; i < numMessages; i++) - { - ClientMessage cMessage = consumer2.receive(5000); - String s = cMessage.getBodyBuffer().readString(); - System.out.println("s = " + s); - assertNotNull(s); - } - } - - @Test - public void testSendQueueMessageStillInForwardQueue() throws Exception - { - int numMessages = 100; - ClientProducer producer = session1.createProducer(); - final CountDownLatch latch = new CountDownLatch(numMessages); - SimpleString dur1 = new SimpleString("myDurSub1"); - session2.createQueue(topic, dur1, true); - waitForBindings(liveServer1.getServer(), topic.toString(), false, 2, 0, 5000L); - waitForBindings(liveServer2.getServer(), topic.toString(), false, 1, 0, 5000L); - session1.setSendAcknowledgementHandler(new SendAcknowledgementHandler() - { - @Override - public void sendAcknowledged(org.hornetq.api.core.Message message) - { - latch.countDown(); - } - }); - for (int i = 0; i < numMessages; i++) - { - ClientMessage message = session1.createMessage(true); - message.getBodyBuffer().writeString("message:" + i); - producer.send(topic, message); - } - latch.await(10, TimeUnit.SECONDS); - //now disconnect the bridge but readd the binding so we can make sure some messages get marooned - Bindings bindingsForAddress = liveServer1.getServer().getPostOffice().getBindingsForAddress(topic); - Collection<Binding> bindings = bindingsForAddress.getBindings(); - Binding binding = null; - for (Binding thebinding : bindings) - { - if (thebinding.getRoutingName().equals(dur1)) - { - binding = thebinding; - break; - } - } - - assertNotNull(binding); - Set<ClusterConnection> clusterConnections = liveServer1.getServer().getClusterManager().getClusterConnections(); - for (ClusterConnection clusterConnection : clusterConnections) - { - clusterConnection.stop(); - } - liveServer1.getServer().getPostOffice().addBinding(binding); - for (int i = 100; i < numMessages + 100; i++) - { - ClientMessage message = session1.createMessage(true); - message.getBodyBuffer().writeString("message:" + i); - producer.send(topic, message); - } - liveServer1.crash(true, session1); - - ColocatedHornetQServer qServer = (ColocatedHornetQServer) liveServer2.getServer(); - qServer.backupServer.waitForActivation(5, TimeUnit.SECONDS); - ClientConsumer consumer = session2.createConsumer(dur1); - session2.start(); - for (int i = 0; i < numMessages * 2; i++) - { - ClientMessage cMessage = consumer.receive(5000); - assertNotNull(cMessage.getBodyBuffer().readString()); - } - } - - @Test - public void testSendQueueMessageStillInForwardQueue2Queues() throws Exception - { - int numMessages = 100; - ClientProducer producer = session1.createProducer(); - final CountDownLatch latch = new CountDownLatch(numMessages); - SimpleString dur1 = new SimpleString("myDurSub1"); - session2.createQueue(topic, dur1, true); - SimpleString dur2 = new SimpleString("myDurSub2"); - session1.createQueue(topic, dur2, true); - waitForBindings(liveServer1.getServer(), topic.toString(), false, 2, 0, 5000L); - waitForBindings(liveServer2.getServer(), topic.toString(), false, 2, 0, 5000L); - session1.setSendAcknowledgementHandler(new SendAcknowledgementHandler() - { - @Override - public void sendAcknowledged(org.hornetq.api.core.Message message) - { - latch.countDown(); - } - }); - for (int i = 0; i < numMessages; i++) - { - ClientMessage message = session1.createMessage(true); - message.getBodyBuffer().writeString("message:" + i); - producer.send(topic, message); - } - latch.await(10, TimeUnit.SECONDS); - //now disconnect the bridge but readd the binding so we can make sure some messages get marooned - Bindings bindingsForAddress = liveServer1.getServer().getPostOffice().getBindingsForAddress(topic); - Collection<Binding> bindings = bindingsForAddress.getBindings(); - Binding binding = null; - for (Binding thebinding : bindings) - { - if (thebinding.getRoutingName().equals(dur1)) - { - binding = thebinding; - break; - } - } - - assertNotNull(binding); - Set<ClusterConnection> clusterConnections = liveServer1.getServer().getClusterManager().getClusterConnections(); - for (ClusterConnection clusterConnection : clusterConnections) - { - clusterConnection.stop(); - } - liveServer1.getServer().getPostOffice().addBinding(binding); - for (int i = 100; i < numMessages + 100; i++) - { - ClientMessage message = session1.createMessage(true); - message.getBodyBuffer().writeString("message:" + i); - producer.send(topic, message); - } - liveServer1.crash(true, session1); - - ColocatedHornetQServer qServer = (ColocatedHornetQServer) liveServer2.getServer(); - qServer.backupServer.waitForActivation(5, TimeUnit.SECONDS); - ClientConsumer consumer = session2.createConsumer(dur1); - session2.start(); - for (int i = 0; i < numMessages * 2; i++) - { - ClientMessage cMessage = consumer.receive(5000); - String s = cMessage.getBodyBuffer().readString(); - System.out.println("s = " + s); - assertNotNull(s); - } - consumer = session2.createConsumer(dur2); - for (int i = 0; i < numMessages * 2; i++) - { - ClientMessage cMessage = consumer.receive(5000); - String s = cMessage.getBodyBuffer().readString(); - System.out.println("s = " + s); - assertNotNull(s); - } - } - - @Test - public void testSendQueueMessageStillInForwardQueue3Servers() throws Exception - { - TransportConfiguration liveConnector1 = getConnectorTransportConfiguration(1); - TransportConfiguration liveConnector3 = getConnectorTransportConfiguration(3); - Configuration liveConfiguration3 = super.createDefaultConfig(); - liveConfiguration3.getAcceptorConfigurations().clear(); - liveConfiguration3.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(3)); - liveConfiguration3.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.SHARED_STORE); - liveConfiguration3.getHAPolicy().setFailbackDelay(1000); - liveConfiguration3.setJournalDirectory(getTestDir() + "/journal3"); - liveConfiguration3.setBindingsDirectory(getTestDir() + "/bindings3"); - liveConfiguration3.setLargeMessagesDirectory(getTestDir() + "/largemessage3"); - liveConfiguration3.setPagingDirectory(getTestDir() + "/paging3"); - liveConfiguration3.getQueueConfigurations().add(new CoreQueueConfiguration("jms.queue.testQueue", "jms.queue.testQueue", null, true)); - liveConfiguration3.getQueueConfigurations().add(new CoreQueueConfiguration("jms.topic.testTopic", "jms.topic.testTopic", HornetQServerImpl.GENERIC_IGNORED_FILTER, true)); - - basicClusterConnectionConfig(liveConfiguration3, liveConnector3.getName(), liveConnector1.getName()); - liveConfiguration3.getConnectorConfigurations().put(liveConnector1.getName(), liveConnector1); - liveConfiguration3.getConnectorConfigurations().put(liveConnector3.getName(), liveConnector3); - - HornetQServer server3 = new HornetQServerImpl(liveConfiguration3); - server3.setIdentity("server3"); - try - { - server3.start(); - waitForServer(server3); - - ClientSessionFactory sessionFactory3 = locator.createSessionFactory(liveConnector3); - ClientSession session3 = sessionFactory3.createSession(); - SimpleString dursub3 = new SimpleString("dursub3"); - session3.createQueue(topic, dursub3, true); - - int numMessages = 100; - ClientProducer producer = session1.createProducer(); - final CountDownLatch latch = new CountDownLatch(numMessages); - SimpleString dur1 = new SimpleString("myDurSub1"); - session2.createQueue(topic, dur1, true); - waitForBindings(liveServer1.getServer(), topic.toString(), false, 4, 0, 5000L); - waitForBindings(liveServer2.getServer(), topic.toString(), false, 3, 0, 5000L); - session1.setSendAcknowledgementHandler(new SendAcknowledgementHandler() - { - @Override - public void sendAcknowledged(org.hornetq.api.core.Message message) - { - latch.countDown(); - } - }); - for (int i = 0; i < numMessages; i++) - { - ClientMessage message = session1.createMessage(true); - message.getBodyBuffer().writeString("message:" + i); - producer.send(topic, message); - } - latch.await(10, TimeUnit.SECONDS); - //now disconnect the bridge but readd the binding so we can make sure some messages get marooned - Bindings bindingsForAddress = liveServer1.getServer().getPostOffice().getBindingsForAddress(topic); - Collection<Binding> bindings = bindingsForAddress.getBindings(); - Binding binding = null; - for (Binding thebinding : bindings) - { - if (thebinding.getRoutingName().equals(dur1)) - { - binding = thebinding; - break; - } - } - - assertNotNull(binding); - - Binding binding2 = null; - for (Binding thebinding : bindings) - { - if (thebinding.getRoutingName().equals(dursub3)) - { - binding2 = thebinding; - break; - } - } - - assertNotNull(binding2); - - Set<ClusterConnection> clusterConnections = liveServer1.getServer().getClusterManager().getClusterConnections(); - for (ClusterConnection clusterConnection : clusterConnections) - { - clusterConnection.stop(); - } - liveServer1.getServer().getPostOffice().addBinding(binding); - liveServer1.getServer().getPostOffice().addBinding(binding2); - for (int i = 100; i < numMessages + 100; i++) - { - ClientMessage message = session1.createMessage(true); - message.getBodyBuffer().writeString("message:" + i); - producer.send(topic, message); - } - liveServer1.crash(true, session1); - - ColocatedHornetQServer qServer = (ColocatedHornetQServer) liveServer2.getServer(); - qServer.backupServer.waitForActivation(5, TimeUnit.SECONDS); - ClientConsumer consumer = session2.createConsumer(dur1); - session2.start(); - for (int i = 0; i < numMessages * 2; i++) - { - ClientMessage cMessage = consumer.receive(5000); - assertNotNull(cMessage.getBodyBuffer().readString()); - } - consumer = session3.createConsumer(dursub3); - session3.start(); - for (int i = 0; i < numMessages * 2; i++) - { - ClientMessage cMessage = consumer.receive(5000); - assertNotNull(cMessage.getBodyBuffer().readString()); - } - } - finally - { - server3.stop(); - } - } - - @Test - public void testSendTransacted() throws Exception - { - int numMessages = 100; - try - ( - ClientSession session = factory1.createSession(true, false, false) - ) - { - ClientProducer producer = session.createProducer(); - Xid xid = new XidImpl("bq1".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes()); - session.start(xid, XAResource.TMNOFLAGS); - for (int i = 0; i < numMessages; i++) - { - ClientMessage message = session1.createMessage(true); - message.getBodyBuffer().writeString("message:" + i); - producer.send(queue, message); - System.out.println("i = " + i); - } - session.end(xid, XAResource.TMSUCCESS); - session.prepare(xid); - liveServer1.crash(true, session); - ColocatedHornetQServer qServer = (ColocatedHornetQServer) liveServer2.getServer(); - qServer.backupServer.waitForActivation(5, TimeUnit.SECONDS); - try - ( - ClientSession session2 = factory2.createSession(true, true, true) - ) - { - session2.getXAResource().commit(xid, false); - } - } - ClientConsumer consumer = session2.createConsumer(queue); - session2.start(); - for (int i = 0; i < numMessages; i++) - { - ClientMessage cMessage = consumer.receive(5000); - assertNotNull(cMessage.getBodyBuffer().readString()); - //NB we dont test for order as they will be round robined and out of order - } - } - - @Test - public void testReceiveTransacted() throws Exception - { - int numMessages = 100; - ClientProducer producer = session1.createProducer(); - ClientSession session = factory1.createSession(true, false, false); - ClientConsumer consumer = session.createConsumer(queue); - for (int i = 0; i < numMessages; i++) - { - ClientMessage message = session1.createMessage(true); - message.getBodyBuffer().writeString("message:" + i); - producer.send(queue, message); - System.out.println("i = " + i); - } - producer.close(); - - Xid xid = new XidImpl("bq1".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes()); - session.start(xid, XAResource.TMNOFLAGS); - session.start(); - for (int i = 0; i < numMessages / 2; i++) - { - ClientMessage cMessage = consumer.receive(5000); - cMessage.acknowledge(); - } - session.end(xid, XAResource.TMSUCCESS); - session.prepare(xid); - session.close(); - liveServer1.crash(true, session); - ColocatedHornetQServer qServer = (ColocatedHornetQServer) liveServer2.getServer(); - qServer.backupServer.waitForActivation(5, TimeUnit.SECONDS); - try - ( - ClientSession session2 = factory2.createSession(true, true, true) - ) - { - session2.getXAResource().commit(xid, false); - } - } - - @Test - public void testSendAndReceiveSameQueueTransacted() throws Exception - { - ClientProducer producer = session1.createProducer(); - ClientSession session = factory1.createSession(true, false, false); - ClientConsumer consumer = session.createConsumer(queue); - ClientMessage message = session1.createMessage(true); - message.getBodyBuffer().writeString("message:1"); - producer.send(queue, message); - producer.close(); - - Xid xid = new XidImpl("bq1".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes()); - session.start(xid, XAResource.TMNOFLAGS); - session.start(); - ClientMessage cMessage = consumer.receive(5000); - cMessage.acknowledge(); - session.end(xid, XAResource.TMSUCCESS); - session.prepare(xid); - session.close(); - liveServer1.crash(true, session); - ColocatedHornetQServer qServer = (ColocatedHornetQServer) liveServer2.getServer(); - qServer.backupServer.waitForActivation(5, TimeUnit.SECONDS); - try - ( - ClientSession session2 = factory2.createSession(true, false, false) - ) - { - session2.getXAResource().commit(xid, false); - } - Queue q = (Queue) liveServer2.getServer().getPostOffice().getBinding(queue).getBindable(); - assertEquals(q.getMessageCount(), 0); - assertEquals(q.getMessagesAdded(), 1); - } - - @Test - public void testSendAndReceiveSameQueueTransactedRollback() throws Exception - { - ClientProducer producer = session1.createProducer(); - ClientSession session = factory1.createSession(true, false, false); - ClientConsumer consumer = session.createConsumer(queue); - ClientMessage message = session1.createMessage(true); - message.getBodyBuffer().writeString("message:1"); - producer.send(queue, message); - producer.close(); - - Xid xid = new XidImpl("bq1".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes()); - session.start(xid, XAResource.TMNOFLAGS); - session.start(); - ClientMessage cMessage = consumer.receive(5000); - cMessage.acknowledge(); - session.end(xid, XAResource.TMSUCCESS); - session.prepare(xid); - session.close(); - liveServer1.crash(true, session); - ColocatedHornetQServer qServer = (ColocatedHornetQServer) liveServer2.getServer(); - qServer.backupServer.waitForActivation(5, TimeUnit.SECONDS); - try - ( - ClientSession session2 = factory2.createSession(true, false, false) - ) - { - session2.getXAResource().rollback(xid); - } - Queue q = (Queue) liveServer2.getServer().getPostOffice().getBinding(queue).getBindable(); - assertEquals(q.getMessageCount(), 1); - assertEquals(q.getMessagesAdded(), 1); - ClientConsumer consumer1 = session2.createConsumer(queue); - session2.start(); - ClientMessage clientMessage = consumer1.receiveImmediate(); - assertNotNull(clientMessage); - String s = clientMessage.getBodyBuffer().readString(); - assertEquals(s, "message:1"); - } - - @Test - public void testSendAndReceiveSameQueueTransactedRollbackRestart() throws Exception - { - ClientProducer producer = session1.createProducer(); - ClientSession session = factory1.createSession(true, false, false); - ClientConsumer consumer = session.createConsumer(queue); - ClientMessage message = session1.createMessage(true); - message.getBodyBuffer().writeString("message:1"); - producer.send(queue, message); - producer.close(); - - Xid xid = new XidImpl("bq1".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes()); - session.start(xid, XAResource.TMNOFLAGS); - session.start(); - ClientMessage cMessage = consumer.receive(5000); - cMessage.acknowledge(); - session.end(xid, XAResource.TMSUCCESS); - session.prepare(xid); - session.close(); - liveServer1.crash(true, session); - ColocatedHornetQServer qServer = (ColocatedHornetQServer) liveServer2.getServer(); - qServer.backupServer.waitForActivation(5, TimeUnit.SECONDS); - try - ( - ClientSession session2 = factory2.createSession(true, false, false) - ) - { - session2.getXAResource().rollback(xid); - } - Queue q = (Queue) liveServer2.getServer().getPostOffice().getBinding(queue).getBindable(); - assertEquals(q.getMessageCount(), 1); - assertEquals(q.getMessagesAdded(), 1); - liveServer2.stop(); - liveServer2.start(); - ClientConsumer consumer1 = session2.createConsumer(queue); - session2.start(); - ClientMessage clientMessage = consumer1.receiveImmediate(); - assertNotNull(clientMessage); - String s = clientMessage.getBodyBuffer().readString(); - assertEquals(s, "message:1"); - } - - @Test - public void testSendAndReceiveSameQueueTransactedManyMessages() throws Exception - { - int numMessage = 1000; - ClientProducer producer = session1.createProducer(); - ClientSession session = factory1.createSession(true, false, false); - ClientConsumer consumer = session.createConsumer(queue); - for (int i = 0; i < numMessage; i++) - { - ClientMessage message = session1.createMessage(true); - message.getBodyBuffer().writeString("message:" + i); - producer.send(queue, message); - - } - producer.close(); - - Xid xid = new XidImpl("bq1".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes()); - session.start(xid, XAResource.TMNOFLAGS); - session.start(); - for (int i = 0; i < numMessage / 2; i++) - { - ClientMessage cMessage = consumer.receive(5000); - cMessage.acknowledge(); - } - session.end(xid, XAResource.TMSUCCESS); - session.prepare(xid); - session.close(); - liveServer1.crash(true, session); - ColocatedHornetQServer qServer = (ColocatedHornetQServer) liveServer2.getServer(); - qServer.backupServer.waitForActivation(5, TimeUnit.SECONDS); - try - ( - ClientSession session2 = factory2.createSession(true, false, false) - ) - { - session2.getXAResource().commit(xid, false); - } - Queue q = (Queue) liveServer2.getServer().getPostOffice().getBinding(queue).getBindable(); - assertEquals(500, q.getMessageCount()); - assertEquals(1000, q.getMessagesAdded()); - } - - @Test - public void testSendAndReceiveSameQueueTransactedManyMessagesRollback() throws Exception - { - int numMessage = 1000; - ClientProducer producer = session1.createProducer(); - ClientSession session = factory1.createSession(true, false, false); - ClientConsumer consumer = session.createConsumer(queue); - for (int i = 0; i < numMessage; i++) - { - ClientMessage message = session1.createMessage(true); - message.getBodyBuffer().writeString("message:" + i); - producer.send(queue, message); - - } - producer.close(); - - Xid xid = new XidImpl("bq1".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes()); - session.start(xid, XAResource.TMNOFLAGS); - session.start(); - for (int i = 0; i < numMessage / 2; i++) - { - ClientMessage cMessage = consumer.receive(5000); - cMessage.acknowledge(); - } - session.end(xid, XAResource.TMSUCCESS); - session.prepare(xid); - session.close(); - liveServer1.crash(true, session); - ColocatedHornetQServer qServer = (ColocatedHornetQServer) liveServer2.getServer(); - qServer.backupServer.waitForActivation(5, TimeUnit.SECONDS); - try - ( - ClientSession session2 = factory2.createSession(true, false, false) - ) - { - session2.getXAResource().rollback(xid); - } - Queue q = (Queue) liveServer2.getServer().getPostOffice().getBinding(queue).getBindable(); - assertEquals(1000, q.getMessageCount()); - assertEquals(1000, q.getMessagesAdded()); - } - - @Test - public void testSendAndReceiveSameQueueTransactedManyMessagesRollbackRestart() throws Exception - { - int numMessage = 1000; - ClientProducer producer = session1.createProducer(); - ClientSession session = factory1.createSession(true, false, false); - ClientConsumer consumer = session.createConsumer(queue); - for (int i = 0; i < numMessage; i++) - { - ClientMessage message = session1.createMessage(true); - message.getBodyBuffer().writeString("message:" + i); - producer.send(queue, message); - - } - producer.close(); - - Xid xid = new XidImpl("bq1".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes()); - session.start(xid, XAResource.TMNOFLAGS); - session.start(); - for (int i = 0; i < numMessage / 2; i++) - { - ClientMessage cMessage = consumer.receive(5000); - cMessage.acknowledge(); - } - session.end(xid, XAResource.TMSUCCESS); - session.prepare(xid); - session.close(); - liveServer1.crash(true, session); - ColocatedHornetQServer qServer = (ColocatedHornetQServer) liveServer2.getServer(); - qServer.backupServer.waitForActivation(5, TimeUnit.SECONDS); - try - ( - ClientSession session2 = factory2.createSession(true, false, false) - ) - { - session2.getXAResource().rollback(xid); - } - liveServer2.stop(); - liveServer2.start(); - Queue q = (Queue) liveServer2.getServer().getPostOffice().getBinding(queue).getBindable(); - assertEquals(1000, q.getMessageCount()); - assertEquals(1000, q.getMessagesAdded()); - } - - @Test - public void testReceiveTransacted2Queues1acked() throws Exception - { - ClientProducer producer = session1.createProducer(); - SimpleString queue2 = new SimpleString("jms.queue.testQueue2"); - session1.createQueue(queue, queue2, true); - ClientSession session = factory1.createSession(true, false, false); - ClientConsumer consumer = session.createConsumer(queue); - ClientMessage message = session1.createMessage(true); - message.getBodyBuffer().writeString("message:1"); - producer.send(queue, message); - producer.close(); - - Xid xid = new XidImpl("bq1".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes()); - session.start(xid, XAResource.TMNOFLAGS); - session.start(); - ClientMessage cMessage = consumer.receive(5000); - cMessage.acknowledge(); - session.end(xid, XAResource.TMSUCCESS); - session.prepare(xid); - session.close(); - liveServer1.crash(true, session); - ColocatedHornetQServer qServer = (ColocatedHornetQServer) liveServer2.getServer(); - qServer.backupServer.waitForActivation(5, TimeUnit.SECONDS); - try - ( - ClientSession session2 = factory2.createSession(true, false, false) - ) - { - session2.getXAResource().commit(xid, false); - } - Queue q = (Queue) liveServer2.getServer().getPostOffice().getBinding(queue).getBindable(); - assertEquals(0, q.getMessageCount()); - assertEquals(1, q.getMessagesAdded()); - q = (Queue) liveServer2.getServer().getPostOffice().getBinding(queue2).getBindable(); - assertEquals(1, q.getMessageCount()); - assertEquals(1, q.getMessagesAdded()); - } - - @Test - public void testReceiveTransacted2Queues1ackedRollback() throws Exception - { - ClientProducer producer = session1.createProducer(); - SimpleString queue2 = new SimpleString("jms.queue.testQueue2"); - session1.createQueue(queue, queue2, true); - ClientSession session = factory1.createSession(true, false, false); - ClientConsumer consumer = session.createConsumer(queue); - ClientMessage message = session1.createMessage(true); - message.getBodyBuffer().writeString("message:1"); - producer.send(queue, message); - producer.close(); - - Xid xid = new XidImpl("bq1".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes()); - session.start(xid, XAResource.TMNOFLAGS); - session.start(); - ClientMessage cMessage = consumer.receive(5000); - cMessage.acknowledge(); - session.end(xid, XAResource.TMSUCCESS); - session.prepare(xid); - session.close(); - liveServer1.crash(true, session); - ColocatedHornetQServer qServer = (ColocatedHornetQServer) liveServer2.getServer(); - qServer.backupServer.waitForActivation(5, TimeUnit.SECONDS); - try - ( - ClientSession session2 = factory2.createSession(true, false, false) - ) - { - session2.getXAResource().rollback(xid); - } - Queue q = (Queue) liveServer2.getServer().getPostOffice().getBinding(queue).getBindable(); - assertEquals(1, q.getMessageCount()); - assertEquals(1, q.getMessagesAdded()); - q = (Queue) liveServer2.getServer().getPostOffice().getBinding(queue2).getBindable(); - assertEquals(1, q.getMessageCount()); - assertEquals(1, q.getMessagesAdded()); - } - - @Test - public void testReceiveTransacted2Queues1ackedRollbackRestart() throws Exception - { - ClientProducer producer = session1.createProducer(); - SimpleString queue2 = new SimpleString("jms.queue.testQueue2"); - session1.createQueue(queue, queue2, true); - ClientSession session = factory1.createSession(true, false, false); - ClientConsumer consumer = session.createConsumer(queue); - ClientMessage message = session1.createMessage(true); - message.getBodyBuffer().writeString("message:1"); - producer.send(queue, message); - producer.close(); - - Xid xid = new XidImpl("bq1".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes()); - session.start(xid, XAResource.TMNOFLAGS); - session.start(); - ClientMessage cMessage = consumer.receive(5000); - cMessage.acknowledge(); - session.end(xid, XAResource.TMSUCCESS); - session.prepare(xid); - session.close(); - liveServer1.crash(true, session); - ColocatedHornetQServer qServer = (ColocatedHornetQServer) liveServer2.getServer(); - qServer.backupServer.waitForActivation(5, TimeUnit.SECONDS); - try - ( - ClientSession session2 = factory2.createSession(true, false, false) - ) - { - session2.getXAResource().rollback(xid); - } - liveServer2.stop(); - liveServer2.start(); - Queue q = (Queue) liveServer2.getServer().getPostOffice().getBinding(queue).getBindable(); - assertEquals(1, q.getMessageCount()); - assertEquals(1, q.getMessagesAdded()); - q = (Queue) liveServer2.getServer().getPostOffice().getBinding(queue2).getBindable(); - assertEquals(1, q.getMessageCount()); - assertEquals(1, q.getMessagesAdded()); - } - - @Test - public void testReceiveTransacted2Queues1ackedAndExtraSend() throws Exception - { - ClientProducer producer = session1.createProducer(); - SimpleString queue2 = new SimpleString("jms.queue.testQueue2"); - session1.createQueue(queue, queue2, true); - ClientSession session = factory1.createSession(true, false, false); - ClientConsumer consumer = session.createConsumer(queue); - ClientMessage message = session1.createMessage(true); - message.getBodyBuffer().writeString("message:1"); - producer.send(queue, message); - producer.close(); - - Xid xid = new XidImpl("bq1".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes()); - session.start(xid, XAResource.TMNOFLAGS); - session.start(); - ClientMessage cMessage = consumer.receive(5000); - cMessage.acknowledge(); - producer = session.createProducer(); - ClientMessage m = session.createMessage(true); - m.getBodyBuffer().writeString("message:2"); - producer.send(queue, m); - session.end(xid, XAResource.TMSUCCESS); - session.prepare(xid); - session.close(); - liveServer1.crash(true, session); - ColocatedHornetQServer qServer = (ColocatedHornetQServer) liveServer2.getServer(); - qServer.backupServer.waitForActivation(5, TimeUnit.SECONDS); - try - ( - ClientSession session2 = factory2.createSession(true, false, false) - ) - { - session2.getXAResource().commit(xid, false); - } - Queue q = (Queue) liveServer2.getServer().getPostOffice().getBinding(queue).getBindable(); - assertEquals(1, q.getMessageCount()); - assertEquals(2, q.getMessagesAdded()); - q = (Queue) liveServer2.getServer().getPostOffice().getBinding(queue2).getBindable(); - assertEquals(2, q.getMessageCount()); - assertEquals(2, q.getMessagesAdded()); - } - - @Test - public void testReceiveTransacted2Queues1ackedAndExtraSendRollback() throws Exception - { - ClientProducer producer = session1.createProducer(); - SimpleString queue2 = new SimpleString("jms.queue.testQueue2"); - session1.createQueue(queue, queue2, true); - ClientSession session = factory1.createSession(true, false, false); - ClientConsumer consumer = session.createConsumer(queue); - ClientMessage message = session1.createMessage(true); - message.getBodyBuffer().writeString("message:1"); - producer.send(queue, message); - producer.close(); - - Xid xid = new XidImpl("bq1".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes()); - session.start(xid, XAResource.TMNOFLAGS); - session.start(); - ClientMessage cMessage = consumer.receive(5000); - cMessage.acknowledge(); - producer = session.createProducer(); - ClientMessage m = session.createMessage(true); - m.getBodyBuffer().writeString("message:2"); - producer.send(queue, m); - session.end(xid, XAResource.TMSUCCESS); - session.prepare(xid); - session.close(); - liveServer1.crash(true, session); - ColocatedHornetQServer qServer = (ColocatedHornetQServer) liveServer2.getServer(); - qServer.backupServer.waitForActivation(5, TimeUnit.SECONDS); - try - ( - ClientSession session2 = factory2.createSession(true, false, false) - ) - { - session2.getXAResource().rollback(xid); - } - Queue q = (Queue) liveServer2.getServer().getPostOffice().getBinding(queue).getBindable(); - assertEquals(1, q.getMessageCount()); - assertEquals(1, q.getMessagesAdded()); - q = (Queue) liveServer2.getServer().getPostOffice().getBinding(queue2).getBindable(); - assertEquals(1, q.getMessageCount()); - assertEquals(1, q.getMessagesAdded()); - } - - @Test - public void testReceiveTransacted2Queues1ackedAndExtraSendRollbackRestart() throws Exception - { - ClientProducer producer = session1.createProducer(); - SimpleString queue2 = new SimpleString("jms.queue.testQueue2"); - session1.createQueue(queue, queue2, true); - ClientSession session = factory1.createSession(true, false, false); - ClientConsumer consumer = session.createConsumer(queue); - ClientMessage message = session1.createMessage(true); - message.getBodyBuffer().writeString("message:1"); - producer.send(queue, message); - producer.close(); - - Xid xid = new XidImpl("bq1".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes()); - session.start(xid, XAResource.TMNOFLAGS); - session.start(); - ClientMessage cMessage = consumer.receive(5000); - cMessage.acknowledge(); - producer = session.createProducer(); - ClientMessage m = session.createMessage(true); - m.getBodyBuffer().writeString("message:2"); - producer.send(queue, m); - session.end(xid, XAResource.TMSUCCESS); - session.prepare(xid); - session.close(); - liveServer1.crash(true, session); - ColocatedHornetQServer qServer = (ColocatedHornetQServer) liveServer2.getServer(); - qServer.backupServer.waitForActivation(5, TimeUnit.SECONDS); - try - ( - ClientSession session2 = factory2.createSession(true, false, false) - ) - { - session2.getXAResource().rollback(xid); - } - liveServer2.stop(); - liveServer2.start(); - Queue q = (Queue) liveServer2.getServer().getPostOffice().getBinding(queue).getBindable(); - assertEquals(1, q.getMessageCount()); - assertEquals(1, q.getMessagesAdded()); - q = (Queue) liveServer2.getServer().getPostOffice().getBinding(queue2).getBindable(); - assertEquals(1, q.getMessageCount()); - assertEquals(1, q.getMessagesAdded()); - } - - @Test - public void testReceive() throws Exception - { - int numMessages = 100; - ClientProducer producer = session1.createProducer(); - final CountDownLatch latch = new CountDownLatch(numMessages); - session1.setSendAcknowledgementHandler(new SendAcknowledgementHandler() - { - @Override - public void sendAcknowledged(org.hornetq.api.core.Message message) - { - latch.countDown(); - } - }); - for (int i = 0; i < numMessages; i++) - { - ClientMessage message = session1.createMessage(true); - message.getBodyBuffer().writeString("message:" + i); - producer.send(queue, message); - } - latch.await(10, TimeUnit.SECONDS); - - ClientConsumer consumer1 = session1.createConsumer(queue); - session1.start(); - for (int i = 0; i < numMessages; i += 2) - { - ClientMessage cMessage = consumer1.receive(5000); - assertNotNull(cMessage.getBodyBuffer().readString()); - } - session1.close(); - liveServer1.crash(); - ClientConsumer consumer = session2.createConsumer(queue); - session2.start(); - for (int i = 50; i < numMessages; i++) - { - ClientMessage cMessage = consumer.receive(5000); - String s = cMessage.getBodyBuffer().readString(); - System.out.println("s = " + s); - assertNotNull(s); - } - } - - @Test - public void testReceiveRedelivery() throws Exception - { - ClientSession session = factory1.createTransactedSession(); - ClientProducer producer = session1.createProducer(); - final CountDownLatch latch = new CountDownLatch(1); - session.setSendAcknowledgementHandler(new SendAcknowledgementHandler() - { - @Override - public void sendAcknowledged(org.hornetq.api.core.Message message) - { - latch.countDown(); - } - }); - ClientMessage message = session.createMessage(true); - message.getBodyBuffer().writeString("message:1"); - producer.send(queue, message); - latch.await(10, TimeUnit.SECONDS); - - ClientConsumer consumer1 = session.createConsumer(queue); - session.start(); - ClientMessage cMessage = consumer1.receive(5000); - assertNotNull(cMessage.getBodyBuffer().readString()); - cMessage.acknowledge(); - assertEquals(1, cMessage.getDeliveryCount()); - session.rollback(); - cMessage = consumer1.receive(5000); - assertNotNull(cMessage.getBodyBuffer().readString()); - cMessage.acknowledge(); - assertEquals(2, cMessage.getDeliveryCount()); - session.rollback(); - session.close(); - liveServer1.crash(); - ClientConsumer consumer = session2.createConsumer(queue); - session2.start(); - cMessage = consumer.receive(5000); - String s = cMessage.getBodyBuffer().readString(); - System.out.println("s = " + s); - assertNotNull(s); - assertEquals(3, cMessage.getDeliveryCount()); - } - - @Test - public void testSendDuplicateIDs() throws Exception - { - int numMessages = 100; - ClientProducer producer = session1.createProducer(); - ClientProducer producer2 = session2.createProducer(); - final CountDownLatch latch = new CountDownLatch(numMessages * 2); - session1.setSendAcknowledgementHandler(new SendAcknowledgementHandler() - { - @Override - public void sendAcknowledged(org.hornetq.api.core.Message message) - { - latch.countDown(); - } - }); - - session2.setSendAcknowledgementHandler(new SendAcknowledgementHandler() - { - @Override - public void sendAcknowledged(org.hornetq.api.core.Message message) - { - latch.countDown(); - } - }); - for (int i = 0; i < numMessages; i++) - { - ClientMessage message = session1.createMessage(true); - message.getBodyBuffer().writeString("message:" + i); - message.putStringProperty(org.hornetq.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), "prod1" + ":" + i); - producer.send(queue, message); - - message = session2.createMessage(true); - message.getBodyBuffer().writeString("message:" + i); - message.putStringProperty(org.hornetq.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), "prod2" + ":" + i); - producer2.send(queue, message); - } - latch.await(10, TimeUnit.SECONDS); - - liveServer1.crash(); - - final CountDownLatch latch2 = new CountDownLatch(numMessages * 2); - session1.setSendAcknowledgementHandler(new SendAcknowledgementHandler() - { - @Override - public void sendAcknowledged(org.hornetq.api.core.Message message) - { - latch2.countDown(); - } - }); - session2.setSendAcknowledgementHandler(new SendAcknowledgementHandler() - { - @Override - public void sendAcknowledged(org.hornetq.api.core.Message message) - { - latch2.countDown(); - } - }); - - Thread.sleep(5000); - for (int i = 0; i < numMessages; i++) - { - ClientMessage message = session1.createMessage(true); - message.getBodyBuffer().writeString("message:" + i); - message.putStringProperty(org.hornetq.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), "prod1" + ":" + i); - producer.send(queue, message); - - message = session2.createMessage(true); - message.getBodyBuffer().writeString("message:" + i); - message.putStringProperty(org.hornetq.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), "prod2" + ":" + i); - producer2.send(queue, message); - } - latch2.await(10, TimeUnit.SECONDS); - Binding binding = liveServer2.getServer().getPostOffice().getBinding(new SimpleString("jms.queue.testQueue")); - QueueImpl q = (QueueImpl) binding.getBindable(); - assertEquals(numMessages * 2, q.getMessageCount()); - } - - - - protected void createConfigs() throws Exception - { - nodeManagerLive1 = new InVMNodeManager(false); - nodeManagerLive2 = new InVMNodeManager(false); - - TransportConfiguration liveConnector1 = getConnectorTransportConfiguration(1); - Configuration liveConfiguration1 = super.createDefaultConfig(); - liveConfiguration1.getAcceptorConfigurations().clear(); - liveConfiguration1.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(1)); - liveConfiguration1.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.SHARED_STORE); - liveConfiguration1.getHAPolicy().setFailbackDelay(1000); - liveConfiguration1.setJournalDirectory(getTestDir() + "/journal1"); - liveConfiguration1.setBindingsDirectory(getTestDir() + "/bindings1"); - liveConfiguration1.setLargeMessagesDirectory(getTestDir() + "/largemessage1"); - liveConfiguration1.setPagingDirectory(getTestDir() + "/paging1"); - liveConfiguration1.getQueueConfigurations().add(new CoreQueueConfiguration("jms.queue.testQueue", "jms.queue.testQueue", null, true)); - liveConfiguration1.getQueueConfigurations().add(new CoreQueueConfiguration("jms.topic.testTopic", "jms.topic.testTopic", HornetQServerImpl.GENERIC_IGNORED_FILTER, true)); - - TransportConfiguration liveConnector2 = getConnectorTransportConfiguration(2); - basicClusterConnectionConfig(liveConfiguration1, liveConnector1.getName(), liveConnector2.getName()); - liveConfiguration1.getConnectorConfigurations().put(liveConnector1.getName(), liveConnector1); - liveConfiguration1.getConnectorConfigurations().put(liveConnector2.getName(), liveConnector2); - - Configuration backupConfiguration1 = liveConfiguration1.copy(); - - backupConfiguration1.setJournalDirectory(getTestDir() + "/journal2"); - backupConfiguration1.setBindingsDirectory(getTestDir() + "/bindings2"); - backupConfiguration1.setLargeMessagesDirectory(getTestDir() + "/largemessage2"); - backupConfiguration1.setPagingDirectory(getTestDir() + "/paging2"); - backupConfiguration1.setBackupStrategy(BackupStrategy.SCALE_DOWN); - backupConfiguration1.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.BACKUP_SHARED_STORE); - HAPolicy haPolicy = new HAPolicy(); - ArrayList<String> scaleDownConnectors = new ArrayList<>(); - scaleDownConnectors.add(liveConnector1.getName()); - haPolicy.setScaleDownConnectors(scaleDownConnectors); - backupConfiguration1.setHAPolicy(haPolicy); - liveConfiguration1.getBackupServerConfigurations().add(backupConfiguration1); - - liveServer1 = createTestableServer(liveConfiguration1, nodeManagerLive1, nodeManagerLive2, 1); - - Configuration liveConfiguration2 = super.createDefaultConfig(); - liveConfiguration2.getAcceptorConfigurations().clear(); - liveConfiguration2.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(2)); - liveConfiguration2.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.SHARED_STORE); - liveConfiguration2.getHAPolicy().setFailbackDelay(1000); - liveConfiguration2.setJournalDirectory(getTestDir() + "/journal2"); - liveConfiguration2.setBindingsDirectory(getTestDir() + "/bindings2"); - liveConfiguration2.setLargeMessagesDirectory(getTestDir() + "/largemessage2"); - liveConfiguration2.setPagingDirectory(getTestDir() + "/paging2"); - liveConfiguration2.getQueueConfigurations().add(new CoreQueueConfiguration("jms.queue.testQueue", "jms.queue.testQueue", null, true)); - liveConfiguration2.getQueueConfigurations().add(new CoreQueueConfiguration("jms.topic.testTopic", "jms.topic.testTopic", HornetQServerImpl.GENERIC_IGNORED_FILTER, true)); - - basicClusterConnectionConfig(liveConfiguration2, liveConnector2.getName(), liveConnector1.getName()); - liveConfiguration2.getConnectorConfigurations().put(liveConnector1.getName(), liveConnector1); - liveConfiguration2.getConnectorConfigurations().put(liveConnector2.getName(), liveConnector2); - - Configuration backupConfiguration2 = liveConfiguration2.copy(); - backupConfiguration2.setJournalDirectory(getTestDir() + "/journal1"); - backupConfiguration2.setBindingsDirectory(getTestDir() + "/bindings1"); - backupConfiguration2.setLargeMessagesDirectory(getTestDir() + "/largemessage1"); - backupConfiguration2.setPagingDirectory(getTestDir() + "/paging1"); - backupConfiguration2.setBackupStrategy(BackupStrategy.SCALE_DOWN); - backupConfiguration2.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.BACKUP_SHARED_STORE); - HAPolicy haPolicy2 = new HAPolicy(); - ArrayList<String> scaleDownConnectors2 = new ArrayList<>(); - scaleDownConnectors2.add(liveConnector2.getName()); - haPolicy2.setScaleDownConnectors(scaleDownConnectors2); - backupConfiguration2.setHAPolicy(haPolicy2); - liveConfiguration2.getBackupServerConfigurations().add(backupConfiguration2); - - liveServer2 = createTestableServer(liveConfiguration2, nodeManagerLive2, nodeManagerLive1, 2); - } - - private TransportConfiguration getAcceptorTransportConfiguration(int node) - { - HashMap<String, Object> params = new HashMap<>(); - params.put("server-id", "" + node); - return new TransportConfiguration(INVM_ACCEPTOR_FACTORY, params); - } - - private TransportConfiguration getConnectorTransportConfiguration(int node) - { - HashMap<String, Object> params = new HashMap<>(); - params.put("server-id", "" + node); - return new TransportConfiguration(INVM_CONNECTOR_FACTORY, params); - } - - protected TestableServer createTestableServer(Configuration config, NodeManager liveNodeManager, NodeManager backupNodeManager, int id) - { - return new SameProcessHornetQServer( - createColocatedInVMFailoverServer(true, config, liveNodeManager, backupNodeManager, id)); - } -}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java index 9debe77..22caa1a 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java @@ -25,7 +25,9 @@ import org.hornetq.api.core.client.ClientSessionFactory; import org.hornetq.api.core.client.ServerLocator; import org.hornetq.core.client.impl.ClientSessionFactoryInternal; import org.hornetq.core.client.impl.ServerLocatorInternal; -import org.hornetq.core.server.cluster.ha.HAPolicy; +import org.hornetq.core.config.ha.SharedStoreMasterPolicyConfiguration; +import org.hornetq.core.config.ha.SharedStoreSlavePolicyConfiguration; +import org.hornetq.core.server.cluster.ha.SharedStoreSlavePolicy; import org.hornetq.core.server.impl.InVMNodeManager; import org.hornetq.jms.client.HornetQTextMessage; import org.hornetq.tests.integration.IntegrationTestLogger; @@ -56,6 +58,7 @@ public class FailBackAutoTest extends FailoverTestBase @Test public void testAutoFailback() throws Exception { + ((SharedStoreSlavePolicy)backupServer.getServer().getHAPolicy()).setRestartBackup(false); createSessionFactory(); final CountDownLatch latch = new CountDownLatch(1); @@ -163,7 +166,6 @@ public class FailBackAutoTest extends FailoverTestBase listener = new CountDownSessionFailureListener(session); session.addFailureListener(listener); - log.info("restarting live node now"); liveServer.start(); @@ -202,6 +204,7 @@ public class FailBackAutoTest extends FailoverTestBase @Test public void testFailBack() throws Exception { + ((SharedStoreSlavePolicy)backupServer.getServer().getHAPolicy()).setRestartBackup(false); createSessionFactory(); ClientSession session = sendAndConsume(sf, true); @@ -218,8 +221,8 @@ public class FailBackAutoTest extends FailoverTestBase producer = session.createProducer(FailoverTestBase.ADDRESS); sendMessages(session, producer, 2 * NUM_MESSAGES); session.commit(); - assertFalse("must NOT be a backup", liveServer.getServer().getConfiguration().getHAPolicy().isBackup()); - adaptLiveConfigForReplicatedFailBack(liveServer.getServer().getConfiguration()); + assertFalse("must NOT be a backup", liveServer.getServer().getHAPolicy().isBackup()); + adaptLiveConfigForReplicatedFailBack(liveServer); CountDownSessionFailureListener listener = new CountDownSessionFailureListener(session); session.addFailureListener(listener); @@ -254,28 +257,32 @@ public class FailBackAutoTest extends FailoverTestBase { nodeManager = new InVMNodeManager(false); - backupConfig = super.createDefaultConfig(); - backupConfig.getAcceptorConfigurations().clear(); - backupConfig.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(false)); - backupConfig.setSecurityEnabled(false); - backupConfig.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.BACKUP_SHARED_STORE); - backupConfig.getHAPolicy().setFailbackDelay(1000); TransportConfiguration liveConnector = getConnectorTransportConfiguration(true); TransportConfiguration backupConnector = getConnectorTransportConfiguration(false); - backupConfig.getConnectorConfigurations().put(liveConnector.getName(), liveConnector); - backupConfig.getConnectorConfigurations().put(backupConnector.getName(), backupConnector); - basicClusterConnectionConfig(backupConfig, backupConnector.getName(), liveConnector.getName()); + + backupConfig = super.createDefaultConfig() + .clearAcceptorConfigurations() + .addAcceptorConfiguration(getAcceptorTransportConfiguration(false)) + .setSecurityEnabled(false) + .setHAPolicyConfiguration(new SharedStoreSlavePolicyConfiguration() + .setFailbackDelay(1000) + .setRestartBackup(true)) + .addConnectorConfiguration(liveConnector.getName(), liveConnector) + .addConnectorConfiguration(backupConnector.getName(), backupConnector) + .addClusterConfiguration(basicClusterConnectionConfig(backupConnector.getName(), liveConnector.getName())); + backupServer = createTestableServer(backupConfig); - liveConfig = super.createDefaultConfig(); - liveConfig.getAcceptorConfigurations().clear(); - liveConfig.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(true)); - liveConfig.setSecurityEnabled(false); - liveConfig.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.SHARED_STORE); - liveConfig.getHAPolicy().setFailbackDelay(1000); - basicClusterConnectionConfig(liveConfig, liveConnector.getName(), backupConnector.getName()); - liveConfig.getConnectorConfigurations().put(liveConnector.getName(), liveConnector); - liveConfig.getConnectorConfigurations().put(backupConnector.getName(), backupConnector); + liveConfig = super.createDefaultConfig() + .clearAcceptorConfigurations() + .addAcceptorConfiguration(getAcceptorTransportConfiguration(true)) + .setSecurityEnabled(false) + .setHAPolicyConfiguration(new SharedStoreMasterPolicyConfiguration() + .setFailbackDelay(100)) + .addClusterConfiguration(basicClusterConnectionConfig(liveConnector.getName(), backupConnector.getName())) + .addConnectorConfiguration(liveConnector.getName(), liveConnector) + .addConnectorConfiguration(backupConnector.getName(), backupConnector); + liveServer = createTestableServer(liveConfig); } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailBackManualTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailBackManualTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailBackManualTest.java index 7575208..366c2ad 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailBackManualTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailBackManualTest.java @@ -11,15 +11,9 @@ * permissions and limitations under the License. */ package org.hornetq.tests.integration.cluster.failover; -import org.hornetq.core.server.cluster.ha.HAPolicy; -import org.junit.Before; - -import org.junit.Test; import java.util.concurrent.TimeUnit; -import org.junit.Assert; - import org.hornetq.api.core.SimpleString; import org.hornetq.api.core.TransportConfiguration; import org.hornetq.api.core.client.ClientConsumer; @@ -29,11 +23,16 @@ import org.hornetq.api.core.client.ClientSession; import org.hornetq.api.core.client.ClientSessionFactory; import org.hornetq.core.client.impl.ClientSessionFactoryInternal; import org.hornetq.core.client.impl.ServerLocatorInternal; +import org.hornetq.core.config.ha.SharedStoreMasterPolicyConfiguration; +import org.hornetq.core.config.ha.SharedStoreSlavePolicyConfiguration; import org.hornetq.core.server.impl.InVMNodeManager; import org.hornetq.jms.client.HornetQTextMessage; import org.hornetq.tests.integration.cluster.util.TestableServer; import org.hornetq.tests.util.CountDownSessionFailureListener; import org.hornetq.tests.util.TransportConfigurationUtils; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; /** * @author <a href="mailto:[email protected]">Andy Taylor</a> @@ -73,7 +72,8 @@ public class FailBackManualTest extends FailoverTestBase backupServer.start(); - assertTrue(listener.getLatch().await(5, TimeUnit.SECONDS)); + assertTrue(listener.getLatch() + .await(5, TimeUnit.SECONDS)); ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS); @@ -85,8 +85,6 @@ public class FailBackManualTest extends FailoverTestBase session.removeFailureListener(listener); - liveConfig.setAllowAutoFailBack(false); - Thread t = new Thread(new ServerStarter(liveServer)); t.start(); @@ -113,29 +111,30 @@ public class FailBackManualTest extends FailoverTestBase protected void createConfigs() throws Exception { nodeManager = new InVMNodeManager(false); - - backupConfig = super.createDefaultConfig(); - backupConfig.getAcceptorConfigurations().clear(); - backupConfig.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(false)); - backupConfig.setSecurityEnabled(false); - backupConfig.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.BACKUP_SHARED_STORE); TransportConfiguration liveConnector = getConnectorTransportConfiguration(true); TransportConfiguration backupConnector = getConnectorTransportConfiguration(false); - backupConfig.getConnectorConfigurations().put(liveConnector.getName(), liveConnector); - backupConfig.getConnectorConfigurations().put(backupConnector.getName(), backupConnector); - basicClusterConnectionConfig(backupConfig, backupConnector.getName(), liveConnector.getName()); - backupConfig.getHAPolicy().setAllowAutoFailBack(false); + + backupConfig = super.createDefaultConfig() + .clearAcceptorConfigurations() + .addAcceptorConfiguration(getAcceptorTransportConfiguration(false)) + .setSecurityEnabled(false) + .setHAPolicyConfiguration(new SharedStoreSlavePolicyConfiguration() + .setAllowFailBack(false)) + .addConnectorConfiguration(liveConnector.getName(), liveConnector) + .addConnectorConfiguration(backupConnector.getName(), backupConnector) + .addClusterConfiguration(basicClusterConnectionConfig(backupConnector.getName(), liveConnector.getName())); + backupServer = createTestableServer(backupConfig); - liveConfig = super.createDefaultConfig(); - liveConfig.getAcceptorConfigurations().clear(); - liveConfig.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(true)); - liveConfig.setSecurityEnabled(false); - liveConfig.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.SHARED_STORE); - basicClusterConnectionConfig(liveConfig, liveConnector.getName(), backupConnector.getName()); - liveConfig.getConnectorConfigurations().put(liveConnector.getName(), liveConnector); - liveConfig.getConnectorConfigurations().put(backupConnector.getName(), backupConnector); - liveConfig.setAllowAutoFailBack(false); + liveConfig = super.createDefaultConfig() + .clearAcceptorConfigurations() + .addAcceptorConfiguration(getAcceptorTransportConfiguration(true)) + .setSecurityEnabled(false) + .setHAPolicyConfiguration(new SharedStoreMasterPolicyConfiguration()) + .addConnectorConfiguration(liveConnector.getName(), liveConnector) + .addConnectorConfiguration(backupConnector.getName(), backupConnector) + .addClusterConfiguration(basicClusterConnectionConfig(liveConnector.getName(), backupConnector.getName())); + liveServer = createTestableServer(liveConfig); } @@ -168,12 +167,13 @@ public class FailBackManualTest extends FailoverTestBase for (int i = 0; i < numMessages; i++) { ClientMessage message = session.createMessage(HornetQTextMessage.TYPE, - false, - 0, - System.currentTimeMillis(), - (byte) 1); + false, + 0, + System.currentTimeMillis(), + (byte) 1); message.putIntProperty(new SimpleString("count"), i); - message.getBodyBuffer().writeString("aardvarks"); + message.getBodyBuffer() + .writeString("aardvarks"); producer.send(message); } @@ -185,7 +185,8 @@ public class FailBackManualTest extends FailoverTestBase { ClientMessage message2 = consumer.receive(); - Assert.assertEquals("aardvarks", message2.getBodyBuffer().readString()); + Assert.assertEquals("aardvarks", message2.getBodyBuffer() + .readString()); Assert.assertEquals(i, message2.getObjectProperty(new SimpleString("count"))); @@ -207,7 +208,8 @@ public class FailBackManualTest extends FailoverTestBase @Override protected void setBody(final int i, final ClientMessage message) { - message.getBodyBuffer().writeString("message" + i); + message.getBodyBuffer() + .writeString("message" + i); } static class ServerStarter implements Runnable http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverListenerTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverListenerTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverListenerTest.java index d277dc0..ccc3592 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverListenerTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverListenerTest.java @@ -26,7 +26,8 @@ import org.hornetq.api.core.client.FailoverEventType; import org.hornetq.api.core.client.ServerLocator; import org.hornetq.core.client.impl.ClientSessionFactoryInternal; import org.hornetq.core.client.impl.ServerLocatorInternal; -import org.hornetq.core.server.cluster.ha.HAPolicy; +import org.hornetq.core.config.ha.SharedStoreMasterPolicyConfiguration; +import org.hornetq.core.config.ha.SharedStoreSlavePolicyConfiguration; import org.hornetq.core.server.impl.InVMNodeManager; import org.hornetq.jms.client.HornetQTextMessage; import org.hornetq.tests.integration.IntegrationTestLogger; @@ -189,29 +190,31 @@ public class FailoverListenerTest extends FailoverTestBase protected void createConfigs() throws Exception { nodeManager = new InVMNodeManager(false); - - backupConfig = super.createDefaultConfig(); - backupConfig.getAcceptorConfigurations().clear(); - backupConfig.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(false)); - backupConfig.setSecurityEnabled(false); - backupConfig.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.BACKUP_SHARED_STORE); - backupConfig.getHAPolicy().setFailbackDelay(1000); TransportConfiguration liveConnector = getConnectorTransportConfiguration(true); TransportConfiguration backupConnector = getConnectorTransportConfiguration(false); - backupConfig.getConnectorConfigurations().put(liveConnector.getName(), liveConnector); - backupConfig.getConnectorConfigurations().put(backupConnector.getName(), backupConnector); - basicClusterConnectionConfig(backupConfig, backupConnector.getName(), liveConnector.getName()); + + backupConfig = super.createDefaultConfig() + .clearAcceptorConfigurations() + .addAcceptorConfiguration(getAcceptorTransportConfiguration(false)) + .setSecurityEnabled(false) + .setHAPolicyConfiguration(new SharedStoreSlavePolicyConfiguration() + .setFailbackDelay(1000)) + .addConnectorConfiguration(liveConnector.getName(), liveConnector) + .addConnectorConfiguration(backupConnector.getName(), backupConnector) + .addClusterConfiguration(basicClusterConnectionConfig(backupConnector.getName(), liveConnector.getName())); + backupServer = createTestableServer(backupConfig); - liveConfig = super.createDefaultConfig(); - liveConfig.getAcceptorConfigurations().clear(); - liveConfig.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(true)); - liveConfig.setSecurityEnabled(false); - liveConfig.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.SHARED_STORE); - liveConfig.getHAPolicy().setFailbackDelay(1000); - basicClusterConnectionConfig(liveConfig, liveConnector.getName(), backupConnector.getName()); - liveConfig.getConnectorConfigurations().put(liveConnector.getName(), liveConnector); - liveConfig.getConnectorConfigurations().put(backupConnector.getName(), backupConnector); + liveConfig = super.createDefaultConfig() + .clearAcceptorConfigurations() + .addAcceptorConfiguration(getAcceptorTransportConfiguration(true)) + .setSecurityEnabled(false) + .setHAPolicyConfiguration(new SharedStoreMasterPolicyConfiguration() + .setFailbackDelay(1000)) + .addClusterConfiguration(basicClusterConnectionConfig(liveConnector.getName(), backupConnector.getName())) + .addConnectorConfiguration(liveConnector.getName(), liveConnector) + .addConnectorConfiguration(backupConnector.getName(), backupConnector); + liveServer = createTestableServer(liveConfig); } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java index f82725d..c1f262d 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java @@ -41,7 +41,12 @@ import org.hornetq.api.core.client.ClientSessionFactory; import org.hornetq.api.core.client.MessageHandler; import org.hornetq.api.core.client.ServerLocator; import org.hornetq.core.client.impl.ClientSessionFactoryInternal; +import org.hornetq.core.server.cluster.ha.BackupPolicy; import org.hornetq.core.server.cluster.ha.HAPolicy; +import org.hornetq.core.server.cluster.ha.ReplicaPolicy; +import org.hornetq.core.server.cluster.ha.ReplicatedPolicy; +import org.hornetq.core.server.cluster.ha.SharedStoreMasterPolicy; +import org.hornetq.core.server.cluster.ha.SharedStoreSlavePolicy; import org.hornetq.core.server.impl.InVMNodeManager; import org.hornetq.core.transaction.impl.XidImpl; import org.hornetq.jms.client.HornetQTextMessage; @@ -585,7 +590,12 @@ public class FailoverTest extends FailoverTestBase public void testFailBack() throws Exception { boolean doFailBack = true; - backupServer.getServer().getConfiguration().setMaxSavedReplicatedJournalSize(0); + HAPolicy haPolicy = backupServer.getServer().getHAPolicy(); + if (haPolicy instanceof ReplicaPolicy) + { + ((ReplicaPolicy)haPolicy).setMaxSavedReplicatedJournalsSize(0); + } + simpleReplication(doFailBack); } @@ -618,8 +628,8 @@ public class FailoverTest extends FailoverTestBase Thread.sleep(100); assertFalse("backup is not running", backupServer.isStarted()); - assertFalse("must NOT be a backup", liveServer.getServer().getConfiguration().getHAPolicy().isBackup()); - adaptLiveConfigForReplicatedFailBack(liveServer.getServer().getConfiguration()); + assertFalse("must NOT be a backup", liveServer.getServer().getHAPolicy() instanceof BackupPolicy); + adaptLiveConfigForReplicatedFailBack(liveServer); beforeRestart(liveServer); liveServer.start(); assertTrue("live initialized...", liveServer.getServer().waitForActivation(15, TimeUnit.SECONDS)); @@ -629,7 +639,7 @@ public class FailoverTest extends FailoverTestBase ClientSession session2 = createSession(sf, false, false); session2.start(); ClientConsumer consumer2 = session2.createConsumer(FailoverTestBase.ADDRESS); - boolean replication = !liveServer.getServer().getConfiguration().getHAPolicy().isSharedStore(); + boolean replication = liveServer.getServer().getHAPolicy() instanceof ReplicatedPolicy; if (replication) receiveMessages(consumer2, 0, NUM_MESSAGES, true); assertNoMoreMessages(consumer2); @@ -658,7 +668,8 @@ public class FailoverTest extends FailoverTestBase backupServer.stop(); // Backup stops! backupServer.start(); - assertTrue(backupServer.getServer().waitForBackupSync(10, TimeUnit.SECONDS)); + + waitForRemoteBackupSynchronization(backupServer.getServer()); session.start(); ClientConsumer consumer = addClientConsumer(session.createConsumer(FailoverTestBase.ADDRESS)); @@ -675,7 +686,7 @@ public class FailoverTest extends FailoverTestBase backupServer.stop(); // Backup stops! beforeRestart(backupServer); backupServer.start(); - assertTrue(backupServer.getServer().waitForBackupSync(30, TimeUnit.SECONDS)); + waitForRemoteBackupSynchronization(backupServer.getServer()); backupServer.stop(); // Backup stops! liveServer.stop(); @@ -723,8 +734,8 @@ public class FailoverTest extends FailoverTestBase assertEquals("backup must be running with the same nodeID", liveId, backupServer.getServer().getNodeID()); if (doFailBack) { - assertFalse("must NOT be a backup", liveServer.getServer().getConfiguration().getHAPolicy().isBackup()); - adaptLiveConfigForReplicatedFailBack(liveServer.getServer().getConfiguration()); + assertFalse("must NOT be a backup", liveServer.getServer().getHAPolicy().isBackup()); + adaptLiveConfigForReplicatedFailBack(liveServer); beforeRestart(liveServer); liveServer.start(); assertTrue("live initialized...", liveServer.getServer().waitForActivation(40, TimeUnit.SECONDS)); @@ -1838,7 +1849,8 @@ public class FailoverTest extends FailoverTestBase receiveMessages(consumer); } - public void _testForceBlockingReturn() throws Exception + @Test + public void testForceBlockingReturn() throws Exception { locator.setBlockOnNonDurableSend(true); locator.setBlockOnDurableSend(true); @@ -1847,8 +1859,7 @@ public class FailoverTest extends FailoverTestBase createClientSessionFactory(); // Add an interceptor to delay the send method so we can get time to cause failover before it returns - - // liveServer.getRemotingService().addIncomingInterceptor(new DelayInterceptor()); + liveServer.getServer().getRemotingService().addIncomingInterceptor(new DelayInterceptor()); final ClientSession session = createSession(sf, true, true, 0); @@ -1888,8 +1899,12 @@ public class FailoverTest extends FailoverTestBase Assert.assertNotNull(sender.e); + Assert.assertNotNull(sender.e.getCause()); + Assert.assertEquals(sender.e.getType(), HornetQExceptionType.UNBLOCKED); + Assert.assertEquals(((HornetQException)sender.e.getCause()).getType(), HornetQExceptionType.DISCONNECTED); + session.close(); } @@ -2159,7 +2174,7 @@ public class FailoverTest extends FailoverTestBase public void testBackupServerNotRemoved() throws Exception { // HORNETQ-720 Disabling test for replicating backups. - if (!backupServer.getServer().getConfiguration().getHAPolicy().isSharedStore()) + if (!(backupServer.getServer().getHAPolicy() instanceof SharedStoreSlavePolicy)) { waitForComponent(backupServer, 1); return; @@ -2297,11 +2312,11 @@ public class FailoverTest extends FailoverTestBase // To reload security or other settings that are read during startup beforeRestart(backupServer); - if (!backupServer.getServer().getConfiguration().getHAPolicy().isSharedStore()) + if (!backupServer.getServer().getHAPolicy().isSharedStore()) { // XXX // this test would not make sense in the remote replication use case, without the following - backupServer.getServer().getConfiguration().getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.SHARED_STORE); + backupServer.getServer().setHAPolicy(new SharedStoreMasterPolicy()); } backupServer.start();
