http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/topology/IsolatedTopologyTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/topology/IsolatedTopologyTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/topology/IsolatedTopologyTest.java index 5b99685..ebbe331 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/topology/IsolatedTopologyTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/topology/IsolatedTopologyTest.java @@ -106,11 +106,6 @@ public class IsolatedTopologyTest extends ServiceTestBase private HornetQServer createServer1() throws Exception { - // Server1 with two acceptors, each acceptor on a different cluster connection - // talking to a different connector. - // i.e. two cluster connections isolated on the same node - Configuration config1 = createBasicConfig(0); - Map<String, Object> params = new HashMap<String, Object>(); params.put(TransportConstants.CLUSTER_CONNECTION, "cc1"); params.put(org.hornetq.core.remoting.impl.invm.TransportConstants.SERVER_ID_PROP_NAME, "1"); @@ -118,13 +113,6 @@ public class IsolatedTopologyTest extends ServiceTestBase TransportConfiguration acceptor1VM1 = new TransportConfiguration(UnitTestCase.INVM_ACCEPTOR_FACTORY, params, "acceptor-cc1"); - config1.getAcceptorConfigurations().add(acceptor1VM1); - - config1.getConnectorConfigurations().put("local-cc1", createInVMTransportConnectorConfig(1, "local-cc1")); - config1.getConnectorConfigurations().put("local-cc2", createInVMTransportConnectorConfig(2, "local-cc2")); - - config1.getConnectorConfigurations().put("other-cc1", createInVMTransportConnectorConfig(3, "other-cc1")); - config1.getConnectorConfigurations().put("other-cc2", createInVMTransportConnectorConfig(4, "other-cc2")); params = new HashMap<String, Object>(); params.put(TransportConstants.CLUSTER_CONNECTION, "cc2"); @@ -133,46 +121,47 @@ public class IsolatedTopologyTest extends ServiceTestBase TransportConfiguration acceptor2VM1 = new TransportConfiguration(UnitTestCase.INVM_ACCEPTOR_FACTORY, params, "acceptor-cc2"); - config1.getAcceptorConfigurations().add(acceptor2VM1); List<String> connectTo = new ArrayList<String>(); connectTo.add("other-cc1"); - ClusterConnectionConfiguration server1CC1 = - new ClusterConnectionConfiguration("cc1", "jms", "local-cc1", - 250, - true, - false, - 1, - 1024, - connectTo, - false); - - config1.getClusterConfigurations().add(server1CC1); + ClusterConnectionConfiguration server1CC1 = new ClusterConnectionConfiguration() + .setName("cc1") + .setAddress("jms") + .setConnectorName("local-cc1") + .setRetryInterval(250) + .setConfirmationWindowSize(1024) + .setStaticConnectors(connectTo); ArrayList<String> connectTo2 = new ArrayList<String>(); connectTo2.add("other-cc2"); - ClusterConnectionConfiguration server1CC2 = - new ClusterConnectionConfiguration("cc2", "jms", "local-cc2", 250, - true, - false, - 1, - 1024, - connectTo2, - false); + ClusterConnectionConfiguration server1CC2 = new ClusterConnectionConfiguration() + .setName("cc2") + .setAddress("jms") + .setConnectorName("local-cc2") + .setRetryInterval(250) + .setConfirmationWindowSize(1024) + .setStaticConnectors(connectTo2); - config1.getClusterConfigurations().add(server1CC2); + // Server1 with two acceptors, each acceptor on a different cluster connection + // talking to a different connector. + // i.e. two cluster connections isolated on the same node + Configuration config1 = createBasicConfig(0) + .addConnectorConfiguration("local-cc1", createInVMTransportConnectorConfig(1, "local-cc1")) + .addConnectorConfiguration("local-cc2", createInVMTransportConnectorConfig(2, "local-cc2")) + .addConnectorConfiguration("other-cc1", createInVMTransportConnectorConfig(3, "other-cc1")) + .addConnectorConfiguration("other-cc2", createInVMTransportConnectorConfig(4, "other-cc2")) + .addAcceptorConfiguration(acceptor1VM1) + .addAcceptorConfiguration(acceptor2VM1) + .addClusterConfiguration(server1CC1) + .addClusterConfiguration(server1CC2); return createServer(false, config1); } private HornetQServer createServer2() throws Exception { - // Server1 with two acceptors, each acceptor on a different cluster connection - // talking to a different connector. - // i.e. two cluster connections isolated on the same node - Configuration config1 = createBasicConfig(3); Map<String, Object> params = new HashMap<String, Object>(); params.put(TransportConstants.CLUSTER_CONNECTION, "cc1"); @@ -181,13 +170,6 @@ public class IsolatedTopologyTest extends ServiceTestBase TransportConfiguration acceptor1VM1 = new TransportConfiguration(UnitTestCase.INVM_ACCEPTOR_FACTORY, params, "acceptor-cc1"); - config1.getAcceptorConfigurations().add(acceptor1VM1); - - config1.getConnectorConfigurations().put("local-cc1", createInVMTransportConnectorConfig(3, "local-cc1")); - config1.getConnectorConfigurations().put("local-cc2", createInVMTransportConnectorConfig(4, "local-cc2")); - - config1.getConnectorConfigurations().put("other-cc1", createInVMTransportConnectorConfig(1, "other-cc1")); - config1.getConnectorConfigurations().put("other-cc2", createInVMTransportConnectorConfig(2, "other-cc2")); params = new HashMap<String, Object>(); params.put(TransportConstants.CLUSTER_CONNECTION, "cc2"); @@ -196,25 +178,41 @@ public class IsolatedTopologyTest extends ServiceTestBase TransportConfiguration acceptor2VM1 = new TransportConfiguration(UnitTestCase.INVM_ACCEPTOR_FACTORY, params, "acceptor-cc2"); - config1.getAcceptorConfigurations().add(acceptor2VM1); List<String> connectTo = new ArrayList<String>(); connectTo.add("other-cc1"); - ClusterConnectionConfiguration server1CC1 = - new ClusterConnectionConfiguration("cc1", "jms", "local-cc1", 250, true, false, 1, 1024, connectTo, - false); + ClusterConnectionConfiguration server1CC1 = new ClusterConnectionConfiguration() + .setName("cc1") + .setAddress("jms") + .setConnectorName("local-cc1") + .setRetryInterval(250) + .setConfirmationWindowSize(1024) + .setStaticConnectors(connectTo); - config1.getClusterConfigurations().add(server1CC1); - - ArrayList<String> connectTo2 = new ArrayList<String>(); + List<String> connectTo2 = new ArrayList<String>(); connectTo2.add("other-cc2"); - ClusterConnectionConfiguration server1CC2 = - new ClusterConnectionConfiguration("cc2", "jms", "local-cc2", 250, true, false, 1, 1024, connectTo2, - false); + ClusterConnectionConfiguration server1CC2 = new ClusterConnectionConfiguration() + .setName("cc2") + .setAddress("jms") + .setConnectorName("local-cc2") + .setRetryInterval(250) + .setConfirmationWindowSize(1024) + .setStaticConnectors(connectTo2); - config1.getClusterConfigurations().add(server1CC2); + // Server1 with two acceptors, each acceptor on a different cluster connection + // talking to a different connector. + // i.e. two cluster connections isolated on the same node + Configuration config1 = createBasicConfig(3) + .addAcceptorConfiguration(acceptor1VM1) + .addAcceptorConfiguration(acceptor2VM1) + .addConnectorConfiguration("local-cc1", createInVMTransportConnectorConfig(3, "local-cc1")) + .addConnectorConfiguration("local-cc2", createInVMTransportConnectorConfig(4, "local-cc2")) + .addConnectorConfiguration("other-cc1", createInVMTransportConnectorConfig(1, "other-cc1")) + .addConnectorConfiguration("other-cc2", createInVMTransportConnectorConfig(2, "other-cc2")) + .addClusterConfiguration(server1CC1) + .addClusterConfiguration(server1CC2); return createServer(false, config1); }
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/topology/NonHATopologyTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/topology/NonHATopologyTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/topology/NonHATopologyTest.java index 41beedb..57acb40 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/topology/NonHATopologyTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/topology/NonHATopologyTest.java @@ -76,9 +76,14 @@ public class NonHATopologyTest extends ServiceTestBase ArrayList<String> list = new ArrayList<String>(); list.add("netty"); Configuration config = server.getConfiguration(); - config.getClusterConfigurations().add(new ClusterConnectionConfiguration("tst", "jms", "netty", 1000, true, - false, 1, 1000, list, true)); - + config.getClusterConfigurations().add(new ClusterConnectionConfiguration() + .setName("tst") + .setAddress("jms") + .setConnectorName("netty") + .setRetryInterval(1000) + .setConfirmationWindowSize(1000) + .setStaticConnectors(list) + .setAllowDirectConnectionsOnly(true)); } server.start(); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java index 60e8cb0..d3f51f9 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java @@ -29,6 +29,7 @@ import org.hornetq.core.protocol.core.impl.wireformat.ReplicationResponseMessage import org.hornetq.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage; import org.hornetq.core.replication.ReplicationEndpoint; import org.hornetq.core.server.HornetQServer; +import org.hornetq.core.server.impl.SharedNothingBackupActivation; import org.hornetq.spi.core.protocol.RemotingConnection; /** @@ -75,8 +76,6 @@ public class BackupSyncDelay implements Interceptor */ public BackupSyncDelay(HornetQServer backup, HornetQServer live, byte packetCode) { - assert backup.getConfiguration().getHAPolicy().isBackup(); - assert !live.getConfiguration().getHAPolicy().isBackup(); this.backup = backup; this.live = live; live.getRemotingService().addIncomingInterceptor(this); @@ -99,7 +98,8 @@ public class BackupSyncDelay implements Interceptor { try { - ReplicationEndpoint repEnd = backup.getReplicationEndpoint(); + SharedNothingBackupActivation activation = (SharedNothingBackupActivation) backup.getActivation(); + ReplicationEndpoint repEnd = activation.getReplicationEndpoint(); handler.addSubHandler(repEnd); Channel repChannel = repEnd.getChannel(); repChannel.setHandler(handler); @@ -313,6 +313,12 @@ public class BackupSyncDelay implements Interceptor } @Override + public void returnBlocking(Throwable cause) + { + throw new UnsupportedOperationException(); + } + + @Override public Lock getLock() { throw new UnsupportedOperationException(); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/MultiServerTestBase.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/MultiServerTestBase.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/MultiServerTestBase.java index 7cd0595..8b2ee1d 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/MultiServerTestBase.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/MultiServerTestBase.java @@ -11,7 +11,10 @@ * permissions and limitations under the License. */ package org.hornetq.tests.integration.cluster.util; -import org.hornetq.core.server.cluster.ha.HAPolicy; +import org.hornetq.core.config.ha.ReplicaPolicyConfiguration; +import org.hornetq.core.config.ha.ReplicatedPolicyConfiguration; +import org.hornetq.core.config.ha.SharedStoreMasterPolicyConfiguration; +import org.hornetq.core.config.ha.SharedStoreSlavePolicyConfiguration; import org.junit.Before; import java.util.ArrayList; @@ -175,33 +178,21 @@ public class MultiServerTestBase extends ServiceTestBase final boolean sharedStorage) throws Exception { NodeManager nodeManager = null; + TransportConfiguration serverConfigAcceptor = createTransportConfiguration(useNetty(), true, generateParams(node, useNetty())); + TransportConfiguration thisConnector = createTransportConfiguration(useNetty(), false, generateParams(node, useNetty())); if (sharedStorage) { nodeManager = new InVMNodeManager(false); } - Configuration configuration = createBasicConfig(node); - - configuration.setJournalMaxIO_AIO(1000); - - if (sharedStorage) - configuration.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.SHARED_STORE); - else - configuration.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.REPLICATED); - - configuration.setThreadPoolMaxSize(10); - - configuration.getAcceptorConfigurations().clear(); - - TransportConfiguration serverConfigAcceptor = createTransportConfiguration(useNetty(), true, - generateParams(node, useNetty())); - configuration.getAcceptorConfigurations().add(serverConfigAcceptor); - - - TransportConfiguration thisConnector = createTransportConfiguration(useNetty(), false, generateParams(node, useNetty())); - - configuration.getConnectorConfigurations().put("thisConnector", thisConnector); + Configuration configuration = createBasicConfig(node) + .setJournalMaxIO_AIO(1000) + .setThreadPoolMaxSize(10) + .clearAcceptorConfigurations() + .addAcceptorConfiguration(serverConfigAcceptor) + .addConnectorConfiguration("thisConnector", thisConnector) + .setHAPolicyConfiguration(sharedStorage ? new SharedStoreMasterPolicyConfiguration() : new ReplicatedPolicyConfiguration()); List<String> targetServersOnConnection = new ArrayList<String>(); @@ -224,15 +215,14 @@ public class MultiServerTestBase extends ServiceTestBase configuration.getConnectorConfigurations().put(backupConnectorName, backupConnector); } - ClusterConnectionConfiguration clusterConf = - new ClusterConnectionConfiguration("localCluster" + node, "cluster-queues", "thisConnector", - 100, - true, - false, - 1, - 1024, - targetServersOnConnection, - false); + ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration() + .setName("localCluster" + node) + .setAddress("cluster-queues") + .setConnectorName("thisConnector") + .setRetryInterval(100) + .setConfirmationWindowSize(1024) + .setStaticConnectors(targetServersOnConnection); + configuration.getClusterConfigurations().add(clusterConf); HornetQServer server; @@ -258,24 +248,14 @@ public class MultiServerTestBase extends ServiceTestBase final int liveNode, final NodeManager nodeManager) throws Exception { - Configuration configuration = createBasicConfig(useSharedStorage() ? liveNode : node); - - if (useSharedStorage()) - configuration.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.BACKUP_SHARED_STORE); - else - configuration.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.BACKUP_REPLICATED); - - configuration.getAcceptorConfigurations().clear(); - - TransportConfiguration serverConfigAcceptor = createTransportConfiguration(useNetty(), true, - generateParams(node, useNetty())); - configuration.getAcceptorConfigurations().add(serverConfigAcceptor); - - - + TransportConfiguration serverConfigAcceptor = createTransportConfiguration(useNetty(), true, generateParams(node, useNetty())); TransportConfiguration thisConnector = createTransportConfiguration(useNetty(), false, generateParams(node, useNetty())); - configuration.getConnectorConfigurations().put("thisConnector", thisConnector); + Configuration configuration = createBasicConfig(useSharedStorage() ? liveNode : node) + .clearAcceptorConfigurations() + .addAcceptorConfiguration(serverConfigAcceptor) + .addConnectorConfiguration("thisConnector", thisConnector) + .setHAPolicyConfiguration(useSharedStorage() ? new SharedStoreSlavePolicyConfiguration() : new ReplicaPolicyConfiguration()); List<String> targetServersOnConnection = new ArrayList<String>(); @@ -288,19 +268,18 @@ public class MultiServerTestBase extends ServiceTestBase // } String targetConnectorName = "targetConnector-" + targetNode; TransportConfiguration targetServer = createTransportConfiguration(useNetty(), false, generateParams(targetNode, useNetty())); - configuration.getConnectorConfigurations().put(targetConnectorName, targetServer); + configuration.addConnectorConfiguration(targetConnectorName, targetServer); targetServersOnConnection.add(targetConnectorName); } - ClusterConnectionConfiguration clusterConf = - new ClusterConnectionConfiguration("localCluster" + node, "cluster-queues", "thisConnector", - 100, - true, - false, - 1, - 1024, - targetServersOnConnection, - false); + ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration() + .setName("localCluster" + node) + .setAddress("cluster-queues") + .setConnectorName("thisConnector") + .setRetryInterval(100) + .setConfirmationWindowSize(1024) + .setStaticConnectors(targetServersOnConnection); + configuration.getClusterConfigurations().add(clusterConf); HornetQServer server; http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/discovery/DiscoveryTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/discovery/DiscoveryTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/discovery/DiscoveryTest.java index 0111ee7..2b10d2c 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/discovery/DiscoveryTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/discovery/DiscoveryTest.java @@ -12,8 +12,12 @@ */ package org.hornetq.tests.integration.discovery; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import java.net.InetAddress; import java.net.NetworkInterface; import java.util.ArrayList; @@ -33,7 +37,7 @@ import org.hornetq.api.core.JGroupsBroadcastGroupConfiguration; import org.hornetq.api.core.SimpleString; import org.hornetq.api.core.TransportConfiguration; import org.hornetq.api.core.UDPBroadcastGroupConfiguration; -import org.hornetq.api.core.management.NotificationType; +import org.hornetq.api.core.management.CoreNotificationType; import org.hornetq.core.cluster.DiscoveryEntry; import org.hornetq.core.cluster.DiscoveryGroup; import org.hornetq.core.cluster.DiscoveryListener; @@ -114,7 +118,7 @@ public class DiscoveryTest extends UnitTestCase bg = new BroadcastGroupImpl(new FakeNodeManager(nodeID), RandomUtil.randomString(), - 0, null, new UDPBroadcastGroupConfiguration(address1, groupPort, null, -1).createBroadcastEndpointFactory()); + 0, null, new UDPBroadcastGroupConfiguration().setGroupAddress(address1).setGroupPort(groupPort).createBroadcastEndpointFactory()); bg.start(); @@ -1158,7 +1162,7 @@ public class DiscoveryTest extends UnitTestCase Assert.assertEquals(1, notifListener.getNotifications().size()); Notification notif = notifListener.getNotifications().get(0); - Assert.assertEquals(NotificationType.DISCOVERY_GROUP_STARTED, notif.getType()); + Assert.assertEquals(CoreNotificationType.DISCOVERY_GROUP_STARTED, notif.getType()); Assert.assertEquals(dg.getName(), notif.getProperties() .getSimpleStringProperty(new SimpleString("name")) .toString()); @@ -1167,7 +1171,7 @@ public class DiscoveryTest extends UnitTestCase Assert.assertEquals(2, notifListener.getNotifications().size()); notif = notifListener.getNotifications().get(1); - Assert.assertEquals(NotificationType.DISCOVERY_GROUP_STOPPED, notif.getType()); + Assert.assertEquals(CoreNotificationType.DISCOVERY_GROUP_STOPPED, notif.getType()); Assert.assertEquals(dg.getName(), notif.getProperties() .getSimpleStringProperty(new SimpleString("name")) .toString()); @@ -1198,7 +1202,7 @@ public class DiscoveryTest extends UnitTestCase Assert.assertEquals(1, notifListener.getNotifications().size()); Notification notif = notifListener.getNotifications().get(0); - Assert.assertEquals(NotificationType.BROADCAST_GROUP_STARTED, notif.getType()); + Assert.assertEquals(CoreNotificationType.BROADCAST_GROUP_STARTED, notif.getType()); Assert.assertEquals(bg.getName(), notif.getProperties() .getSimpleStringProperty(new SimpleString("name")) .toString()); @@ -1207,12 +1211,34 @@ public class DiscoveryTest extends UnitTestCase Assert.assertEquals(2, notifListener.getNotifications().size()); notif = notifListener.getNotifications().get(1); - Assert.assertEquals(NotificationType.BROADCAST_GROUP_STOPPED, notif.getType()); + Assert.assertEquals(CoreNotificationType.BROADCAST_GROUP_STOPPED, notif.getType()); Assert.assertEquals(bg.getName(), notif.getProperties() .getSimpleStringProperty(new SimpleString("name")) .toString()); } + /** + * https://issues.jboss.org/browse/HORNETQ-1389 + * @throws Exception + */ + @Test + public void testJGroupsBroadcastGroupConfigurationSerializable() throws Exception + { + JGroupsBroadcastGroupConfiguration jgroupsConfig = + new JGroupsBroadcastGroupConfiguration(TEST_JGROUPS_CONF_FILE, "somChannel"); + ByteArrayOutputStream byteOut = new ByteArrayOutputStream(); + ObjectOutputStream objectOut = new ObjectOutputStream(byteOut); + objectOut.writeObject(jgroupsConfig); + + byte[] serializedData = byteOut.toByteArray(); + ByteArrayInputStream byteIn = new ByteArrayInputStream(serializedData); + ObjectInputStream objectIn = new ObjectInputStream(byteIn); + + Object object = objectIn.readObject(); + assertNotNull(object); + assertTrue(object instanceof JGroupsBroadcastGroupConfiguration); + } + private TransportConfiguration generateTC(String debug) { String className = "org.foo.bar." + debug + "|" + UUIDGenerator.getInstance().generateStringUUID() + ""; @@ -1305,9 +1331,12 @@ public class DiscoveryTest extends UnitTestCase final InetAddress groupAddress, final int groupPort) throws Exception { - return new BroadcastGroupImpl(new FakeNodeManager(nodeID), name, 0, null, - new UDPBroadcastGroupConfiguration(groupAddress.getHostAddress(), groupPort, - localAddress != null ? localAddress.getHostAddress() : null, localPort).createBroadcastEndpointFactory()); + return new BroadcastGroupImpl(new FakeNodeManager(nodeID), name, 0, null, new UDPBroadcastGroupConfiguration() + .setGroupAddress(groupAddress.getHostAddress()) + .setGroupPort(groupPort) + .setLocalBindAddress(localAddress != null ? localAddress.getHostAddress() : null) + .setLocalBindPort(localPort) + .createBroadcastEndpointFactory()); } private DiscoveryGroup newDiscoveryGroup(final String nodeID, final String name, final InetAddress localBindAddress, @@ -1319,9 +1348,11 @@ public class DiscoveryTest extends UnitTestCase private DiscoveryGroup newDiscoveryGroup(final String nodeID, final String name, final InetAddress localBindAddress, final InetAddress groupAddress, final int groupPort, final long timeout, NotificationService notif) throws Exception { - return new DiscoveryGroup(nodeID, name, timeout, - new UDPBroadcastGroupConfiguration(groupAddress.getHostAddress(), groupPort, - localBindAddress != null ? localBindAddress.getHostAddress() : null, -1).createBroadcastEndpointFactory(), notif); + return new DiscoveryGroup(nodeID, name, timeout, new UDPBroadcastGroupConfiguration() + .setGroupAddress(groupAddress.getHostAddress()) + .setGroupPort(groupPort) + .setLocalBindAddress(localBindAddress != null ? localBindAddress.getHostAddress() : null) + .createBroadcastEndpointFactory(), notif); } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/divert/DivertTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/divert/DivertTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/divert/DivertTest.java index 2b0d61e..6bae7f1 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/divert/DivertTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/divert/DivertTest.java @@ -24,8 +24,10 @@ import org.hornetq.api.core.client.ClientSessionFactory; import org.hornetq.api.core.client.ServerLocator; import org.hornetq.core.config.Configuration; import org.hornetq.core.config.DivertConfiguration; +import org.hornetq.core.message.impl.MessageImpl; import org.hornetq.core.server.HornetQServer; import org.hornetq.core.server.HornetQServers; +import org.hornetq.core.settings.impl.AddressSettings; import org.hornetq.tests.util.ServiceTestBase; import org.junit.Assert; import org.junit.Test; @@ -49,13 +51,11 @@ public class DivertTest extends ServiceTestBase final String forwardAddress = "forwardAddress"; - DivertConfiguration divertConf = new DivertConfiguration("divert1", - "divert1", - testAddress, - forwardAddress, - false, - null, - null); + DivertConfiguration divertConf = new DivertConfiguration() + .setName("divert1") + .setRoutingName("divert1") + .setAddress(testAddress) + .setForwardingAddress(forwardAddress); List<DivertConfiguration> divertConfs = new ArrayList<DivertConfiguration>(); @@ -135,6 +135,145 @@ public class DivertTest extends ServiceTestBase messagingService.stop(); } + + @Test + public void testSingleDivertWithExpiry() throws Exception + { + Configuration conf = createDefaultConfig(); + final String testAddress = "testAddress"; + + final String forwardAddress = "forwardAddress"; + + final String expiryAddress = "expiryAddress"; + + conf.getAddressesSettings().clear(); + + AddressSettings expirySettings = new AddressSettings(); + expirySettings.setExpiryAddress(new SimpleString(expiryAddress)); + + conf.getAddressesSettings().put("#", expirySettings); + + DivertConfiguration divertConf = new DivertConfiguration() + .setName("divert1") + .setRoutingName("divert1") + .setAddress(testAddress) + .setForwardingAddress(forwardAddress); + + List<DivertConfiguration> divertConfs = new ArrayList<DivertConfiguration>(); + + divertConfs.add(divertConf); + + conf.setDivertConfigurations(divertConfs); + + HornetQServer messagingService = addServer(HornetQServers.newHornetQServer(conf, true)); + + messagingService.start(); + + + ServerLocator locator = createInVMNonHALocator(); + + ClientSessionFactory sf = createSessionFactory(locator); + + ClientSession session = sf.createSession(false, false, false); + + final SimpleString queueName1 = new SimpleString("queue1"); + + final SimpleString queueName2 = new SimpleString("queue2"); + + session.createQueue(new SimpleString(forwardAddress), queueName1, null, true); + + session.createQueue(new SimpleString(testAddress), queueName2, null, true); + + session.createQueue(new SimpleString(expiryAddress), new SimpleString(expiryAddress), null, true); + + session.start(); + + ClientProducer producer = session.createProducer(new SimpleString(testAddress)); + + ClientConsumer consumer1 = session.createConsumer(queueName1); + + ClientConsumer consumer2 = session.createConsumer(queueName2); + + final int numMessages = 1; + + final SimpleString propKey = new SimpleString("testkey"); + + for (int i = 0; i < numMessages; i++) + { + ClientMessage message = session.createMessage(true); + + message.putIntProperty(propKey, i); + + message.setExpiration(System.currentTimeMillis() + 1000); + + producer.send(message); + } + session.commit(); + + + // this context is validating if these messages are routed correctly + { + int count1 = 0; + ClientMessage message = null; + while ((message = consumer1.receiveImmediate()) != null) + { + message.acknowledge(); + count1++; + } + + int count2 = 0; + message = null; + while ((message = consumer2.receiveImmediate()) != null) + { + message.acknowledge(); + count2++; + } + + assertEquals(1, count1); + assertEquals(1, count2); + session.rollback(); + } + Thread.sleep(2000); + + // it must been expired by now + assertNull(consumer1.receiveImmediate()); + // it must been expired by now + assertNull(consumer2.receiveImmediate()); + + int countOriginal1 = 0; + int countOriginal2 = 0; + ClientConsumer consumerExpiry = session.createConsumer(expiryAddress); + + for (int i = 0; i < numMessages * 2; i++) + { + ClientMessage message = consumerExpiry.receive(5000); + System.out.println("Received message " + message); + assertNotNull(message); + + if (message.getStringProperty(MessageImpl.HDR_ORIGINAL_QUEUE).equals("queue1")) + { + countOriginal1++; + } + else if (message.getStringProperty(MessageImpl.HDR_ORIGINAL_QUEUE).equals("queue2")) + { + countOriginal2++; + } + else + { + System.out.println("message not part of any expired queue" + message); + } + } + + assertEquals(numMessages, countOriginal1); + assertEquals(numMessages, countOriginal2); + + session.close(); + + sf.close(); + + messagingService.stop(); + } + @Test public void testSingleNonExclusiveDivert2() throws Exception { @@ -143,13 +282,11 @@ public class DivertTest extends ServiceTestBase final String forwardAddress = "forwardAddress"; - DivertConfiguration divertConf = new DivertConfiguration("divert1", - "divert1", - testAddress, - forwardAddress, - false, - null, - null); + DivertConfiguration divertConf = new DivertConfiguration() + .setName("divert1") + .setRoutingName("divert1") + .setAddress(testAddress) + .setForwardingAddress(forwardAddress); List<DivertConfiguration> divertConfs = new ArrayList<DivertConfiguration>(); @@ -275,13 +412,11 @@ public class DivertTest extends ServiceTestBase final String forwardAddress = "forwardAddress"; - DivertConfiguration divertConf = new DivertConfiguration("divert1", - "divert1", - testAddress, - forwardAddress, - false, - null, - null); + DivertConfiguration divertConf = new DivertConfiguration() + .setName("divert1") + .setRoutingName("divert1") + .setAddress(testAddress) + .setForwardingAddress(forwardAddress); List<DivertConfiguration> divertConfs = new ArrayList<DivertConfiguration>(); @@ -349,13 +484,12 @@ public class DivertTest extends ServiceTestBase final String forwardAddress = "forwardAddress"; - DivertConfiguration divertConf = new DivertConfiguration("divert1", - "divert1", - testAddress, - forwardAddress, - true, - null, - null); + DivertConfiguration divertConf = new DivertConfiguration() + .setName("divert1") + .setRoutingName("divert1") + .setAddress(testAddress) + .setForwardingAddress(forwardAddress) + .setExclusive(true); List<DivertConfiguration> divertConfs = new ArrayList<DivertConfiguration>(); @@ -448,29 +582,23 @@ public class DivertTest extends ServiceTestBase final String forwardAddress2 = "forwardAddress2"; final String forwardAddress3 = "forwardAddress3"; - DivertConfiguration divertConf1 = new DivertConfiguration("divert1", - "divert1", - testAddress, - forwardAddress1, - false, - null, - null); - - DivertConfiguration divertConf2 = new DivertConfiguration("divert2", - "divert2", - testAddress, - forwardAddress2, - false, - null, - null); - - DivertConfiguration divertConf3 = new DivertConfiguration("divert3", - "divert3", - testAddress, - forwardAddress3, - false, - null, - null); + DivertConfiguration divertConf1 = new DivertConfiguration() + .setName("divert1") + .setRoutingName("divert1") + .setAddress(testAddress) + .setForwardingAddress(forwardAddress1); + + DivertConfiguration divertConf2 = new DivertConfiguration() + .setName("divert2") + .setRoutingName("divert2") + .setAddress(testAddress) + .setForwardingAddress(forwardAddress2); + + DivertConfiguration divertConf3 = new DivertConfiguration() + .setName("divert3") + .setRoutingName("divert3") + .setAddress(testAddress) + .setForwardingAddress(forwardAddress3); List<DivertConfiguration> divertConfs = new ArrayList<DivertConfiguration>(); @@ -599,29 +727,26 @@ public class DivertTest extends ServiceTestBase final String forwardAddress2 = "forwardAddress2"; final String forwardAddress3 = "forwardAddress3"; - DivertConfiguration divertConf1 = new DivertConfiguration("divert1", - "divert1", - testAddress, - forwardAddress1, - true, - null, - null); - - DivertConfiguration divertConf2 = new DivertConfiguration("divert2", - "divert2", - testAddress, - forwardAddress2, - true, - null, - null); - - DivertConfiguration divertConf3 = new DivertConfiguration("divert3", - "divert3", - testAddress, - forwardAddress3, - true, - null, - null); + DivertConfiguration divertConf1 = new DivertConfiguration() + .setName("divert1") + .setRoutingName("divert1") + .setAddress(testAddress) + .setForwardingAddress(forwardAddress1) + .setExclusive(true); + + DivertConfiguration divertConf2 = new DivertConfiguration() + .setName("divert2") + .setRoutingName("divert2") + .setAddress(testAddress) + .setForwardingAddress(forwardAddress2) + .setExclusive(true); + + DivertConfiguration divertConf3 = new DivertConfiguration() + .setName("divert3") + .setRoutingName("divert3") + .setAddress(testAddress) + .setForwardingAddress(forwardAddress3) + .setExclusive(true); List<DivertConfiguration> divertConfs = new ArrayList<DivertConfiguration>(); @@ -740,29 +865,25 @@ public class DivertTest extends ServiceTestBase final String forwardAddress2 = "forwardAddress2"; final String forwardAddress3 = "forwardAddress3"; - DivertConfiguration divertConf1 = new DivertConfiguration("divert1", - "divert1", - testAddress, - forwardAddress1, - true, - null, - null); - - DivertConfiguration divertConf2 = new DivertConfiguration("divert2", - "divert2", - testAddress, - forwardAddress2, - true, - null, - null); - - DivertConfiguration divertConf3 = new DivertConfiguration("divert3", - "divert3", - testAddress, - forwardAddress3, - false, - null, - null); + DivertConfiguration divertConf1 = new DivertConfiguration() + .setName("divert1") + .setRoutingName("divert1") + .setAddress(testAddress) + .setForwardingAddress(forwardAddress1) + .setExclusive(true); + + DivertConfiguration divertConf2 = new DivertConfiguration() + .setName("divert2") + .setRoutingName("divert2") + .setAddress(testAddress) + .setForwardingAddress(forwardAddress2) + .setExclusive(true); + + DivertConfiguration divertConf3 = new DivertConfiguration() + .setName("divert3") + .setRoutingName("divert3") + .setAddress(testAddress) + .setForwardingAddress(forwardAddress3); List<DivertConfiguration> divertConfs = new ArrayList<DivertConfiguration>(); @@ -872,29 +993,25 @@ public class DivertTest extends ServiceTestBase final String filter = "animal='antelope'"; - DivertConfiguration divertConf1 = new DivertConfiguration("divert1", - "divert1", - testAddress, - forwardAddress1, - true, - filter, - null); - - DivertConfiguration divertConf2 = new DivertConfiguration("divert2", - "divert2", - testAddress, - forwardAddress2, - false, - null, - null); - - DivertConfiguration divertConf3 = new DivertConfiguration("divert3", - "divert3", - testAddress, - forwardAddress3, - false, - null, - null); + DivertConfiguration divertConf1 = new DivertConfiguration() + .setName("divert1") + .setRoutingName("divert1") + .setAddress(testAddress) + .setForwardingAddress(forwardAddress1) + .setExclusive(true) + .setFilterString(filter); + + DivertConfiguration divertConf2 = new DivertConfiguration() + .setName("divert2") + .setRoutingName("divert2") + .setAddress(testAddress) + .setForwardingAddress(forwardAddress2); + + DivertConfiguration divertConf3 = new DivertConfiguration() + .setName("divert3") + .setRoutingName("divert3") + .setAddress(testAddress) + .setForwardingAddress(forwardAddress3); List<DivertConfiguration> divertConfs = new ArrayList<DivertConfiguration>(); @@ -1055,29 +1172,23 @@ public class DivertTest extends ServiceTestBase final String forwardAddress2 = "forwardAddress2"; final String forwardAddress3 = "forwardAddress3"; - DivertConfiguration divertConf1 = new DivertConfiguration("divert1", - "thename", - testAddress, - forwardAddress1, - false, - null, - null); - - DivertConfiguration divertConf2 = new DivertConfiguration("divert2", - "thename", - testAddress, - forwardAddress2, - false, - null, - null); - - DivertConfiguration divertConf3 = new DivertConfiguration("divert3", - "thename", - testAddress, - forwardAddress3, - false, - null, - null); + DivertConfiguration divertConf1 = new DivertConfiguration() + .setName("divert1") + .setRoutingName("thename") + .setAddress(testAddress) + .setForwardingAddress(forwardAddress1); + + DivertConfiguration divertConf2 = new DivertConfiguration() + .setName("divert2") + .setRoutingName("thename") + .setAddress(testAddress) + .setForwardingAddress(forwardAddress2); + + DivertConfiguration divertConf3 = new DivertConfiguration() + .setName("divert3") + .setRoutingName("thename") + .setAddress(testAddress) + .setForwardingAddress(forwardAddress3); List<DivertConfiguration> divertConfs = new ArrayList<DivertConfiguration>(); @@ -1215,29 +1326,23 @@ public class DivertTest extends ServiceTestBase final String forwardAddress2 = "forwardAddress2"; final String forwardAddress3 = "forwardAddress3"; - DivertConfiguration divertConf1 = new DivertConfiguration("divert1", - "thename1", - testAddress, - forwardAddress1, - false, - null, - null); - - DivertConfiguration divertConf2 = new DivertConfiguration("divert1", - "thename2", - testAddress, - forwardAddress2, - false, - null, - null); - - DivertConfiguration divertConf3 = new DivertConfiguration("divert2", - "thename3", - testAddress, - forwardAddress3, - false, - null, - null); + DivertConfiguration divertConf1 = new DivertConfiguration() + .setName("divert1") + .setRoutingName("thename1") + .setAddress(testAddress) + .setForwardingAddress(forwardAddress1); + + DivertConfiguration divertConf2 = new DivertConfiguration() + .setName("divert1") + .setRoutingName("thename2") + .setAddress(testAddress) + .setForwardingAddress(forwardAddress2); + + DivertConfiguration divertConf3 = new DivertConfiguration() + .setName("divert2") + .setRoutingName("thename3") + .setAddress(testAddress) + .setForwardingAddress(forwardAddress3); List<DivertConfiguration> divertConfs = new ArrayList<DivertConfiguration>(); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/divert/PersistentDivertTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/divert/PersistentDivertTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/divert/PersistentDivertTest.java index 91b0954..95264f7 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/divert/PersistentDivertTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/divert/PersistentDivertTest.java @@ -71,29 +71,23 @@ public class PersistentDivertTest extends ServiceTestBase final String forwardAddress3 = "forwardAddress3"; - DivertConfiguration divertConf1 = new DivertConfiguration("divert1", - "divert1", - testAddress, - forwardAddress1, - false, - null, - null); - - DivertConfiguration divertConf2 = new DivertConfiguration("divert2", - "divert2", - testAddress, - forwardAddress2, - false, - null, - null); - - DivertConfiguration divertConf3 = new DivertConfiguration("divert3", - "divert3", - testAddress, - forwardAddress3, - false, - null, - null); + DivertConfiguration divertConf1 = new DivertConfiguration() + .setName("divert1") + .setRoutingName("divert1") + .setAddress(testAddress) + .setForwardingAddress(forwardAddress1); + + DivertConfiguration divertConf2 = new DivertConfiguration() + .setName("divert2") + .setRoutingName("divert2") + .setAddress(testAddress) + .setForwardingAddress(forwardAddress2); + + DivertConfiguration divertConf3 = new DivertConfiguration() + .setName("divert3") + .setRoutingName("divert3") + .setAddress(testAddress) + .setForwardingAddress(forwardAddress3); List<DivertConfiguration> divertConfs = new ArrayList<DivertConfiguration>(); @@ -274,29 +268,23 @@ public class PersistentDivertTest extends ServiceTestBase final String forwardAddress3 = "forwardAddress3"; - DivertConfiguration divertConf1 = new DivertConfiguration("divert1", - "divert1", - testAddress, - forwardAddress1, - false, - null, - null); - - DivertConfiguration divertConf2 = new DivertConfiguration("divert2", - "divert2", - testAddress, - forwardAddress2, - false, - null, - null); - - DivertConfiguration divertConf3 = new DivertConfiguration("divert3", - "divert3", - testAddress, - forwardAddress3, - false, - null, - null); + DivertConfiguration divertConf1 = new DivertConfiguration() + .setName("divert1") + .setRoutingName("divert1") + .setAddress(testAddress) + .setForwardingAddress(forwardAddress1); + + DivertConfiguration divertConf2 = new DivertConfiguration() + .setName("divert2") + .setRoutingName("divert2") + .setAddress(testAddress) + .setForwardingAddress(forwardAddress2); + + DivertConfiguration divertConf3 = new DivertConfiguration() + .setName("divert3") + .setRoutingName("divert3") + .setAddress(testAddress) + .setForwardingAddress(forwardAddress3); List<DivertConfiguration> divertConfs = new ArrayList<DivertConfiguration>(); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/embedded/ValidateAIOTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/embedded/ValidateAIOTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/embedded/ValidateAIOTest.java index 9d782a0..3f9b5d5 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/embedded/ValidateAIOTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/embedded/ValidateAIOTest.java @@ -31,9 +31,9 @@ public class ValidateAIOTest extends ServiceTestBase @Test public void testValidateAIO() throws Exception { - Configuration config = createDefaultConfig(false); - // This will force AsyncIO - config.setJournalType(JournalType.ASYNCIO); + Configuration config = createDefaultConfig(false) + // This will force AsyncIO + .setJournalType(JournalType.ASYNCIO); HornetQServer server = HornetQServers.newHornetQServer(config, true); try { http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/http/CoreClientOverHttpTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/http/CoreClientOverHttpTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/http/CoreClientOverHttpTest.java index 964616a..7c08c77 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/http/CoreClientOverHttpTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/http/CoreClientOverHttpTest.java @@ -51,12 +51,12 @@ public class CoreClientOverHttpTest extends UnitTestCase public void setUp() throws Exception { super.setUp(); - conf = createDefaultConfig(); - - conf.setSecurityEnabled(false); HashMap<String, Object> params = new HashMap<String, Object>(); params.put(TransportConstants.HTTP_ENABLED_PROP_NAME, true); - conf.getAcceptorConfigurations().add(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY)); + + conf = createDefaultConfig() + .setSecurityEnabled(false) + .addAcceptorConfiguration(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY)); server = addServer(HornetQServers.newHornetQServer(conf, false)); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/FloodServerTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/FloodServerTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/FloodServerTest.java index 03b2460..1550d0b 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/FloodServerTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/FloodServerTest.java @@ -84,10 +84,8 @@ public class FloodServerTest extends UnitTestCase { super.setUp(); - Configuration conf = createBasicConfig(); - conf.setSecurityEnabled(false); - conf.setJMXManagementEnabled(true); - conf.getAcceptorConfigurations().add(new TransportConfiguration(NettyAcceptorFactory.class.getName())); + Configuration conf = createBasicConfig() + .addAcceptorConfiguration(new TransportConfiguration(NettyAcceptorFactory.class.getName())); server = HornetQServers.newHornetQServer(conf, false); server.start(); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/HornetQConnectionFactoryTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/HornetQConnectionFactoryTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/HornetQConnectionFactoryTest.java index 4205942..1e2def9 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/HornetQConnectionFactoryTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/HornetQConnectionFactoryTest.java @@ -11,7 +11,7 @@ * permissions and limitations under the License. */ package org.hornetq.tests.integration.jms; -import org.hornetq.core.server.cluster.ha.HAPolicy; +import org.hornetq.core.config.ha.SharedStoreMasterPolicyConfiguration; import org.junit.Before; import org.junit.Test; @@ -164,8 +164,10 @@ public class HornetQConnectionFactoryTest extends UnitTestCase @Test public void testDiscoveryConstructor() throws Exception { - DiscoveryGroupConfiguration groupConfiguration = new DiscoveryGroupConfiguration(HornetQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT, HornetQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT, - new UDPBroadcastGroupConfiguration(groupAddress, groupPort, null, -1)); + DiscoveryGroupConfiguration groupConfiguration = new DiscoveryGroupConfiguration() + .setBroadcastEndpointFactoryConfiguration(new UDPBroadcastGroupConfiguration() + .setGroupAddress(groupAddress) + .setGroupPort(groupPort)); HornetQConnectionFactory cf = HornetQJMSClient.createConnectionFactoryWithoutHA(groupConfiguration, JMSFactoryType.CF); assertFactoryParams(cf, null, @@ -707,27 +709,31 @@ public class HornetQConnectionFactoryTest extends UnitTestCase private void startServer() throws Exception { - Configuration liveConf = createBasicConfig(); - liveConf.setSecurityEnabled(false); liveTC = new TransportConfiguration(INVM_CONNECTOR_FACTORY); - liveConf.getAcceptorConfigurations().add(new TransportConfiguration(INVM_ACCEPTOR_FACTORY)); Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>(); connectors.put(liveTC.getName(), liveTC); - liveConf.setConnectorConfigurations(connectors); - liveConf.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.SHARED_STORE); List<String> connectorNames = new ArrayList<String>(); connectorNames.add(liveTC.getName()); + Configuration liveConf = createBasicConfig() + .addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY)) + .setConnectorConfigurations(connectors) + .setHAPolicyConfiguration(new SharedStoreMasterPolicyConfiguration()); + final long broadcastPeriod = 250; final String bcGroupName = "bc1"; final int localBindPort = 5432; - BroadcastGroupConfiguration bcConfig1 = new BroadcastGroupConfiguration(bcGroupName, - broadcastPeriod, - connectorNames, - new UDPBroadcastGroupConfiguration(groupAddress, groupPort, null, localBindPort)); + BroadcastGroupConfiguration bcConfig1 = new BroadcastGroupConfiguration() + .setName(bcGroupName) + .setBroadcastPeriod(broadcastPeriod) + .setConnectorInfos(connectorNames) + .setEndpointFactoryConfiguration(new UDPBroadcastGroupConfiguration() + .setGroupAddress(groupAddress) + .setGroupPort(groupPort) + .setLocalBindPort(localBindPort)); List<BroadcastGroupConfiguration> bcConfigs1 = new ArrayList<BroadcastGroupConfiguration>(); bcConfigs1.add(bcConfig1); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/ManualReconnectionToSingleServerTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/ManualReconnectionToSingleServerTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/ManualReconnectionToSingleServerTest.java index 90c71e5..1f35454 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/ManualReconnectionToSingleServerTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/ManualReconnectionToSingleServerTest.java @@ -148,25 +148,25 @@ public class ManualReconnectionToSingleServerTest extends ServiceTestBase { super.setUp(); - Configuration conf = createBasicConfig(); - conf.setSecurityEnabled(false); - conf.setJMXManagementEnabled(true); - conf.getAcceptorConfigurations().add(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY)); + context = new InVMNamingContext(); + + Configuration conf = createBasicConfig() + .addAcceptorConfiguration(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY)); + server = createServer(false, conf); JMSConfiguration configuration = new JMSConfigurationImpl(); - context = new InVMNamingContext(); configuration.setContext(context); - configuration.getQueueConfigurations().add(new JMSQueueConfigurationImpl(QUEUE_NAME, null, true, QUEUE_NAME)); + configuration.getQueueConfigurations().add(new JMSQueueConfigurationImpl().setName(QUEUE_NAME).setBindings(QUEUE_NAME)); ArrayList<TransportConfiguration> configs = new ArrayList<TransportConfiguration>(); configs.add(new TransportConfiguration(NETTY_CONNECTOR_FACTORY)); - ConnectionFactoryConfiguration cfConfig = new ConnectionFactoryConfigurationImpl("cf", - false, - registerConnectors(server, configs), "/cf"); - cfConfig.setRetryInterval(1000); - cfConfig.setRetryIntervalMultiplier(1.0); - cfConfig.setReconnectAttempts(-1); + ConnectionFactoryConfiguration cfConfig = new ConnectionFactoryConfigurationImpl() + .setName("cf") + .setConnectorNames(registerConnectors(server, configs)) + .setBindings("/cf") + .setRetryInterval(1000) + .setReconnectAttempts(-1); configuration.getConnectionFactoryConfigurations().add(cfConfig); serverManager = new JMSServerManagerImpl(server, configuration); serverManager.start(); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/bridge/BridgeTestBase.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/bridge/BridgeTestBase.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/bridge/BridgeTestBase.java index a4e491b..11811e6 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/bridge/BridgeTestBase.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/bridge/BridgeTestBase.java @@ -98,7 +98,7 @@ public abstract class BridgeTestBase extends UnitTestCase protected InVMNamingContext context1; - private HashMap<String, Object> params1; + protected HashMap<String, Object> params1; protected ConnectionFactoryFactory cff0LowProducerWindow; @@ -109,11 +109,11 @@ public abstract class BridgeTestBase extends UnitTestCase super.setUp(); // Start the servers - Configuration conf0 = createBasicConfig(); - conf0.setJournalDirectory(getJournalDir(0, false)); - conf0.setBindingsDirectory(getBindingsDir(0, false)); - conf0.setSecurityEnabled(false); - conf0.getAcceptorConfigurations().add(new TransportConfiguration(INVM_ACCEPTOR_FACTORY)); + Configuration conf0 = createBasicConfig() + .setJournalDirectory(getJournalDir(0, false)) + .setBindingsDirectory(getBindingsDir(0, false)) + .addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY)); + server0 = addServer(HornetQServers.newHornetQServer(conf0, false)); context0 = new InVMNamingContext(); @@ -121,13 +121,13 @@ public abstract class BridgeTestBase extends UnitTestCase jmsServer0.setContext(context0); jmsServer0.start(); - Configuration conf1 = createBasicConfig(); - conf1.setSecurityEnabled(false); - conf1.setJournalDirectory(getJournalDir(1, false)); - conf1.setBindingsDirectory(getBindingsDir(1, false)); params1 = new HashMap<String, Object>(); params1.put(TransportConstants.SERVER_ID_PROP_NAME, 1); - conf1.getAcceptorConfigurations().add(new TransportConfiguration(INVM_ACCEPTOR_FACTORY, params1)); + + Configuration conf1 = createBasicConfig() + .setJournalDirectory(getJournalDir(1, false)) + .setBindingsDirectory(getBindingsDir(1, false)) + .addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY, params1)); server1 = addServer(HornetQServers.newHornetQServer(conf1, false)); @@ -562,11 +562,16 @@ public abstract class BridgeTestBase extends UnitTestCase } JMSQueueControl queueControl = (JMSQueueControl) managementService.getResource(ResourceNames.JMS_QUEUE + queue.getQueueName()); - Long messageCount = queueControl.getMessageCount(); - - if (messageCount > 0) + //server may be closed + if (queueControl != null) { - queueControl.removeMessages(null); + queueControl.flushExecutor(); + Long messageCount = queueControl.getMessageCount(); + + if (messageCount > 0) + { + queueControl.removeMessages(null); + } } return true; } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/bridge/ClusteredBridgeTestBase.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/bridge/ClusteredBridgeTestBase.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/bridge/ClusteredBridgeTestBase.java index 451e2ed..cc75b20 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/bridge/ClusteredBridgeTestBase.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/bridge/ClusteredBridgeTestBase.java @@ -37,10 +37,11 @@ import org.hornetq.api.core.client.ServerLocator; import org.hornetq.api.jms.HornetQJMSClient; import org.hornetq.api.jms.JMSFactoryType; import org.hornetq.core.config.Configuration; +import org.hornetq.core.config.ha.ReplicaPolicyConfiguration; +import org.hornetq.core.config.ha.ReplicatedPolicyConfiguration; import org.hornetq.core.remoting.impl.invm.TransportConstants; import org.hornetq.core.server.HornetQServer; import org.hornetq.core.server.HornetQServers; -import org.hornetq.core.server.cluster.ha.HAPolicy; import org.hornetq.jms.bridge.ConnectionFactoryFactory; import org.hornetq.jms.bridge.DestinationFactory; import org.hornetq.jms.client.HornetQConnectionFactory; @@ -151,14 +152,13 @@ public abstract class ClusteredBridgeTestBase extends ServiceTestBase backupConnector = new TransportConfiguration(INVM_CONNECTOR_FACTORY, params, "in-vm-backup"); //live - Configuration conf0 = createBasicConfig(); - conf0.setJournalDirectory(getJournalDir(id, false)); - conf0.setBindingsDirectory(getBindingsDir(id, false)); - conf0.setSecurityEnabled(false); - conf0.getAcceptorConfigurations().add(new TransportConfiguration(INVM_ACCEPTOR_FACTORY, params0)); - conf0.getConnectorConfigurations().put(liveConnector.getName(), liveConnector); - conf0.getHAPolicy().setFailoverOnServerShutdown(true); - basicClusterConnectionConfig(conf0, liveConnector.getName()); + Configuration conf0 = createBasicConfig() + .setJournalDirectory(getJournalDir(id, false)) + .setBindingsDirectory(getBindingsDir(id, false)) + .addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY, params0)) + .addConnectorConfiguration(liveConnector.getName(), liveConnector) + .setHAPolicyConfiguration(new ReplicatedPolicyConfiguration()) + .addClusterConfiguration(basicClusterConnectionConfig(liveConnector.getName())); HornetQServer server0 = addServer(HornetQServers.newHornetQServer(conf0, true)); @@ -167,17 +167,14 @@ public abstract class ClusteredBridgeTestBase extends ServiceTestBase liveNode.setContext(liveContext); //backup - Configuration conf = createBasicConfig(); - conf.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.BACKUP_REPLICATED); - conf.setJournalDirectory(getJournalDir(id, true)); - conf.setBindingsDirectory(getBindingsDir(id, true)); - conf.setSecurityEnabled(false); - conf.getAcceptorConfigurations().add(new TransportConfiguration(INVM_ACCEPTOR_FACTORY, params)); - - conf.getConnectorConfigurations().put(backupConnector.getName(), backupConnector); - conf.getConnectorConfigurations().put(liveConnector.getName(), liveConnector); - conf.getHAPolicy().setFailoverOnServerShutdown(true); - basicClusterConnectionConfig(conf, backupConnector.getName(), liveConnector.getName()); + Configuration conf = createBasicConfig() + .setJournalDirectory(getJournalDir(id, true)) + .setBindingsDirectory(getBindingsDir(id, true)) + .addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY, params)) + .addConnectorConfiguration(backupConnector.getName(), backupConnector) + .addConnectorConfiguration(liveConnector.getName(), liveConnector) + .setHAPolicyConfiguration(new ReplicaPolicyConfiguration()) + .addClusterConfiguration(basicClusterConnectionConfig(backupConnector.getName(), liveConnector.getName())); HornetQServer backup = addServer(HornetQServers.newHornetQServer(conf, true)); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java index 2a94222..c20c48d 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java @@ -12,13 +12,27 @@ */ package org.hornetq.tests.integration.jms.bridge; +import org.hornetq.api.core.TransportConfiguration; +import org.hornetq.api.core.client.ClientSession; +import org.hornetq.api.jms.HornetQJMSClient; +import org.hornetq.api.jms.JMSFactoryType; import org.hornetq.jms.bridge.ConnectionFactoryFactory; import org.hornetq.jms.bridge.QualityOfServiceMode; import org.hornetq.jms.bridge.impl.JMSBridgeImpl; +import org.hornetq.jms.client.HornetQXAConnectionFactory; import org.hornetq.tests.integration.IntegrationTestLogger; +import org.hornetq.tests.integration.ra.DummyTransactionManager; import org.junit.Assert; import org.junit.Test; +import javax.transaction.HeuristicMixedException; +import javax.transaction.HeuristicRollbackException; +import javax.transaction.RollbackException; +import javax.transaction.Synchronization; +import javax.transaction.SystemException; +import javax.transaction.Transaction; +import javax.transaction.xa.XAResource; + /** * @author <a href="mailto:[email protected]">Tim Fox</a> */ @@ -259,9 +273,221 @@ public class JMSBridgeReconnectionTest extends BridgeTestBase JMSBridgeReconnectionTest.log.info("Sent messages"); - checkMessagesReceived(cf1, targetQueue, qosMode, NUM_MESSAGES, false, largeMessage); + jmsServer1.stop(); + + bridge.stop(); + + System.out.println("JMSBridgeReconnectionTest.performCrashAndReconnectDestBasic"); + } + + @Test + public void performCrashDestinationStopBridge() throws Exception + { + ConnectionFactoryFactory factInUse0 = cff0; + ConnectionFactoryFactory factInUse1 = cff1; + final JMSBridgeImpl bridge = + new JMSBridgeImpl(factInUse0, + factInUse1, + sourceQueueFactory, + targetQueueFactory, + null, + null, + null, + null, + null, + 1000, + -1, + QualityOfServiceMode.DUPLICATES_OK, + 10, + -1, + null, + null, + false); + + + addHornetQComponent(bridge); + bridge.setTransactionManager(newTransactionManager()); + bridge.start(); + + Thread clientThread = new Thread(new Runnable() + { + @Override + public void run() + { + while (bridge.isStarted()) + { + try + { + sendMessages(cf0, sourceQueue, 0, 1, false, false); + } + catch (Exception e) + { + e.printStackTrace(); + } + } + } + }); + + clientThread.start(); + + checkAllMessageReceivedInOrder(cf1, targetQueue, 0, 1, false); + + JMSBridgeReconnectionTest.log.info("About to crash server"); + + jmsServer1.stop(); + + // Wait a while before starting up to simulate the dest being down for a while + JMSBridgeReconnectionTest.log.info("Waiting 5 secs before bringing server back up"); + Thread.sleep(TIME_WAIT); + JMSBridgeReconnectionTest.log.info("Done wait"); + + bridge.stop(); + + clientThread.join(5000); + + assertTrue(!clientThread.isAlive()); + } + + @Test + public void performCrashAndReconnect() throws Exception + { + performCrashAndReconnect(true); + } + + @Test + public void performCrashAndNoReconnect() throws Exception + { + performCrashAndReconnect(false); + } + + + private void performCrashAndReconnect(boolean restart) throws Exception + { + cff1xa = new ConnectionFactoryFactory() + { + public Object createConnectionFactory() throws Exception + { + HornetQXAConnectionFactory cf = (HornetQXAConnectionFactory) HornetQJMSClient.createConnectionFactoryWithHA(JMSFactoryType.XA_CF, + new TransportConfiguration( + INVM_CONNECTOR_FACTORY, + params1)); + + // Note! We disable automatic reconnection on the session factory. The bridge needs to do the reconnection + cf.setReconnectAttempts(-1); + cf.setBlockOnNonDurableSend(true); + cf.setBlockOnDurableSend(true); + cf.setCacheLargeMessagesClient(true); + + return cf; + } + + }; + + DummyTransactionManager tm = new DummyTransactionManager(); + DummyTransaction tx = new DummyTransaction(); + tm.tx = tx; + + JMSBridgeImpl bridge = + new JMSBridgeImpl(cff0xa, + cff1xa, + sourceQueueFactory, + targetQueueFactory, + null, + null, + null, + null, + null, + 1000, + -1, + QualityOfServiceMode.ONCE_AND_ONLY_ONCE, + 10, + 5000, + null, + null, + false); + addHornetQComponent(bridge); + bridge.setTransactionManager(tm); + + bridge.start(); + + // Now crash the dest server + + JMSBridgeReconnectionTest.log.info("About to crash server"); + + jmsServer1.stop(); + + if (restart) + { + jmsServer1.start(); + } + // Wait a while before starting up to simulate the dest being down for a while + JMSBridgeReconnectionTest.log.info("Waiting 5 secs before bringing server back up"); + Thread.sleep(TIME_WAIT); + JMSBridgeReconnectionTest.log.info("Done wait"); + + bridge.stop(); + + if (restart) + { + assertTrue(tx.rolledback); + assertTrue(tx.targetConnected); + } + else + { + assertTrue(tx.rolledback); + assertFalse(tx.targetConnected); + } } + private class DummyTransaction implements Transaction + { + boolean rolledback = false; + ClientSession targetSession; + boolean targetConnected = false; + @Override + public void commit() throws RollbackException, HeuristicMixedException, HeuristicRollbackException, SecurityException, SystemException + { + + } + + @Override + public void rollback() throws IllegalStateException, SystemException + { + rolledback = true; + targetConnected = !targetSession.isClosed(); + } + + @Override + public void setRollbackOnly() throws IllegalStateException, SystemException + { + + } + + @Override + public int getStatus() throws SystemException + { + return 0; + } + + @Override + public boolean enlistResource(XAResource xaResource) throws RollbackException, IllegalStateException, SystemException + { + targetSession = (ClientSession) xaResource; + return false; + } + + @Override + public boolean delistResource(XAResource xaResource, int i) throws IllegalStateException, SystemException + { + return false; + } + + @Override + public void registerSynchronization(Synchronization synchronization) throws RollbackException, IllegalStateException, SystemException + { + + } + } /* * Send some messages * Crash the destination server http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/client/ExpiryMessageTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/client/ExpiryMessageTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/client/ExpiryMessageTest.java index 1de3f14..bb9cc94 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/client/ExpiryMessageTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/client/ExpiryMessageTest.java @@ -44,9 +44,8 @@ public class ExpiryMessageTest extends JMSTestBase @Override protected Configuration createDefaultConfig(boolean netty) throws Exception { - Configuration conf = super.createDefaultConfig(netty); - - conf.setMessageExpiryScanPeriod(1000); + Configuration conf = super.createDefaultConfig(netty) + .setMessageExpiryScanPeriod(1000); return conf; } @@ -58,8 +57,6 @@ public class ExpiryMessageTest extends JMSTestBase Topic topic = createTopic("test-topic"); TopicControl control = ManagementControlHelper.createTopicControl(topic, mbeanServer); - System.out.println("size = " + control.getMessageCount()); - Connection conn2 = cf.createConnection(); conn2.setClientID("client1");
