http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index f34dcb4..5576fc4 100644 --- a/pom.xml +++ b/pom.xml @@ -7,6 +7,7 @@ <version>2.5.0-SNAPSHOT</version> <modules> <module>hornetq-protocols</module> + <module>hornetq-dto</module> </modules> <name>HornetQ Parent</name> @@ -45,7 +46,8 @@ ${hornetq.version.majorVersion}.${hornetq.version.minorVersion}.${hornetq.version.microVersion}.${hornetq.version.versionSuffix} (${hornetq.version.versionName}, ${hornetq.version.incrementingVersion}) </HornetQ-Version> - <resteasy.version>3.0.4.Final</resteasy.version> + <resteasy.version>3.0.9.Final</resteasy.version> + <jackson-databind.version>2.3.1</jackson-databind.version> <skipUnitTests>true</skipUnitTests> <skipJmsTests>true</skipJmsTests> <skipBytemanTests>true</skipBytemanTests> @@ -209,10 +211,9 @@ </dependency> <dependency> <groupId>org.jboss.spec.javax.transaction</groupId> - <artifactId>jboss-transaction-api_1.1_spec</artifactId> + <artifactId>jboss-transaction-api_1.2_spec</artifactId> <version>1.0.0.Final</version> </dependency> - <!--this specifically for the JMS Bridge--> <dependency> <groupId>org.jboss</groupId> @@ -231,6 +232,12 @@ <artifactId>jbossjts-jacorb</artifactId> <version>4.17.13.Final</version> </dependency> + <!-- this for Ironjacamar SPI XAResourceWrapper implementation --> + <dependency> + <groupId>org.jboss.ironjacamar</groupId> + <artifactId>ironjacamar-core-api</artifactId> + <version>1.2.0.Beta2</version> + </dependency> <!--needed to compile security--> <dependency> <groupId>org.jboss.security</groupId> @@ -244,58 +251,15 @@ </dependency> <!--needed to compile the bootstrap jar--> <dependency> - <groupId>org.jboss.microcontainer</groupId> - <artifactId>jboss-kernel</artifactId> - <version>2.0.6.GA</version> - <exclusions> - <exclusion> - <groupId>org.jboss.logging</groupId> - <artifactId>jboss-logging-spi</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.jboss</groupId> - <artifactId>jboss-common-core</artifactId> - <version>2.2.14.GA</version> - <exclusions> - <exclusion> - <groupId>org.jboss.logging</groupId> - <artifactId>jboss-logging-spi</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> <groupId>org.jgroups</groupId> <artifactId>jgroups</artifactId> <version>3.3.4.Final</version> </dependency> - <!--needed for microntainer deps in distro--> - <dependency> - <groupId>org.jboss.microcontainer</groupId> - <artifactId>jboss-dependency</artifactId> - <version>2.0.6.GA</version> - </dependency> - <dependency> - <groupId>org.jboss</groupId> - <artifactId>jboss-reflect</artifactId> - <version>2.0.2.GA</version> - </dependency> - <dependency> - <groupId>org.jboss</groupId> - <artifactId>jboss-mdr</artifactId> - <version>2.0.1.GA</version> - </dependency> - <dependency> - <groupId>org.jboss</groupId> - <artifactId>jbossxb</artifactId> - <version>2.0.1.GA</version> - </dependency> - <dependency> - <groupId>sun-jaxb</groupId> - <artifactId>jaxb-api</artifactId> - <version>2.1.9</version> - </dependency> + <dependency> + <groupId>io.airlift</groupId> + <artifactId>airline</artifactId> + <version>0.6</version> + </dependency> <!--needed to compile transport jar--> <dependency> <groupId>io.netty</groupId> @@ -324,19 +288,25 @@ <dependency> <groupId>org.apache.qpid</groupId> - <artifactId>proton-api</artifactId> - <version>0.5</version> + <artifactId>proton-j</artifactId> + <version>0.8</version> </dependency> <dependency> <groupId>org.apache.qpid</groupId> - <artifactId>proton-j-impl</artifactId> - <version>0.5</version> + <artifactId>proton-jms</artifactId> + <version>0.8</version> </dependency> <dependency> - <groupId>org.apache.qpid</groupId> - <artifactId>proton-jms</artifactId> - <version>0.5</version> + <groupId>org.apache.activemq</groupId> + <artifactId>activemq-client</artifactId> + <version>5.10.0</version> </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>1.7.5</version> + </dependency> + <!--needed to compile the spring support--> <dependency> <groupId>org.springframework</groupId> @@ -397,7 +367,6 @@ <artifactId>jboss-jaspi-api</artifactId> <version>1.0.0.GA</version> </dependency> - <!--needed to run the jms tests --> <dependency> <groupId>org.jboss.naming</groupId> @@ -432,6 +401,12 @@ <groupId>org.jboss.javaee</groupId> <artifactId>jboss-jca-api</artifactId> <version>1.5.0.GA</version> + <exclusions> + <exclusion> + <groupId>org.jboss.logging</groupId> + <artifactId>jboss-logging-spi</artifactId> + </exclusion> + </exclusions> </dependency> <!-- needed for javadoc graphics--> @@ -457,6 +432,25 @@ <name>JBoss releases</name> <url>https://repository.jboss.org/nexus/content/groups/public/</url> </repository> + <!-- + This is a repository for intermediate releases from Proton. + In case there's an API broken, we will release an intermediate release here, on that case we will uncomment this and + use this repository again + + <repository> + <snapshots> + <enabled>false</enabled> + <updatePolicy>never</updatePolicy> + </snapshots> + <releases> + <enabled>true</enabled> + <updatePolicy>interval:10080</updatePolicy> + </releases> + <id>fuse.release</id> + <name>Fuse releases</name> + <url>https://repository.jboss.org/nexus/content/repositories/fs-releases/</url> + </repository> + --> </repositories> <pluginRepositories> <pluginRepository> @@ -482,6 +476,7 @@ <activeByDefault>true</activeByDefault> </activation> <modules> + <module>hornetq-dto</module> <module>hornetq-bootstrap</module> <module>hornetq-commons</module> <module>hornetq-selector</module> @@ -506,6 +501,7 @@ <profile> <id>maven-release</id> <modules> + <module>hornetq-dto</module> <module>hornetq-bootstrap</module> <module>hornetq-commons</module> <module>hornetq-selector</module> @@ -530,6 +526,7 @@ <profile> <id>release</id> <modules> + <module>hornetq-dto</module> <module>hornetq-bootstrap</module> <module>hornetq-commons</module> <module>hornetq-selector</module> @@ -556,6 +553,7 @@ <profile> <id>hudson-tests</id> <modules> + <module>hornetq-dto</module> <module>hornetq-bootstrap</module> <module>hornetq-commons</module> <module>hornetq-selector</module> @@ -593,6 +591,7 @@ <profile> <id>jenkins-fast-tests</id> <modules> + <module>hornetq-dto</module> <module>hornetq-bootstrap</module> <module>hornetq-commons</module> <module>hornetq-selector</module> @@ -626,6 +625,7 @@ <profile> <id>examples</id> <modules> + <module>hornetq-dto</module> <module>hornetq-bootstrap</module> <module>hornetq-commons</module> <module>hornetq-selector</module> @@ -847,6 +847,15 @@ <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-checkstyle-plugin</artifactId> <version>2.12</version> + <dependencies> + <!-- This was initially done to enforce name on Parameter annotation + I've developed a customized check and I needed this jar to deploy the specialized checkstyle --> + <dependency> + <groupId>org.hornetq</groupId> + <artifactId>hornetq-checkstyle-checks</artifactId> + <version>0.2</version> + </dependency> + </dependencies> <configuration> <skip>${skipStyleCheck}</skip> <configLocation>${hornetq.basedir}/etc/checkstyle.xml</configLocation>
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/byteman-tests/pom.xml ---------------------------------------------------------------------- diff --git a/tests/byteman-tests/pom.xml b/tests/byteman-tests/pom.xml index bac33d0..cd3bffa 100644 --- a/tests/byteman-tests/pom.xml +++ b/tests/byteman-tests/pom.xml @@ -13,7 +13,7 @@ <properties> <tools.jar>${java.home}/../lib/tools.jar</tools.jar> - <byteman.version>2.1.4.1</byteman.version> + <byteman.version>2.2.0</byteman.version> <hornetq.basedir>${project.basedir}/../..</hornetq.basedir> </properties> @@ -132,7 +132,7 @@ </dependency> <dependency> <groupId>org.jboss.spec.javax.transaction</groupId> - <artifactId>jboss-transaction-api_1.1_spec</artifactId> + <artifactId>jboss-transaction-api_1.2_spec</artifactId> </dependency> <!--this specifically for the JMS Bridge --> <dependency> http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/BMFailoverTest.java ---------------------------------------------------------------------- diff --git a/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/BMFailoverTest.java b/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/BMFailoverTest.java index 4182c8c..4724891 100644 --- a/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/BMFailoverTest.java +++ b/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/BMFailoverTest.java @@ -19,6 +19,7 @@ import javax.transaction.xa.Xid; import org.hornetq.api.core.HornetQTransactionOutcomeUnknownException; import org.hornetq.api.core.HornetQTransactionRolledBackException; +import org.hornetq.api.core.HornetQUnBlockedException; import org.hornetq.api.core.SimpleString; import org.hornetq.api.core.TransportConfiguration; import org.hornetq.api.core.client.ClientConsumer; @@ -27,6 +28,7 @@ import org.hornetq.api.core.client.ClientProducer; import org.hornetq.api.core.client.ClientSession; import org.hornetq.api.core.client.ClientSessionFactory; import org.hornetq.api.core.client.ServerLocator; +import org.hornetq.core.client.HornetQClientMessageBundle; import org.hornetq.core.client.impl.ClientMessageImpl; import org.hornetq.core.client.impl.ClientSessionFactoryInternal; import org.hornetq.core.client.impl.ClientSessionInternal; @@ -35,11 +37,13 @@ import org.hornetq.core.server.Queue; import org.hornetq.core.transaction.impl.XidImpl; import org.hornetq.tests.integration.cluster.failover.FailoverTestBase; import org.hornetq.tests.integration.cluster.util.TestableServer; +import org.hornetq.tests.util.RandomUtil; import org.hornetq.utils.UUIDGenerator; import org.jboss.byteman.contrib.bmunit.BMRule; import org.jboss.byteman.contrib.bmunit.BMRules; import org.jboss.byteman.contrib.bmunit.BMUnitRunner; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -61,6 +65,7 @@ public class BMFailoverTest extends FailoverTestBase public void setUp() throws Exception { super.setUp(); + stopped = false; locator = getServerLocator(); } @@ -71,6 +76,122 @@ public class BMFailoverTest extends FailoverTestBase super.tearDown(); } + private static boolean stopped = false; + public static void stopAndThrow() throws HornetQUnBlockedException + { + if (!stopped) + { + try + { + serverToStop.getServer().stop(true); + } + catch (Exception e) + { + e.printStackTrace(); + } + try + { + Thread.sleep(2000); + } + catch (InterruptedException e) + { + e.printStackTrace(); + } + stopped = true; + throw HornetQClientMessageBundle.BUNDLE.unblockingACall(null); + } + } + @Test + @BMRules + ( + rules = + { + @BMRule + ( + name = "trace HornetQSessionContext xaEnd", + targetClass = "org.hornetq.core.protocol.core.impl.HornetQSessionContext", + targetMethod = "xaEnd", + targetLocation = "AT EXIT", + action = "org.hornetq.byteman.tests.BMFailoverTest.stopAndThrow()" + ) + } + ) + //https://bugzilla.redhat.com/show_bug.cgi?id=1152410 + public void testFailOnEndAndRetry() throws Exception + { + serverToStop = liveServer; + + createSessionFactory(); + + ClientSession session = createSession(sf, true, false, false); + + session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true); + + ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS); + + for (int i = 0; i < 100; i++) + { + producer.send(createMessage(session, i, true)); + } + + ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS); + + Xid xid = RandomUtil.randomXid(); + + session.start(xid, XAResource.TMNOFLAGS); + session.start(); + // Receive MSGs but don't ack! + for (int i = 0; i < 100; i++) + { + ClientMessage message = consumer.receive(1000); + + Assert.assertNotNull(message); + + assertMessageBody(i, message); + + Assert.assertEquals(i, message.getIntProperty("counter").intValue()); + } + try + { + //top level prepare + session.end(xid, XAResource.TMSUCCESS); + } + catch (XAException e) + { + try + { + //top level abort + session.end(xid, XAResource.TMFAIL); + } + catch (XAException e1) + { + try + { + //rollback + session.rollback(xid); + } + catch (XAException e2) + { + } + } + } + xid = RandomUtil.randomXid(); + session.start(xid, XAResource.TMNOFLAGS); + + for (int i = 0; i < 50; i++) + { + ClientMessage message = consumer.receive(1000); + + Assert.assertNotNull(message); + + assertMessageBody(i, message); + + Assert.assertEquals(i, message.getIntProperty("counter").intValue()); + } + session.end(xid, XAResource.TMSUCCESS); + session.commit(xid, true); + } + @Test @BMRules ( @@ -170,7 +291,7 @@ public class BMFailoverTest extends FailoverTestBase //let's close the consumer so anything pending is handled consumer.close(); - assertTrue("actual message count=" + inQ.getMessageCount(), inQ.getMessageCount() == 1); + assertEquals(1, getMessageCount(inQ)); } @@ -212,7 +333,7 @@ public class BMFailoverTest extends FailoverTestBase sendMessages(session, producer, 10); session.commit(); Queue bindable = (Queue) backupServer.getServer().getPostOffice().getBinding(FailoverTestBase.ADDRESS).getBindable(); - assertTrue(bindable.getMessageCount() == 10); + assertEquals(10, getMessageCount(bindable)); } @Test @@ -266,7 +387,8 @@ public class BMFailoverTest extends FailoverTestBase //pass } Queue bindable = (Queue) backupServer.getServer().getPostOffice().getBinding(FailoverTestBase.ADDRESS).getBindable(); - assertTrue("messager count = " + bindable.getMessageCount(), bindable.getMessageCount() == 10); + assertEquals(10, getMessageCount(bindable)); + } @Override @@ -303,6 +425,13 @@ public class BMFailoverTest extends FailoverTestBase return addClientSession(sf1.createSession(autoCommitSends, autoCommitAcks)); } + protected ClientSession + createSession(ClientSessionFactory sf1, boolean xa, boolean autoCommitSends, boolean autoCommitAcks) throws Exception + { + return addClientSession(sf1.createSession(xa, autoCommitSends, autoCommitAcks)); + } + + private void createSessionFactory() throws Exception { locator.setBlockOnNonDurableSend(true); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/BridgeServerLocatorConfigurationTest.java ---------------------------------------------------------------------- diff --git a/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/BridgeServerLocatorConfigurationTest.java b/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/BridgeServerLocatorConfigurationTest.java index 7b28da0..c30b7b9 100644 --- a/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/BridgeServerLocatorConfigurationTest.java +++ b/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/BridgeServerLocatorConfigurationTest.java @@ -17,9 +17,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import org.hornetq.api.config.HornetQDefaultConfiguration; import org.hornetq.api.core.TransportConfiguration; -import org.hornetq.api.core.client.HornetQClient; import org.hornetq.api.core.client.ServerLocator; import org.hornetq.core.config.BridgeConfiguration; import org.hornetq.core.config.CoreQueueConfiguration; @@ -98,24 +96,31 @@ public class BridgeServerLocatorConfigurationTest extends ServiceTestBase ArrayList<String> staticConnectors = new ArrayList<String>(); staticConnectors.add(server1tc.getName()); - BridgeConfiguration bridgeConfiguration = - new BridgeConfiguration(BRIDGE_NAME, queueName0, forwardAddress, null, null, - HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, - HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD, BRIDGE_TTL, 1000, - HornetQClient.DEFAULT_MAX_RETRY_INTERVAL, 1d, -1, 0, 0, true, 1024, - staticConnectors, false, HornetQDefaultConfiguration.getDefaultClusterUser(), - HornetQDefaultConfiguration.getDefaultClusterPassword()); + BridgeConfiguration bridgeConfiguration = new BridgeConfiguration() + .setName(BRIDGE_NAME) + .setQueueName(queueName0) + .setForwardingAddress(forwardAddress) + .setConnectionTTL(BRIDGE_TTL) + .setRetryInterval(1000) + .setReconnectAttempts(0) + .setReconnectAttemptsOnSameNode(0) + .setConfirmationWindowSize(1024) + .setStaticConnectors(staticConnectors); List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>(); bridgeConfigs.add(bridgeConfiguration); serverWithBridge.getConfiguration().setBridgeConfigurations(bridgeConfigs); - CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration(testAddress, queueName0, null, true); + CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration() + .setAddress(testAddress) + .setName(queueName0); List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<CoreQueueConfiguration>(); queueConfigs0.add(queueConfig0); serverWithBridge.getConfiguration().setQueueConfigurations(queueConfigs0); - CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration(forwardAddress, queueName1, null, true); + CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration() + .setAddress(forwardAddress) + .setName(queueName1); List<CoreQueueConfiguration> queueConfigs1 = new ArrayList<CoreQueueConfiguration>(); queueConfigs1.add(queueConfig1); server1.getConfiguration().setQueueConfigurations(queueConfigs1); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/ClusteredGroupingTest.java ---------------------------------------------------------------------- diff --git a/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/ClusteredGroupingTest.java b/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/ClusteredGroupingTest.java index 19852f9..cd27a49 100644 --- a/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/ClusteredGroupingTest.java +++ b/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/ClusteredGroupingTest.java @@ -19,7 +19,7 @@ import org.hornetq.api.core.HornetQNonExistentQueueException; import org.hornetq.api.core.Message; import org.hornetq.api.core.SimpleString; import org.hornetq.api.core.management.ManagementHelper; -import org.hornetq.api.core.management.NotificationType; +import org.hornetq.api.core.management.CoreNotificationType; import org.hornetq.core.server.HornetQServer; import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration; import org.hornetq.core.server.group.impl.Response; @@ -419,7 +419,7 @@ public class ClusteredGroupingTest extends ClusterTestBase public static void pause2(Notification notification) { - if (notification.getType() == NotificationType.BINDING_REMOVED) + if (notification.getType() == CoreNotificationType.BINDING_REMOVED) { SimpleString clusterName = notification.getProperties() .getSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/HornetQMessageHandlerTest.java ---------------------------------------------------------------------- diff --git a/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/HornetQMessageHandlerTest.java b/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/HornetQMessageHandlerTest.java index 178643f..e800d79 100644 --- a/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/HornetQMessageHandlerTest.java +++ b/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/HornetQMessageHandlerTest.java @@ -96,6 +96,7 @@ public class HornetQMessageHandlerTest extends HornetQRATestBase HornetQActivationSpec spec = new HornetQActivationSpec(); spec.setMaxSession(1); + spec.setCallTimeout(1000L); spec.setResourceAdapter(qResourceAdapter); spec.setUseJNDI(false); spec.setDestinationType("javax.jms.Queue"); @@ -131,7 +132,7 @@ public class HornetQMessageHandlerTest extends HornetQRATestBase qResourceAdapter.stop(); Binding binding = server.getPostOffice().getBinding(SimpleString.toSimpleString(MDBQUEUEPREFIXED)); - assertEquals(1, ((Queue) binding.getBindable()).getMessageCount()); + assertEquals(1, getMessageCount(((Queue) binding.getBindable()))); server.stop(); server.start(); @@ -175,6 +176,7 @@ public class HornetQMessageHandlerTest extends HornetQRATestBase HornetQActivationSpec spec = new HornetQActivationSpec(); spec.setMaxSession(1); + spec.setCallTimeout(1000L); spec.setResourceAdapter(qResourceAdapter); spec.setUseJNDI(false); spec.setDestinationType("javax.jms.Queue"); @@ -210,7 +212,8 @@ public class HornetQMessageHandlerTest extends HornetQRATestBase qResourceAdapter.stop(); Binding binding = server.getPostOffice().getBinding(SimpleString.toSimpleString(MDBQUEUEPREFIXED)); - assertEquals(1, ((Queue) binding.getBindable()).getMessageCount()); + assertEquals(1, getMessageCount(((Queue) binding.getBindable()))); + server.stop(); server.start(); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/JMSBridgeReconnectionTest.java ---------------------------------------------------------------------- diff --git a/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/JMSBridgeReconnectionTest.java b/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/JMSBridgeReconnectionTest.java new file mode 100644 index 0000000..e0db2e3 --- /dev/null +++ b/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/JMSBridgeReconnectionTest.java @@ -0,0 +1,159 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.byteman.tests; + +import org.hornetq.core.client.impl.ClientProducerCredits; +import org.hornetq.core.message.impl.MessageInternal; +import org.hornetq.core.protocol.core.Packet; +import org.hornetq.core.protocol.core.impl.PacketImpl; +import org.hornetq.core.protocol.core.impl.wireformat.SessionSendMessage; +import org.hornetq.jms.bridge.ConnectionFactoryFactory; +import org.hornetq.jms.bridge.QualityOfServiceMode; +import org.hornetq.jms.bridge.impl.JMSBridgeImpl; +import org.hornetq.jms.server.JMSServerManager; +import org.hornetq.tests.integration.jms.bridge.BridgeTestBase; +import org.jboss.byteman.contrib.bmunit.BMRule; +import org.jboss.byteman.contrib.bmunit.BMRules; +import org.jboss.byteman.contrib.bmunit.BMUnitRunner; +import org.junit.Test; +import org.junit.runner.RunWith; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +@RunWith(BMUnitRunner.class) +public class JMSBridgeReconnectionTest extends BridgeTestBase +{ + + @Test + @BMRules + ( + rules = + { + @BMRule + ( + name = "trace clientsessionimpl send", + targetClass = "org.hornetq.core.protocol.core.impl.ChannelImpl", + targetMethod = "send", + targetLocation = "ENTRY", + action = "org.hornetq.byteman.tests.JMSBridgeReconnectionTest.pause($1);" + ), + @BMRule + ( + name = "trace sendRegularMessage", + targetClass = "org.hornetq.core.client.impl.ClientProducerImpl", + targetMethod = "sendRegularMessage", + targetLocation = "ENTRY", + action = "org.hornetq.byteman.tests.JMSBridgeReconnectionTest.pause2($1,$2,$3);" + ) + } + ) + public void performCrashDestinationStopBridge() throws Exception + { + hornetQServer = jmsServer1; + ConnectionFactoryFactory factInUse0 = cff0; + ConnectionFactoryFactory factInUse1 = cff1; + final JMSBridgeImpl bridge = + new JMSBridgeImpl(factInUse0, + factInUse1, + sourceQueueFactory, + targetQueueFactory, + null, + null, + null, + null, + null, + 1000, + -1, + QualityOfServiceMode.DUPLICATES_OK, + 10, + -1, + null, + null, + false); + + addHornetQComponent(bridge); + bridge.setTransactionManager(newTransactionManager()); + bridge.start(); + final CountDownLatch latch = new CountDownLatch(20); + Thread clientThread = new Thread(new Runnable() + { + @Override + public void run() + { + while (bridge.isStarted()) + { + try + { + sendMessages(cf0, sourceQueue, 0, 1, false, false); + latch.countDown(); + } + catch (Exception e) + { + e.printStackTrace(); + } + } + } + }); + + clientThread.start(); + + stopLatch.await(10000, TimeUnit.MILLISECONDS); + + bridge.stop(); + + clientThread.join(5000); + + assertTrue(!clientThread.isAlive()); + } + + public static void pause(Packet packet) + { + if (packet.getType() == PacketImpl.SESS_SEND) + { + SessionSendMessage sendMessage = (SessionSendMessage) packet; + if (sendMessage.getMessage().containsProperty("__HQ_CID") && count < 0 && !stopped) + { + try + { + hornetQServer.stop(); + } + catch (Exception e) + { + e.printStackTrace(); + } + stopped = true; + try + { + Thread.sleep(5000); + } + catch (InterruptedException e) + { + e.printStackTrace(); + } + stopLatch.countDown(); + } + } + } + + static JMSServerManager hornetQServer; + static boolean stopped = false; + static int count = 20; + static CountDownLatch stopLatch = new CountDownLatch(1); + public static void pause2(MessageInternal msgI, boolean sendBlocking, final ClientProducerCredits theCredits) + { + if (msgI.containsProperty("__HQ_CID")) + { + count--; + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/OrphanedConsumerTest.java ---------------------------------------------------------------------- diff --git a/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/OrphanedConsumerTest.java b/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/OrphanedConsumerTest.java new file mode 100644 index 0000000..6a278fe --- /dev/null +++ b/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/OrphanedConsumerTest.java @@ -0,0 +1,298 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.hornetq.byteman.tests; + + +import org.hornetq.api.core.SimpleString; +import org.hornetq.api.core.client.ClientConsumer; +import org.hornetq.api.core.client.ClientMessage; +import org.hornetq.api.core.client.ClientProducer; +import org.hornetq.api.core.client.ClientSession; +import org.hornetq.api.core.client.ServerLocator; +import org.hornetq.core.client.impl.ClientSessionFactoryImpl; +import org.hornetq.core.server.HornetQServer; +import org.hornetq.core.server.Queue; +import org.hornetq.tests.util.ServiceTestBase; +import org.jboss.byteman.contrib.bmunit.BMRule; +import org.jboss.byteman.contrib.bmunit.BMRules; +import org.jboss.byteman.contrib.bmunit.BMUnitRunner; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; + +/** + * @author Clebert Suconic + */ +@RunWith(BMUnitRunner.class) +public class OrphanedConsumerTest extends ServiceTestBase +{ + + private static boolean conditionActive = true; + + public static final boolean isConditionActive() + { + return conditionActive; + } + + + public static final void setConditionActive(boolean _conditionActive) + { + conditionActive = _conditionActive; + } + + + public static void throwException() throws Exception + { + throw new InterruptedException("nice.. I interrupted this!"); + } + + private HornetQServer server; + + private ServerLocator locator; + + static HornetQServer staticServer; + + /** + * {@link #leavingCloseOnTestCountersWhileClosing()} will set this in case of any issues. + * the test must then validate for this being null + */ + static AssertionError verification; + + /** + * This static method is an entry point for the byteman rules on {@link #testOrphanedConsumers()} + * */ + public static void leavingCloseOnTestCountersWhileClosing() + { + if (staticServer.getConnectionCount() == 0) + { + verification = new AssertionError("The connection was closed before the consumers and sessions, this may cause issues on management leaving Orphaned Consumers!"); + } + + if (staticServer.getSessions().size() == 0) + { + verification = new AssertionError("The session was closed before the consumers, this may cause issues on management leaving Orphaned Consumers!"); + } + } + + + @Override + @Before + public void setUp() throws Exception + { + super.setUp(); + setConditionActive(true); + /** I'm using the internal method here because closing + * this locator on tear down would hang. + * as we are tweaking with the internal state and making it fail */ + locator = internalCreateNonHALocator(true); + } + + + @Override + @After + public void tearDown() throws Exception + { + super.tearDown(); + setConditionActive(false); + + staticServer = null; + } + + + /** + * This is like being two tests in one: + * I - validating that any exception during the close wouldn't stop connection from being closed + * II - validating that the connection is only removed at the end of the process and you wouldn't see + * inconsistencies on management + * @throws Exception + */ + @Test + @BMRules + ( + rules = + { + @BMRule + ( + name = "closeExit", + targetClass = "org.hornetq.core.server.impl.ServerConsumerImpl", + targetMethod = "close", + targetLocation = "AT EXIT", + condition = "org.hornetq.byteman.tests.OrphanedConsumerTest.isConditionActive()", + action = "System.out.println(\"throwing stuff\");throw new InterruptedException()" + ), + @BMRule + ( + name = "closeEnter", + targetClass = "org.hornetq.core.server.impl.ServerConsumerImpl", + targetMethod = "close", + targetLocation = "ENTRY", + condition = "org.hornetq.byteman.tests.OrphanedConsumerTest.isConditionActive()", + action = "org.hornetq.byteman.tests.OrphanedConsumerTest.leavingCloseOnTestCountersWhileClosing()" + ) + + } + ) + public void testOrphanedConsumers() throws Exception + { + internalTestOrphanedConsumers(false); + } + + + /** + * This is like being two tests in one: + * I - validating that any exception during the close wouldn't stop connection from being closed + * II - validating that the connection is only removed at the end of the process and you wouldn't see + * inconsistencies on management + * @throws Exception + */ + @Test + @BMRules + ( + rules = + { + @BMRule + ( + name = "closeExit", + targetClass = "org.hornetq.core.server.impl.ServerConsumerImpl", + targetMethod = "close", + targetLocation = "AT EXIT", + condition = "org.hornetq.byteman.tests.OrphanedConsumerTest.isConditionActive()", + action = "System.out.println(\"throwing stuff\");throw new InterruptedException()" + ), + @BMRule + ( + name = "closeEnter", + targetClass = "org.hornetq.core.server.impl.ServerConsumerImpl", + targetMethod = "close", + targetLocation = "ENTRY", + condition = "org.hornetq.byteman.tests.OrphanedConsumerTest.isConditionActive()", + action = "org.hornetq.byteman.tests.OrphanedConsumerTest.leavingCloseOnTestCountersWhileClosing()" + ) + + } + ) + public void testOrphanedConsumersByManagement() throws Exception + { + internalTestOrphanedConsumers(true); + } + + /** + * + * @param useManagement true = it will use a management operation to make the connection failure, false through ping + * @throws Exception + */ + private void internalTestOrphanedConsumers(boolean useManagement) throws Exception + { + final int NUMBER_OF_MESSAGES = 2; + server = createServer(true, true); + server.start(); + staticServer = server; + + locator.setBlockOnNonDurableSend(false); + locator.setBlockOnDurableSend(false); + locator.setBlockOnAcknowledge(true); + locator.setConnectionTTL(1000); + locator.setClientFailureCheckPeriod(100); + locator.setReconnectAttempts(0); + // We are not interested on consumer-window-size on this test + // We want that every message is delivered + // as we asserting for number of consumers available and round-robin on delivery + locator.setConsumerWindowSize(-1); + + ClientSessionFactoryImpl sf = (ClientSessionFactoryImpl)createSessionFactory(locator); + + ClientSession session = sf.createSession(true, true, 0); + + session.createQueue("queue", "queue1", true); + session.createQueue("queue", "queue2", true); + + ClientProducer prod = session.createProducer("queue"); + + ClientConsumer consumer = session.createConsumer("queue1"); + ClientConsumer consumer2 = session.createConsumer("queue2"); + + + Queue queue1 = server.locateQueue(new SimpleString("queue1")); + + Queue queue2 = server.locateQueue(new SimpleString("queue2")); + + session.start(); + + + if (!useManagement) + { + sf.stopPingingAfterOne(); + + for (long timeout = System.currentTimeMillis() + 6000; timeout > System.currentTimeMillis() && server.getConnectionCount() != 0; ) + { + Thread.sleep(100); + } + + // an extra second to avoid races of something closing the session while we are asserting it + Thread.sleep(1000); + } + else + { + server.getHornetQServerControl().closeConnectionsForAddress("127.0.0.1"); + } + + if (verification != null) + { + throw verification; + } + + assertEquals(0, queue1.getConsumerCount()); + assertEquals(0, queue2.getConsumerCount()); + + setConditionActive(false); + + locator = internalCreateNonHALocator(true); + + locator.setBlockOnNonDurableSend(false); + locator.setBlockOnDurableSend(false); + locator.setBlockOnAcknowledge(true); + locator.setReconnectAttempts(0); + locator.setConsumerWindowSize(-1); + + sf = (ClientSessionFactoryImpl)locator.createSessionFactory(); + session = sf.createSession(true, true, 0); + + + session.start(); + + + prod = session.createProducer("queue"); + + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) + { + ClientMessage message = session.createMessage(true); + message.putIntProperty("i", i); + prod.send(message); + } + + consumer = session.createConsumer("queue1"); + consumer2 = session.createConsumer("queue2"); + + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) + { + assertNotNull(consumer.receive(5000)); + assertNotNull(consumer2.receive(5000)); + } + + session.close(); + } + + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/PagingLeakTest.java ---------------------------------------------------------------------- diff --git a/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/PagingLeakTest.java b/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/PagingLeakTest.java new file mode 100644 index 0000000..654de4c --- /dev/null +++ b/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/PagingLeakTest.java @@ -0,0 +1,270 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.hornetq.byteman.tests; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import org.hornetq.api.core.SimpleString; +import org.hornetq.api.core.TransportConfiguration; +import org.hornetq.api.core.client.ClientConsumer; +import org.hornetq.api.core.client.ClientMessage; +import org.hornetq.api.core.client.ClientProducer; +import org.hornetq.api.core.client.ClientSession; +import org.hornetq.api.core.client.ClientSessionFactory; +import org.hornetq.api.core.client.ServerLocator; +import org.hornetq.core.config.Configuration; +import org.hornetq.core.paging.cursor.impl.PagePositionImpl; +import org.hornetq.core.server.HornetQServer; +import org.hornetq.core.server.HornetQServers; +import org.hornetq.core.settings.impl.AddressFullMessagePolicy; +import org.hornetq.core.settings.impl.AddressSettings; +import org.hornetq.tests.util.ServiceTestBase; +import org.jboss.byteman.contrib.bmunit.BMRule; +import org.jboss.byteman.contrib.bmunit.BMRules; +import org.jboss.byteman.contrib.bmunit.BMUnitRunner; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; + +@RunWith(BMUnitRunner.class) +public class PagingLeakTest extends ServiceTestBase +{ + + private static final AtomicInteger pagePosInstances = new AtomicInteger(0); + + public static void newPosition() + { + pagePosInstances.incrementAndGet(); + } + + public static void deletePosition() + { + pagePosInstances.decrementAndGet(); + } + + @Before + public void setup() + { + pagePosInstances.set(0); + } + + @Test + @BMRules + ( + rules = + { + @BMRule + ( + name = "newPosition", + targetClass = "org.hornetq.core.paging.cursor.impl.PagePositionImpl", + targetMethod = "<init>()", + targetLocation = "ENTRY", + action = "org.hornetq.byteman.tests.PagingLeakTest.newPosition()" + ), + @BMRule + ( + name = "finalPosition", + targetClass = "org.hornetq.core.paging.cursor.impl.PagePositionImpl", + targetMethod = "finalize", + targetLocation = "ENTRY", + action = "org.hornetq.byteman.tests.PagingLeakTest.deletePosition()" + ) + } + ) + public void testValidateLeak() throws Throwable + { + + List<PagePositionImpl> positions = new ArrayList<PagePositionImpl>(); + + for (int i = 0; i < 300; i++) + { + positions.add(new PagePositionImpl(3, 3)); + } + + long timeout = System.currentTimeMillis() + 5000; + while (pagePosInstances.get() != 300 && timeout > System.currentTimeMillis()) + { + forceGC(); + } + + // This is just to validate the rules are correctly applied on byteman + assertEquals("You have changed something on PagePositionImpl in such way that these byteman rules are no longer working", 300, pagePosInstances.get()); + + positions.clear(); + + timeout = System.currentTimeMillis() + 5000; + while (pagePosInstances.get() != 0 && timeout > System.currentTimeMillis()) + { + forceGC(); + } + + // This is just to validate the rules are correctly applied on byteman + assertEquals("You have changed something on PagePositionImpl in such way that these byteman rules are no longer working", 0, pagePosInstances.get()); + + final ArrayList<Exception> errors = new ArrayList<Exception>(); + // A backup that will be waiting to be activated + Configuration conf = createDefaultConfig(true) + .setSecurityEnabled(false) + .addConnectorConfiguration("invm", new TransportConfiguration(INVM_CONNECTOR_FACTORY)); + + final HornetQServer server = HornetQServers.newHornetQServer(conf, true); + addServer(server); + + + server.start(); + + + AddressSettings settings = new AddressSettings(); + settings.setPageSizeBytes(20 * 1024); + settings.setMaxSizeBytes(200 * 1024); + settings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE); + + + server.getAddressSettingsRepository().addMatch("#", settings); + + + final SimpleString address = new SimpleString("pgdAddress"); + + class Consumer extends Thread + { + final ServerLocator locator; + final ClientSessionFactory sf; + final ClientSession session; + final ClientConsumer consumer; + + final int sleepTime; + final int maxConsumed; + + Consumer(int sleepTime, String suffix, int maxConsumed) throws Exception + { + + server.createQueue(address, address.concat(suffix), null, true, false); + + this.sleepTime = sleepTime; + locator = createInVMLocator(0); + sf = locator.createSessionFactory(); + session = sf.createSession(true, true); + consumer = session.createConsumer(address.concat(suffix)); + + this.maxConsumed = maxConsumed; + } + + public void run() + { + try + { + session.start(); + + long lastTime = System.currentTimeMillis(); + + for (long i = 0; i < maxConsumed; i++) + { + ClientMessage msg = consumer.receive(5000); + + if (msg == null) + { + errors.add(new Exception("didn't receive a message")); + return; + } + + msg.acknowledge(); + + + if (sleepTime > 0) + { + + Thread.sleep(sleepTime); + } + + if (i % 1000 == 0) + { + System.out.println("Consumed " + i + " events in " + (System.currentTimeMillis() - lastTime)); + lastTime = System.currentTimeMillis(); + } + } + } + catch (Exception e) + { + e.printStackTrace(); + } + } + } + + + int numberOfMessages = 10000; + + Consumer consumer1 = new Consumer(100, "-1", 150); + Consumer consumer2 = new Consumer(0, "-2", numberOfMessages); + + final ServerLocator locator = createInVMLocator(0); + final ClientSessionFactory sf = locator.createSessionFactory(); + final ClientSession session = sf.createSession(true, true); + final ClientProducer producer = session.createProducer(address); + + + byte[] b = new byte[1024]; + + + for (long i = 0; i < numberOfMessages; i++) + { + ClientMessage msg = session.createMessage(true); + msg.getBodyBuffer().writeBytes(b); + producer.send(msg); + + if (i == 1000) + { + System.out.println("Starting consumers!!!"); + consumer1.start(); + consumer2.start(); + } + + if (i % 1000 == 0) + { + validateInstances(); + } + + } + + + consumer1.join(); + consumer2.join(); + + validateInstances(); + Throwable elast = null; + + for (Throwable e : errors) + { + e.printStackTrace(); + elast = e; + } + + if (elast != null) + { + throw elast; + } + + } + + private void validateInstances() + { + forceGC(); + int count2 = pagePosInstances.get(); + Assert.assertTrue("There is a leak, you shouldn't have this many instances (" + count2 + ")", count2 < 5000); + } + + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/ScaleDownFailoverTest.java ---------------------------------------------------------------------- diff --git a/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/ScaleDownFailoverTest.java b/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/ScaleDownFailoverTest.java index 22a39da..05c7245 100644 --- a/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/ScaleDownFailoverTest.java +++ b/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/ScaleDownFailoverTest.java @@ -14,6 +14,8 @@ package org.hornetq.byteman.tests; import org.hornetq.api.core.TransportConfiguration; import org.hornetq.api.core.client.ClientMessage; +import org.hornetq.core.config.ScaleDownConfiguration; +import org.hornetq.core.config.ha.LiveOnlyPolicyConfiguration; import org.hornetq.core.remoting.impl.netty.TransportConstants; import org.hornetq.core.server.HornetQServer; import org.hornetq.tests.integration.cluster.distribution.ClusterTestBase; @@ -37,23 +39,30 @@ public class ScaleDownFailoverTest extends ClusterTestBase { super.setUp(); stopCount = 0; - setupServer(0, isFileStorage(), isNetty()); - setupServer(1, isFileStorage(), isNetty()); - setupServer(2, isFileStorage(), isNetty()); + setupLiveServer(0, isFileStorage(), false, isNetty(), true); + setupLiveServer(1, isFileStorage(), false, isNetty(), true); + setupLiveServer(2, isFileStorage(), false, isNetty(), true); + ScaleDownConfiguration scaleDownConfiguration = new ScaleDownConfiguration(); + ScaleDownConfiguration scaleDownConfiguration2 = new ScaleDownConfiguration(); + scaleDownConfiguration2.setEnabled(false); + ScaleDownConfiguration scaleDownConfiguration3 = new ScaleDownConfiguration(); + scaleDownConfiguration3.setEnabled(false); + ((LiveOnlyPolicyConfiguration) servers[0].getConfiguration().getHAPolicyConfiguration()).setScaleDownConfiguration(scaleDownConfiguration); + ((LiveOnlyPolicyConfiguration) servers[1].getConfiguration().getHAPolicyConfiguration()).setScaleDownConfiguration(scaleDownConfiguration2); + ((LiveOnlyPolicyConfiguration) servers[2].getConfiguration().getHAPolicyConfiguration()).setScaleDownConfiguration(scaleDownConfiguration3); if (isGrouped()) { - servers[0].getConfiguration().getHAPolicy().setScaleDownGroupName("bill"); - servers[1].getConfiguration().getHAPolicy().setScaleDownGroupName("bill"); - servers[2].getConfiguration().getHAPolicy().setScaleDownGroupName("bill"); + scaleDownConfiguration.setGroupName("bill"); + scaleDownConfiguration2.setGroupName("bill"); + scaleDownConfiguration3.setGroupName("bill"); } - servers[0].getConfiguration().getHAPolicy().setScaleDown(true); staticServers = servers; setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2); setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2); setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1); - servers[0].getConfiguration().getHAPolicy().getScaleDownConnectors().addAll(servers[0].getConfiguration().getClusterConfigurations().iterator().next().getStaticConnectors()); - servers[1].getConfiguration().getHAPolicy().getScaleDownConnectors().addAll(servers[1].getConfiguration().getClusterConfigurations().iterator().next().getStaticConnectors()); - servers[2].getConfiguration().getHAPolicy().getScaleDownConnectors().addAll(servers[2].getConfiguration().getClusterConfigurations().iterator().next().getStaticConnectors()); + scaleDownConfiguration.getConnectors().addAll(servers[0].getConfiguration().getClusterConfigurations().iterator().next().getStaticConnectors()); + scaleDownConfiguration2.getConnectors().addAll(servers[1].getConfiguration().getClusterConfigurations().iterator().next().getStaticConnectors()); + scaleDownConfiguration3.getConnectors().addAll(servers[2].getConfiguration().getClusterConfigurations().iterator().next().getStaticConnectors()); startServers(0, 1, 2); setupSessionFactory(0, isNetty()); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/ScaleDownFailureTest.java ---------------------------------------------------------------------- diff --git a/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/ScaleDownFailureTest.java b/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/ScaleDownFailureTest.java index 1a7b254..44cd4df 100644 --- a/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/ScaleDownFailureTest.java +++ b/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/ScaleDownFailureTest.java @@ -13,6 +13,8 @@ package org.hornetq.byteman.tests; import org.hornetq.api.core.client.ClientMessage; +import org.hornetq.core.config.ScaleDownConfiguration; +import org.hornetq.core.config.ha.LiveOnlyPolicyConfiguration; import org.hornetq.tests.integration.cluster.distribution.ClusterTestBase; import org.jboss.byteman.contrib.bmunit.BMRule; import org.jboss.byteman.contrib.bmunit.BMUnitRunner; @@ -30,14 +32,15 @@ public class ScaleDownFailureTest extends ClusterTestBase public void setUp() throws Exception { super.setUp(); - setupServer(0, isFileStorage(), isNetty()); - setupServer(1, isFileStorage(), isNetty()); + setupLiveServer(0, isFileStorage(), false, isNetty(), true); + setupLiveServer(1, isFileStorage(), false, isNetty(), true); if (isGrouped()) { - servers[0].getConfiguration().getHAPolicy().setScaleDownGroupName("bill"); - servers[1].getConfiguration().getHAPolicy().setScaleDownGroupName("bill"); + ScaleDownConfiguration scaleDownConfiguration = new ScaleDownConfiguration(); + scaleDownConfiguration.setGroupName("bill"); + ((LiveOnlyPolicyConfiguration) servers[0].getConfiguration().getHAPolicyConfiguration()).setScaleDownConfiguration(scaleDownConfiguration); + ((LiveOnlyPolicyConfiguration) servers[1].getConfiguration().getHAPolicyConfiguration()).setScaleDownConfiguration(scaleDownConfiguration); } - servers[0].getConfiguration().getHAPolicy().setScaleDown(true); setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1); setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0); startServers(0, 1); @@ -62,7 +65,7 @@ public class ScaleDownFailureTest extends ClusterTestBase closeAllConsumers(); closeAllSessionFactories(); closeAllServerLocatorsFactories(); - servers[0].getConfiguration().getHAPolicy().setScaleDown(false); + ((LiveOnlyPolicyConfiguration) servers[0].getConfiguration().getHAPolicyConfiguration()).setScaleDownConfiguration(null); stopServers(0, 1); super.tearDown(); } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/StartStopDeadlockTest.java ---------------------------------------------------------------------- diff --git a/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/StartStopDeadlockTest.java b/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/StartStopDeadlockTest.java index 28bd89c..a813bfb 100644 --- a/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/StartStopDeadlockTest.java +++ b/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/StartStopDeadlockTest.java @@ -17,9 +17,10 @@ import java.util.concurrent.atomic.AtomicInteger; import org.hornetq.api.core.TransportConfiguration; import org.hornetq.core.config.Configuration; +import org.hornetq.core.config.ha.SharedStoreMasterPolicyConfiguration; +import org.hornetq.core.config.ha.SharedStoreSlavePolicyConfiguration; import org.hornetq.core.server.HornetQServer; import org.hornetq.core.server.HornetQServers; -import org.hornetq.core.server.cluster.ha.HAPolicy; import org.hornetq.jms.server.impl.JMSServerManagerImpl; import org.hornetq.tests.unit.util.InVMNamingContext; import org.hornetq.tests.util.ServiceTestBase; @@ -76,20 +77,20 @@ public class StartStopDeadlockTest extends ServiceTestBase { // A live server that will always be crashed - Configuration confLive = createDefaultConfig(true); - confLive.setSecurityEnabled(false); - confLive.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.SHARED_STORE); - confLive.getConnectorConfigurations().put("invm", new TransportConfiguration(INVM_CONNECTOR_FACTORY)); + Configuration confLive = createDefaultConfig(true) + .setSecurityEnabled(false) + .setHAPolicyConfiguration(new SharedStoreMasterPolicyConfiguration()) + .addConnectorConfiguration("invm", new TransportConfiguration(INVM_CONNECTOR_FACTORY)); final HornetQServer serverLive = HornetQServers.newHornetQServer(confLive); serverLive.start(); addServer(serverLive); // A backup that will be waiting to be activated - Configuration conf = createDefaultConfig(true); - conf.setSecurityEnabled(false); - conf.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.BACKUP_SHARED_STORE); - conf.getConnectorConfigurations().put("invm", new TransportConfiguration(INVM_CONNECTOR_FACTORY)); + Configuration conf = createDefaultConfig(true) + .setSecurityEnabled(false) + .setHAPolicyConfiguration(new SharedStoreSlavePolicyConfiguration()) + .addConnectorConfiguration("invm", new TransportConfiguration(INVM_CONNECTOR_FACTORY)); final HornetQServer server = HornetQServers.newHornetQServer(conf, true); addServer(server); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/StompInternalStateTest.java ---------------------------------------------------------------------- diff --git a/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/StompInternalStateTest.java b/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/StompInternalStateTest.java index b446f13..7bde640 100644 --- a/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/StompInternalStateTest.java +++ b/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/StompInternalStateTest.java @@ -20,7 +20,7 @@ import org.hornetq.api.core.TransportConfiguration; import org.hornetq.api.core.client.ClientSession; import org.hornetq.api.core.client.ClientSessionFactory; import org.hornetq.api.core.client.ServerLocator; -import org.hornetq.api.core.management.NotificationType; +import org.hornetq.api.core.management.CoreNotificationType; import org.hornetq.core.config.Configuration; import org.hornetq.core.protocol.stomp.StompProtocolManagerFactory; import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory; @@ -88,12 +88,12 @@ public class StompInternalStateTest extends ServiceTestBase @Override protected Configuration createDefaultConfig(final boolean netty) throws Exception { - Configuration config = super.createDefaultConfig(netty); - config.setSecurityEnabled(false); - config.setPersistenceEnabled(false); + Configuration config = super.createDefaultConfig(netty) + .setSecurityEnabled(false) + .setPersistenceEnabled(false); Map<String, Object> params = new HashMap<String, Object>(); - params.put(TransportConstants.PROTOCOL_PROP_NAME, StompProtocolManagerFactory.STOMP_PROTOCOL_NAME); + params.put(TransportConstants.PROTOCOLS_PROP_NAME, StompProtocolManagerFactory.STOMP_PROTOCOL_NAME); params.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_STOMP_PORT); params.put(TransportConstants.STOMP_CONSUMERS_CREDIT, "-1"); TransportConfiguration stompTransport = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params); @@ -107,14 +107,14 @@ public class StompInternalStateTest extends ServiceTestBase public void verifyBindingAddRemove(Notification noti, Object obj) { Set<String> destinations = (Set<String>)obj; - if (noti.getType() == NotificationType.BINDING_ADDED) + if (noti.getType() == CoreNotificationType.BINDING_ADDED) { if (!destinations.contains(STOMP_QUEUE_NAME)) { resultTestStompProtocolManagerLeak += "didn't save the queue when binding added " + destinations; } } - else if (noti.getType() == NotificationType.BINDING_REMOVED) + else if (noti.getType() == CoreNotificationType.BINDING_REMOVED) { if (destinations.contains(STOMP_QUEUE_NAME)) { http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/config/logging.properties.trace ---------------------------------------------------------------------- diff --git a/tests/config/logging.properties.trace b/tests/config/logging.properties.trace new file mode 100644 index 0000000..5bafa56 --- /dev/null +++ b/tests/config/logging.properties.trace @@ -0,0 +1,68 @@ +# +# JBoss, Home of Professional Open Source. +# Copyright 2010, Red Hat, Inc., and individual contributors +# as indicated by the @author tags. See the copyright.txt file in the +# distribution for a full listing of individual contributors. +# +# This is free software; you can redistribute it and/or modify it +# under the terms of the GNU Lesser General Public License as +# published by the Free Software Foundation; either version 2.1 of +# the License, or (at your option) any later version. +# +# This software is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this software; if not, write to the Free +# Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA +# 02110-1301 USA, or see the FSF site: http://www.fsf.org. +# + +# this is an example of a logging configuration where you could enable tracing at the testsuite + +# Additional logger names to configure (root logger is always configured) +# Root logger option +loggers=org.jboss.logging,org.hornetq.core.server,org.hornetq.utils,org.hornetq.journal,org.hornetq.jms,org.hornetq.ra,org.hornetq.tests.unit,org.hornetq.tests.integration,org.hornetq.jms.tests + +# Root logger level +logger.level=INFO +# HornetQ logger levels +logger.org.hornetq.core.server.level=TRACE +logger.org.hornetq.journal.level=INFO +logger.org.hornetq.utils.level=INFO +logger.org.hornetq.jms.level=INFO +logger.org.hornetq.ra.level=INFO +logger.org.hornetq.tests.unit.level=INFO +logger.org.hornetq.tests.integration.level=INFO +logger.org.hornetq.jms.tests.level=INFO + +# Root logger handlers +logger.handlers=CONSOLE,TEST +#logger.handlers=CONSOLE,FILE + +# Console handler configuration +handler.CONSOLE=org.jboss.logmanager.handlers.ConsoleHandler +handler.CONSOLE.properties=autoFlush +handler.CONSOLE.level=FINE +handler.CONSOLE.autoFlush=true +handler.CONSOLE.formatter=PATTERN + +# File handler configuration +handler.FILE=org.jboss.logmanager.handlers.FileHandler +handler.FILE.level=FINE +handler.FILE.properties=autoFlush,fileName +handler.FILE.autoFlush=true +handler.FILE.fileName=target/hornetq.log +handler.FILE.formatter=PATTERN + +# Console handler configuration +handler.TEST=org.hornetq.tests.logging.AssertionLoggerHandler +handler.TEST.level=TRACE +handler.TEST.formatter=PATTERN + +# Formatter pattern configuration +formatter.PATTERN=org.jboss.logmanager.formatters.PatternFormatter +formatter.PATTERN.properties=pattern +formatter.PATTERN.pattern=%d{HH:mm:ss,SSS} %-5p [%c] %s%E%n http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/pom.xml ---------------------------------------------------------------------- diff --git a/tests/integration-tests/pom.xml b/tests/integration-tests/pom.xml index 05fc719..f25fddc 100644 --- a/tests/integration-tests/pom.xml +++ b/tests/integration-tests/pom.xml @@ -13,8 +13,8 @@ <properties> <hornetq.basedir>${project.basedir}/../..</hornetq.basedir> - <vertx.version>2.1RC1</vertx.version> - <vertx.testtools.version>2.0.2-final</vertx.testtools.version> + <vertx.version>2.1.2</vertx.version> + <vertx.testtools.version>2.0.3-final</vertx.testtools.version> </properties> <dependencies> @@ -66,6 +66,11 @@ </dependency> <dependency> <groupId>org.hornetq</groupId> + <artifactId>hornetq-tools</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.hornetq</groupId> <artifactId>hornetq-twitter-integration</artifactId> <version>${project.version}</version> </dependency> @@ -96,12 +101,12 @@ </dependency> <dependency> <groupId>org.hornetq</groupId> - <artifactId>hornetq-aerogear-integration</artifactId> + <artifactId>hornetq-openwire-protocol</artifactId> <version>${project.version}</version> </dependency> <dependency> <groupId>org.hornetq</groupId> - <artifactId>hornetq-tools</artifactId> + <artifactId>hornetq-aerogear-integration</artifactId> <version>${project.version}</version> </dependency> <dependency> @@ -121,11 +126,6 @@ <artifactId>twitter4j-core</artifactId> </dependency> <dependency> - <groupId>com.hazelcast</groupId> - <artifactId>hazelcast</artifactId> - <version>2.6.6</version> - </dependency> - <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> </dependency> @@ -147,9 +147,13 @@ <version>0.24</version> </dependency> <dependency> - <groupId>org.apache.qpid</groupId> - <artifactId>proton-jms</artifactId> - </dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>proton-j</artifactId> + </dependency> + <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>proton-jms</artifactId> + </dependency> <dependency> <groupId>org.apache.qpid</groupId> <artifactId>qpid-client</artifactId> @@ -205,6 +209,22 @@ <version>${vertx.testtools.version}</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.activemq</groupId> + <artifactId>activemq-client</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.geronimo.specs</groupId> + <artifactId>geronimo-j2ee-management_1.1_spec</artifactId> + <version>1.0.1</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.jboss.ironjacamar</groupId> + <artifactId>ironjacamar-core-api</artifactId> + <scope>test</scope> + </dependency> </dependencies> <build> @@ -234,7 +254,6 @@ <skipTests>${skipIntegrationTests}</skipTests> <excludes> <exclude>**/ReplicatedJMSFailoverTest.java</exclude> - <exclude>**/Colocated*Test.java</exclude> <exclude>org/hornetq/tests/util/*.java</exclude> </excludes> <argLine>-Djgroups.bind_addr=::1 ${hornetq-surefire-argline}</argLine> http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/DuplicateDetectionTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/DuplicateDetectionTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/DuplicateDetectionTest.java index fc66974..e1a859e 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/DuplicateDetectionTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/DuplicateDetectionTest.java @@ -29,6 +29,7 @@ import org.hornetq.api.core.client.ClientSessionFactory; import org.hornetq.api.core.client.HornetQClient; import org.hornetq.api.core.client.ServerLocator; import org.hornetq.core.config.Configuration; +import org.hornetq.core.message.impl.MessageImpl; import org.hornetq.core.postoffice.impl.PostOfficeImpl; import org.hornetq.core.server.HornetQServer; import org.hornetq.core.transaction.impl.XidImpl; @@ -169,6 +170,8 @@ public class DuplicateDetectionTest extends ServiceTestBase ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY)); + locator.setBlockOnNonDurableSend(true); + ClientSessionFactory sf = createSessionFactory(locator); ClientSession session = sf.createSession(false, true, true); @@ -209,10 +212,23 @@ public class DuplicateDetectionTest extends ServiceTestBase message2 = consumer.receiveImmediate(); Assert.assertNull(message2); + message = createMessage(session, 3); + message.putBytesProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID, dupID.getData()); + producer.send(message); + message2 = consumer.receive(1000); + Assert.assertEquals(3, message2.getObjectProperty(propKey)); + + message = createMessage(session, 4); + message.putBytesProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID, dupID.getData()); + producer.send(message); + message2 = consumer.receiveImmediate(); + Assert.assertNull(message2); + producer.close(); consumer.close(); - Assert.assertEquals(1, ((PostOfficeImpl)messagingService.getPostOffice()).getDuplicateIDCaches().size()); + // there will be 2 ID caches, one for messages using "_HQ_DUPL_ID" and one for "_HQ_BRIDGE_DUP" + Assert.assertEquals(2, ((PostOfficeImpl)messagingService.getPostOffice()).getDuplicateIDCaches().size()); session.deleteQueue(queueName); Assert.assertEquals(0, ((PostOfficeImpl)messagingService.getPostOffice()).getDuplicateIDCaches().size()); } @@ -1275,9 +1291,8 @@ public class DuplicateDetectionTest extends ServiceTestBase { messagingService.stop(); - Configuration conf = createDefaultConfig(); - - conf.setIDCacheSize(cacheSize); + Configuration conf = createDefaultConfig() + .setIDCacheSize(cacheSize); HornetQServer messagingService2 = createServer(conf); @@ -1361,11 +1376,10 @@ public class DuplicateDetectionTest extends ServiceTestBase { messagingService.stop(); - Configuration conf = createDefaultConfig(); - final int theCacheSize = 5; - conf.setIDCacheSize(theCacheSize); + Configuration conf = createDefaultConfig() + .setIDCacheSize(theCacheSize); HornetQServer messagingService2 = createServer(conf); @@ -1443,12 +1457,11 @@ public class DuplicateDetectionTest extends ServiceTestBase { messagingService.stop(); - Configuration conf = createDefaultConfig(); - final int initialCacheSize = 10; final int subsequentCacheSize = 5; - conf.setIDCacheSize(initialCacheSize); + Configuration conf = createDefaultConfig() + .setIDCacheSize(initialCacheSize); HornetQServer messagingService2 = createServer(conf); @@ -1537,12 +1550,11 @@ public class DuplicateDetectionTest extends ServiceTestBase { messagingService.stop(); - Configuration conf = createDefaultConfig(); - final int initialCacheSize = 10; final int subsequentCacheSize = 5; - conf.setIDCacheSize(initialCacheSize); + Configuration conf = createDefaultConfig() + .setIDCacheSize(initialCacheSize); HornetQServer messagingService2 = createServer(conf); @@ -1641,11 +1653,9 @@ public class DuplicateDetectionTest extends ServiceTestBase { messagingService.stop(); - Configuration conf = createDefaultConfig(); - - conf.setIDCacheSize(cacheSize); - - conf.setPersistIDCache(false); + Configuration conf = createDefaultConfig() + .setIDCacheSize(cacheSize) + .setPersistIDCache(false); HornetQServer messagingService2 = createServer(conf); @@ -1729,11 +1739,9 @@ public class DuplicateDetectionTest extends ServiceTestBase { messagingService.stop(); - Configuration conf = createDefaultConfig(); - - conf.setIDCacheSize(cacheSize); - - conf.setPersistIDCache(false); + Configuration conf = createDefaultConfig() + .setIDCacheSize(cacheSize) + .setPersistIDCache(false); HornetQServer messagingService2 = createServer(conf); @@ -1821,9 +1829,8 @@ public class DuplicateDetectionTest extends ServiceTestBase { messagingService.stop(); - Configuration conf = createDefaultConfig(); - - conf.setIDCacheSize(cacheSize); + Configuration conf = createDefaultConfig() + .setIDCacheSize(cacheSize); HornetQServer messagingService2 = createServer(conf); @@ -1941,11 +1948,9 @@ public class DuplicateDetectionTest extends ServiceTestBase { messagingService.stop(); - Configuration conf = createDefaultConfig(); - - conf.setIDCacheSize(cacheSize); - - conf.setPersistIDCache(false); + Configuration conf = createDefaultConfig() + .setIDCacheSize(cacheSize) + .setPersistIDCache(false); HornetQServer messagingService2 = createServer(conf); @@ -2047,9 +2052,8 @@ public class DuplicateDetectionTest extends ServiceTestBase { messagingService.stop(); - Configuration conf = createDefaultConfig(); - - conf.setIDCacheSize(cacheSize); + Configuration conf = createDefaultConfig() + .setIDCacheSize(cacheSize); HornetQServer messagingService2 = createServer(conf); @@ -2149,9 +2153,8 @@ public class DuplicateDetectionTest extends ServiceTestBase { messagingService.stop(); - Configuration conf = createDefaultConfig(); - - conf.setIDCacheSize(cacheSize); + Configuration conf = createDefaultConfig() + .setIDCacheSize(cacheSize); HornetQServer messagingService2 = createServer(conf); @@ -2263,9 +2266,8 @@ public class DuplicateDetectionTest extends ServiceTestBase { super.setUp(); - Configuration conf = createDefaultConfig(); - - conf.setIDCacheSize(cacheSize); + Configuration conf = createDefaultConfig() + .setIDCacheSize(cacheSize); messagingService = createServer(true, conf); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/String64KLimitTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/String64KLimitTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/String64KLimitTest.java index b14d985..5ef7f85 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/String64KLimitTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/String64KLimitTest.java @@ -207,9 +207,8 @@ public class String64KLimitTest extends UnitTestCase { super.setUp(); - Configuration config = createBasicConfig(); - config.setSecurityEnabled(false); - config.getAcceptorConfigurations().add(new TransportConfiguration(INVM_ACCEPTOR_FACTORY)); + Configuration config = createBasicConfig() + .addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY)); server = addServer(HornetQServers.newHornetQServer(config, false)); server.start(); locator = createInVMNonHALocator(); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/aerogear/AeroGearBasicServerTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/aerogear/AeroGearBasicServerTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/aerogear/AeroGearBasicServerTest.java index ae4223c..41819bb 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/aerogear/AeroGearBasicServerTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/aerogear/AeroGearBasicServerTest.java @@ -73,7 +73,6 @@ public class AeroGearBasicServerTest extends ServiceTestBase connector0.setHost("localhost"); jetty.addConnector(connector0); jetty.start(); - Configuration configuration = createDefaultConfig(); HashMap<String, Object> params = new HashMap(); params.put(AeroGearConstants.QUEUE_NAME, "testQueue"); params.put(AeroGearConstants.ENDPOINT_NAME, "http://localhost:8080"); @@ -86,10 +85,15 @@ public class AeroGearBasicServerTest extends ServiceTestBase params.put(AeroGearConstants.DEVICE_TYPE_NAME, "android,ipad"); params.put(AeroGearConstants.SOUND_NAME, "sound1"); params.put(AeroGearConstants.VARIANTS_NAME, "variant1,variant2"); - configuration.getConnectorServiceConfigurations().add( - new ConnectorServiceConfiguration(AeroGearConnectorServiceFactory.class.getName(), params, "TestAeroGearService")); - - configuration.getQueueConfigurations().add(new CoreQueueConfiguration("testQueue", "testQueue", null, true)); + Configuration configuration = createDefaultConfig() + .addConnectorServiceConfiguration( + new ConnectorServiceConfiguration() + .setFactoryClassName(AeroGearConnectorServiceFactory.class.getName()) + .setParams(params) + .setName("TestAeroGearService")) + .addQueueConfiguration(new CoreQueueConfiguration() + .setAddress("testQueue") + .setName("testQueue")); server = createServer(configuration); server.start(); @@ -151,9 +155,6 @@ public class AeroGearBasicServerTest extends ServiceTestBase String badge = body.getString("badge"); assertNotNull(badge); assertEquals(badge, "99"); - Integer ttl = body.getInt("ttl"); - assertNotNull(ttl); - assertEquals(ttl.intValue(), 3600); JSONArray jsonArray = (JSONArray) aeroGearHandler.jsonObject.get("variants"); assertNotNull(jsonArray); assertEquals(jsonArray.getString(0), "variant1"); @@ -167,6 +168,9 @@ public class AeroGearBasicServerTest extends ServiceTestBase assertNotNull(jsonArray); assertEquals(jsonArray.getString(0), "android"); assertEquals(jsonArray.getString(1), "ipad"); + Integer ttl = (Integer) aeroGearHandler.jsonObject.get("ttl"); + assertNotNull(ttl); + assertEquals(ttl.intValue(), 3600); latch = new CountDownLatch(1); aeroGearHandler.resetLatch(latch); @@ -194,9 +198,6 @@ public class AeroGearBasicServerTest extends ServiceTestBase badge = body.getString("badge"); assertNotNull(badge); assertEquals(badge, "111"); - ttl = body.getInt("ttl"); - assertNotNull(ttl); - assertEquals(ttl.intValue(), 10000); jsonArray = (JSONArray) aeroGearHandler.jsonObject.get("variants"); assertNotNull(jsonArray); assertEquals(jsonArray.getString(0), "v1"); @@ -209,6 +210,9 @@ public class AeroGearBasicServerTest extends ServiceTestBase assertNotNull(jsonArray); assertEquals(jsonArray.getString(0), "dev1"); assertEquals(jsonArray.getString(1), "dev2"); + ttl = (Integer) aeroGearHandler.jsonObject.get("ttl"); + assertNotNull(ttl); + assertEquals(ttl.intValue(), 10000); session.start(); ClientMessage message = session.createConsumer("testQueue").receiveImmediate(); assertNull(message); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/AcknowledgeTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/AcknowledgeTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/AcknowledgeTest.java index d5b7426..5d0362c 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/AcknowledgeTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/AcknowledgeTest.java @@ -30,8 +30,10 @@ import org.hornetq.api.core.client.ClientSessionFactory; import org.hornetq.api.core.client.MessageHandler; import org.hornetq.api.core.client.ServerLocator; import org.hornetq.core.client.impl.ClientSessionInternal; +import org.hornetq.core.protocol.core.impl.HornetQConsumerContext; import org.hornetq.core.server.HornetQServer; import org.hornetq.core.server.Queue; +import org.hornetq.spi.core.remoting.ConsumerContext; import org.hornetq.tests.integration.IntegrationTestLogger; import org.hornetq.tests.util.ServiceTestBase; import org.hornetq.utils.UUID; @@ -333,9 +335,9 @@ public class AcknowledgeTest extends ServiceTestBase } @Override - public Object getId() + public ConsumerContext getConsumerContext() { - return id; + return new HornetQConsumerContext(this.id); } @Override @@ -363,9 +365,9 @@ public class AcknowledgeTest extends ServiceTestBase } @Override - public void setMessageHandler(MessageHandler handler) throws HornetQException + public FakeConsumerWithID setMessageHandler(MessageHandler handler) throws HornetQException { - + return this; } @Override @@ -410,9 +412,9 @@ public class AcknowledgeTest extends ServiceTestBase } @Override - public void setUserID(UUID userID) + public FakeMessageWithID setUserID(UUID userID) { - + return this; } @Override @@ -440,9 +442,9 @@ public class AcknowledgeTest extends ServiceTestBase } @Override - public void setDurable(boolean durable) + public FakeMessageWithID setDurable(boolean durable) { - + return this; } @Override @@ -458,9 +460,9 @@ public class AcknowledgeTest extends ServiceTestBase } @Override - public void setExpiration(long expiration) + public FakeMessageWithID setExpiration(long expiration) { - + return this; } @Override @@ -470,9 +472,9 @@ public class AcknowledgeTest extends ServiceTestBase } @Override - public void setTimestamp(long timestamp) + public FakeMessageWithID setTimestamp(long timestamp) { - + return this; } @Override @@ -482,9 +484,9 @@ public class AcknowledgeTest extends ServiceTestBase } @Override - public void setPriority(byte priority) + public FakeMessageWithID setPriority(byte priority) { - + return this; } @Override @@ -810,5 +812,17 @@ public class AcknowledgeTest extends ServiceTestBase { return null; } + + @Override + public FakeMessageWithID writeBodyBufferBytes(byte[] bytes) + { + return this; + } + + @Override + public FakeMessageWithID writeBodyBufferString(String string) + { + return this; + } } } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/CommitRollbackTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/CommitRollbackTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/CommitRollbackTest.java index d4e88b9..ce687ac 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/CommitRollbackTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/CommitRollbackTest.java @@ -156,9 +156,9 @@ public class CommitRollbackTest extends ServiceTestBase cc2.close(); session.rollback(); Assert.assertEquals(0, q2.getDeliveringCount()); - Assert.assertEquals(numMessages, q.getMessageCount()); + Assert.assertEquals(numMessages, getMessageCount(q)); Assert.assertEquals(0, q2.getDeliveringCount()); - Assert.assertEquals(numMessages, q.getMessageCount()); + Assert.assertEquals(numMessages, getMessageCount(q)); sendSession.close(); session.close(); } @@ -209,10 +209,10 @@ public class CommitRollbackTest extends ServiceTestBase Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); Queue q = (Queue) server.getPostOffice().getBinding(queueA).getBindable(); Assert.assertEquals(numMessages, q.getDeliveringCount()); - Assert.assertEquals(numMessages, q.getMessageCount()); + Assert.assertEquals(numMessages, getMessageCount(q)); session.commit(); Assert.assertEquals(0, q.getDeliveringCount()); - Assert.assertEquals(0, q.getMessageCount()); + Assert.assertEquals(0, getMessageCount(q)); sendSession.close(); session.close(); @@ -243,11 +243,11 @@ public class CommitRollbackTest extends ServiceTestBase Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); Queue q = (Queue) server.getPostOffice().getBinding(queueA).getBindable(); Assert.assertEquals(numMessages, q.getDeliveringCount()); - Assert.assertEquals(numMessages, q.getMessageCount()); + Assert.assertEquals(numMessages, getMessageCount(q)); session.stop(); session.rollback(); Assert.assertEquals(0, q.getDeliveringCount()); - Assert.assertEquals(numMessages, q.getMessageCount()); + Assert.assertEquals(numMessages, getMessageCount(q)); latch = new CountDownLatch(numMessages); cc.setMessageHandler(new ackHandler(session, latch)); session.start();
